Source code for tmtk.clinical.Clinical

import pandas as pd
import os

from .DataFile import DataFile
from .Variable import Variable, VarID
from .ColumnMapping import ColumnMapping
from .WordMapping import WordMapping
from ..utils import PathError, clean_for_namespace, FileBase, ValidateMixin, path_converter
from .. import arborist
from ..utils.batch import TransmartBatch


[docs]class Clinical(ValidateMixin): """ Container class for all clinical data related objects, i.e. the column mapping, word mapping, and clinical data files. This object has methods that add data files, and for lookups of clinical files and variables. """ def __init__(self, clinical_params=None): self._WordMapping = None self._ColumnMapping = None self._params = clinical_params def __str__(self): return "ClinicalObject ({})".format(self.params.path) def __repr__(self): return "ClinicalObject ({})".format(self.params.path) @property def params(self): return self._params @params.setter def params(self, value): self._params = value self.ColumnMapping = ColumnMapping(params=self.params) self.WordMapping = WordMapping(params=self.params) @property def ColumnMapping(self): return self._ColumnMapping @ColumnMapping.setter def ColumnMapping(self, value): self._ColumnMapping = value for file in self.ColumnMapping.included_datafiles: clinical_data_path = os.path.join(self.params.dirname, file) self.add_datafile(clinical_data_path) @property def WordMapping(self): return self._WordMapping @WordMapping.setter def WordMapping(self, value): self._WordMapping = value
[docs] def apply_blueprint(self, blueprint, omit_missing=False): """ Update the column mapping by applying a template. :param blueprint: expected input is a dictionary where keys are column names as found in clinical datafiles. Each column header name has a dictionary describing the path and data label and other information. For example: { "GENDER": { "path": "Characteristics\\Demographics", "label": "Gender", "metadata_tags": { "Info": "As measured when born." }, "force_categorical": "Y", "word_map": { "goo": "values", "pile": "list" }, "expected_categorical": [ "pile", "of", "goo" ] }, "BPBASE": { "path": "Lab results\\Blood", "label": "Blood pressure (baseline)", "expected_numerical": { "min": 1, "max": 9 } } } :param omit_missing: if True, then variable that are not present in the blueprint will be set to OMIT. """ for var_id, variable in self.all_variables.items(): blueprint_var = blueprint.get(variable.header.strip()) if not blueprint_var: self.msgs.info("Removing column with header {!r}. Not found in blueprint.".format(variable.header)) if omit_missing: variable.data_label = 'OMIT' continue if blueprint_var.get('path'): variable.concept_path = path_converter(blueprint_var.get('path')) if blueprint_var.get('label'): variable.data_label = blueprint_var.get('label') if blueprint_var.get('force_categorical'): variable.forced_categorical = blueprint_var.get('force_categorical') == "Y" if blueprint_var.get('word_map'): variable.word_map_dict = blueprint_var.get('word_map') expected_numerical = blueprint_var.get('expected_numerical') if expected_numerical and variable.is_numeric_in_datafile: min_expected = expected_numerical.get('min', '') try: min_const = float(min_expected if min_expected != '' else '-Inf') except ValueError: self.msgs.warning("Expected numerical for min constraint ({}), got {!r}." .format(variable.header, min_expected)) max_expected = expected_numerical.get('max', '') try: max_const = float(max_expected if max_expected != '' else 'Inf') except ValueError: self.msgs.warning("Expected numerical for max constraint ({}), got {!r}." .format(variable.header, max_expected)) if min_const > variable.min or max_const < variable.max: self.msgs.warning("Value constraints exceeded for {}: {} to {}, where datafile has min:{}, max:{}". format(variable.header, min_const, max_const, variable.min, variable.max) ) expected_categorical = blueprint_var.get('expected_categorical') if expected_categorical: unexpected = set(variable.unique_values) - set(expected_categorical) if unexpected: self.msgs.warning("Unexpected values for {}. Expected: {}. Also found: {}". format(variable.header, expected_categorical, list(unexpected)) )
[docs] def add_datafile(self, filename, dataframe=None): """ Add a clinical data file to study. :param filename: path to file or filename of file in clinical directory. :param dataframe: if given, add `pd.DataFrame` to study. """ if isinstance(dataframe, pd.DataFrame): datafile = DataFile() datafile.df = dataframe else: if os.path.exists(filename): file_path = filename else: file_path = os.path.join(self.params.dirname, filename) assert os.path.exists(file_path), PathError(file_path) datafile = DataFile(file_path) # Check if file is in de clinical directory if not os.path.dirname(os.path.abspath(filename)) == self.params.dirname: datafile.df # Force load df datafile.path = os.path.join(self.params.dirname, os.path.basename(filename)) while self.get_datafile(datafile.name): new_name = input("Filename {!r} already taken, try again. ".format(datafile.name)) datafile.name = new_name if not new_name == '' else datafile.name safe_name = clean_for_namespace(datafile.name) self.__dict__[safe_name] = datafile if datafile.name not in self.ColumnMapping.included_datafiles: self.msgs.okay('Adding {!r} as clinical datafile to study.'.format(datafile.name)) self.ColumnMapping.append_from_datafile(datafile)
[docs] def get_variable(self, var_id: tuple): """ Return a Variable object based on variable id. :param var_id: tuple of filename and column number. :return: `tmtk.Variable`. """ df_name, column = var_id datafile = self.get_datafile(df_name) return Variable(datafile, column, self)
@property def all_variables(self): """ Dictionary where {`tmtk.VarID`: `tmtk.Variable`} for all variables in the column mapping file. """ return {VarID(var_id): self.get_variable(var_id) for var_id in self.ColumnMapping.ids}
[docs] def call_boris(self, height=650): """ Use The Arborist to modify only information in the column and word mapping files. :param height: set the height of the output cell """ arborist.call_boris(self, height=height)
[docs] def validate_all(self, verbosity=3): for key, obj in self.__dict__.items(): if hasattr(obj, 'validate'): obj.validate(verbosity=verbosity)
[docs] def get_datafile(self, name: str): """ Find datafile object by filename. :param name: name of file. :return: `tmtk.DataFile` object. """ for key, obj in self.__dict__.items(): if isinstance(obj, DataFile): if obj.name == name: return obj
def __hash__(self): """ Calculate hash for in memory pd.DataFrame objects. The sum of these hashes is returned. :return: sum of hashes. """ hashes = 0 for key, obj in self.__dict__.items(): if hasattr(obj, 'df'): hashes += hash(obj) return hashes
[docs] def show_changes(self): """Print changes made to the column mapping and word mapping file.""" column_changes = self.ColumnMapping.path_changes(silent=True) word_map_changes = self.WordMapping.word_map_changes(silent=True) for var_id in set().union(column_changes, word_map_changes): print("{}: {}".format(*var_id)) path_change = column_changes.get(var_id) if path_change: print(" {}".format(path_change[0])) print(" -> {}".format(path_change[1])) else: print(" {}".format(self.get_variable(var_id).concept_path)) map_change = word_map_changes.get(var_id) if map_change: for k, v in map_change.items(): print(" - {!r} -> {!r}".format(k, v))
@property def load_to(self): return TransmartBatch(param=self.params.path, items_expected=self._get_lazy_batch_items() ).get_loading_namespace() def _get_lazy_batch_items(self): return {self.params.path: [self.get_datafile(f).path for f in self.ColumnMapping.included_datafiles]} @property def clinical_files(self): return [x for k, x in self.__dict__.items() if issubclass(type(x), FileBase)] def _validate_clinical_params(self): if os.path.exists(self.params.path): self.msgs.okay('Clinical params found on disk.') else: self.msgs.error('Clinical params not on disk.') def _validate_SUBJ_IDs(self): for datafile in self.ColumnMapping.included_datafiles: var_id_list = [var_id for var_id in self.ColumnMapping.subj_id_columns if var_id[0] == datafile] # Check for one SUBJ_ID per file if len(var_id_list) == 1: subj_id = self.get_variable(var_id_list[0]) if len(subj_id.values) == len(subj_id.unique_values): self.msgs.okay('Found a SUBJ_ID for {} and it has unique values, thats good!'.format(datafile)) else: self.msgs.error('Found a SUBJ_ID for {}, but it has duplicate values.'.format(datafile), warning_list=subj_id.values[subj_id.values.duplicated()].unique()) else: self.msgs.error('Found {} SUBJ_ID for {}'.format(len(var_id_list), datafile)) def _validate_word_mappings(self): # check presence of all data files filenames = self.WordMapping.included_datafiles valid_filenames = [] for filename in filenames: if filename not in self.ColumnMapping.included_datafiles: msg = "The file {} isn't included in the column map".format(filename) self.msgs.error(msg) else: valid_filenames.append(filename) column_number = self.WordMapping.df.columns[1] for filename in valid_filenames: datafile = self.get_datafile(filename) amount_of_columns = datafile.df.shape[1] columns = set(self.WordMapping.df.loc[filename, column_number]) out_of_bound = {index for index in columns if index > amount_of_columns} for index in out_of_bound: msg = "File {} doesn't has {} columns, but {} columns".format(filename, index, amount_of_columns) self.msgs.error(msg) correct_columns = columns - out_of_bound for column in correct_columns: variable = self.get_variable((filename, column)) unmapped = variable.word_mapped_not_present() for unmapped_value in unmapped: msg = "Value {} is mapped at column {} in file {}. " \ "However the value is not present in the column".format(unmapped_value, column, filename) self.msgs.warning(msg)