import json
import pandas as pd
import tmtk
import tqdm

from ..utils import Mappings, Exceptions, path_join, path_converter, Message
from ..clinical.Variable import VarID

[docs]def create_concept_tree(column_object): """ :param column_object: tmtk.Study object, tmtk.Clinical object, or ColumnMapping dataframe :return: json string to be interpreted by the JSTree """ if isinstance(column_object, tmtk.Study): concept_tree = create_tree_from_study(column_object) elif isinstance(column_object, tmtk.Clinical): concept_tree = create_tree_from_clinical(column_object) else: raise Exceptions.ClassError(type(column_object, 'tmtk.Clinical or tmtk.Study')) return concept_tree.jstree.json_data_string
def _get_hd_args(path, high_dim_node, annotation): """ Create dict with meta tags that belong to a certain high dimensional node. """ map_file = high_dim_node.sample_mapping s = map_file.slice_path(path).iloc[:, 5].unique() t = map_file.slice_path(path).iloc[:, 6].unique() hd_args = {'hd_sample': ', '.join(s.astype(str)) if pd.notnull(s[0]) else '', 'hd_tissue': ', '.join(t.astype(str)) if pd.notnull(t[0]) else '', 'hd_type': Mappings.annotation_data_types.get(high_dim_node.params.datatype), } if annotation: hd_args.update({'pl_marker_type': annotation.marker_type, 'pl_genome_build': annotation.params.get('GENOME_RELEASE', ''), 'pl_title': annotation.params.get('TITLE', ''), 'pl_id': annotation.platform}) return hd_args
[docs]def create_tree_from_study(study, concept_tree=None): """ :param study: :param concept_tree: :return: """ if not concept_tree: concept_tree = ConceptTree() concept_tree = create_tree_from_clinical(study.Clinical, concept_tree) for high_dim_node in study.high_dim_files: annotation = study.find_annotation(high_dim_node.platform) for md5, path in high_dim_node.sample_mapping.get_concept_paths.items(): path = path_converter(path, to_internal=True) hd_args = _get_hd_args(path, high_dim_node, annotation) concept_tree.add_node(path, var_id=md5, node_type='highdim', data_args={'hd_args': hd_args}) if hasattr(study, 'Tags'): for i, (path, tags_dict) in enumerate(study.Tags.get_tags()): # Don't add empty folder if Tags are at study level path_in_tree = path_join(path, Mappings.tags_node_name) if path != "" else Mappings.tags_node_name path_in_tree = path_converter(path_in_tree, to_internal=True) data_args = {'tags': tags_dict} concept_tree.add_node(path_in_tree, var_id="tags_id_{}".format(i), node_type='tag', data_args=data_args) return concept_tree
[docs]def create_tree_from_clinical(clinical_object, concept_tree=None): """ :param clinical_object: :param concept_tree: :return: """ if not concept_tree: concept_tree = ConceptTree() column_map_ids = clinical_object.ColumnMapping.ids no_bar = True if len(column_map_ids) < 200 else False bar_format = '{l_bar}{bar} | {n_fmt}/{total_fmt} nodes ready, {rate_fmt}' for var_id, variable in tqdm.tqdm_notebook(clinical_object.all_variables.items(), bar_format=bar_format, unit=' nodes', leave=False, dynamic_ncols=True, disable=no_bar): data_args = variable.column_map_data # Don't need these, they're in the tree. for k in [Mappings.cat_cd_s, Mappings.data_label_s]: data_args.pop(k) concept_path = path_converter(variable.concept_path, to_internal=True) categories = {} if variable.is_numeric else variable.word_map_dict if categories: node_type = 'categorical' else: node_type = 'empty' if variable.is_empty else 'numeric' # Store node type in `data` so it can be changed back after renaming OMIT data_args.update({'ctype': node_type}) # Store column header of variable. data_args.update({'dfh': variable.header}) # Add filename to SUBJ_ID and OMIT, this is a work around for unique path constraint. if variable.data_label in {"SUBJ_ID", "OMIT"}: concept_path = concept_path.replace("SUBJ ID", "SUBJ_ID") node_type = 'codeleaf' # Add categorical values to concept tree (if any) for i, datafile_value in enumerate(categories): oid = var_id.create_category(i + 1) mapped = categories[datafile_value] mapped = mapped if not pd.isnull(mapped) else '' categorical_path = path_join(concept_path, mapped) concept_tree.add_node(categorical_path, oid, node_type='alpha', data_args={Mappings.df_value_s: datafile_value}) concept_tree.add_node(concept_path, var_id, node_type=node_type, data_args=data_args) return concept_tree
[docs]class ConceptTree: """ Build a ConceptTree to be used in the graphical tree editor. """ def __init__(self, json_data=None): """ :param json_data: Optional json data that initiates the ConceptTree object and populates it with ConceptNode objects. """ self.nodes = [] self.paths = set() if json_data: if type(json_data) == str: json_data = json.loads(json_data) self._extract_node_list(json_data)
[docs] def add_node(self, path, var_id=None, node_type=None, data_args=None): """ Add ConceptNode object nodes list. :param path: Concept path for this node. :param var_id: Unique ID that allows to keep track of a node. :param node_type: Explicitly set node type (highdim, numerical, categorical) :param data_args: Any additional parameters are put a 'data' dictionary. """ # Check if node already exists. if path in self.paths and node_type not in {'alpha', 'codeleaf'}: Message.warning('Trying to add duplicate to ConceptTree: {}\n' 'This might fail in the GUI.'.format(path)) new_node = ConceptNode(path, var_id=var_id, node_type=node_type, data_args=data_args) self.nodes.append(new_node) self.paths.add(new_node.path)
@property def jstree(self): return JSTree(self.nodes) @property def column_mapping_file(self): """ :return: Column Mapping file based on ConceptTree object. """ df = pd.concat([self._extract_column_mapping_row(node) for node in self.nodes], axis=1).T df.columns = Mappings.column_mapping_header return df @property def high_dim_paths(self): """ All high dimensional nodes in concept tree as dict """ return {node.var_id: path_converter(node.path, from_internal=True) for node in self.nodes if node.type == 'highdim'} @property def word_mapping(self): all_mappings = [self._extract_word_mapping_row(node) for node in self.nodes] df = pd.concat(all_mappings, axis=1).T # Fillna needs to happen because for some reason this expression below # returns True for NaN and NaN, which introduces unnecessary rows in word mapping. # This issue might need to be resolved earlier in the ConceptTree! changed_values = df.fillna('').iloc[:, 2] != df.fillna('').iloc[:, 3] # Set None to NaN, else empty fields in dataframes are not recognized (None != NaN) df.fillna(, inplace=True) df.columns = Mappings.word_mapping_header return df[changed_values].reset_index(drop=True) @property def tags_file(self): all_mappings = [self._extract_node_tags(node) for node in self.nodes] # This reduces the nested dictionary to a flat one. flat_mapping = [row for nest_list in all_mappings for row in nest_list] column_names = Mappings.tags_header try: df = pd.concat([pd.Series(row) for row in flat_mapping], axis=1).T df.columns = column_names except ValueError: # This happens when there are no tags in the file df = pd.DataFrame(columns=column_names) return df @staticmethod def _extract_column_mapping_row(node): if node.type not in {'numeric', 'categorical', 'codeleaf', 'empty'}: return filename = *path, data_label = node.path.rsplit(Mappings.PATH_DELIM, 1) path = path_converter(path[0], from_internal=True) if path else Mappings.EXT_PATH_DELIM # Remove file names from SUBJ_ID, they were added as workaround for unique constraints. if data_label.startswith("SUBJ_ID"): data_label = "SUBJ_ID" # Remove variable ID from OMIT variables. if data_label.startswith("OMIT"): data_label = "OMIT" column = magic5 = magic6 = concept_type = new_row = pd.Series([filename, path, column, data_label, magic5, magic6, concept_type]) if all([filename, data_label, column]): return new_row @staticmethod def _extract_node_tags(node): list_of_rows = [] tags_dict ='tags', {}) if tags_dict: # Tag paths need to start with slash path = node.path.rsplit(Mappings.tags_node_name, 1)[0].strip(Mappings.PATH_DELIM) path = path_converter(path, from_internal=True) path = Mappings.EXT_PATH_DELIM + path for title, (description, weight, *_) in tags_dict.items(): if not all([title, description, weight]): continue list_of_rows.append([path, title, description, weight]) return list_of_rows @staticmethod def _extract_word_mapping_row(node): if node.type == 'alpha': filename, column, c = node.var_id datafile_value = mapped_value = node.path.rsplit(Mappings.PATH_DELIM, 1)[1] return pd.Series([filename, column, datafile_value, mapped_value]) def _extract_node_list(self, json_data): path = [] for node in json_data: self._get_children(node, path) def _get_children(self, node, path): node_type = node.get('type', 'default') node_children = node.get('children', []) node_text = node['text'] node_path = path + [node_text] if node_type != 'default': concept_path = path_join(*node_path) var_id = VarID(node.get('id')) if node_type != 'tag' else None self.add_node(path=concept_path, var_id=var_id, node_type=node_type, data_args=node.get('data', {}), ) for child in node_children: self._get_children(child, node_path)
[docs]class ConceptNode: def __init__(self, path, var_id=None, node_type='numeric', data_args=None): """ Object to be put into a list and interpreted by JSTree. :param path: Concept path for this node. :param var_id: Unique ID that allows to keep track of a node. :param node_type: If None, this concept node is considered to be numerical. :param data_args: Any additional parameters are put a 'data' dictionary. """ self.path = path self.var_id = var_id = data_args if data_args else {} self.type = node_type def __repr__(self): return self.path def __str__(self): return self.path
[docs]class JSNode: """ This class exists as a helper to the JSTree. Its "json_data" method can generate sub-tree JSON without putting the logic directly into the JSTree. """ def __init__(self, path, oid=None, **kwargs): """ kwargs allows users to pass arbitrary information into a Node that will later be output in json_data(). It allows for more advanced configuration than the default path handling that jsTree currently allows. For example, users may want to pass "attr" or some other valid jsTree options. """ self.children = {} self.helper_children = {} if not all([isinstance(self.children[child], JSNode) for child in self.children]): raise TypeError("One or more children were not instances of '{}'".format(JSNode)) if 'children' in kwargs: del kwargs['children'] = kwargs.get('data', {}) if del kwargs['data'] self.__dict__.update(id=oid) self.__dict__.update(**kwargs) self.__dict__['text'] = path
[docs] def get_child(self, var_id, text): return self.children.get(var_id) or self.helper_children.get(text) or self.children.get(text)
def __repr__(self): return self.text
[docs] def json_data(self): children = [k.json_data() for k in self.children.values()] output = {} for k, v in self.__dict__.items(): if k in {'children', 'helper_children'}: continue output[k] = v if children: output['children'] = children return output
[docs]class JSTree: """ An json like object that converts a list of nodes into something that jQuery jstree can use. """ def __init__(self, concept_nodes): """ Take a list of paths and put them into a tree. """ if not all([isinstance(p, ConceptNode) for p in concept_nodes]): raise TypeError("All paths must be instances of {}".format(ConceptNode.__name__)) self._root = JSNode('', None) # Sort paths, not sure if this is really necessary. concept_nodes.sort(key=lambda x: x.path) for node in concept_nodes: curr = self._root sub_paths = node.path.split(Mappings.PATH_DELIM) data = node.__dict__.get('data', {}) node_type = node.__dict__.get('type', 'default') # Will be used to add the categories to the right categorical node. parent = node.var_id.parent if node_type == 'alpha' else 0 # And now for the tricky bit. for i, sub_path in enumerate(sub_paths): # Arrived at leaf. Add final JSNode of path and give it the VarID if i == len(sub_paths) - 1: # Arrived at leaf new_node = JSNode(sub_path, oid=node.var_id, data=data, type=node_type) curr.children[node.var_id] = new_node curr.helper_children[new_node.text] = new_node continue # next path # Not a leaf, check if current path already in tree. next_child = curr.get_child(var_id=parent, text=sub_path) if not next_child: new_node = JSNode(sub_path) curr.children[sub_path] = new_node curr = new_node else: curr = next_child def __repr__(self): """ This outputs the tree to terminal as class representation. """ return self.pretty()
[docs] def pretty(self, root=None, depth=0, spacing=2): """ Create a pretty representation of tree. """ if root is None: root = self._root fmt = "%s%s/" if root.children else "%s%s" s = fmt % (" " * depth * spacing, root.text) for child in root.children: s += "\n%s" % self.pretty(child, depth + 1, spacing) return s
@property def json_data(self): """ Convert this object to json ready to be consumed by jstree. """ return [k.json_data() for k in self._root.children.values()] @property def json_data_string(self): """ :return: Returns the json_data properly formatted as string. """ return json.dumps(self.json_data, cls=MyEncoder)
[docs] def to_clipboard(self): pd.DataFrame.to_clipboard(self.json_data_string)
[docs]class MyEncoder(json.JSONEncoder): """ Overwriting the standard JSON Encoder to treat numpy ints as native ints."""
[docs] def default(self, obj): if isinstance(obj, (, return int(obj) elif isinstance(obj, VarID): return str(obj) else: return super(MyEncoder, self).default(obj)