Skip to content

CdaTableImporter

Bases: CdaImporter[fetch_rows]

This class is the entry point for transforming CDA data into GA4GH Phenopackets. Client code only needs to initialize it with a CDA query, and it can return phenopackets with the :func:get_ga4gh_phenopackets. It also returns individual tables for that can be used for testing or visualizing data.

The CDA query determines the cohort that will be retrieved from CDA. This class then retrieves data for this cohort in form of pandas DataFrames and extracts data for phenopacket construction using the data in the tables

Parameters:

Name Type Description Default
disease_factory CdaDiseaseFactory

the component for mapping CDA table into Disease element of the Phenopacket Schema.

required
cache_dir Optional[str]

a str with path to the folder to store the cache files

None
use_cache bool

if True, cache/retrieve from cache

False
page_size

Number of pages to retrieve at once. Defaults to 10000 New CDA: https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#returning-a-matrix-of-results old: all of the functions previously used with, or chained onto Q()...run() have been replaced with the single function fetch_rows() new: `fetch_rows(table=, ) https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#parameters old: page_size, limit, and count parameters have been removed new: column_values always returns all unique values and their counts by default, however there are several new parameters old: system= new: data_source= can now take a list, as in data_source=["GDC", "PDC"] new: sort_by= sort results by any column new: force= For columns with an extremely large number of unique values, such as filename, the query will fail with a large data warning. You can override the warning with Force=True tables: ['diagnosis', 'file', 'researchsubject', 'somatic_mutation', 'specimen', 'subject', 'treatment']

required
Source code in src/oncoexporter/cda/cda_table_importer.py
class CdaTableImporter(CdaImporter[fetch_rows]):
    """This class is the entry point for transforming CDA data into GA4GH Phenopackets. Client code only needs
    to initialize it with a CDA query, and it can return phenopackets with the :func:`get_ga4gh_phenopackets`.
    It also returns individual tables for that can be used for testing or visualizing data.

    The CDA query determines the cohort that will be retrieved from CDA. This class then retrieves data
    for this cohort in form of pandas DataFrames and extracts data for phenopacket construction using the data
    in the tables

    :param disease_factory: the component for mapping CDA table into Disease element of the Phenopacket Schema.
    :param cache_dir: a `str` with path to the folder to store the cache files
    :param use_cache: if True, cache/retrieve from cache
    :param page_size: Number of pages to retrieve at once. Defaults to `10000`

    New CDA: 
    https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#returning-a-matrix-of-results
    old: all of the functions previously used with, or chained onto Q()...run() have been replaced with the single function fetch_rows()
    new: `fetch_rows(table=, )

    https://cda.readthedocs.io/en/latest/documentation/cdapython/code_update/#parameters
    old: page_size, limit, and count parameters have been removed
    new: column_values always returns all unique values and their counts by default, however there are several new parameters

    old: system=<data source>
    new: data_source=<data source> can now take a list, as in data_source=["GDC", "PDC"]

    new: sort_by=<column:asc/desc> sort results by any column

    new: force=<True/False> For columns with an extremely large number of unique values, such as filename, the query will fail with a large data warning. You can override the warning with Force=True

    tables: ['diagnosis', 'file', 'researchsubject', 'somatic_mutation', 'specimen', 'subject', 'treatment']
    """

    def __init__(self,
                 disease_factory: CdaDiseaseFactory,
                 use_cache: bool = False,
                 cache_dir: typing.Optional[str] = None,
                 #page_size: int = 10000,
                 gdc_timeout: int = 100000,
                 ):
        self._use_cache = use_cache
        #self._page_size = page_size # not in new CDA 

        self._individual_factory = CdaIndividualFactory()
        self._disease_factory = disease_factory
        self._specimen_factory = CdaBiosampleFactory()
        self._mutation_factory = CdaMutationFactory()
        self._gdc_mutation_service = GdcMutationService(timeout=gdc_timeout)

        if cache_dir is None:
            self._cache_dir = os.path.join(os.getcwd(), '.oncoexporter_cache')
            if not os.path.isdir(self._cache_dir):
                os.makedirs(self._cache_dir, exist_ok=True)
        else:
            if not os.path.isdir(cache_dir) or not os.access(cache_dir, os.W_OK):
                raise ValueError(f'`cache_dir` must be a writable directory: {cache_dir}')

    def _get_cda_df(self, callback_fxn, cache_name: str):
        fpath_cache = os.path.join(self._cache_dir, cache_name)
        if self._use_cache and os.path.isfile(fpath_cache):
            print(f"\tRetrieving dataframe {fpath_cache}")
            with open(fpath_cache, 'rb') as cachehandle:
                print(f"loading cached dataframe from {fpath_cache}")
                individual_df = pickle.load(cachehandle)
        else:
            print(f"\tcalling CDA function")
            individual_df = callback_fxn()
            if self._use_cache:
                print(f"Creating cached dataframe as {fpath_cache}")
                with open(fpath_cache, 'wb') as f:
                    pickle.dump(individual_df, f)
        return individual_df

    # not clear why this is _ but others are not...
    def _get_subject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        """Retrieve the subject dataframe from CDA

        This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

        :raises: raises an exception if the query object was not properly initialized
        :returns: pandas DataFrame that corresponds to the CDA subject table.
        :rtype: pd.DataFrame
        """
        print("\nGetting subject df...")
        #callable = lambda: q.subject.run(page_size=self._page_size).get_all().to_dataframe()
        callable = lambda: fetch_rows( table='subject', **q )
        subject_df = self._get_cda_df(callable, f"{cohort_name}_individual_df.pkl")
        print("obtained subject_df")
        return subject_df

    def get_merged_diagnosis_research_subject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        """
        Retrieve a merged dataframe from CDA corresponding to the diagnosis and research subject tables

        diagnosis table columns: diagnosis_id, age_at_diagnosis, grade, method_of_diagnosis, morphology, primary_diagnosis, stage

        researchsubject table columns: researchsubject_id, member_of_research_project, primary_diagnosis_condition, primary_diagnosis_site

        Need to combine in order to get primary_diagnosis_condition and primary_diagnosis_site with diagnosis table

        Original:
        diagnosis_callable = lambda: q.diagnosis.run(page_size=self._page_size).get_all().to_dataframe()
        diagnosis_df = self._get_cda_df(diagnosis_callable, f"{cohort_name}_diagnosis_df.pkl")
        print("obtained diagnosis_df")

        rsub_callable = lambda: q.researchsubject.run(page_size=self._page_size).get_all().to_dataframe()
        research_subject_df = self._get_cda_df(rsub_callable, f"{cohort_name}_research_subject_df.pkl")
        print("obtained research_subject_df")

        merged_df = pd.merge(diagnosis_df, research_subject_df, left_on='researchsubject_id', right_on='researchsubject_id',
                             suffixes=["_di", "_rs"])
        return merged_df

        """
        print("\nGetting merged diagnosis and researchsubject df...")
        diagnosis_callable = lambda: fetch_rows( table='diagnosis', **q , add_columns=['subject_id'])
        diagnosis_df = self._get_cda_df(diagnosis_callable, f"{cohort_name}_diagnosis_df.pkl")
        print("obtained diagnosis_df")
        #diagnosis_df.to_csv('diagnosis_df.txt', sep='\t')

        # tried link_to_table='diagnosis' but it doesn't add any columns
        rsub_callable = lambda: fetch_rows( table='researchsubject', **q , add_columns=['subject_id'])
        rsub_df = self._get_cda_df(rsub_callable, f"{cohort_name}_researchsubject_df.pkl") # call is repeated below in get_merged_subject_research_subject_df
        print("obtained researchsubject_df")
        #rsub_df.to_csv('rsub_diagnosis_df.txt', sep='\t')

        # merge tables
        diag_rsub_df = pd.merge(diagnosis_df, rsub_df, left_on='subject_id', right_on='subject_id') #, suffixes=["_di", "_rs"])
        print("obtained merged diagnosis researchsubject df")
        #diag_rsub_df.to_csv('diagnosis_rsub.txt', sep='\t')

        return diag_rsub_df

    def get_merged_subject_research_subject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        """
        Retrieve a merged dataframe from CDA corresponding to the subject and research subject tables
        Need both subject_id and researchsubject_id to include variants in phenopacket 

        subject table columns: subject_id, cause_of_death, days_to_birth, days_to_death, ethnicity, race
                               sex, species, vital_status

        researchsubject table columns: researchsubject_id, member_of_research_project, primary_diagnosis_condition, 
                                        primary_diagnosis_site

        M. Sierk 4-11-24: 
            This is how Henry Schaeffer from CBIIT recommended getting the data source:

            I consolidate the data_source_id columns (as opposed to just deleting it which works also) or else 
            the merge expands each subject into many more rows. For example, for subject provenance I used:

            sub = fetch_rows(table='subject', provenance=True)
            sub['subject_data_source_id_concat'] = sub.groupby(['subject_id','subject_data_source'])['subject_data_source_id'].transform(lambda x: ','.join(x))
            sub = sub.drop(columns=['subject_data_source_id'], axis=1)
            sub = sub.drop_duplicates()

        """
        print("\nGetting merged subject and researchsubject df...")
        #subject_callable = lambda: q.subject.run(page_size=self._page_size).get_all().to_dataframe()

        subject_callable = lambda: fetch_rows( table='subject', **q ) # link_to_table='researchsubject' doesn't add anything
        subject_df = self._get_cda_df(subject_callable, f"{cohort_name}_subject_df.pkl")

        rsub_callable = lambda: fetch_rows( table='researchsubject', **q , add_columns=['subject_id']) # repeat from above
        rsub_df = self._get_cda_df(rsub_callable, f"{cohort_name}_researchsubject_df.pkl")

        # merge tables
        subject_rsub_df = pd.merge(subject_df, rsub_df, left_on='subject_id', right_on='subject_id') #, suffixes=["_di", "_rs"])
        print("obtained merged subject and researchsubject df")
        #subject_rsub_df.to_csv('subject_rsub_df.txt', sep='\t')


        #rsub_callable = lambda: q.researchsubject.run(page_size=self._page_size).get_all().to_dataframe()
        #rsub_callable = lambda: fetch_rows( table='researchsubject', **q , link_to_table='subject' )
        #research_subject_df = self._get_cda_df(rsub_callable, f"{cohort_name}_research_subject_df.pkl")
        #print("obtained research_subject_df")
        #merged_df = pd.merge(subject_df, research_subject_df, left_on='subject_id', right_on='subject_id',
                                #suffixes=["_subj", "_rs"])
        #research_subject_df.to_csv('researchsubject_df.txt', sep='\t')

        return subject_rsub_df

    def get_specimen_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        """Retrieve the subject dataframe from CDA

        This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

        :raises: raises an exception if the query object was not properly initialized
        :returns: pandas DataFrame that corresponds to the CDA subject table.
        :rtype: pd.DataFrame
        """
        print("\nGetting specimen df...")
        #specimen_callable = lambda: q.specimen.run(page_size=self._page_size).get_all().to_dataframe()
        specimen_callable = lambda: fetch_rows( table='specimen', **q, add_columns=['subject_id'] )
        specimen_df = self._get_cda_df(specimen_callable, f"{cohort_name}_specimen_df.pkl")
        #specimen_df.to_csv('specimen_df.txt', sep='\t')
        return specimen_df

    def get_treatment_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
        print("\nGetting treatment df...")
        #treatment_callable = lambda: q.treatment.run(page_size=self._page_size).get_all().to_dataframe()
        treatment_callable = lambda: fetch_rows( table='treatment', **q, add_columns=['subject_id'] )
        treatment_df = self._get_cda_df(treatment_callable, f"{cohort_name}_treatment_df.pkl")
        #treatment_df.to_csv('treatment_df.txt', sep='\t')
        return treatment_df

    def get_ga4gh_phenopackets(self, source: dict, **kwargs) -> typing.List[PPkt.Phenopacket]:
        """Get a list of GA4GH phenopackets corresponding to the individuals returned by the query passed to the constructor.

        :returns: A list of GA4GH phenopackets corresponding to the individuals selected by the query passed to the constructor.
        :rtype: typing.List[PPkt.Phenopacket]

        New version of CDA: need to change Q to fetch_rows()
        """
        # no longer using Q objects
        # if not isinstance(source, Q):
        #    raise ValueError(f"query_obj argument must be Q.Q object, but instead was {type(source)}")

        if 'cohort_name' in kwargs:
            cohort_name = kwargs['cohort_name']
        else:
            # Format it as a string, for example: 'YYYY-MM-DD HH:MM:SS'
            ts = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
            cohort_name = f'cohort-{ts}'

        # Dictionary of phenopackets, keys are the phenopacket ids.
        ppackt_d = {}

        # First obtain the pandas DataFrames from the CDA tables with rows that correspond to the Query
        subject_df = self._get_subject_df(source, cohort_name)
        diag_rsub_df = self.get_merged_diagnosis_research_subject_df(source, cohort_name)
        specimen_df = self.get_specimen_df(source, cohort_name)
        treatment_df = self.get_treatment_df(source, cohort_name)
        subj_rsub_df = self.get_merged_subject_research_subject_df(source, cohort_name)

        # Now use the CdaFactory classes to transform the information from the DataFrames into
        # components of the GA4GH Phenopacket Schema
        # Add these components one at a time to Phenopacket objects.

        # treatment_factory = TODO

        print("\nConverting to Phenopackets...\n")

        # Retrieve GA4GH Individual messages
        for _, row in tqdm(subject_df.iterrows(),total=len(subject_df), desc= "individual dataframe"):
            try:
                individual_message = self._individual_factory.to_ga4gh(row=row)
            except ValueError as e:
                # TODO: decide how to handle depending on your paranoia
                #
                pass

            individual_id = individual_message.id
            ppackt = PPkt.Phenopacket()
            ppackt.id = f'{cohort_name}-{individual_id}'
            ppackt.subject.CopyFrom(individual_message)
            ppackt_d[individual_id] = ppackt

        # Retrieve GA4GH Disease messages
        for _, row in tqdm(diag_rsub_df.iterrows(), total= len(diag_rsub_df.index), desc="merged diagnosis and researchsubject dataframe"):
            disease_message = self._disease_factory.to_ga4gh(row)
            #print(row)
            pp = ppackt_d.get(row["subject_id"]) 

            # Do not add the disease if it is already in the phenopacket.
            if not any(disease.term.id == disease_message.term.id for disease in pp.diseases):
                pp.diseases.append(disease_message)

        # Retrieve GA4GH Biospecimen messages
        for idx, row in tqdm(specimen_df.iterrows(),total= len(specimen_df.index), desc="specimen/biosample dataframe"):
            biosample_message = self._specimen_factory.to_ga4gh(row)
            individual_id = row["subject_id"]
            if individual_id not in ppackt_d:
                raise ValueError(f"Attempt to enter unknown individual ID from biosample factory: \"{individual_id}\"")

            # convert CDA days_to_collection to PPKt time_of_collection
            #         days_to_collection: number of days from index date to sample collection date
            #         time_of_collection: Age of subject at time sample was collected 
            if not pd.isna(row['days_to_collection']):
                days_to_coll_iso = CdaFactory.days_to_iso(int(row["days_to_collection"]))
            # this should work if both are pd.Timedelta:
            # TODO: fix the code below!
            # time_of_collection = ppackt_d[individual_id]["iso8601duration"] + days_to_coll_iso # should it be 'Age' or 'iso8601duration'?
            # biosample_message["time_of_collection"] = time_of_collection

            ppackt_d.get(individual_id).biosamples.append(biosample_message)

        # Get variant data for each ResearchSubject in Subject
        # takes 45 minutes due to API calls to gdc
        for _, row in tqdm(subj_rsub_df.iterrows(), total=subj_rsub_df.shape[0], desc="subject researchsubject dataframe"):
            individual_id = row["subject_id"]
            # removing this loop for new CDA API.  Should work the same to go through df by row (MS 4-10-24)
            #  Note: as of 4-10-24, no way to access system/data source from result of fetch_rows (can specify in fetch_rows)
            #for rsub_subj in row["subject_identifier"]:
            #    if rsub_subj["system"] == "GDC":

            # have to strip off the leading name before first period
            # e.g. TCGA.TCGA-05-4250 -> TCGA-05-4250
            subj_id = re.sub("^[^.]+\.", "", row["subject_id"])
            #print(row["subject_id"], subj_id)

            variant_interpretations = self._gdc_mutation_service.fetch_variants(subj_id) # was rsub_subj['value']
            vital_status = self._gdc_mutation_service.fetch_vital_status(subj_id)
            if len(variant_interpretations) == 0:
                #print("No variants found")
                continue
            ppackt_d.get(individual_id).subject.vital_status.CopyFrom(vital_status)
            #else:
                #print("length variant_interpretations: {}".format(len(variant_interpretations)))

            # TODO: improve/enhance diagnosis term annotations
            diagnosis = PPkt.Diagnosis()
            diagnosis.disease.id = "NCIT:C3262"
            diagnosis.disease.label = "Neoplasm"

            for variant in variant_interpretations:
                genomic_interpretation = PPkt.GenomicInterpretation()
                genomic_interpretation.subject_or_biosample_id = row["subject_id"]
                genomic_interpretation.interpretation_status = PPkt.GenomicInterpretation.InterpretationStatus.UNKNOWN_STATUS
                genomic_interpretation.variant_interpretation.CopyFrom(variant)

                diagnosis.genomic_interpretations.append(genomic_interpretation)

            interpretation = PPkt.Interpretation()
            interpretation.id = f"{individual_id}-{row['researchsubject_id']}"
            interpretation.progress_status = PPkt.Interpretation.ProgressStatus.IN_PROGRESS 
            interpretation.diagnosis.CopyFrom(diagnosis)

            ppackt_d.get(individual_id).interpretations.append(interpretation)

        # TODO Treatment
        # make_cda_medicalaction
        for idx, row in tqdm(treatment_df.iterrows(), total=len(treatment_df.index), desc="Treatment DF"):
            individual_id = row["subject_id"]
            medical_action_message = make_cda_medicalaction(row)
            if individual_id not in ppackt_d:
                raise ValueError(f"Attempt to enter unknown individual ID from treatment factory: \"{individual_id}\"")
            ppackt_d.get(individual_id).medical_actions.append(medical_action_message)

        # When we get here, we have constructed GA4GH Phenopackets with Individual, Disease, Biospecimen, MedicalAction, and GenomicInterpretations
        return list(ppackt_d.values())

get_ga4gh_phenopackets(source, **kwargs)

Get a list of GA4GH phenopackets corresponding to the individuals returned by the query passed to the constructor.

Returns:

Type Description
typing.List[PPkt.Phenopacket] New version of CDA: need to change Q to fetch_rows()

A list of GA4GH phenopackets corresponding to the individuals selected by the query passed to the constructor.

Source code in src/oncoexporter/cda/cda_table_importer.py
def get_ga4gh_phenopackets(self, source: dict, **kwargs) -> typing.List[PPkt.Phenopacket]:
    """Get a list of GA4GH phenopackets corresponding to the individuals returned by the query passed to the constructor.

    :returns: A list of GA4GH phenopackets corresponding to the individuals selected by the query passed to the constructor.
    :rtype: typing.List[PPkt.Phenopacket]

    New version of CDA: need to change Q to fetch_rows()
    """
    # no longer using Q objects
    # if not isinstance(source, Q):
    #    raise ValueError(f"query_obj argument must be Q.Q object, but instead was {type(source)}")

    if 'cohort_name' in kwargs:
        cohort_name = kwargs['cohort_name']
    else:
        # Format it as a string, for example: 'YYYY-MM-DD HH:MM:SS'
        ts = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
        cohort_name = f'cohort-{ts}'

    # Dictionary of phenopackets, keys are the phenopacket ids.
    ppackt_d = {}

    # First obtain the pandas DataFrames from the CDA tables with rows that correspond to the Query
    subject_df = self._get_subject_df(source, cohort_name)
    diag_rsub_df = self.get_merged_diagnosis_research_subject_df(source, cohort_name)
    specimen_df = self.get_specimen_df(source, cohort_name)
    treatment_df = self.get_treatment_df(source, cohort_name)
    subj_rsub_df = self.get_merged_subject_research_subject_df(source, cohort_name)

    # Now use the CdaFactory classes to transform the information from the DataFrames into
    # components of the GA4GH Phenopacket Schema
    # Add these components one at a time to Phenopacket objects.

    # treatment_factory = TODO

    print("\nConverting to Phenopackets...\n")

    # Retrieve GA4GH Individual messages
    for _, row in tqdm(subject_df.iterrows(),total=len(subject_df), desc= "individual dataframe"):
        try:
            individual_message = self._individual_factory.to_ga4gh(row=row)
        except ValueError as e:
            # TODO: decide how to handle depending on your paranoia
            #
            pass

        individual_id = individual_message.id
        ppackt = PPkt.Phenopacket()
        ppackt.id = f'{cohort_name}-{individual_id}'
        ppackt.subject.CopyFrom(individual_message)
        ppackt_d[individual_id] = ppackt

    # Retrieve GA4GH Disease messages
    for _, row in tqdm(diag_rsub_df.iterrows(), total= len(diag_rsub_df.index), desc="merged diagnosis and researchsubject dataframe"):
        disease_message = self._disease_factory.to_ga4gh(row)
        #print(row)
        pp = ppackt_d.get(row["subject_id"]) 

        # Do not add the disease if it is already in the phenopacket.
        if not any(disease.term.id == disease_message.term.id for disease in pp.diseases):
            pp.diseases.append(disease_message)

    # Retrieve GA4GH Biospecimen messages
    for idx, row in tqdm(specimen_df.iterrows(),total= len(specimen_df.index), desc="specimen/biosample dataframe"):
        biosample_message = self._specimen_factory.to_ga4gh(row)
        individual_id = row["subject_id"]
        if individual_id not in ppackt_d:
            raise ValueError(f"Attempt to enter unknown individual ID from biosample factory: \"{individual_id}\"")

        # convert CDA days_to_collection to PPKt time_of_collection
        #         days_to_collection: number of days from index date to sample collection date
        #         time_of_collection: Age of subject at time sample was collected 
        if not pd.isna(row['days_to_collection']):
            days_to_coll_iso = CdaFactory.days_to_iso(int(row["days_to_collection"]))
        # this should work if both are pd.Timedelta:
        # TODO: fix the code below!
        # time_of_collection = ppackt_d[individual_id]["iso8601duration"] + days_to_coll_iso # should it be 'Age' or 'iso8601duration'?
        # biosample_message["time_of_collection"] = time_of_collection

        ppackt_d.get(individual_id).biosamples.append(biosample_message)

    # Get variant data for each ResearchSubject in Subject
    # takes 45 minutes due to API calls to gdc
    for _, row in tqdm(subj_rsub_df.iterrows(), total=subj_rsub_df.shape[0], desc="subject researchsubject dataframe"):
        individual_id = row["subject_id"]
        # removing this loop for new CDA API.  Should work the same to go through df by row (MS 4-10-24)
        #  Note: as of 4-10-24, no way to access system/data source from result of fetch_rows (can specify in fetch_rows)
        #for rsub_subj in row["subject_identifier"]:
        #    if rsub_subj["system"] == "GDC":

        # have to strip off the leading name before first period
        # e.g. TCGA.TCGA-05-4250 -> TCGA-05-4250
        subj_id = re.sub("^[^.]+\.", "", row["subject_id"])
        #print(row["subject_id"], subj_id)

        variant_interpretations = self._gdc_mutation_service.fetch_variants(subj_id) # was rsub_subj['value']
        vital_status = self._gdc_mutation_service.fetch_vital_status(subj_id)
        if len(variant_interpretations) == 0:
            #print("No variants found")
            continue
        ppackt_d.get(individual_id).subject.vital_status.CopyFrom(vital_status)
        #else:
            #print("length variant_interpretations: {}".format(len(variant_interpretations)))

        # TODO: improve/enhance diagnosis term annotations
        diagnosis = PPkt.Diagnosis()
        diagnosis.disease.id = "NCIT:C3262"
        diagnosis.disease.label = "Neoplasm"

        for variant in variant_interpretations:
            genomic_interpretation = PPkt.GenomicInterpretation()
            genomic_interpretation.subject_or_biosample_id = row["subject_id"]
            genomic_interpretation.interpretation_status = PPkt.GenomicInterpretation.InterpretationStatus.UNKNOWN_STATUS
            genomic_interpretation.variant_interpretation.CopyFrom(variant)

            diagnosis.genomic_interpretations.append(genomic_interpretation)

        interpretation = PPkt.Interpretation()
        interpretation.id = f"{individual_id}-{row['researchsubject_id']}"
        interpretation.progress_status = PPkt.Interpretation.ProgressStatus.IN_PROGRESS 
        interpretation.diagnosis.CopyFrom(diagnosis)

        ppackt_d.get(individual_id).interpretations.append(interpretation)

    # TODO Treatment
    # make_cda_medicalaction
    for idx, row in tqdm(treatment_df.iterrows(), total=len(treatment_df.index), desc="Treatment DF"):
        individual_id = row["subject_id"]
        medical_action_message = make_cda_medicalaction(row)
        if individual_id not in ppackt_d:
            raise ValueError(f"Attempt to enter unknown individual ID from treatment factory: \"{individual_id}\"")
        ppackt_d.get(individual_id).medical_actions.append(medical_action_message)

    # When we get here, we have constructed GA4GH Phenopackets with Individual, Disease, Biospecimen, MedicalAction, and GenomicInterpretations
    return list(ppackt_d.values())

get_merged_diagnosis_research_subject_df(q, cohort_name)

Retrieve a merged dataframe from CDA corresponding to the diagnosis and research subject tables

diagnosis table columns: diagnosis_id, age_at_diagnosis, grade, method_of_diagnosis, morphology, primary_diagnosis, stage

researchsubject table columns: researchsubject_id, member_of_research_project, primary_diagnosis_condition, primary_diagnosis_site

Need to combine in order to get primary_diagnosis_condition and primary_diagnosis_site with diagnosis table

Original: diagnosis_callable = lambda: q.diagnosis.run(page_size=self._page_size).get_all().to_dataframe() diagnosis_df = self._get_cda_df(diagnosis_callable, f"{cohort_name}_diagnosis_df.pkl") print("obtained diagnosis_df")

rsub_callable = lambda: q.researchsubject.run(page_size=self._page_size).get_all().to_dataframe() research_subject_df = self._get_cda_df(rsub_callable, f"{cohort_name}_research_subject_df.pkl") print("obtained research_subject_df")

merged_df = pd.merge(diagnosis_df, research_subject_df, left_on='researchsubject_id', right_on='researchsubject_id', suffixes=["_di", "_rs"]) return merged_df

Source code in src/oncoexporter/cda/cda_table_importer.py
def get_merged_diagnosis_research_subject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
    """
    Retrieve a merged dataframe from CDA corresponding to the diagnosis and research subject tables

    diagnosis table columns: diagnosis_id, age_at_diagnosis, grade, method_of_diagnosis, morphology, primary_diagnosis, stage

    researchsubject table columns: researchsubject_id, member_of_research_project, primary_diagnosis_condition, primary_diagnosis_site

    Need to combine in order to get primary_diagnosis_condition and primary_diagnosis_site with diagnosis table

    Original:
    diagnosis_callable = lambda: q.diagnosis.run(page_size=self._page_size).get_all().to_dataframe()
    diagnosis_df = self._get_cda_df(diagnosis_callable, f"{cohort_name}_diagnosis_df.pkl")
    print("obtained diagnosis_df")

    rsub_callable = lambda: q.researchsubject.run(page_size=self._page_size).get_all().to_dataframe()
    research_subject_df = self._get_cda_df(rsub_callable, f"{cohort_name}_research_subject_df.pkl")
    print("obtained research_subject_df")

    merged_df = pd.merge(diagnosis_df, research_subject_df, left_on='researchsubject_id', right_on='researchsubject_id',
                         suffixes=["_di", "_rs"])
    return merged_df

    """
    print("\nGetting merged diagnosis and researchsubject df...")
    diagnosis_callable = lambda: fetch_rows( table='diagnosis', **q , add_columns=['subject_id'])
    diagnosis_df = self._get_cda_df(diagnosis_callable, f"{cohort_name}_diagnosis_df.pkl")
    print("obtained diagnosis_df")
    #diagnosis_df.to_csv('diagnosis_df.txt', sep='\t')

    # tried link_to_table='diagnosis' but it doesn't add any columns
    rsub_callable = lambda: fetch_rows( table='researchsubject', **q , add_columns=['subject_id'])
    rsub_df = self._get_cda_df(rsub_callable, f"{cohort_name}_researchsubject_df.pkl") # call is repeated below in get_merged_subject_research_subject_df
    print("obtained researchsubject_df")
    #rsub_df.to_csv('rsub_diagnosis_df.txt', sep='\t')

    # merge tables
    diag_rsub_df = pd.merge(diagnosis_df, rsub_df, left_on='subject_id', right_on='subject_id') #, suffixes=["_di", "_rs"])
    print("obtained merged diagnosis researchsubject df")
    #diag_rsub_df.to_csv('diagnosis_rsub.txt', sep='\t')

    return diag_rsub_df

get_merged_subject_research_subject_df(q, cohort_name)

Retrieve a merged dataframe from CDA corresponding to the subject and research subject tables Need both subject_id and researchsubject_id to include variants in phenopacket

subject table columns: subject_id, cause_of_death, days_to_birth, days_to_death, ethnicity, race sex, species, vital_status

researchsubject table columns: researchsubject_id, member_of_research_project, primary_diagnosis_condition, primary_diagnosis_site

M. Sierk 4-11-24: This is how Henry Schaeffer from CBIIT recommended getting the data source:

I consolidate the data_source_id columns (as opposed to just deleting it which works also) or else 
the merge expands each subject into many more rows. For example, for subject provenance I used:

sub = fetch_rows(table='subject', provenance=True)
sub['subject_data_source_id_concat'] = sub.groupby(['subject_id','subject_data_source'])['subject_data_source_id'].transform(lambda x: ','.join(x))
sub = sub.drop(columns=['subject_data_source_id'], axis=1)
sub = sub.drop_duplicates()
Source code in src/oncoexporter/cda/cda_table_importer.py
def get_merged_subject_research_subject_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
    """
    Retrieve a merged dataframe from CDA corresponding to the subject and research subject tables
    Need both subject_id and researchsubject_id to include variants in phenopacket 

    subject table columns: subject_id, cause_of_death, days_to_birth, days_to_death, ethnicity, race
                           sex, species, vital_status

    researchsubject table columns: researchsubject_id, member_of_research_project, primary_diagnosis_condition, 
                                    primary_diagnosis_site

    M. Sierk 4-11-24: 
        This is how Henry Schaeffer from CBIIT recommended getting the data source:

        I consolidate the data_source_id columns (as opposed to just deleting it which works also) or else 
        the merge expands each subject into many more rows. For example, for subject provenance I used:

        sub = fetch_rows(table='subject', provenance=True)
        sub['subject_data_source_id_concat'] = sub.groupby(['subject_id','subject_data_source'])['subject_data_source_id'].transform(lambda x: ','.join(x))
        sub = sub.drop(columns=['subject_data_source_id'], axis=1)
        sub = sub.drop_duplicates()

    """
    print("\nGetting merged subject and researchsubject df...")
    #subject_callable = lambda: q.subject.run(page_size=self._page_size).get_all().to_dataframe()

    subject_callable = lambda: fetch_rows( table='subject', **q ) # link_to_table='researchsubject' doesn't add anything
    subject_df = self._get_cda_df(subject_callable, f"{cohort_name}_subject_df.pkl")

    rsub_callable = lambda: fetch_rows( table='researchsubject', **q , add_columns=['subject_id']) # repeat from above
    rsub_df = self._get_cda_df(rsub_callable, f"{cohort_name}_researchsubject_df.pkl")

    # merge tables
    subject_rsub_df = pd.merge(subject_df, rsub_df, left_on='subject_id', right_on='subject_id') #, suffixes=["_di", "_rs"])
    print("obtained merged subject and researchsubject df")
    #subject_rsub_df.to_csv('subject_rsub_df.txt', sep='\t')


    #rsub_callable = lambda: q.researchsubject.run(page_size=self._page_size).get_all().to_dataframe()
    #rsub_callable = lambda: fetch_rows( table='researchsubject', **q , link_to_table='subject' )
    #research_subject_df = self._get_cda_df(rsub_callable, f"{cohort_name}_research_subject_df.pkl")
    #print("obtained research_subject_df")
    #merged_df = pd.merge(subject_df, research_subject_df, left_on='subject_id', right_on='subject_id',
                            #suffixes=["_subj", "_rs"])
    #research_subject_df.to_csv('researchsubject_df.txt', sep='\t')

    return subject_rsub_df

get_specimen_df(q, cohort_name)

Retrieve the subject dataframe from CDA

This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

Returns:

Type Description
pd.DataFrame

pandas DataFrame that corresponds to the CDA subject table.

Source code in src/oncoexporter/cda/cda_table_importer.py
def get_specimen_df(self, q: dict, cohort_name: str) -> pd.DataFrame:
    """Retrieve the subject dataframe from CDA

    This method uses the Query that was passed to the constructor to retrieve data from the CDA subject table

    :raises: raises an exception if the query object was not properly initialized
    :returns: pandas DataFrame that corresponds to the CDA subject table.
    :rtype: pd.DataFrame
    """
    print("\nGetting specimen df...")
    #specimen_callable = lambda: q.specimen.run(page_size=self._page_size).get_all().to_dataframe()
    specimen_callable = lambda: fetch_rows( table='specimen', **q, add_columns=['subject_id'] )
    specimen_df = self._get_cda_df(specimen_callable, f"{cohort_name}_specimen_df.pkl")
    #specimen_df.to_csv('specimen_df.txt', sep='\t')
    return specimen_df