Skip to content

HpoaTableCreator

Create an HPO "small file" with the following fourteen columns 1. #diseaseID 2. diseaseName 3. phenotypeID 4. phenotypeName 5. onsetID 6. onsetName 7. frequency 8. sex 9. negation 10. modifier 11. description 12. publication 13. evidence 14. biocuration These should be tab separated fields.

Source code in pyphetools/visualization/hpoa_table_creator.py
class HpoaTableCreator:
    """
    Create an HPO "small file" with the following fourteen columns
        1. #diseaseID
        2. diseaseName
        3. phenotypeID
        4. phenotypeName
        5. onsetID
        6. onsetName
        7. frequency
        8. sex
        9. negation
        10. modifier
        11. description
        12. publication
        13. evidence
        14. biocuration
    These should be tab separated fields.
    """
    def __init__(
            self,
            phenopacket_list,
            onset_term_d,
            moi_d,
            created_by: str,
    ) -> None:
        """Constructor

        :param phenopacket_list: List of GA4GH phenopackets
        :type phenopacket_list: List[PPKt.Phenopacket]
        :param onset_term_d: Dictionary with key PMID string and value: OnsetTerm object
        :type: Dict[str, OnsetTerm]
        :param moi_d: Dictionary with key PMID and value Mode of inheritance
        """
        todays_date = datetime.now().strftime("%Y-%m-%d")
        self._created_by = created_by
        self._todays_date = f"[{todays_date}]"
        self._phenopackets = phenopacket_list
        self._all_hpo_d = self._get_all_hpos()
        self._disease = self._get_disease() # only allow one disease, therefore this is a scalar value (string)
        self._hpo_counter_d = self._count_hpos()
        self._biocurator_d = self._get_biocurator_d()
        self._onset_rows = self._add_age_of_onset_terms(onset_term_d)
        self._moi_rows = self._add_moi_rows(moi_d)

    def _get_all_hpos(self) -> Dict[str,HpTerm]:
        """Get a dictionary of HpTerms, with key being HPO id and the value the corresponding HpTerm

        We use this to retrieve the label
        :returns: a dictionary, with key=HPO id, value: HpTerm
        :rtype: Dict[str, HpTerm]
        """
        all_hpo_d = {}
        for ppkt in self._phenopackets:
            for pf in ppkt.phenotypic_features:
                hpterm = HpTerm(hpo_id=pf.type.id, label=pf.type.label)
                all_hpo_d[hpterm.id] = hpterm
        print(f"We found a total of {len(all_hpo_d)} unique HPO terms")
        return all_hpo_d


    @staticmethod
    def get_pmid(ppkt):
        mdata = ppkt.meta_data
        if mdata.external_references is None:
            raise ValueError("MetaData must have external_references element for HPOA conversion")
        eref_list = mdata.external_references
        if len(eref_list) != 1:
            raise ValueError(f"MetaData must have exactly one external_references element for HPOA conversion but had {len(eref_list)}")
        eref = eref_list[0]
        pmid = eref.id
        if not pmid.startswith("PMID:"):
            raise ValueError(f"Malformed PMID: \"{pmid}\"")
        return pmid


    def _get_disease(self):
        disease_set = set()
        for ppkt in self._phenopackets:
            interpretations = ppkt.interpretations
            if len(interpretations) != 1:
                raise ValueError(f"Error: must have only a single disease for HPOA conversion but we found {len(interpretations)}")
            interpretation = interpretations[0]
            if interpretation.diagnosis is None:
                raise ValueError(f"Could not get diagnosis object from interpretation with id {interpretation.id}")
            diagnosis = interpretation.diagnosis
            disease = Disease(disease_id=diagnosis.disease.id, disease_label=diagnosis.disease.label)
            disease_set.add(disease)
        if len(disease_set) == 0:
            raise ValueError("Could not retrieve Disease for cohort")
        elif len(disease_set) > 1:
            disease_lst = '; '.join([disease.id for disease in disease_set])
            raise ValueError(f"Error: must have only a single Disease for HPOA conversion but we found {len(disease_set)}: {disease_lst}")
        [disease] = disease_set
        print(f"Extracted disease: {disease}")
        return disease

    def _get_biocurator_d(self):
        """The unspoken assumption of this function is that there is just one biocurator per PMID.
        This will be true for phenopackets created by pyphetools.

        :returns: dictionary with key=PMID, value=biocurator
        :rtype: Dict[str,str]
        """
        biocurator_d = {}
        for ppkt in self._phenopackets:
            pmid = HpoaTableCreator.get_pmid(ppkt=ppkt)
            mdata = ppkt.meta_data
            created_by = mdata.created_by
            if mdata.HasField("created"):
                created = mdata.created  # created is a TimeStamp object
                created_dt = created.ToDatetime()
                ymd = created_dt.strftime('%Y-%m-%d')
                created_by = f"{created_by}[{ymd}]"
            else:
                created_by = f"{created_by}{self._todays_date}"
            biocurator_d[pmid] = created_by
        return biocurator_d

    def _count_hpos(self):
        hpo_counter_d = defaultdict(HpoaPmidCounter)
        for ppkt in self._phenopackets:
            pmid = HpoaTableCreator.get_pmid(ppkt=ppkt)
            hpo_counter = hpo_counter_d.get(pmid)
            # if not yet present,  initialize with zero counts
            if  hpo_counter is None:
                hpo_counter = HpoaPmidCounter()
                hpo_counter_d[pmid] = hpo_counter
            for pf in ppkt.phenotypic_features:
                hpterm = HpTerm(hpo_id=pf.type.id, label=pf.type.label)
                hpo_counter.increment_measured(hpterm.id)
                if pf.excluded is not None and not pf.excluded:
                    hpo_counter.increment_observed(hpterm.id)
        return hpo_counter_d

    def _add_age_of_onset_terms(self, onset_term_d) -> List[HpoaTableRow]:
        """
        :param onset_term_d: Dictionary with key=pmid, value: list of CountedHpoTerm objects
        :type onset_term_d: Dict[str, List[CountedHpoTerm]]
        """
        onset_rows  = list() # reset
        for pmid, oterm_list in onset_term_d.items():
            biocurator = self._biocurator_d.get(pmid)
            for oterm in oterm_list:
                hpo_onset_term = HpTerm(hpo_id=oterm.id, label=oterm.label)
                row = HpoaTableRow(disease=self._disease, hpo_term=hpo_onset_term, publication=pmid, biocurator=biocurator, freq_num=oterm.numerator, freq_denom=oterm.denominator)
                onset_rows.append(row)
        return onset_rows

    def _add_moi_rows(self, moi_d) -> List[HpoaTableRow]:
        """Add mode of inheritance information
        :param moi_d: dictionary with key: PMID, and value: List of MOI terms
        :type moi_d:Dict[str,List[str]]
        :returns: list of HPOA table rows
        :rtype: List[HpoaTableRow]
        """
        moi_rows = list()
        for pmid, hpterm_list in moi_d.items():
            biocurator = self._biocurator_d.get(pmid)
            # If we add an MOI outside of the template, then it will not have a PMID
            # the template builder requires a created_by field which is designed for this.
            if biocurator is None:
                biocurator = f'{self._created_by}{self._todays_date}'
            for hpterm in hpterm_list:
                row = HpoaTableRow(disease=self._disease, hpo_term=hpterm, publication=pmid, biocurator=biocurator)
                moi_rows.append(row)
        return moi_rows


    def get_dataframe(self):
        rows = []
        column_names = ["#diseaseID", "diseaseName", "phenotypeID", "phenotypeName",
                        "onsetID", "onsetName", "frequency", "sex", "negation",  "modifier",
                        "description", "publication","evidence", "biocuration"]
        for pmid, counter in self._hpo_counter_d.items():
            biocurator = self._biocurator_d.get(pmid)
            measured_d = counter.get_measured_d()
            observed_d = counter.get_observed_d()
            # by construction, there can be no term in observed_d that is not in measured_d
            for hpo_id in measured_d:
                n = observed_d.get(hpo_id, 0)
                m = measured_d.get(hpo_id)
                hpo_term = self._all_hpo_d.get(hpo_id)
                row = HpoaTableRow(disease=self._disease, hpo_term=hpo_term, publication=pmid, biocurator=biocurator, freq_num=n, freq_denom=m)
                rows.append(row.get_dict())
        for onset_row in self._onset_rows:
            rows.append(onset_row.get_dict())
        for moi_row in self._moi_rows:
            rows.append(moi_row.get_dict())
        df = pd.DataFrame.from_records(data=rows, columns=column_names)
        return df

    def write_data_frame(self):
        df = self.get_dataframe()
        dlist = df["#diseaseID"].unique()
        if len(dlist) != 1:
            raise ValueError("Error - expected to get one disease but got {len(dlist)}")
        disease = dlist[0].replace(":", "-")
        fname = f"{disease}.tab"
        df.to_csv(fname, sep="\t", index=False)
        print(f"Wrote HPOA disease file to {fname}")

__init__(phenopacket_list, onset_term_d, moi_d, created_by)

Constructor

Parameters:

Name Type Description Default
phenopacket_list List[PPKt.Phenopacket]

List of GA4GH phenopackets

required
onset_term_d

Dictionary with key PMID string and value: OnsetTerm object

required
moi_d

Dictionary with key PMID and value Mode of inheritance

required
Source code in pyphetools/visualization/hpoa_table_creator.py
def __init__(
        self,
        phenopacket_list,
        onset_term_d,
        moi_d,
        created_by: str,
) -> None:
    """Constructor

    :param phenopacket_list: List of GA4GH phenopackets
    :type phenopacket_list: List[PPKt.Phenopacket]
    :param onset_term_d: Dictionary with key PMID string and value: OnsetTerm object
    :type: Dict[str, OnsetTerm]
    :param moi_d: Dictionary with key PMID and value Mode of inheritance
    """
    todays_date = datetime.now().strftime("%Y-%m-%d")
    self._created_by = created_by
    self._todays_date = f"[{todays_date}]"
    self._phenopackets = phenopacket_list
    self._all_hpo_d = self._get_all_hpos()
    self._disease = self._get_disease() # only allow one disease, therefore this is a scalar value (string)
    self._hpo_counter_d = self._count_hpos()
    self._biocurator_d = self._get_biocurator_d()
    self._onset_rows = self._add_age_of_onset_terms(onset_term_d)
    self._moi_rows = self._add_moi_rows(moi_d)