Class to encode data from user-provided Excel template.


Name Type Description Default
df DataFrame

template table with clinical data

hpo_cr HpoConceptRecognizer

HpoConceptRecognizer for text mining

created_by str

biocurator (typically, this should be an ORCID identifier)

Source code in pyphetools/creation/
class CaseTemplateEncoder:
    """Class to encode data from user-provided Excel template.

    :param df: template table with clinical data
    :type df: pd.DataFrame
    :param hpo_cr: HpoConceptRecognizer for text mining
    :type hpo_cr: pyphetools.creation.HpoConceptRecognizer
    :param created_by: biocurator (typically, this should be an ORCID identifier)
    :type created_by: str

    HPO_VERSION = None

    def __init__(self, df:pd.DataFrame, hpo_cr:HpoConceptRecognizer, created_by:str, hpo_ontology:hpotk.MinimalOntology) -> None:
        if not isinstance(df, pd.DataFrame):
            raise ValueError(f"argument \"df\" must be pandas DataFrame but was {type(df)}")
        self._individuals = []
        self._errors = []
        self._ntr_set = set()
        header_1 = df.columns.values.tolist()
        header_2 = df.loc[0, :].values.tolist()
        if len(header_1) != len(header_2):
            # should never happen unless the template file is corrupted
            raise ValueError("headers are different lengths. Check template file for correctness.")
        # check headers are well formed
        idx = 0
        ## The deceased field is optional
        if "deceased" in header_1:
            required_h1 = OPTIONAL_H1_FIELDS
            required_h2 = OPTIONAL_H2_FIELDS
            required_h1 = REQUIRED_H1_FIELDS
            required_h2 = REQUIRED_H2_FIELDS
        for i in range(len(required_h1)):
            if idx == ALLELE_2_IDX and header_1[idx] != required_h1[idx]:
                idx += 1 # skip optional index
            if header_1[idx] != required_h1[idx]:
                raise ValueError(f"Malformed header 1 field at index {idx}. Expected \"{required_h1[idx]}\" but got \"{header_1[idx]}\"")
            if header_2[idx] != required_h2[idx]:
                raise ValueError(f"Malformed header 2 field at index {idx}. Expected \"{required_h2[idx]}\" but got \"{header_2[idx]}\"")
        self._header_fields_1 = header_1
        self._n_columns = len(header_1)
        self._index_to_decoder = self._process_header(header_1=header_1, header_2=header_2, hpo_cr=hpo_cr)
        data_df = df.iloc[1:]
        self._is_biallelic = "allele_2" in header_1
        self._allele1_d = {}
        self._allele2_d = {}
        for _, row in data_df.iterrows():
            individual = self._parse_individual(row)
            self._allele1_d[] = row["allele_1"]
            if self._is_biallelic:
                self._allele2_d[] = row["allele_2"]
        CaseTemplateEncoder.HPO_VERSION = hpo_ontology.version
        self._created_by = created_by
        self._metadata_d = {}
        for i in self._individuals:
            cite = i._citation
            metadata = MetaData(created_by=created_by, citation=cite)
            self._metadata_d[] = metadata

    def  _process_header(self, header_1:List, header_2:List, hpo_cr:HpoConceptRecognizer) -> Dict[int, CellEncoder]:
        index_to_decoder_d = {}
        in_hpo_range = False
        for i in range(self._n_columns):
            h1 = header_1[i]
            h2 = header_2[i]
            if isinstance(h1, float) or len(h1) == 0:
                raise ValueError(f"Error: Empty column header at column {i}")
            if h1 == "HPO":
                in_hpo_range = True
                index_to_decoder_d[i] = NullEncoder()
            if h1 == "Miscellaneous":
                index_to_decoder_d[i] = MiscEncoder(h1=h1, h2=h2, hpo_cr=hpo_cr)
            elif not in_hpo_range:
                if h1 in EXPECTED_HEADERS:
                    index_to_decoder_d[i] = DataEncoder(h1=h1, h2=h2)
                elif h1 == "deceased":
                    index_to_decoder_d[i] = DataEncoder(h1=h1, h2=h2)
                    raise ValueError(f"Malformed template header at column {i}: \"{h1}\"")
            elif in_hpo_range:
                ntr =  h2 == "NTR"
                encoder = HpoEncoder(h1=h1, h2=h2, ntr=ntr)
                if ntr:
                    index_to_decoder_d[i] = encoder
                elif encoder.needs_attention():
                    index_to_decoder_d[i] = NullEncoder()
                    index_to_decoder_d[i] = encoder
        if not in_hpo_range:
            raise ValueError("Did not find HPO boundary column")
        print(f"Created encoders for {len(index_to_decoder_d)} fields")
        if len(self._ntr_set) > 0:
            print("[WARNING] Template contains new term requests (NTR). These columns will be ignored until they are replaced with HPO terms")
            for ntr in self._ntr_set:
                print("\tNTR: ", ntr)
        if len(self._errors) > 0:
            for e in self._errors:
                print(f"ERROR: {e}")
        return index_to_decoder_d

    def _check_for_duplicate_individual_ids(self, df:pd.DataFrame) -> None:
        """Check that no two individuals in the dataframe have the same identifier
        Duplicate identifiers can lead to other errors in the code
        An identifier is made from the combination of PMID and individual_id and must be unique
        If there is one or mure duplicates, this function will throw a value error.
        if not "individual_id" in df.columns:
            raise ValueError(f"Malformed template headers - could not find column \"individual_id\"")
        if not "PMID" in df.columns:
            raise ValueError(f"Malformed template headers - could not find column \"individual_id\"")
        seen_ids = set()
        errors = list()
        for _, row in df.iterrows():
            individual_id = row["individual_id"]
            pmid = row["PMID"]
            composite_id = f"{pmid}_{individual_id}"
            if composite_id in seen_ids:
                errors.append(f"Duplicate identifier: {composite_id}")
        if len(errors) > 0:
            err_str = "\n".join(errors)
            raise ValueError(err_str)
        # else, all is OK, no duplicate ids

    def _parse_individual(self, row:pd.Series):
        if not isinstance(row, pd.Series):
            raise ValueError(f"argument df must be pandas DSeriestaFrame but was {type(row)}")
        data = row.values.tolist()
        if len(data) != self._n_columns:
            # Should never happen
            raise ValueError(f"Divergent number of columns: header {self._n_columns} but data row {len(data)}: {data}")
        data_items = {}
        hpo_terms = list()
        for i in range(self._n_columns):
            encoder = self._index_to_decoder.get(i)
            cell_contents = data[i]
            if encoder is None:
                print(f"Encoder {i} was None for data \"{cell_contents}\"")
                self._debug_row(i, row)
                raise ValueError(f"Encoder {i} was None for data \"{cell_contents}\"")
            elif encoder.columntype == CellType.NTR:
                continue ## cannot be use yet because new term request.
            encoder_type = encoder.columntype()
            if encoder_type == CellType.DATA and in DATA_ITEMS:
                data_items[] = encoder.encode(cell_contents)
            elif encoder_type == CellType.HPO:
                    hpoterm = encoder.encode(cell_contents)
                    if hpoterm is not None:
                except Exception as hpo_parse_exception:
                    errr = f"Could not parse contents of HPO column {}: {cell_contents} because of {str(hpo_parse_exception)}"
                    raise ValueError(errr)
            elif encoder_type == CellType.MISC:
                term_list = encoder.encode(cell_contents=cell_contents)
                for trm in term_list:
        # Check we have all of the items we need
        for item in data_items.keys():
            if item not in DATA_ITEMS:
                raise ValueError(f"Unrecognized data item: \"{item}\"")
        #Note that allele_2 is optional
        if len(data_items) < len(DATA_ITEMS) - 1:
            raise ValueError(f"Insufficient data items: \"{data_items}\"")
        # If we get here, we can contruct an individual
        individual_id = data_items.get('individual_id')
        if individual_id is None or isinstance(individual_id, float) or len(individual_id) == 0:
            raise ValueError(f"Empty individual_id field for {row}")
        pmid = data_items.get("PMID")
        title = data_items.get("title")
        if pmid is None or isinstance(pmid, float) or not pmid.startswith("PMID"):
            raise ValueError(f"Could not find PubMed identifier for {individual_id}")
        if title is None or isinstance(title, float) or len(title) < 5:
            raise ValueError(f"Could not find valid title for {individual_id}")
        citation = Citation(pmid=pmid, title=title)
        sex = data_items.get("sex")
        if sex == "M":
            sex = Constants.MALE_SYMBOL
        elif sex == "F":
            sex = Constants.FEMALE_SYMBOL
        elif sex == "O":
            sex = Constants.OTHER_SEX_SYMBOL
        elif sex == "U":
            sex = Constants.UNKNOWN_SEX_SYMBOL
            raise ValueError(f"Unrecognized sex symbol: {sex} for individual \"{individual_id}\"")
        onset_age = data_items.get(AGE_OF_ONSET_FIELDNAME)
        if onset_age is not None and isinstance(onset_age, str):
            onset_age = PyPheToolsAge.get_age(onset_age)
            onset_age = NoneAge("na")
        encounter_age = data_items.get(AGE_AT_LAST_ENCOUNTER_FIELDNAME)
        if encounter_age is not None and isinstance(encounter_age, str):
            encounter_age = PyPheToolsAge.get_age(encounter_age)
            encounter_age = NoneAge("na")
        vitStat = None
        if "deceased" in data_items:
            decsd = data_items.get("deceased")
            if decsd == "yes" and encounter_age.is_valid():
                timeelem = encounter_age.to_ga4gh_time_element()
                vitStat = VitalStatus(status=VitalStatus.Status.DECEASED, time_of_death=timeelem)
                vitStat = VitalStatus(status=VitalStatus.Status.DECEASED)
        disease_id = data_items.get("disease_id")
        disease_label = data_items.get("disease_label")
        # common error -- e.g. PMID: 3000312 or OMIM: 600123 (whitespace after colon)
        for item in [pmid, disease_id]:
            if " " in item:
                raise ValueError(f"Found illegal whitespace in {item}. Please remove it and try again")
        disease = Disease(disease_id=disease_id, disease_label=disease_label)
        return Individual(individual_id=individual_id,

    def _debug_row(self, target_idx:int, row:pd.Series):
        row_items = list(row)
        for j in range(len(row_items)):
            hdr = self._header_fields_1[j]
            if j == target_idx:
                print(f"[{j}] *** {hdr}={row_items[j]}  ***")
                print(f"[{j}] {hdr}={row_items[j]}")

    def get_individuals(self) -> List[Individual]:
        return self._individuals

    def get_allele1_d(self)-> Dict[str,str]:
        return self._allele1_d

    def get_allele2_d(self)-> Dict[str,str]:
        return self._allele2_d

    def _is_biallelic(self) -> bool:
        return self._is_biallelic

    def get_metadata_d(self) -> Dict[str,MetaData]:
        return self._metadata_d

    def get_phenopackets(self) -> List[PPKt.Phenopacket]:
        ppack_list = []
        for individual in self._individuals:
            cite = individual._citation
            metadata = MetaData(created_by=self._created_by, citation=cite)
            phenopckt = individual.to_ga4gh_phenopacket(metadata=metadata)
        return ppack_list

    def _transform_individuals_to_phenopackets(self, individual_list:List[Individual]):
        """Create one phenopacket for each of the individuals

        :param individual_list: List of individual objects
        :type individual_list:List[Individual]
        :returns: list of corresponding phenopackets
        :rtype: List[PPKt.Phenopacket]
        ppkt_list = list()
        if self._created_by is None:
            created_by = 'pyphetools'
            created_by = self._created_by
        for individual in individual_list:
            cite = individual._citation
            metadata = MetaData(created_by=created_by, citation=cite)
            phenopckt = individual.to_ga4gh_phenopacket(metadata=metadata)
        return ppkt_list

    def output_individuals_as_phenopackets(self, individual_list:List[Individual], outdir="phenopackets") -> None:
        """write a list of Individual objects to file in GA4GH Phenopacket format
        Note that the individual_list needs to be passed to this object, because we expect that
        the QC code will have been used to cleanse the data of redundancies etc before output.
        We use the statefullness to keep track of the created_by argument from the constructor

        :param outdir: Path to output directory. Defaults to "phenopackets". Created if not exists.
        :type outdir: str
        if os.path.isfile(outdir):
            raise ValueError(f"Attempt to create directory with name of existing file {outdir}")
        if not os.path.isdir(outdir):
        written = 0

        if self._created_by is None:
            created_by = 'pyphetools'
            created_by = self._created_by
        for individual in individual_list:
            cite = individual._citation
            metadata = MetaData(created_by=created_by, citation=cite)
            phenopckt = individual.to_ga4gh_phenopacket(metadata=metadata)
            json_string = MessageToJson(phenopckt)
            pmid = cite.pmid
            if pmid is None:
                fname = "phenopacket_" +
                pmid = pmid.replace(" ", "").replace(":", "_")
                fname = pmid + "_" +
            fname = re.sub('[^A-Za-z0-9\_\-]', '', fname)  # remove any illegal characters from filename
            fname = fname.replace(" ", "_") + ".json"
            outpth = os.path.join(outdir, fname)
            with open(outpth, "wt") as fh:
                written += 1
        print(f"We output {written} GA4GH phenopackets to the directory {outdir}")

    def print_individuals_as_phenopackets(self, individual_list:List[Individual]) -> None:
        """Function designed to show all phenopackets in a notebook for Q/C
        :param individual_list: List of individual objects
        :type individual_list:List[Individual]
        ppkt_list = self._transform_individuals_to_phenopackets(individual_list)
        for ppkt in ppkt_list:
            json_string = MessageToJson(ppkt)

    def to_summary(self) -> pd.DataFrame:

        The table provides a summary of the table that was parsed from the input file. If there were errors, it
        provides enough feedback so that the user knows what needs to be fixed

        :returns: an table with status of parse
        :rtype: pd.DataFrame
        n_error = 0
        items = []
        for e in self._errors:
            n_error += 1
            d = {'item': f"Error {n_error}", 'value': e}
        d = {'item': 'created by', 'value':self._created_by}
        d = {'item':'number of individuals', 'value': str(len(self._individuals))}
        n_hpo_columns = sum([1 for encoder in self._index_to_decoder.values() if encoder.is_hpo()])
        d = {'item':'number of HPO columns', 'value': str(n_hpo_columns)}
        return pd.DataFrame(items)

