class HpoaTableCreator:
"""
Create an HPO "small file" with the following fourteen columns
1. #diseaseID
2. diseaseName
3. phenotypeID
4. phenotypeName
5. onsetID
6. onsetName
7. frequency
8. sex
9. negation
10. modifier
11. description
12. publication
13. evidence
14. biocuration
These should be tab separated fields.
"""
def __init__(
self,
phenopacket_list,
onset_term_d,
moi_d,
created_by: str,
) -> None:
"""Constructor
:param phenopacket_list: List of GA4GH phenopackets
:type phenopacket_list: List[PPKt.Phenopacket]
:param onset_term_d: Dictionary with key PMID string and value: OnsetTerm object
:type: Dict[str, OnsetTerm]
:param moi_d: Dictionary with key PMID and value Mode of inheritance
"""
todays_date = datetime.now().strftime("%Y-%m-%d")
self._created_by = created_by
self._todays_date = f"[{todays_date}]"
self._phenopackets = phenopacket_list
self._all_hpo_d = self._get_all_hpos()
self._disease = self._get_disease() # only allow one disease, therefore this is a scalar value (string)
self._hpo_counter_d = self._count_hpos()
self._biocurator_d = self._get_biocurator_d()
self._onset_rows = self._add_age_of_onset_terms(onset_term_d)
self._moi_rows = self._add_moi_rows(moi_d)
def _get_all_hpos(self) -> Dict[str,HpTerm]:
"""Get a dictionary of HpTerms, with key being HPO id and the value the corresponding HpTerm
We use this to retrieve the label
:returns: a dictionary, with key=HPO id, value: HpTerm
:rtype: Dict[str, HpTerm]
"""
all_hpo_d = {}
for ppkt in self._phenopackets:
for pf in ppkt.phenotypic_features:
hpterm = HpTerm(hpo_id=pf.type.id, label=pf.type.label)
all_hpo_d[hpterm.id] = hpterm
print(f"We found a total of {len(all_hpo_d)} unique HPO terms")
return all_hpo_d
@staticmethod
def get_pmid(ppkt):
mdata = ppkt.meta_data
if mdata.external_references is None:
raise ValueError("MetaData must have external_references element for HPOA conversion")
eref_list = mdata.external_references
if len(eref_list) != 1:
raise ValueError(f"MetaData must have exactly one external_references element for HPOA conversion but had {len(eref_list)}")
eref = eref_list[0]
pmid = eref.id
if not pmid.startswith("PMID:"):
raise ValueError(f"Malformed PMID: \"{pmid}\"")
return pmid
def _get_disease(self):
disease_set = set()
for ppkt in self._phenopackets:
interpretations = ppkt.interpretations
if len(interpretations) != 1:
raise ValueError(f"Error: must have only a single disease for HPOA conversion but we found {len(interpretations)}")
interpretation = interpretations[0]
if interpretation.diagnosis is None:
raise ValueError(f"Could not get diagnosis object from interpretation with id {interpretation.id}")
diagnosis = interpretation.diagnosis
disease = Disease(disease_id=diagnosis.disease.id, disease_label=diagnosis.disease.label)
disease_set.add(disease)
if len(disease_set) == 0:
raise ValueError("Could not retrieve Disease for cohort")
elif len(disease_set) > 1:
disease_lst = '; '.join([disease.id for disease in disease_set])
raise ValueError(f"Error: must have only a single Disease for HPOA conversion but we found {len(disease_set)}: {disease_lst}")
[disease] = disease_set
print(f"Extracted disease: {disease}")
return disease
def _get_biocurator_d(self):
"""The unspoken assumption of this function is that there is just one biocurator per PMID.
This will be true for phenopackets created by pyphetools.
:returns: dictionary with key=PMID, value=biocurator
:rtype: Dict[str,str]
"""
biocurator_d = {}
for ppkt in self._phenopackets:
pmid = HpoaTableCreator.get_pmid(ppkt=ppkt)
mdata = ppkt.meta_data
created_by = mdata.created_by
if mdata.HasField("created"):
created = mdata.created # created is a TimeStamp object
created_dt = created.ToDatetime()
ymd = created_dt.strftime('%Y-%m-%d')
created_by = f"{created_by}[{ymd}]"
else:
created_by = f"{created_by}{self._todays_date}"
biocurator_d[pmid] = created_by
return biocurator_d
def _count_hpos(self):
hpo_counter_d = defaultdict(HpoaPmidCounter)
for ppkt in self._phenopackets:
pmid = HpoaTableCreator.get_pmid(ppkt=ppkt)
hpo_counter = hpo_counter_d.get(pmid)
# if not yet present, initialize with zero counts
if hpo_counter is None:
hpo_counter = HpoaPmidCounter()
hpo_counter_d[pmid] = hpo_counter
for pf in ppkt.phenotypic_features:
hpterm = HpTerm(hpo_id=pf.type.id, label=pf.type.label)
hpo_counter.increment_measured(hpterm.id)
if pf.excluded is not None and not pf.excluded:
hpo_counter.increment_observed(hpterm.id)
return hpo_counter_d
def _add_age_of_onset_terms(self, onset_term_d) -> List[HpoaTableRow]:
"""
:param onset_term_d: Dictionary with key=pmid, value: list of CountedHpoTerm objects
:type onset_term_d: Dict[str, List[CountedHpoTerm]]
"""
onset_rows = list() # reset
for pmid, oterm_list in onset_term_d.items():
biocurator = self._biocurator_d.get(pmid)
for oterm in oterm_list:
hpo_onset_term = HpTerm(hpo_id=oterm.id, label=oterm.label)
row = HpoaTableRow(disease=self._disease, hpo_term=hpo_onset_term, publication=pmid, biocurator=biocurator, freq_num=oterm.numerator, freq_denom=oterm.denominator)
onset_rows.append(row)
return onset_rows
def _add_moi_rows(self, moi_d) -> List[HpoaTableRow]:
"""Add mode of inheritance information
:param moi_d: dictionary with key: PMID, and value: List of MOI terms
:type moi_d:Dict[str,List[str]]
:returns: list of HPOA table rows
:rtype: List[HpoaTableRow]
"""
moi_rows = list()
for pmid, hpterm_list in moi_d.items():
biocurator = self._biocurator_d.get(pmid)
# If we add an MOI outside of the template, then it will not have a PMID
# the template builder requires a created_by field which is designed for this.
if biocurator is None:
biocurator = f'{self._created_by}{self._todays_date}'
for hpterm in hpterm_list:
row = HpoaTableRow(disease=self._disease, hpo_term=hpterm, publication=pmid, biocurator=biocurator)
moi_rows.append(row)
return moi_rows
def get_dataframe(self):
rows = []
column_names = ["#diseaseID", "diseaseName", "phenotypeID", "phenotypeName",
"onsetID", "onsetName", "frequency", "sex", "negation", "modifier",
"description", "publication","evidence", "biocuration"]
for pmid, counter in self._hpo_counter_d.items():
biocurator = self._biocurator_d.get(pmid)
measured_d = counter.get_measured_d()
observed_d = counter.get_observed_d()
# by construction, there can be no term in observed_d that is not in measured_d
for hpo_id in measured_d:
n = observed_d.get(hpo_id, 0)
m = measured_d.get(hpo_id)
hpo_term = self._all_hpo_d.get(hpo_id)
row = HpoaTableRow(disease=self._disease, hpo_term=hpo_term, publication=pmid, biocurator=biocurator, freq_num=n, freq_denom=m)
rows.append(row.get_dict())
for onset_row in self._onset_rows:
rows.append(onset_row.get_dict())
for moi_row in self._moi_rows:
rows.append(moi_row.get_dict())
df = pd.DataFrame.from_records(data=rows, columns=column_names)
return df
def write_data_frame(self):
df = self.get_dataframe()
dlist = df["#diseaseID"].unique()
if len(dlist) != 1:
raise ValueError("Error - expected to get one disease but got {len(dlist)}")
disease = dlist[0].replace(":", "-")
fname = f"{disease}.tab"
df.to_csv(fname, sep="\t", index=False)
print(f"Wrote HPOA disease file to {fname}")