Skip to content

CdaTableImporter

Bases: CdaImporter[fetch_rows]

This class is the entry point for transforming CDA data into GA4GH Phenopackets. Client code only needs to initialize it with a CDA query, and it can return phenopackets with the :func:get_ga4gh_phenopackets. It also returns individual tables for that can be used for testing or visualizing data.

The CDA query determines the cohort that will be retrieved from CDA. This class then retrieves data for this cohort in form of pandas DataFrames and extracts data for phenopacket construction using the data in the tables

Parameters:

Name Type Description Default
disease_factory CdaDiseaseFactory

the component for mapping CDA table into Disease element of the Phenopacket Schema.

required
cache_dir Optional[str]

a str with path to the folder to store the cache files

None
use_cache bool

if True, cache/retrieve from cache

False
page_size

Number of pages to retrieve at once. Defaults to 10000 New CDA: https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#returning-a-matrix-of-results old: all of the functions previously used with, or chained onto Q()...run() have been replaced with the single function fetch_rows() new: `fetch_rows(table=, ) https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#parameters old: page_size, limit, and count parameters have been removed new: column_values always returns all unique values and their counts by default, however there are several new parameters old: system= new: data_source= can now take a list, as in data_source=["GDC", "PDC"] new: sort_by= sort results by any column new: force= For columns with an extremely large number of unique values, such as filename, the query will fail with a large data warning. You can override the warning with Force=True tables: ['diagnosis', 'file', 'researchsubject', 'somatic_mutation', 'specimen', 'subject', 'treatment']

required
Source code in src/oncoexporter/cda/cda_table_importer.py
class CdaTableImporter(CdaImporter[fetch_rows]):
    """This class is the entry point for transforming CDA data into GA4GH Phenopackets. Client code only needs
    to initialize it with a CDA query, and it can return phenopackets with the :func:`get_ga4gh_phenopackets`.
    It also returns individual tables for that can be used for testing or visualizing data.

    The CDA query determines the cohort that will be retrieved from CDA. This class then retrieves data
    for this cohort in form of pandas DataFrames and extracts data for phenopacket construction using the data
    in the tables

    :param disease_factory: the component for mapping CDA table into Disease element of the Phenopacket Schema.
    :param cache_dir: a `str` with path to the folder to store the cache files
    :param use_cache: if True, cache/retrieve from cache
    :param page_size: Number of pages to retrieve at once. Defaults to `10000`

    New CDA: 
    https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#returning-a-matrix-of-results
    old: all of the functions previously used with, or chained onto Q()...run() have been replaced with the single function fetch_rows()
    new: `fetch_rows(table=, )

    https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#parameters
    old: page_size, limit, and count parameters have been removed
    new: column_values always returns all unique values and their counts by default, however there are several new parameters

    old: system=<data source>
    new: data_source=<data source> can now take a list, as in data_source=["GDC", "PDC"]

    new: sort_by=<column:asc/desc> sort results by any column

    new: force=<True/False> For columns with an extremely large number of unique values, such as filename, the query will fail with a large data warning. 
    You can override the warning with Force=True

    tables: ['diagnosis', 'file', 'researchsubject', 'somatic_mutation', 'specimen', 'subject', 'treatment']
    """

    def __init__(self,
                 disease_factory: CdaDiseaseFactory,
                 use_cache: bool = False,
                 cache_dir: typing.Optional[str] = None,
                 #page_size: int = 10000,
                 gdc_timeout: int = 100000,
                 ):
        self._use_cache = use_cache
        #self._page_size = page_size # not in new CDA 

        self._individual_factory = CdaIndividualFactory()
        self._disease_factory = disease_factory # why is disease_factory set up differently than the others?
        self._specimen_factory = CdaBiosampleFactory()
        self._mutation_factory = CdaMutationFactory()
        self._gdc_service = GdcService(timeout=gdc_timeout)

        if cache_dir is None:
            self._cache_dir = os.path.join(os.getcwd(), '.oncoexporter_cache')
            if not os.path.isdir(self._cache_dir):
                os.makedirs(self._cache_dir, exist_ok=True)
        else:
            if not os.path.isdir(cache_dir) or not os.access(cache_dir, os.W_OK):
                raise ValueError(f'`cache_dir` must be a writable directory: {cache_dir}')

    def _get_cda_df(self, callback_fxn, cache_name: str):
        fpath_cache = os.path.join(self._cache_dir, cache_name)
        if self._use_cache and os.path.isfile(fpath_cache):
            print(f"\tRetrieving dataframe {fpath_cache}")
            with open(fpath_cache, 'rb') as cachehandle:
                print(f"loading cached dataframe from {fpath_cache}")
                individual_df = pickle.load(cachehandle)
        else:
            print(f"\tcalling CDA function")
            individual_df = callback_fxn()
            if self._use_cache:
                print(f"Creating cached dataframe as {fpath_cache}")
                with open(fpath_cache, 'wb') as f:
                    pickle.dump(individual_df, f)
        return individual_df
    def get_subject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        """Retrieve the subject dataframe from CDA

        This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

        :raises: raises an exception if the query object was not properly initialized
        :returns: pandas DataFrame that corresponds to the CDA subject table.
        :rtype: pd.DataFrame
        """
        print("\nGetting subject df...")
        callable = lambda: fetch_rows( table='subject', **q, provenance=True )
        subject_df = self._get_cda_df(callable, f"{cohort_name}_individual_df.pkl")
        subject_df = subject_df.drop(columns=['subject_data_source_id'], axis=1)
        subject_df = subject_df.drop_duplicates()
        print("obtained subject_df")

        return subject_df

    def get_researchsubject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:

        print("\nGetting researchsubject df...")
        # tried link_to_table='diagnosis' but it doesn't add any columns
        # research = fetch_rows(table='researchsubject', provenance=True)
        rsub_callable = lambda: fetch_rows( table='researchsubject', **q , add_columns=['subject_id'])
        rsub_df = self._get_cda_df(rsub_callable, f"{cohort_name}_researchsubject_df.pkl") 
        print("obtained researchsubject_df")
        #rsub_df.to_csv('rsub_diagnosis_df.txt', sep='\t')

        return rsub_df

    def get_diagnosis_df(self, q: dict, cohort_name: str) -> pd.DataFrame:

        print("\nGetting diagnosis df...")
        # diag = fetch_rows(table='diagnosis', add_columns=['researchsubject_id'])
        diagnosis_callable = lambda: fetch_rows( table='diagnosis', **q , add_columns=['subject_id'])
        diagnosis_df = self._get_cda_df(diagnosis_callable, f"{cohort_name}_diagnosis_df.pkl")
        print("obtained diagnosis_df")
        #diagnosis_df.to_csv('diagnosis_df.txt', sep='\t')

        return diagnosis_df

    def get_specimen_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        """Retrieve the subject dataframe from CDA

        This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

        :raises: raises an exception if the query object was not properly initialized
        :returns: pandas DataFrame that corresponds to the CDA subject table.
        :rtype: pd.DataFrame
        """
        print("\nGetting specimen df...")
        #specimen_callable = lambda: q.specimen.run(page_size=self._page_size).get_all().to_dataframe()
        specimen_callable = lambda: fetch_rows( table='specimen', **q, add_columns=['subject_id'] )
        specimen_df = self._get_cda_df(specimen_callable, f"{cohort_name}_specimen_df.pkl")
        #specimen_df.to_csv('specimen_df.txt', sep='\t')
        return specimen_df

    def get_treatment_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        print("\nGetting treatment df...")
        #treatment_callable = lambda: q.treatment.run(page_size=self._page_size).get_all().to_dataframe()
        treatment_callable = lambda: fetch_rows( table='treatment', **q, add_columns=['subject_id'] )
        treatment_df = self._get_cda_df(treatment_callable, f"{cohort_name}_treatment_df.pkl")
        #treatment_df.to_csv('treatment_df.txt', sep='\t')
        return treatment_df

    def get_ga4gh_phenopackets(self, source: dict, **kwargs) -> typing.List[PPkt.Phenopacket]:
        """Get a list of GA4GH phenopackets corresponding to the individuals returned by the query passed to the constructor.

        :returns: A list of GA4GH phenopackets corresponding to the individuals selected by the query passed to the constructor.
        :rtype: typing.List[PPkt.Phenopacket]

        New version of CDA: need to change Q to fetch_rows()
        """
        # no longer using Q objects
        # if not isinstance(source, Q):
        #    raise ValueError(f"query_obj argument must be Q.Q object, but instead was {type(source)}")

        if 'cohort_name' in kwargs:
            cohort_name = kwargs['cohort_name']
        else:
            # Format timestamp as a string, for example: 'YYYY-MM-DD HH:MM:SS'
            ts = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
            cohort_name = f'cohort-{ts}'

        # Dictionary of phenopackets, keys are the phenopacket ids.
        ppackt_d = {}

        # First obtain the pandas DataFrames from the CDA tables with rows that correspond to the Query
        # (MLS 6/18/24) rewriting this to get subject, researchsubject, diagnosis, specimen, and treatment dataframes,
        # then merge them here to avoid getting researchsubject and subject dataframes multiple times.

        subject_df = self.get_subject_df(source, cohort_name)
        rsub_df = self.get_researchsubject_df(source, cohort_name)
        diagnosis_df = self.get_diagnosis_df(source, cohort_name)
        specimen_df = self.get_specimen_df(source, cohort_name)
        treatment_df = self.get_treatment_df(source, cohort_name)

        # merge dfs:
        # can actually just make one merged df with subject_id, researchsubject_id, primary_diagnosis_condition,
        # primary_diagnosis_site, primary_diagnosis, age_at_diagnosis, stage
        # loop through it to generate:
        #  - disease_factory
        #  - vital_status
        #  - variants

        sub_rsub_diag_df = subject_df.merge(rsub_df, on='subject_id', how='outer')
        sub_rsub_diag_df = sub_rsub_diag_df.merge(diagnosis_df, on='subject_id', how='outer')

        print("merged subject-researchsubject-diagnosis df")
        print(sub_rsub_diag_df.shape)

        # Now use the CdaFactory classes to transform the information from the DataFrames into
        # components of the GA4GH Phenopacket Schema
        # Add these components one at a time to Phenopacket objects.

        # treatment_factory = TODO

        print("\nConverting to Phenopackets...\n")

        '''
        required fields:
        - 'stage'                           # getting from GDC 
        - 'primary_diagnosis_condition'     # diagnosis mapper 
        - 'primary_diagnosis_site'          # uberon mapper
        - 'primary_diagnosis'               # diagnosis mapper
        - 'age_at_diagnosis'                # disease term mapper
        '''

        # Retrieve GA4GH Individual messages
        for _, row in tqdm(subject_df.iterrows(), total=len(subject_df), desc= "individual dataframe"):
            try:
                individual_message = self._individual_factory.to_ga4gh(row=row)
            except ValueError as e:
                # TODO: decide how to handle depending on your paranoia
                #
                pass

            individual_id = individual_message.id
            ppackt = PPkt.Phenopacket()
            ppackt.id = f'{cohort_name}-{individual_id}'
            ppackt.subject.CopyFrom(individual_message)
            ppackt_d[individual_id] = ppackt

        # get stage dictionary, map to subject ID
        print("Retrieving stage info from GDC...")
        stage_dict = self._gdc_service.fetch_stage_dict()
        # remove initial data source label: TCGA.TCGA-4J-AA1J > TCGA-4J-AA1J
        sub_rsub_diag_df['subject_id_short'] = sub_rsub_diag_df["subject_id"].str.extract(r'^[^\.]+\.(.+)', expand=False)
        sub_rsub_diag_df['stage'] = sub_rsub_diag_df['subject_id_short'].map(stage_dict).fillna(sub_rsub_diag_df['stage'])

        sub_rsub_diag_df['primary_diagnosis'] = sub_rsub_diag_df['primary_diagnosis'].fillna('') # remove nans (not sure why they are there)
        #sub_rsub_diag_df.to_csv('sub_rsub_diag_df.txt', sep='\t')


        # Retrieve GA4GH Disease messages 
        for _, row in tqdm(sub_rsub_diag_df.iterrows(), total=len(sub_rsub_diag_df.index), desc="creating disease messsages"):
            #print(list(row))
            #print("\n", row["subject_id"])

            # Retrieve GA4GH Disease messages
            disease_message = self._disease_factory.to_ga4gh(row)
            pp = ppackt_d.get(row["subject_id"]) 

            # Do not add the disease if it is already in the phenopacket.
            if not any(disease.term.id == disease_message.term.id for disease in pp.diseases):
                pp.diseases.append(disease_message)

            # need to check if we have the age_at_diagnosis in the phenopacket message
            for disease in pp.diseases:
                if not disease.HasField("onset") and disease.term.id == disease_message.term.id and disease_message.HasField("onset"):
                    disease.onset.CopyFrom(disease_message.onset)

            # get vital status from GDC - probably not needed (should be same as subject_df obtained from CDA)
            # vital_status = self._gdc_service.fetch_vital_status(subj_id)
            # ppackt_d.get(individual_id).subject.vital_status.CopyFrom(vital_status)         

        # Get variant data 
        # takes ~45 minutes due to API calls to GDC
        sub_rsub_diag_GDC_df = sub_rsub_diag_df[sub_rsub_diag_df['subject_data_source'] == 'GDC']
        for _, row in tqdm(sub_rsub_diag_GDC_df.iterrows(), total=len(sub_rsub_diag_df.index), desc="getting variants from GDC"):

            individual_id = row["subject_id"]

            #for rsub_subj in row["subject_identifier"]:
            #if row["subject_data_source"] == "GDC":
            print("GDC variants...")
            # have to strip off the leading name before first period
            # e.g. TCGA.TCGA-05-4250 -> TCGA-05-4250
            subj_id = re.sub("^[^.]+\.", "", individual_id)
            #print(row["subject_id"], subj_id)

            # get variants
            variant_interpretations = self._gdc_service.fetch_variants(subj_id) # was rsub_subj['value']
            if len(variant_interpretations) == 0:
                #print("No variants found")
                continue
            #else:
                #print("length variant_interpretations: {}".format(len(variant_interpretations)))   

            # TODO: improve/enhance diagnosis term annotations
            diagnosis = PPkt.Diagnosis()
            diagnosis.disease.id = "NCIT:C3262"
            diagnosis.disease.label = "Neoplasm"

            for variant in variant_interpretations:
                genomic_interpretation = PPkt.GenomicInterpretation()
                genomic_interpretation.subject_or_biosample_id = individual_id
                genomic_interpretation.interpretation_status = PPkt.GenomicInterpretation.InterpretationStatus.UNKNOWN_STATUS
                genomic_interpretation.variant_interpretation.CopyFrom(variant)

                diagnosis.genomic_interpretations.append(genomic_interpretation)

            interpretation = PPkt.Interpretation()
            interpretation.id = f"{individual_id}-{row['researchsubject_id']}"
            interpretation.progress_status = PPkt.Interpretation.ProgressStatus.IN_PROGRESS 
            interpretation.diagnosis.CopyFrom(diagnosis)

            ppackt_d.get(individual_id).interpretations.append(interpretation)


        # Retrieve GA4GH Biospecimen messages
        for idx, row in tqdm(specimen_df.iterrows(),total=len(specimen_df.index), desc="specimen/biosample dataframe"):
            biosample_message = self._specimen_factory.to_ga4gh(row)
            individual_id = row["subject_id"]
            if individual_id not in ppackt_d:
                raise ValueError(f"Attempt to enter unknown individual ID from biosample factory: \"{individual_id}\"")

            # convert CDA days_to_collection to PPKt time_of_collection
            #         days_to_collection: number of days from index date to sample collection date
            #         time_of_collection: Age of subject at time sample was collected 
            if not pd.isna(row['days_to_collection']):
                days_to_coll_iso = CdaFactory.days_to_iso(int(row["days_to_collection"]))
            # this should work if both are pd.Timedelta:
            # TODO: fix the code below!
            # time_of_collection = ppackt_d[individual_id]["iso8601duration"] + days_to_coll_iso # should it be 'Age' or 'iso8601duration'?
            # biosample_message["time_of_collection"] = time_of_collection

            ppackt_d.get(individual_id).biosamples.append(biosample_message)

        # TODO Treatment
        # make_cda_medicalaction
        for idx, row in tqdm(treatment_df.iterrows(), total=len(treatment_df.index), desc="Treatment DF"):
            individual_id = row["subject_id"]
            medical_action_message = make_cda_medicalaction(row)
            if individual_id not in ppackt_d:
                raise ValueError(f"Attempt to enter unknown individual ID from treatment factory: \"{individual_id}\"")
            ppackt_d.get(individual_id).medical_actions.append(medical_action_message)

        # When we get here, we have constructed GA4GH Phenopackets with Individual, Disease, Biospecimen, MedicalAction, and GenomicInterpretations
        return list(ppackt_d.values())

get_ga4gh_phenopackets(source, **kwargs)

Get a list of GA4GH phenopackets corresponding to the individuals returned by the query passed to the constructor.

Returns:

Type Description
typing.List[PPkt.Phenopacket] New version of CDA: need to change Q to fetch_rows()

A list of GA4GH phenopackets corresponding to the individuals selected by the query passed to the constructor.

Source code in src/oncoexporter/cda/cda_table_importer.py
def get_ga4gh_phenopackets(self, source: dict, **kwargs) -> typing.List[PPkt.Phenopacket]:
    """Get a list of GA4GH phenopackets corresponding to the individuals returned by the query passed to the constructor.

    :returns: A list of GA4GH phenopackets corresponding to the individuals selected by the query passed to the constructor.
    :rtype: typing.List[PPkt.Phenopacket]

    New version of CDA: need to change Q to fetch_rows()
    """
    # no longer using Q objects
    # if not isinstance(source, Q):
    #    raise ValueError(f"query_obj argument must be Q.Q object, but instead was {type(source)}")

    if 'cohort_name' in kwargs:
        cohort_name = kwargs['cohort_name']
    else:
        # Format timestamp as a string, for example: 'YYYY-MM-DD HH:MM:SS'
        ts = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
        cohort_name = f'cohort-{ts}'

    # Dictionary of phenopackets, keys are the phenopacket ids.
    ppackt_d = {}

    # First obtain the pandas DataFrames from the CDA tables with rows that correspond to the Query
    # (MLS 6/18/24) rewriting this to get subject, researchsubject, diagnosis, specimen, and treatment dataframes,
    # then merge them here to avoid getting researchsubject and subject dataframes multiple times.

    subject_df = self.get_subject_df(source, cohort_name)
    rsub_df = self.get_researchsubject_df(source, cohort_name)
    diagnosis_df = self.get_diagnosis_df(source, cohort_name)
    specimen_df = self.get_specimen_df(source, cohort_name)
    treatment_df = self.get_treatment_df(source, cohort_name)

    # merge dfs:
    # can actually just make one merged df with subject_id, researchsubject_id, primary_diagnosis_condition,
    # primary_diagnosis_site, primary_diagnosis, age_at_diagnosis, stage
    # loop through it to generate:
    #  - disease_factory
    #  - vital_status
    #  - variants

    sub_rsub_diag_df = subject_df.merge(rsub_df, on='subject_id', how='outer')
    sub_rsub_diag_df = sub_rsub_diag_df.merge(diagnosis_df, on='subject_id', how='outer')

    print("merged subject-researchsubject-diagnosis df")
    print(sub_rsub_diag_df.shape)

    # Now use the CdaFactory classes to transform the information from the DataFrames into
    # components of the GA4GH Phenopacket Schema
    # Add these components one at a time to Phenopacket objects.

    # treatment_factory = TODO

    print("\nConverting to Phenopackets...\n")

    '''
    required fields:
    - 'stage'                           # getting from GDC 
    - 'primary_diagnosis_condition'     # diagnosis mapper 
    - 'primary_diagnosis_site'          # uberon mapper
    - 'primary_diagnosis'               # diagnosis mapper
    - 'age_at_diagnosis'                # disease term mapper
    '''

    # Retrieve GA4GH Individual messages
    for _, row in tqdm(subject_df.iterrows(), total=len(subject_df), desc= "individual dataframe"):
        try:
            individual_message = self._individual_factory.to_ga4gh(row=row)
        except ValueError as e:
            # TODO: decide how to handle depending on your paranoia
            #
            pass

        individual_id = individual_message.id
        ppackt = PPkt.Phenopacket()
        ppackt.id = f'{cohort_name}-{individual_id}'
        ppackt.subject.CopyFrom(individual_message)
        ppackt_d[individual_id] = ppackt

    # get stage dictionary, map to subject ID
    print("Retrieving stage info from GDC...")
    stage_dict = self._gdc_service.fetch_stage_dict()
    # remove initial data source label: TCGA.TCGA-4J-AA1J > TCGA-4J-AA1J
    sub_rsub_diag_df['subject_id_short'] = sub_rsub_diag_df["subject_id"].str.extract(r'^[^\.]+\.(.+)', expand=False)
    sub_rsub_diag_df['stage'] = sub_rsub_diag_df['subject_id_short'].map(stage_dict).fillna(sub_rsub_diag_df['stage'])

    sub_rsub_diag_df['primary_diagnosis'] = sub_rsub_diag_df['primary_diagnosis'].fillna('') # remove nans (not sure why they are there)
    #sub_rsub_diag_df.to_csv('sub_rsub_diag_df.txt', sep='\t')


    # Retrieve GA4GH Disease messages 
    for _, row in tqdm(sub_rsub_diag_df.iterrows(), total=len(sub_rsub_diag_df.index), desc="creating disease messsages"):
        #print(list(row))
        #print("\n", row["subject_id"])

        # Retrieve GA4GH Disease messages
        disease_message = self._disease_factory.to_ga4gh(row)
        pp = ppackt_d.get(row["subject_id"]) 

        # Do not add the disease if it is already in the phenopacket.
        if not any(disease.term.id == disease_message.term.id for disease in pp.diseases):
            pp.diseases.append(disease_message)

        # need to check if we have the age_at_diagnosis in the phenopacket message
        for disease in pp.diseases:
            if not disease.HasField("onset") and disease.term.id == disease_message.term.id and disease_message.HasField("onset"):
                disease.onset.CopyFrom(disease_message.onset)

        # get vital status from GDC - probably not needed (should be same as subject_df obtained from CDA)
        # vital_status = self._gdc_service.fetch_vital_status(subj_id)
        # ppackt_d.get(individual_id).subject.vital_status.CopyFrom(vital_status)         

    # Get variant data 
    # takes ~45 minutes due to API calls to GDC
    sub_rsub_diag_GDC_df = sub_rsub_diag_df[sub_rsub_diag_df['subject_data_source'] == 'GDC']
    for _, row in tqdm(sub_rsub_diag_GDC_df.iterrows(), total=len(sub_rsub_diag_df.index), desc="getting variants from GDC"):

        individual_id = row["subject_id"]

        #for rsub_subj in row["subject_identifier"]:
        #if row["subject_data_source"] == "GDC":
        print("GDC variants...")
        # have to strip off the leading name before first period
        # e.g. TCGA.TCGA-05-4250 -> TCGA-05-4250
        subj_id = re.sub("^[^.]+\.", "", individual_id)
        #print(row["subject_id"], subj_id)

        # get variants
        variant_interpretations = self._gdc_service.fetch_variants(subj_id) # was rsub_subj['value']
        if len(variant_interpretations) == 0:
            #print("No variants found")
            continue
        #else:
            #print("length variant_interpretations: {}".format(len(variant_interpretations)))   

        # TODO: improve/enhance diagnosis term annotations
        diagnosis = PPkt.Diagnosis()
        diagnosis.disease.id = "NCIT:C3262"
        diagnosis.disease.label = "Neoplasm"

        for variant in variant_interpretations:
            genomic_interpretation = PPkt.GenomicInterpretation()
            genomic_interpretation.subject_or_biosample_id = individual_id
            genomic_interpretation.interpretation_status = PPkt.GenomicInterpretation.InterpretationStatus.UNKNOWN_STATUS
            genomic_interpretation.variant_interpretation.CopyFrom(variant)

            diagnosis.genomic_interpretations.append(genomic_interpretation)

        interpretation = PPkt.Interpretation()
        interpretation.id = f"{individual_id}-{row['researchsubject_id']}"
        interpretation.progress_status = PPkt.Interpretation.ProgressStatus.IN_PROGRESS 
        interpretation.diagnosis.CopyFrom(diagnosis)

        ppackt_d.get(individual_id).interpretations.append(interpretation)


    # Retrieve GA4GH Biospecimen messages
    for idx, row in tqdm(specimen_df.iterrows(),total=len(specimen_df.index), desc="specimen/biosample dataframe"):
        biosample_message = self._specimen_factory.to_ga4gh(row)
        individual_id = row["subject_id"]
        if individual_id not in ppackt_d:
            raise ValueError(f"Attempt to enter unknown individual ID from biosample factory: \"{individual_id}\"")

        # convert CDA days_to_collection to PPKt time_of_collection
        #         days_to_collection: number of days from index date to sample collection date
        #         time_of_collection: Age of subject at time sample was collected 
        if not pd.isna(row['days_to_collection']):
            days_to_coll_iso = CdaFactory.days_to_iso(int(row["days_to_collection"]))
        # this should work if both are pd.Timedelta:
        # TODO: fix the code below!
        # time_of_collection = ppackt_d[individual_id]["iso8601duration"] + days_to_coll_iso # should it be 'Age' or 'iso8601duration'?
        # biosample_message["time_of_collection"] = time_of_collection

        ppackt_d.get(individual_id).biosamples.append(biosample_message)

    # TODO Treatment
    # make_cda_medicalaction
    for idx, row in tqdm(treatment_df.iterrows(), total=len(treatment_df.index), desc="Treatment DF"):
        individual_id = row["subject_id"]
        medical_action_message = make_cda_medicalaction(row)
        if individual_id not in ppackt_d:
            raise ValueError(f"Attempt to enter unknown individual ID from treatment factory: \"{individual_id}\"")
        ppackt_d.get(individual_id).medical_actions.append(medical_action_message)

    # When we get here, we have constructed GA4GH Phenopackets with Individual, Disease, Biospecimen, MedicalAction, and GenomicInterpretations
    return list(ppackt_d.values())

get_specimen_df(q, cohort_name)

Retrieve the subject dataframe from CDA

This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

Returns:

Type Description
pd.DataFrame

pandas DataFrame that corresponds to the CDA subject table.

Source code in src/oncoexporter/cda/cda_table_importer.py
def get_specimen_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
    """Retrieve the subject dataframe from CDA

    This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

    :raises: raises an exception if the query object was not properly initialized
    :returns: pandas DataFrame that corresponds to the CDA subject table.
    :rtype: pd.DataFrame
    """
    print("\nGetting specimen df...")
    #specimen_callable = lambda: q.specimen.run(page_size=self._page_size).get_all().to_dataframe()
    specimen_callable = lambda: fetch_rows( table='specimen', **q, add_columns=['subject_id'] )
    specimen_df = self._get_cda_df(specimen_callable, f"{cohort_name}_specimen_df.pkl")
    #specimen_df.to_csv('specimen_df.txt', sep='\t')
    return specimen_df

get_subject_df(q, cohort_name)

Retrieve the subject dataframe from CDA

This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

Returns:

Type Description
pd.DataFrame

pandas DataFrame that corresponds to the CDA subject table.

Source code in src/oncoexporter/cda/cda_table_importer.py
def get_subject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
    """Retrieve the subject dataframe from CDA

    This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

    :raises: raises an exception if the query object was not properly initialized
    :returns: pandas DataFrame that corresponds to the CDA subject table.
    :rtype: pd.DataFrame
    """
    print("\nGetting subject df...")
    callable = lambda: fetch_rows( table='subject', **q, provenance=True )
    subject_df = self._get_cda_df(callable, f"{cohort_name}_individual_df.pkl")
    subject_df = subject_df.drop(columns=['subject_data_source_id'], axis=1)
    subject_df = subject_df.drop_duplicates()
    print("obtained subject_df")

    return subject_df