Skip to content

Validation module

This module contains code for validating phenopackets created by pyphetools. These functions are provided for convenience. We recommend using phenopacket-tools to validate phenopackets.

CohortValidator

Source code in pyphetools/validation/cohort_validator.py
class CohortValidator:

    def __init__(self, cohort:List[Individual], ontology:hpotk.MinimalOntology, min_hpo:int,  allelic_requirement:AllelicRequirement=None) -> None:
        self._cohort = cohort
        self._ontology = ontology
        self._validated_individual_list = []
        for indi in cohort:
            vindi = ValidatedIndividual(individual=indi)
            vindi.validate(ontology=ontology, min_hpo=min_hpo, allelic_requirement=allelic_requirement)
            self._validated_individual_list.append(vindi)
        if len(cohort) != len(self._validated_individual_list):
            # should never happen
            raise ValueError(f"Invalid validation: size of cohort ={len(cohort)} but size of validated individual = {len(self._validated_individual_list)}")
        self._error_free_individuals = [vi.get_individual_with_clean_terms() for vi in self._validated_individual_list if not vi.has_unfixed_error()]
        self._v_individuals_with_unfixable_errors = [vi for vi in self._validated_individual_list if vi.has_unfixed_error()]

    def get_validated_individual_list(self):
        """
        :returns: list of all individuals with QC Validation results
        :rtype: List[ValidatedIndividual]
        """
        return self._validated_individual_list


    def get_error_free_individual_list(self) -> List[Individual]:
        """
        Returns a list of individuals from which the erroneous and redundant termas have been removed and from which individuals with errors (e.g., not enough HPO terms) have been removed.
        :returns: List of individuals with no errors
        :rtype: List[Individual]
        """
        return self._error_free_individuals

    def get_validated_individuals_with_unfixable_errors(self):
        """
        Returns a list of individuals with errors that cannot be automatically fixed.
        :returns: List of individuals with unfixable errors
        :rtype: List[ValidatedIndivudal]
        """
        return self._v_individuals_with_unfixable_errors


    def n_removed_individuals(self):
        return len(self._validated_individual_list) - len(self._error_free_individuals)

    def n_individuals(self):
        return len(self._validated_individual_list)

    def n_error_free_individuals(self):
        return len(self._error_free_individuals)

    def get_ontology(self):
        return self._ontology

get_error_free_individual_list()

Returns a list of individuals from which the erroneous and redundant termas have been removed and from which individuals with errors (e.g., not enough HPO terms) have been removed.

Returns:

Type Description
List[Individual]

List of individuals with no errors

Source code in pyphetools/validation/cohort_validator.py
def get_error_free_individual_list(self) -> List[Individual]:
    """
    Returns a list of individuals from which the erroneous and redundant termas have been removed and from which individuals with errors (e.g., not enough HPO terms) have been removed.
    :returns: List of individuals with no errors
    :rtype: List[Individual]
    """
    return self._error_free_individuals

get_validated_individual_list()

Returns:

Type Description
List[ValidatedIndividual]

list of all individuals with QC Validation results

Source code in pyphetools/validation/cohort_validator.py
def get_validated_individual_list(self):
    """
    :returns: list of all individuals with QC Validation results
    :rtype: List[ValidatedIndividual]
    """
    return self._validated_individual_list

get_validated_individuals_with_unfixable_errors()

Returns a list of individuals with errors that cannot be automatically fixed.

Returns:

Type Description
List[ValidatedIndivudal]

List of individuals with unfixable errors

Source code in pyphetools/validation/cohort_validator.py
def get_validated_individuals_with_unfixable_errors(self):
    """
    Returns a list of individuals with errors that cannot be automatically fixed.
    :returns: List of individuals with unfixable errors
    :rtype: List[ValidatedIndivudal]
    """
    return self._v_individuals_with_unfixable_errors

ContentValidator

Bases: PhenopacketValidator

Validate a list of phenopackets as to whether they have a minunum number of phenotypic features and alleles

The following example shows how to use this class to assess whether each phenopacket in the directory called "phenopackets" contains at least one variant and at least three HPO terms.

from pyphetools.visualization import PhenopacketIngestor
from pyphetools.validation import ContentValidator
ingestor = PhenopacketIngestor(indir="phenopackets")
ppkt_d = ingestor.get_phenopacket_dictionary()
ppkt_list = list(ppkt_d.values())
validator = ContentValidator(min_var=1, min_hpo=3)
errors = validator.validate_phenopacket_list(ppkt_list)
print(f"{len(errors)} errors were identified")

Note that this class does not test for all errors. Use phenopacket-tools to check for redundant or conflicting annotations.

Parameters:

Name Type Description Default
min_hpo int

minimum number of phenotypic features (HP terms) for this phenopacket to be considered valid

required
allelic_requirement AllelicRequirement

used to check number of alleles and variants

None
Source code in pyphetools/validation/content_validator.py
class ContentValidator(PhenopacketValidator):
    """
    Validate a list of phenopackets as to whether they have a minunum number of phenotypic features and alleles

    The following example shows how to use this class to assess whether each phenopacket in the directory called "phenopackets" contains at least one variant and at least three HPO terms.

        from pyphetools.visualization import PhenopacketIngestor
        from pyphetools.validation import ContentValidator
        ingestor = PhenopacketIngestor(indir="phenopackets")
        ppkt_d = ingestor.get_phenopacket_dictionary()
        ppkt_list = list(ppkt_d.values())
        validator = ContentValidator(min_var=1, min_hpo=3)
        errors = validator.validate_phenopacket_list(ppkt_list)
        print(f"{len(errors)} errors were identified")

    Note that this class does not test for all errors. Use phenopacket-tools to check for redundant or conflicting
    annotations.

    :param min_hpo: minimum number of phenotypic features (HP terms) for this phenopacket to be considered valid
    :type min_hpo: int
    :param allelic_requirement: used to check number of alleles and variants
    :type allelic_requirement: AllelicRequirement
    """
    def __init__(self, min_hpo: int, allelic_requirement: AllelicRequirement = None, minimum_disease_count:int=1) -> None:
        super().__init__()
        self._min_hpo = min_hpo
        self._allelic_requirement = allelic_requirement
        self._minimum_disease_count = minimum_disease_count


    def validate_individual(self, individual:Individual) -> List[ValidationResult]:
        """
        check a single Individual as to whether there are sufficient HPO terms and alleles/variants
        :returns: a potential empty list of validations
        :rtype: List[ValidationResult]
        """
        n_pf = len(individual.hpo_terms)
        n_var = 0
        n_alleles = 0
        pp_id = individual.get_phenopacket_id()
        for variant_interpretation in individual.interpretation_list:
            n_var += 1
            if variant_interpretation.variation_descriptor is not None:
                vdesc =  variant_interpretation.variation_descriptor
                if vdesc.allelic_state is not None:
                    gtype = vdesc.allelic_state
                    if gtype.label == "heterozygous": # "GENO:0000135"
                        n_alleles += 1
                    elif gtype.label == "homozygous": # "GENO:0000136"
                        n_alleles += 2
                    elif gtype.label == "hemizygous": # "GENO:0000134"
                        n_alleles += 1
        disease_count =  individual.disease_count()
        return self._validate(pp_id=pp_id, n_hpo=n_pf, disease_count=disease_count, n_var=n_var, n_alleles=n_alleles)



    def validate_phenopacket(self, phenopacket) -> List[ValidationResult]:
        """
        check a single phenopacket as to whether there are sufficient HPO terms and alleles/variants
        :returns: a potential empty list of validations
        :rtype: List[ValidationResult]
        """
        if isinstance(phenopacket, str):
            # the user passed a file
            if not os.path.isfile(phenopacket):
                raise FileNotFoundError(f"Could not find phenopacket file at '{phenopacket}'")
            with open(phenopacket) as f:
                data = f.read()
                jsondata = json.loads(data)
                phpacket = Parse(json.dumps(jsondata), phenopackets.Phenopacket())
        elif isinstance(phenopacket, phenopackets.Phenopacket):
            phpacket = phenopacket
        else:
            raise ValueError(f"phenopacket argument must be file path or GA4GH Phenopacket \
                object but was {type(phenopacket)}")
        pp_id = phpacket.id
        n_pf = len(phpacket.phenotypic_features)
        if phpacket.interpretations is None:
            n_var = 0
            n_alleles = 0
        else:
            n_var = 0
            n_alleles = 0
            for interpretation in phpacket.interpretations:
                if interpretation.diagnosis is not None:
                    dx = interpretation.diagnosis
                    for genomic_interpretation in dx.genomic_interpretations:
                        n_var += 1
                        vint = genomic_interpretation.variant_interpretation
                        if vint.variation_descriptor is not None:
                            vdesc =   vint.variation_descriptor
                            if vdesc.allelic_state is not None:
                                gtype = vdesc.allelic_state
                                if gtype.label == "heterozygous": # "GENO:0000135"
                                    n_alleles += 1
                                elif gtype.label == "homozygous": # "GENO:0000136"
                                    n_alleles += 2
                                elif gtype.label == "hemizygous": # "GENO:0000134"
                                    n_alleles += 1
        disease_count = len(phenopacket.diseases)
        return self._validate(pp_id=pp_id, n_hpo=n_pf, disease_count=disease_count, n_var=n_var, n_alleles=n_alleles)



    def _validate(self, pp_id:str, n_hpo:int, disease_count:int, n_var:int=None, n_alleles:int=None):
        """
        private method called by validate_individual or validate_phenopacket.
        :param pp_id: phenopacket identifier
        :type pp_id: str
        :param n_hpo: Number of HPO terms
        :type n_hpo: int
        :param n_var: Number of variants found
        :type n_var: Optional[int]
        :param n_alleles: Number of alleles found
        :type n_alleles: Optional[int]
        """
        validation_results = []
        if n_hpo < self._min_hpo:
            validation_results.append(ValidationResultBuilder(phenopacket_id=pp_id).insufficient_hpos(min_hpo=self._min_hpo, n_hpo=n_hpo).build())
        if disease_count < self._minimum_disease_count:
            val_result = ValidationResultBuilder(phenopacket_id=pp_id).insufficient_disease_count(disease_count, self._minimum_disease_count).build()
            validation_results.append(val_result)
        if self._allelic_requirement is None:
            return validation_results
        if self._allelic_requirement == AllelicRequirement.MONO_ALLELIC:
            if n_var != 1:
                val_result = ValidationResultBuilder(phenopacket_id=pp_id).incorrect_variant_count(self._allelic_requirement, n_var).build()
                validation_results.append(val_result)
            if n_alleles != 1:
                val_result = ValidationResultBuilder(phenopacket_id=pp_id).incorrect_allele_count(self._allelic_requirement, n_alleles).build()
                validation_results.append(val_result)
        elif self._allelic_requirement == AllelicRequirement.BI_ALLELIC:
            if n_var < 1 or n_var > 2:
                msg = f"Expected one or two variant for biallelic but got {n_var} variants"
                val_result = ValidationResultBuilder(phenopacket_id=pp_id).incorrect_variant_count(self._allelic_requirement, n_var).build()
                validation_results.append(val_result)
            if n_alleles != 2:
                val_result = ValidationResultBuilder(phenopacket_id=pp_id).incorrect_allele_count(self._allelic_requirement, n_alleles).build()
                validation_results.append(val_result)
        return validation_results

validate_individual(individual)

check a single Individual as to whether there are sufficient HPO terms and alleles/variants

Returns:

Type Description
List[ValidationResult]

a potential empty list of validations

Source code in pyphetools/validation/content_validator.py
def validate_individual(self, individual:Individual) -> List[ValidationResult]:
    """
    check a single Individual as to whether there are sufficient HPO terms and alleles/variants
    :returns: a potential empty list of validations
    :rtype: List[ValidationResult]
    """
    n_pf = len(individual.hpo_terms)
    n_var = 0
    n_alleles = 0
    pp_id = individual.get_phenopacket_id()
    for variant_interpretation in individual.interpretation_list:
        n_var += 1
        if variant_interpretation.variation_descriptor is not None:
            vdesc =  variant_interpretation.variation_descriptor
            if vdesc.allelic_state is not None:
                gtype = vdesc.allelic_state
                if gtype.label == "heterozygous": # "GENO:0000135"
                    n_alleles += 1
                elif gtype.label == "homozygous": # "GENO:0000136"
                    n_alleles += 2
                elif gtype.label == "hemizygous": # "GENO:0000134"
                    n_alleles += 1
    disease_count =  individual.disease_count()
    return self._validate(pp_id=pp_id, n_hpo=n_pf, disease_count=disease_count, n_var=n_var, n_alleles=n_alleles)

validate_phenopacket(phenopacket)

check a single phenopacket as to whether there are sufficient HPO terms and alleles/variants

Returns:

Type Description
List[ValidationResult]

a potential empty list of validations

Source code in pyphetools/validation/content_validator.py
def validate_phenopacket(self, phenopacket) -> List[ValidationResult]:
    """
    check a single phenopacket as to whether there are sufficient HPO terms and alleles/variants
    :returns: a potential empty list of validations
    :rtype: List[ValidationResult]
    """
    if isinstance(phenopacket, str):
        # the user passed a file
        if not os.path.isfile(phenopacket):
            raise FileNotFoundError(f"Could not find phenopacket file at '{phenopacket}'")
        with open(phenopacket) as f:
            data = f.read()
            jsondata = json.loads(data)
            phpacket = Parse(json.dumps(jsondata), phenopackets.Phenopacket())
    elif isinstance(phenopacket, phenopackets.Phenopacket):
        phpacket = phenopacket
    else:
        raise ValueError(f"phenopacket argument must be file path or GA4GH Phenopacket \
            object but was {type(phenopacket)}")
    pp_id = phpacket.id
    n_pf = len(phpacket.phenotypic_features)
    if phpacket.interpretations is None:
        n_var = 0
        n_alleles = 0
    else:
        n_var = 0
        n_alleles = 0
        for interpretation in phpacket.interpretations:
            if interpretation.diagnosis is not None:
                dx = interpretation.diagnosis
                for genomic_interpretation in dx.genomic_interpretations:
                    n_var += 1
                    vint = genomic_interpretation.variant_interpretation
                    if vint.variation_descriptor is not None:
                        vdesc =   vint.variation_descriptor
                        if vdesc.allelic_state is not None:
                            gtype = vdesc.allelic_state
                            if gtype.label == "heterozygous": # "GENO:0000135"
                                n_alleles += 1
                            elif gtype.label == "homozygous": # "GENO:0000136"
                                n_alleles += 2
                            elif gtype.label == "hemizygous": # "GENO:0000134"
                                n_alleles += 1
    disease_count = len(phenopacket.diseases)
    return self._validate(pp_id=pp_id, n_hpo=n_pf, disease_count=disease_count, n_var=n_var, n_alleles=n_alleles)

OntologyQC

This class performs three kind of checks/cleansing of ontology data 1. negated superclass and observed subclass (this is an error in the original data) 2. observed superclass and observed subclass (this is a redundancy but arguably not an error) 3. Same term is excluded and observed (this is an unfixable error in the original data)

Source code in pyphetools/validation/ontology_qc.py
class OntologyQC:
    """
    This class performs three kind of checks/cleansing of ontology data
    1. negated superclass and observed subclass (this is an error in the original data)
    2. observed superclass and observed subclass (this is a redundancy but arguably not an error)
    3. Same term is excluded and observed (this is an unfixable error in the original data)

    """

    def __init__(self,
                 ontology:hpotk.MinimalOntology,
                 individual:Individual,
                 fix_conflicts=True,
                 fix_redundancies=True):
        self._ontology = ontology
        self._individual = individual
        self._phenopacket_id = individual.get_phenopacket_id()
        self._fix_conflict_flag = fix_conflicts
        self._fix_redundancy_flag = fix_redundancies
        self._errors = []
        self._clean_hpo_terms = self._clean_terms()


    def _fix_conflicts(self,
                       observed_hpo_terms:List[HpTerm],
                       excluded_hpo_terms) -> List[HpTerm]:
        """
        This class detects excluded superclasses that have observed subclasses -- a conflict.

        For instance, if an individual is annotated to the terms (1) excluded: Seizure [HP:0001250] and (2)
        observed - Clonic Seizure [HP:0020221], this is a conflict, because a person with clonic seizure also
        can be said to have seizure. Here, we assume that the excluded annotation is an error that we
        want to remove automatically and issue a warning. Thus, in this example, we would remove the
        annotation excluded: Seizure [HP:0001250], and in general the excluded superclass is removed
        if this kind of conflict is detected

        :param observed_hpo_terms: list of HPO terms (observed), can be empty
        :type observed_hpo_terms: List[HpTerm]
        :param excluded_hpo_terms: list of HPO terms (excluded), can be empty
        :type excluded_hpo_terms: List[HpTerm]
        :returns: the potentially cleansed list of excluded terms (the observed terms are never changed by this method
        :rtype: List[HpTerm]
        """
        if len(excluded_hpo_terms) == 0:
            # i.e., there can be no conflict
            return excluded_hpo_terms
        all_excluded_term_ids = {term.id for term in excluded_hpo_terms}
        conflicting_term_id_set = set()
        for term in observed_hpo_terms:
            for tid in all_excluded_term_ids:
                if term.id == tid:
                    # same term observed and excluded
                    # we cannot automatically fix this error
                    # this will be reported and the user will need to check the input data
                    error = ValidationResultBuilder(phenopacket_id=self._phenopacket_id).observed_and_excluded_term(term=term).build()
                    self._errors.append(error)
                elif self._ontology.graph.is_ancestor_of(tid, term.id):
                    conflicting_term_id_set.add(tid)
                    conflicting_term = self._ontology.get_term(term_id=tid)
                    cterm = HpTerm.from_hpo_tk_term(conflicting_term)
                    error = ValidationResultBuilder(phenopacket_id=self._phenopacket_id).conflict(term=term, conflicting_term=cterm).build()
                    self._errors.append(error)
        if len(conflicting_term_id_set) > 0:
            excluded_hpo_terms = [term for term in excluded_hpo_terms if term.id not in conflicting_term_id_set]
        return excluded_hpo_terms




    def _fix_redundancies(self,
                          hpo_terms:List[HpTerm]) -> List[HpTerm]:
        """
        Remove redundant terms from a list of HPO terms.

        As a side effect, add a ValidationResult for each removed redundant term
        :param hpo_terms: original term list that might contain redundancies
        :type hpo_terms: List[HpTerm]
        :returns: list of HPO terms without redundancies
        :rtype hpo_terms: List[HpTerm]
        """
        all_terms = set(hpo_terms)
        # check for duplicates
        if len(all_terms) != len(hpo_terms):
            duplicates = [item for item, count in Counter(hpo_terms).items() if count > 1]
            for dup in duplicates:
                error = ValidationResultBuilder(self._phenopacket_id).duplicate_term(redundant_term=dup).build()
                self._errors.append(error)
            # The following removes duplicates under the assumption that all components of the HpTerm are equal
            hpo_terms = set(hpo_terms)
        # The following code checks for other kinds of redundancies
        redundant_term_d = {}
        for term in all_terms:
            for term2 in all_terms:
                # The ancestor, e.g. Seizure comes first, the other term, e.g. Clonic seizure, second
                # in the following function call
                if self._ontology.graph.is_ancestor_of(term2.id, term.id):
                    redundant_term_d[term2] = term
        # When we get here, we have scanned all terms for redundant ancestors
        non_redundant_terms = [ term for term in hpo_terms if term not in redundant_term_d]
        if len(redundant_term_d) > 0:
            for term, descendant in redundant_term_d.items():
                error = ValidationResultBuilder(self._phenopacket_id).redundant_term(ancestor_term=term, descendent_term=descendant).build()
                self._errors.append(error)
        return non_redundant_terms


    def _check_term_ids_and_labels(self,
                                   hpo_terms:List[HpTerm]) -> None:
        """
        Check whether the term identifiers (e.g., HP:0001234) are present in the ontology as primary ids and whether
        the label matches the current priumary label; if not, flag the errors in self._errors
        """
        for term in hpo_terms:
            hpo_id = term.id
            if not hpo_id in self._ontology:
                error = ValidationResultBuilder(self._phenopacket_id).malformed_hpo_id(malformed_term=term).build()
                self._errors.append(error)
            else:
                hpo_term = self._ontology.get_term(term_id=hpo_id)
                if hpo_term.name != term.label:
                    valid_term = HpTerm.from_hpo_tk_term(hpo_term)
                    error = ValidationResultBuilder(self._phenopacket_id).malformed_hpo_label(malformed_label=term.label,
                                                                                              valid_term=hpo_term).build()
                    self._errors.append(error)

    def _clean_terms(self) -> List[HpTerm]:
        """
        :returns: list of HPO terms without redundancies/conflicts
        :rtype hpo_terms: List[HpTerm]
        """
        by_age_dictionary = defaultdict(list)
        # collect all terms without a defined age of onset
        # We will assume these terms exist at all specific ages of onset, thus we need this to calculate redundancy
        observed_terms_without_onset = list()
        excluded_terms_without_onset = list()
        for term in self._individual.hpo_terms:
            if not term.measured:
                self._errors.append(ValidationResultBuilder(self._phenopacket_id).not_measured(term=term).build())
            else:
                if term.onset is not None:
                    by_age_dictionary[term.onset].append(term)
                else:
                    if term.observed:
                        observed_terms_without_onset.append(term)
                    else:
                        excluded_terms_without_onset.append(term)
        self._check_term_ids_and_labels(self._individual.hpo_terms)
        clean_terms = []

        for onset, term_list in by_age_dictionary.items():
            observed_hpo_terms = [term for term in term_list if term.observed]
            excluded_hpo_terms = [term for term in term_list if not term.observed]
            if self._fix_redundancy_flag:
                observed_hpo_terms = self._fix_redundancies(observed_hpo_terms)
                excluded_hpo_terms = self._fix_redundancies(excluded_hpo_terms)
            if self._fix_conflict_flag:
                # this method checks and may fix the excluded terms (only)
                excluded_hpo_terms = self._fix_conflicts(observed_hpo_terms, excluded_hpo_terms)
            clean_terms.extend(observed_hpo_terms)
            clean_terms.extend(excluded_hpo_terms)
        # When we get here, clean terms contains terms with specific onsets and conflicting/redundant terms
        # have been removed. There may be terms with no specific onset. We only add such terms if they are neither
        # ancestors or descendants of the specific terms
        observed_terms_without_onset = self._fix_redundancies(observed_terms_without_onset)
        excluded_terms_without_onset = self._fix_redundancies(excluded_terms_without_onset)
        all_term_set = set(clean_terms)
        for t in observed_terms_without_onset:
            addT = True
            for s in all_term_set:
                # keep the term with the age of onset regardless of whether it is more or less specific
                if s.id == t.id:
                    error = ValidationResultBuilder(self._phenopacket_id).duplicate_term(s).build()
                    self._errors.append(error)
                    addT = False
                    break
                if self._ontology.graph.is_ancestor_of(t.id, s.id):
                    error = ValidationResultBuilder(self._phenopacket_id).redundant_term(t, s).build()
                    self._errors.append(error)
                    addT = False
                    break
                if self._ontology.graph.is_ancestor_of(s.id, t.id):
                    error = ValidationResultBuilder(self._phenopacket_id).redundant_term(s, t).build()
                    self._errors.append(error)
                    addT = False
                    break
            if addT:
                clean_terms.append(t)
                all_term_set.add(t)
        # now check for problems with excluded terms
        for t in excluded_terms_without_onset:
            addT = True
            for s in all_term_set:
                # if an excluded term is equal to or ancestor of an observed term this is an error
                if s.id == t.id:
                    error = ValidationResultBuilder(self._phenopacket_id).observed_and_excluded_term(term=s).build()
                    self._errors.append(error)
                    addT = False
                elif self._ontology.graph.is_ancestor_of(t.id, s.id):
                    error = ValidationResultBuilder(self._phenopacket_id).conflict(term=s, conflicting_term=t).build()
                    self._errors.append(error)
                    addT = False
                    break
            if addT:
                clean_terms.append(t)
                all_term_set.add(t)

        return clean_terms

    def has_error(self) -> bool:
        """
        :returns: True iff errors were encountered
        :rtype: boolean
        """
        return len(self._errors) > 0

    def get_error_list(self) -> List[ValidationResult]:
        """
        :returns: a potential empty list of errors
        :rtype: List[str]
        """
        return self._errors

    def get_clean_terms(self) -> List[HpTerm]:
        return self._clean_hpo_terms


    def get_error_string(self) -> Optional[str]:
        """
        create and return a string that summarizes the redundancies and conflicts that were corrected

        :returns: a string summarizing errors or None if there were none
        :rtype: Optional[str]
        """
        if not self.has_error():
            return None
        redundancies = [e for e in self._errors if e.is_redundant()]
        conflicts = [e for e in self._errors if e.is_conflict()]
        e_string = ""
        if len(redundancies) > 0:
            red_terms = [e.hpo_term_and_id for e in redundancies]
            e_string = "The following redundant terms were removed: " + ", ".join(red_terms) + ". "
        if len(conflicts) > 0:
            conf_terms = [e.hpo_term_and_id for e in conflicts]
            e_string = e_string + "The following conflicting excluded terms were removed: " + ", ".join(conf_terms) + ". "
        return e_string


    @staticmethod
    def qc_cohort(individual_list:List[Individual]) -> List[Individual] :


        return individual_list

get_error_list()

Returns:

Type Description
List[str]

a potential empty list of errors

Source code in pyphetools/validation/ontology_qc.py
def get_error_list(self) -> List[ValidationResult]:
    """
    :returns: a potential empty list of errors
    :rtype: List[str]
    """
    return self._errors

get_error_string()

create and return a string that summarizes the redundancies and conflicts that were corrected

Returns:

Type Description
Optional[str]

a string summarizing errors or None if there were none

Source code in pyphetools/validation/ontology_qc.py
def get_error_string(self) -> Optional[str]:
    """
    create and return a string that summarizes the redundancies and conflicts that were corrected

    :returns: a string summarizing errors or None if there were none
    :rtype: Optional[str]
    """
    if not self.has_error():
        return None
    redundancies = [e for e in self._errors if e.is_redundant()]
    conflicts = [e for e in self._errors if e.is_conflict()]
    e_string = ""
    if len(redundancies) > 0:
        red_terms = [e.hpo_term_and_id for e in redundancies]
        e_string = "The following redundant terms were removed: " + ", ".join(red_terms) + ". "
    if len(conflicts) > 0:
        conf_terms = [e.hpo_term_and_id for e in conflicts]
        e_string = e_string + "The following conflicting excluded terms were removed: " + ", ".join(conf_terms) + ". "
    return e_string

has_error()

Returns:

Type Description
boolean

True iff errors were encountered

Source code in pyphetools/validation/ontology_qc.py
def has_error(self) -> bool:
    """
    :returns: True iff errors were encountered
    :rtype: boolean
    """
    return len(self._errors) > 0

PhenopacketValidator

Abstract super class for classes that validate phenopackets

Source code in pyphetools/validation/phenopacket_validator.py
class PhenopacketValidator(metaclass=abc.ABCMeta):
    """
    Abstract super class for classes that validate phenopackets
    """
    def __init__(self):
        pass

    @abc.abstractmethod
    def validate_phenopacket(self, phenopacket):
        pass

ValidationResult

A helper class to store the results of validation

Parameters:

Name Type Description Default
phenopacket_id str

Identifier of the phenopacket being validated

required
message str

description of the error/warning

required
errorlevel ErrorLevel

whether this result is an error or a warning

required
category Category

type of QcError

required
term HpTerm

HpTerm that caused the error

None
Source code in pyphetools/validation/validation_result.py
class ValidationResult:
    """
    A helper class to store the results of validation
    :param phenopacket_id: Identifier of the phenopacket being validated
    :type phenopacket_id: str
    :param message: description of the error/warning
    :type message: str
    :param errorlevel: whether this result is an error or a warning
    :type errorlevel: ErrorLevel
    :param category: type of QcError
    :type category: Category
    :param term: HpTerm that caused the error
    :type term: HpTerm

    """
    def __init__(self, phenopacket_id:str, message:str, errorlevel:ErrorLevel, category:Category, term:HpTerm=None):
        self._phenopacket_id = phenopacket_id
        self._message = message
        self._error_level = errorlevel
        self._category = category
        self._term = term

    @property
    def id(self):
        return self._phenopacket_id

    @property
    def message(self) -> str:
        """
        :returns: description of the cause of ValidationResult
        :rtype: str
        """
        return self._message

    @property
    def error_level(self)-> str:
        """
        :returns: the name of the ErrorLevel this ValidationResult is about
        :rtype: str
        """
        return self._error_level.name

    @property
    def term(self) -> Optional[HpTerm]:
        """
        :returns: A string representation of the HPO term this ValidationResult is about, if applicable, or empty string
        :rtype: Optional[str]
        """
        return self._term

    @property
    def category(self) -> str:
        """
        :returns: the name of the Category this ValidationResult is about
        :rtype: str
        """
        return self._category.name

    def is_error(self) -> bool:
        return self._error_level == ErrorLevel.ERROR

    def is_warning(self) -> bool:
        return self._error_level == ErrorLevel.WARNING

    def is_unfixable_error(self) -> bool:
        """Some errors cannot be fixed automatically and require manual attention.

        :returns: True iff this ValidationResult cannot be fixed automatically.
        :rtype: bool
        """
        return self._category in {Category.INSUFFICIENT_HPOS,
                                Category.INCORRECT_ALLELE_COUNT,
                                Category.INCORRECT_VARIANT_COUNT,
                                Category.MALFORMED_ID,
                                Category.MALFORMED_LABEL,
                                Category.OBSERVED_AND_EXCLUDED
                                }

    def get_items_as_array(self) -> List[str]:
        """
        :returns: A list of items (strings) intended for display
        :rtype: List[str]
        """
        if self._term is None:
            term = ""
        elif isinstance(self._term, HpTerm):
            term = self._term.hpo_term_and_id
        else:
            term = f"{self._term.name} ({self._term.identifier.value})"
        return [self.id, self.error_level, self.category, self.message, term]

    def __repr__(self):
        return f"{self._error_level}: {self._message}"


    @staticmethod
    def get_header_fields():
        return ["ID", "Level", "Category", "Message", "HPO Term"]

category: str property

Returns:

Type Description
str

the name of the Category this ValidationResult is about

error_level: str property

Returns:

Type Description
str

the name of the ErrorLevel this ValidationResult is about

message: str property

Returns:

Type Description
str

description of the cause of ValidationResult

term: Optional[HpTerm] property

Returns:

Type Description
Optional[str]

A string representation of the HPO term this ValidationResult is about, if applicable, or empty string

get_items_as_array()

Returns:

Type Description
List[str]

A list of items (strings) intended for display

Source code in pyphetools/validation/validation_result.py
def get_items_as_array(self) -> List[str]:
    """
    :returns: A list of items (strings) intended for display
    :rtype: List[str]
    """
    if self._term is None:
        term = ""
    elif isinstance(self._term, HpTerm):
        term = self._term.hpo_term_and_id
    else:
        term = f"{self._term.name} ({self._term.identifier.value})"
    return [self.id, self.error_level, self.category, self.message, term]

is_unfixable_error()

Some errors cannot be fixed automatically and require manual attention.

Returns:

Type Description
bool

True iff this ValidationResult cannot be fixed automatically.

Source code in pyphetools/validation/validation_result.py
def is_unfixable_error(self) -> bool:
    """Some errors cannot be fixed automatically and require manual attention.

    :returns: True iff this ValidationResult cannot be fixed automatically.
    :rtype: bool
    """
    return self._category in {Category.INSUFFICIENT_HPOS,
                            Category.INCORRECT_ALLELE_COUNT,
                            Category.INCORRECT_VARIANT_COUNT,
                            Category.MALFORMED_ID,
                            Category.MALFORMED_LABEL,
                            Category.OBSERVED_AND_EXCLUDED
                            }

ValidationResultBuilder

This class is intended for internal use only, and makes constructing ValidatioResult objects a little easier.

Source code in pyphetools/validation/validation_result.py
class ValidationResultBuilder:
    """
    This class is intended for internal use only, and makes constructing ValidatioResult objects a little easier.
    """

    def __init__(self, phenopacket_id:str):
        self._phenopacket_id = phenopacket_id
        self._error_level = ErrorLevel.UNKNOWN
        self._category = Category.UNKNOWN
        self._message = ""
        self._term = None

    def duplicate_term(self, redundant_term:HpTerm):
        """The HPO term is annotated as observed and excluded in the same individual. This is an unfixable error.

        :param redundant_term: HPO term that is annotated as observed and excluded
        :type redundant_term: HpTerm
        :returns: a reference to self so this command can be used as part of a builder.
        :rtype: ValidationResultBuilder
        """
        self._error_level = ErrorLevel.WARNING
        self._category = Category.DUPLICATE
        self._message = f"<b>{redundant_term.label}</b> is listed multiple times"
        self._term = redundant_term
        return self

    def observed_and_excluded_term(self, term:HpTerm):
        """The HPO term is annotated as observed and excluded in the same individual. This is an unfixable error.

        :param redundant_term: HPO term that is annotated as observed and excluded
        :type redundant_term: HpTerm
        :returns: a reference to self so this command can be used as part of a builder.
        :rtype: ValidationResultBuilder
        """
        self._error_level = ErrorLevel.ERROR
        self._category = Category.OBSERVED_AND_EXCLUDED
        self._message = f"Term {term.label} ({term.id}) was annotated to be both observed and excluded."
        self._term = term
        return self

    def redundant_term(self, ancestor_term:HpTerm, descendent_term:HpTerm):
        """The HPO term and one of its ancestors are both annotated as observed in the same individual.

        :param ancestor_term: Ancestor HPO term that is annotated as observed
        :type ancestor_term: HpTerm
        :param descendent_term: Descendent HPO term that is annotated as observed
        :type descendent_term: HpTerm
        :returns: a reference to self so this command can be used as part of a builder.
        :rtype: ValidationResultBuilder
        """
        self._error_level = ErrorLevel.WARNING
        self._category = Category.REDUNDANT
        self._message = f"<b>{ancestor_term.label}</b> is redundant because of <b>{descendent_term.label}</b>"
        self._term = ancestor_term
        return self

    def conflict(self, term:HpTerm, conflicting_term:HpTerm):
        message = f"{term.to_string()} conflicts with the excluded term {conflicting_term.to_string()} "
        self._error_level = ErrorLevel.ERROR
        self._category = Category.CONFLICT
        self._message = message
        self._term = conflicting_term
        return self

    def not_measured(self, term:HpTerm):
        self._error_level = ErrorLevel.INFORMATION
        self._category = Category.NOT_MEASURED
        self._term = term
        self._message = f"{term.hpo_term_and_id} was listed as not measured and will be omitted"
        return self

    def insufficient_hpos(self, min_hpo:int, n_hpo:int):
        self._message = f"Minimum HPO terms required {min_hpo} but only {n_hpo} found"
        self._error_level = ErrorLevel.ERROR
        self._category = Category.INSUFFICIENT_HPOS
        return self

    def incorrect_allele_count(self, allelic_requirement:AllelicRequirement, observed_alleles:int):
        if allelic_requirement == AllelicRequirement.MONO_ALLELIC:
            self._message = f"Expected one allele for monoallelic but got {observed_alleles} alleles"
        elif allelic_requirement == AllelicRequirement.BI_ALLELIC:
            self._message  = f"Expected two alleles for biallelic but got {observed_alleles} alleles"
        else:
            # should never happen
            raise ValueError("attempt to create incorrect_allele_count Error without defined allelic requirement")
        self._error_level = ErrorLevel.ERROR
        self._category = Category.INCORRECT_ALLELE_COUNT
        return self

    def incorrect_variant_count(self,allelic_requirement:AllelicRequirement, n_var:int):
        if allelic_requirement == AllelicRequirement.MONO_ALLELIC:
            self._message = f"Expected one variant for monoallelic but got {n_var} variants"
        elif allelic_requirement == AllelicRequirement.BI_ALLELIC:
            self._message  = f"Expected one or two variants for biallelic but got {n_var} variants"
        else:
            # should never happen
            raise ValueError("attempt to create incorrect_variant_count Error without defined allelic requirement")
        self._error_level = ErrorLevel.ERROR
        self._category = Category.INCORRECT_VARIANT_COUNT
        return self

    def set_message(self, msg):
        self._message = msg
        return self

    def malformed_hpo_id(self, malformed_term:HpTerm):
        self._error_level = ErrorLevel.ERROR
        self._category = Category.MALFORMED_ID
        self._message = f"Malformed term {malformed_term.label} with invalid HPO id {malformed_term.id}"
        return self

    def insufficient_disease_count(self, observed_count:int, minimum_count:int):
        self._error_level = ErrorLevel.ERROR
        self._category = Category.INSUFFICIENT_DISEASE_COUNT
        self._message = f"Individual had {observed_count} disease annotation(s) but the mininum required count is {minimum_count}"
        return self

    def malformed_hpo_label(self, malformed_label, valid_term:HpTerm):
        self._error_level = ErrorLevel.ERROR
        self._category = Category.MALFORMED_LABEL
        self._message = f"Invalid label '{malformed_label}' found for {valid_term.name} ({valid_term.identifier.value})"
        self._term = valid_term
        return self

    def set_term(self, term:HpTerm):
        self._term = term
        return self

    def build(self) -> ValidationResult:
        return ValidationResult(phenopacket_id=self._phenopacket_id, message=self._message, errorlevel=self._error_level, category=self._category, term=self._term)

duplicate_term(redundant_term)

The HPO term is annotated as observed and excluded in the same individual. This is an unfixable error.

Parameters:

Name Type Description Default
redundant_term HpTerm

HPO term that is annotated as observed and excluded

required

Returns:

Type Description
ValidationResultBuilder

a reference to self so this command can be used as part of a builder.

Source code in pyphetools/validation/validation_result.py
def duplicate_term(self, redundant_term:HpTerm):
    """The HPO term is annotated as observed and excluded in the same individual. This is an unfixable error.

    :param redundant_term: HPO term that is annotated as observed and excluded
    :type redundant_term: HpTerm
    :returns: a reference to self so this command can be used as part of a builder.
    :rtype: ValidationResultBuilder
    """
    self._error_level = ErrorLevel.WARNING
    self._category = Category.DUPLICATE
    self._message = f"<b>{redundant_term.label}</b> is listed multiple times"
    self._term = redundant_term
    return self

observed_and_excluded_term(term)

The HPO term is annotated as observed and excluded in the same individual. This is an unfixable error.

Parameters:

Name Type Description Default
redundant_term HpTerm

HPO term that is annotated as observed and excluded

required

Returns:

Type Description
ValidationResultBuilder

a reference to self so this command can be used as part of a builder.

Source code in pyphetools/validation/validation_result.py
def observed_and_excluded_term(self, term:HpTerm):
    """The HPO term is annotated as observed and excluded in the same individual. This is an unfixable error.

    :param redundant_term: HPO term that is annotated as observed and excluded
    :type redundant_term: HpTerm
    :returns: a reference to self so this command can be used as part of a builder.
    :rtype: ValidationResultBuilder
    """
    self._error_level = ErrorLevel.ERROR
    self._category = Category.OBSERVED_AND_EXCLUDED
    self._message = f"Term {term.label} ({term.id}) was annotated to be both observed and excluded."
    self._term = term
    return self

redundant_term(ancestor_term, descendent_term)

The HPO term and one of its ancestors are both annotated as observed in the same individual.

Parameters:

Name Type Description Default
ancestor_term HpTerm

Ancestor HPO term that is annotated as observed

required
descendent_term HpTerm

Descendent HPO term that is annotated as observed

required

Returns:

Type Description
ValidationResultBuilder

a reference to self so this command can be used as part of a builder.

Source code in pyphetools/validation/validation_result.py
def redundant_term(self, ancestor_term:HpTerm, descendent_term:HpTerm):
    """The HPO term and one of its ancestors are both annotated as observed in the same individual.

    :param ancestor_term: Ancestor HPO term that is annotated as observed
    :type ancestor_term: HpTerm
    :param descendent_term: Descendent HPO term that is annotated as observed
    :type descendent_term: HpTerm
    :returns: a reference to self so this command can be used as part of a builder.
    :rtype: ValidationResultBuilder
    """
    self._error_level = ErrorLevel.WARNING
    self._category = Category.REDUNDANT
    self._message = f"<b>{ancestor_term.label}</b> is redundant because of <b>{descendent_term.label}</b>"
    self._term = ancestor_term
    return self