Source code for tmtk.clinical.Clinical

import os
import pandas as pd

import tmtk
from .ColumnMapping import ColumnMapping
from .DataFile import DataFile
from .Ontology import OntologyMapping
from .Variable import Variable, VarID
from .WordMapping import WordMapping
from .modifier import Modifiers
from .trial_vists import TrialVisits
from ..utils import PathError, clean_for_namespace, FileBase, ValidateMixin, path_converter, BlueprintException
from ..utils.batch import TransmartBatch


[docs]class Clinical(ValidateMixin): """ Container class for all clinical data related objects, i.e. the column mapping, word mapping, and clinical data files. This object has methods that add data files, and for lookups of clinical files and variables. """ def __init__(self, clinical_params=None): self._ColumnMapping = None self.WordMapping = None self.OntologyMapping = None self.Modifiers = None self.TrialVisits = None self._params = clinical_params def __str__(self): return "ClinicalObject ({})".format(self.params.path) def __repr__(self): return "ClinicalObject ({})".format(self.params.path) @property def params(self): return self._params @params.setter def params(self, value): self._params = value self.ColumnMapping = ColumnMapping(params=self.params) self.WordMapping = WordMapping(params=self.params) if not tmtk.options.transmart_batch_mode: self.OntologyMapping = OntologyMapping(params=self.params) self.Modifiers = Modifiers(params=self.params) self.TrialVisits = TrialVisits(params=self.params) @property def ColumnMapping(self): return self._ColumnMapping @ColumnMapping.setter def ColumnMapping(self, value): self._ColumnMapping = value for file in self.ColumnMapping.included_datafiles: clinical_data_path = os.path.join(self.params.dirname, file) self.add_datafile(clinical_data_path)
[docs] def apply_blueprint(self, blueprint, omit_missing=False): """ Update the column mapping by applying a template. :param blueprint: expected input is a dictionary where keys are column names as found in clinical datafiles. Each column header name has a dictionary describing the path and data label and other information. For example: { "GENDER": { "path": "Characteristics\\Demographics", "label": "Gender", "concept_code": "SNOMEDCT/263495000", "metadata_tags": { "Info": "As measured when born." }, "force_categorical": "Y", "word_map": { "goo": "values", "pile": "list" }, "expected_categorical": [ "pile", "of", "goo" ] }, "BPBASE": { "path": "Lab results\\Blood", "label": "Blood pressure (baseline)", "expected_numerical": { "min": 1, "max": 9 } } } :param omit_missing: if True, then variable that are not present in the blueprint will be set to OMIT. """ for var_id, variable in self.all_variables.items(): blueprint_var = blueprint.get(variable.header.strip()) if not blueprint_var: self.msgs.info("Column with header {!r}. Not found in blueprint.".format(variable.header)) if omit_missing: variable.data_label = 'OMIT' continue if blueprint_var.get('path') is not None: variable.category_code = path_converter(blueprint_var.get('path')) if blueprint_var.get('label') is not None: variable.data_label = blueprint_var.get('label') if blueprint_var.get('word_map'): variable.word_map_dict = blueprint_var.get('word_map') if blueprint_var.get('concept_code'): variable.concept_code = blueprint_var.get('concept_code') if blueprint_var.get('force_categorical') and blueprint_var.get('data_type'): msg = "Both 'force_categorical' and 'data_type' found for {!r}".format(variable.header) raise BlueprintException(msg) elif blueprint_var.get('data_type'): variable.column_type = blueprint_var.get('data_type') elif blueprint_var.get('force_categorical'): variable.forced_categorical = blueprint_var.get('force_categorical') == "Y" reference_column = blueprint_var.get('reference_column') if reference_column is not None: try: variable.reference_column = variable.datafile.df.columns.get_loc(reference_column) + 1 except KeyError: msg = 'Cannot find reference column {!r} within dataframe header'.format(reference_column) raise BlueprintException(msg) expected_numerical = blueprint_var.get('expected_numerical') if expected_numerical and variable.is_numeric_in_datafile: min_expected = expected_numerical.get('min', '') try: min_const = float(min_expected if min_expected != '' else '-Inf') except ValueError: self.msgs.warning("Expected numerical for min constraint ({}), got {!r}." .format(variable.header, min_expected)) max_expected = expected_numerical.get('max', '') try: max_const = float(max_expected if max_expected != '' else 'Inf') except ValueError: self.msgs.warning("Expected numerical for max constraint ({}), got {!r}." .format(variable.header, max_expected)) if min_const > variable.min or max_const < variable.max: self.msgs.warning("Value constraints exceeded for {}: {} to {}, where datafile has min:{}, max:{}". format(variable.header, min_const, max_const, variable.min, variable.max) ) expected_categorical = blueprint_var.get('expected_categorical') if expected_categorical: unexpected = set(variable.unique_values) - set(expected_categorical) if unexpected: self.msgs.warning("Unexpected values for {}. Expected: {}. Also found: {}". format(variable.header, expected_categorical, list(unexpected)) )
[docs] def add_datafile(self, filename, dataframe=None): """ Add a clinical data file to study. :param filename: path to file or filename of file in clinical directory. :param dataframe: if given, add `pd.DataFrame` to study. """ if isinstance(dataframe, pd.DataFrame): datafile = DataFile() datafile.df = dataframe else: if os.path.exists(filename): file_path = filename else: file_path = os.path.join(self.params.dirname, filename) assert os.path.exists(file_path), PathError(file_path) datafile = DataFile(file_path) # Check if file is in de clinical directory if not os.path.dirname(os.path.abspath(filename)) == self.params.dirname: datafile.df # Force load df datafile.path = os.path.join(self.params.dirname, os.path.basename(filename)) while self.get_datafile(datafile.name): new_name = input("Filename {!r} already taken, try again. ".format(datafile.name)) datafile.name = new_name if not new_name == '' else datafile.name safe_name = clean_for_namespace(datafile.name) self.__dict__[safe_name] = datafile if datafile.name not in self.ColumnMapping.included_datafiles: self.msgs.okay('Adding {!r} as clinical datafile to study.'.format(datafile.name)) self.ColumnMapping.append_from_datafile(datafile)
[docs] def get_variable(self, var_id: tuple): """ Return a Variable object based on variable id. :param var_id: tuple of filename and column number. :return: `tmtk.Variable`. """ df_name, column = var_id datafile = self.get_datafile(df_name) return Variable(datafile, column, self)
[docs] def find_variables_by_label(self, label: str, in_file: str=None) -> list: """ Search for variables based on data label. All labels are converted to lower case. :param label: :param in_file: :return: """ sliced = self.ColumnMapping.df.iloc[:, 3] == label if in_file: sliced = sliced & (self.ColumnMapping.df.iloc[:, 0] == in_file) return [self.get_variable(x[0]) for x in self.ColumnMapping.df[sliced].iterrows()]
[docs] def get_patients(self): """ Creates a dictionary that has subject identifiers as keys and each value is a map that contains an nothing or an 'age' and/or 'gender' key that maps to this value. :return: patients dict. """ def add_patient_properties(subjects_dict, destination, labels): vars_ = [v for label in labels for v in self.find_variables_by_label(label)] if len(vars_) > 1: print("More than one {!r} defined, will pick last " "value found for each subject.".format(destination)) for var in vars_: for k, v in zip(var.subj_id.values, var.values): if v in (None, '', pd.np.nan): continue subjects_dict[k].update({destination: v}) subj_id_vars = self.find_variables_by_label('SUBJ_ID') # Create dictionary where every subject is a key with value as empty dictionary subjects = {subj_id: {} for var in subj_id_vars for subj_id in var.values} add_patient_properties(subjects, 'gender', ('gender', 'Gender', 'GENDER', 'sex', 'Sex', 'SEX')) add_patient_properties(subjects, 'age', ('Age', 'age', 'AGE')) return subjects
[docs] def get_trial_visits(self): """ Returns a list of all trial visits present in this study. Visits are identified by the TRIAL_VISIT_LABEL keyword in column mapping and can be annotated with a value and unit using the TrialVisits object. :return: list of dicts. """ visit_labels = {'General': {'name': 'General'}} visit_labels.update({l: {'name': l} for var in self.find_variables_by_label('TRIAL_VISIT_LABEL') for l in var.values}) visit_labels.update({l: {'name': l} for l in self.TrialVisits.df.iloc[:, 0]}) self.TrialVisits.df.apply( lambda x: visit_labels[x[0]].update( {'relative_time': x[1], 'time_unit': x[2]} ), axis=1 ) for missing in ('', None, pd.np.nan): try: visit_labels.pop(missing) except KeyError: pass return list(visit_labels.values())
@property def all_variables(self): """ Dictionary where {`tmtk.VarID`: `tmtk.Variable`} for all variables in the column mapping file. """ return {VarID(var_id): self.get_variable(var_id) for var_id in self.ColumnMapping.ids} @property def filtered_variables(self): """ Dictionary where {`tmtk.VarID`: `tmtk.Variable`} for all variables in the column mapping file that do not have a data label in the RESERVED_KEYWORDS list """ return {k: v for k, v in self.all_variables.items() if v.data_label not in self.ColumnMapping.RESERVED_KEYWORDS}
[docs] def validate_all(self, verbosity=3): for key, obj in self.__dict__.items(): if hasattr(obj, 'validate'): obj.validate(verbosity=verbosity)
[docs] def get_datafile(self, name: str): """ Find datafile object by filename. :param name: name of file. :return: `tmtk.DataFile` object. """ for key, obj in self.__dict__.items(): if isinstance(obj, DataFile): if obj.name == name: return obj
def __hash__(self): """ Calculate hash for in memory pd.DataFrame objects. The sum of these hashes is returned. :return: sum of hashes. """ hashes = 0 for key, obj in self.__dict__.items(): if hasattr(obj, 'df'): hashes += hash(obj) return hashes
[docs] def show_changes(self): """Print changes made to the column mapping and word mapping file.""" column_changes = self.ColumnMapping.path_changes(silent=True) word_map_changes = self.WordMapping.word_map_changes(silent=True) changes = set().union(column_changes, word_map_changes) for var_id in changes: print("{}: {}".format(*var_id)) path_change = column_changes.get(var_id) if path_change: print(" {}".format(path_change[0])) print(" -> {}".format(path_change[1])) else: print(" {}".format(self.get_variable(var_id).concept_path)) map_change = word_map_changes.get(var_id) if map_change: for k, v in map_change.items(): print(" - {!r} -> {!r}".format(k, v)) return changes
@property def load_to(self): return TransmartBatch(param=self.params.path, items_expected=self._get_lazy_batch_items() ).get_loading_namespace() def _get_lazy_batch_items(self): return {self.params.path: [self.get_datafile(f).path for f in self.ColumnMapping.included_datafiles]} @property def clinical_files(self): return [x for k, x in self.__dict__.items() if issubclass(type(x), FileBase)] def _validate_clinical_params(self): if os.path.exists(self.params.path): self.msgs.okay('Clinical params found on disk.') else: self.msgs.error('Clinical params not on disk.') def _validate_SUBJ_IDs(self): for datafile in self.ColumnMapping.included_datafiles: var_id_list = [var_id for var_id in self.ColumnMapping.subj_id_columns if var_id[0] == datafile] # Check for one SUBJ_ID per file if len(var_id_list) == 1: subj_id = self.get_variable(var_id_list[0]) if len(subj_id.values) == len(subj_id.unique_values): self.msgs.okay('Found a SUBJ_ID for {} and it has unique values, thats good!'.format(datafile)) else: self.msgs.error('Found a SUBJ_ID for {}, but it has duplicate values.'.format(datafile), warning_list=subj_id.values[subj_id.values.duplicated()].unique()) else: self.msgs.error('Found {} SUBJ_ID for {}'.format(len(var_id_list), datafile)) def _validate_word_mappings(self): # check presence of all data files filenames = self.WordMapping.included_datafiles valid_filenames = [] for filename in filenames: if filename not in self.ColumnMapping.included_datafiles: msg = "The file {} isn't included in the column map".format(filename) self.msgs.error(msg) else: valid_filenames.append(filename) column_number = self.WordMapping.df.columns[1] for filename in valid_filenames: datafile = self.get_datafile(filename) amount_of_columns = datafile.df.shape[1] columns = set(self.WordMapping.df.loc[filename, column_number]) out_of_bound = {index for index in columns if index > amount_of_columns} for index in out_of_bound: msg = "File {} doesn't has {} columns, but {} columns".format(filename, index, amount_of_columns) self.msgs.error(msg) correct_columns = columns - out_of_bound for column in correct_columns: variable = self.get_variable((filename, column)) unmapped = variable.word_mapped_not_present() for unmapped_value in unmapped: msg = "Value {} is mapped at column {} in file {}. " \ "However the value is not present in the column".format(unmapped_value, column, filename) self.msgs.warning(msg)