diff --git a/node_normalizer/model/input.py b/node_normalizer/model/input.py index 78591b8..ad42683 100644 --- a/node_normalizer/model/input.py +++ b/node_normalizer/model/input.py @@ -41,6 +41,11 @@ class CurieList(BaseModel): title="Whether to return taxa for equivalent identifiers" ) + include_clique_leaders: bool = Field( + default=False, + title="Whether to return clique leaders for conflated identifiers" + ) + class Config: schema_extra = { "example": { diff --git a/node_normalizer/normalizer.py b/node_normalizer/normalizer.py index 5f92e2f..0c02cdf 100644 --- a/node_normalizer/normalizer.py +++ b/node_normalizer/normalizer.py @@ -500,6 +500,25 @@ async def get_info_content( async def get_eqids_and_types( app: FastAPI, canonical_nonan: List) -> Tuple[List, List]: + """ + Retrieve equivalent IDs and their corresponding types, along with ancestor types, by querying databases. + + This function processes a given list of canonical identifiers in batches and fetches their equivalent + IDs (`eqids`) from a database. For each identifier, it also retrieves its type information and computes + the ancestor types using a function. If no type information is found for a given identifier, a default + type is assigned, and an error is logged. The resulting data structures consolidate this information + to provide detailed insights into IDs and their associated types. + + :param app: An instance of the FastAPI application containing the database connections. + :type app: FastAPI + :param canonical_nonan: A list of canonical identifiers for which `eqids` and types need to be fetched. + :type canonical_nonan: List + :return: A tuple containing two lists: + 1. A list of equivalent IDs (`eqids`) for each input identifier. + 2. A list of lists containing ancestor types for each input identifier, starting with the most specific type. + :rtype: Tuple[List, List] + """ + if len(canonical_nonan) == 0: return [], [] batch_size = int(os.environ.get("EQ_BATCH_SIZE", 2500)) @@ -511,7 +530,7 @@ async def get_eqids_and_types( types_with_ancestors = [] for index, typ in enumerate(types): if not typ: - logging.error(f"No type information found for '{canonical_nonan[index]}' with eqids: {eqids[index]}, " + logger.error(f"No type information found for '{canonical_nonan[index]}' with eqids: {eqids[index]}, " f"replacing with {BIOLINK_NAMED_THING}") types_with_ancestors.append([BIOLINK_NAMED_THING]) else: @@ -532,6 +551,7 @@ async def get_normalized_nodes( include_descriptions: bool = False, include_individual_types: bool = True, include_taxa: bool = True, + include_clique_leaders: bool = False, ) -> Dict[str, Optional[str]]: """ Get value(s) for key(s) using redis MGET @@ -555,6 +575,8 @@ async def get_normalized_nodes( canonical_ids = await app.state.eq_id_to_id_db.mget(*upper_curies, encoding='utf-8') canonical_nonan = [canonical_id for canonical_id in canonical_ids if canonical_id is not None] info_contents = {} + clique_leaders_gene_protein = {} + clique_leaders_drug_chemical = {} # did we get some canonical ids if canonical_nonan: @@ -562,26 +584,34 @@ async def get_normalized_nodes( info_contents = await get_info_content(app, canonical_nonan) # Get the equivalent_ids and types - eqids, types = await get_eqids_and_types(app, canonical_nonan) + eqids, types_with_ancestors = await get_eqids_and_types(app, canonical_nonan) # are we looking for conflated values if conflate_gene_protein or conflate_chemical_drug: other_ids = [] if conflate_gene_protein: - other_ids.extend(await app.state.gene_protein_db.mget(*canonical_nonan, encoding='utf8')) + gene_protein_clique_leaders_strings = await app.state.gene_protein_db.mget(*canonical_nonan, encoding='utf8') + gene_protein_clique_leaders = [json.loads(oids) if oids else [] for oids in gene_protein_clique_leaders_strings] + other_ids.extend(gene_protein_clique_leaders) + if include_clique_leaders: + clique_leaders_gene_protein.update(zip(canonical_nonan, gene_protein_clique_leaders)) # logger.error(f"After conflate_gene_protein: {other_ids}") if conflate_chemical_drug: - other_ids.extend(await app.state.chemical_drug_db.mget(*canonical_nonan, encoding='utf8')) + drug_chemical_clique_leaders_strings = await app.state.chemical_drug_db.mget(*canonical_nonan, encoding='utf8') + drug_chemical_clique_leaders = [json.loads(oids) if oids else [] for oids in drug_chemical_clique_leaders_strings] + other_ids.extend(drug_chemical_clique_leaders) + if include_clique_leaders: + clique_leaders_drug_chemical.update(zip(canonical_nonan, drug_chemical_clique_leaders)) # logger.error(f"After conflate_chemical_drug: {other_ids}") # if there are other ids, then we want to rebuild eqids and types. That's because even though we have them, # they're not necessarily first. For instance if what came in and got canonicalized was a protein id # and we want gene first, then we're relying on the order of the other_ids to put it back in the right place. - other_ids = [json.loads(oids) if oids else [] for oids in other_ids] + # other_ids = [json.loads(oids) if oids else [] for oids in other_ids] # Until we added conflate_chemical_drug, canonical_nonan and other_ids would always have the same # length, so we could figure out mappings from one to the other just by doing: @@ -598,7 +628,9 @@ async def get_normalized_nodes( dereference_others[canon].extend(oids) all_other_ids = sum(other_ids, []) - eqids2, types2 = await get_eqids_and_types(app, all_other_ids) + # We don't care about direct types for conflated identifiers here -- if you want it, you can get it + # in clique_leaders_gene_protein. + eqids2, types_with_ancestors2 = await get_eqids_and_types(app, all_other_ids) # logger.error(f"other_ids = {other_ids}") # logger.error(f"dereference_others = {dereference_others}") @@ -608,9 +640,9 @@ async def get_normalized_nodes( final_types = [] deref_others_eqs = dict(zip(all_other_ids, eqids2)) - deref_others_typ = dict(zip(all_other_ids, types2)) + deref_others_typ = dict(zip(all_other_ids, types_with_ancestors2)) - zipped = zip(canonical_nonan, eqids, types) + zipped = zip(canonical_nonan, eqids, types_with_ancestors) for canonical_id, e, t in zipped: # here's where we replace the eqids, types @@ -619,7 +651,7 @@ async def get_normalized_nodes( t = [] for other in dereference_others[canonical_id]: - # logging.debug(f"e = {e}, other = {other}, deref_others_eqs = {deref_others_eqs}") + # logger.debug(f"e = {e}, other = {other}, deref_others_eqs = {deref_others_eqs}") e += deref_others_eqs[other] t += deref_others_typ[other] @@ -630,7 +662,7 @@ async def get_normalized_nodes( dereference_types = dict(zip(canonical_nonan, final_types)) else: dereference_ids = dict(zip(canonical_nonan, eqids)) - dereference_types = dict(zip(canonical_nonan, types)) + dereference_types = dict(zip(canonical_nonan, types_with_ancestors)) else: dereference_ids = dict() dereference_types = dict() @@ -638,20 +670,24 @@ async def get_normalized_nodes( # output the final result normal_nodes = { input_curie: await create_node(app, canonical_id, dereference_ids, dereference_types, info_contents, - include_descriptions=include_descriptions, - include_individual_types=include_individual_types, - include_taxa=include_taxa, - conflations={ - 'GeneProtein': conflate_gene_protein, - 'DrugChemical': conflate_chemical_drug, - }) + clique_leaders = { + 'GeneProtein': clique_leaders_gene_protein, + 'DrugChemical': clique_leaders_drug_chemical, + }, + include_descriptions=include_descriptions, + include_individual_types=include_individual_types, + include_taxa=include_taxa, + conflations={ + 'GeneProtein': conflate_gene_protein, + 'DrugChemical': conflate_chemical_drug, + }) for input_curie, canonical_id in zip(curies, canonical_ids) } end_time = time.time_ns() logger.info(f"Normalized {len(curies)} nodes in {(end_time - start_time)/1_000_000:.2f} ms with arguments " + f"(curies={curies}, conflate_gene_protein={conflate_gene_protein}, conflate_chemical_drug={conflate_chemical_drug}, " + - f"include_descriptions={include_descriptions}, include_individual_types={include_individual_types})") + f"include_descriptions={include_descriptions}, include_individual_types={include_individual_types}, include_clique_leaders={include_clique_leaders})") return normal_nodes @@ -680,7 +716,7 @@ async def get_info_content_attribute(app, canonical_nonan) -> dict: return new_attrib -async def create_node(app, canonical_id, equivalent_ids, types, info_contents, include_descriptions=True, +async def create_node(app, canonical_id, equivalent_ids, types_with_ancestors, info_contents, clique_leaders, include_descriptions=True, include_individual_types=False, include_taxa=False, conflations=None): """Construct the output format given the compressed redis data""" # It's possible that we didn't find a canonical_id @@ -693,16 +729,16 @@ async def create_node(app, canonical_id, equivalent_ids, types, info_contents, i # If we have 'None' in the equivalent IDs, skip it so we don't confuse things further down the line. if None in equivalent_ids[canonical_id]: - logging.warning(f"Skipping None in canonical ID {canonical_id} among eqids: {equivalent_ids}") + logger.warning(f"Skipping None in canonical ID {canonical_id} among eqids: {equivalent_ids}") equivalent_ids[canonical_id] = [x for x in equivalent_ids[canonical_id] if x is not None] if not equivalent_ids[canonical_id]: - logging.warning(f"No non-None values found for ID {canonical_id} among filtered eqids: {equivalent_ids}") + logger.warning(f"No non-None values found for ID {canonical_id} among filtered eqids: {equivalent_ids}") return None # If we have 'None' in the canonical types, something went horribly wrong (specifically: we couldn't # find the type information for all the eqids for this clique). Return None. - if None in types[canonical_id]: - logging.error(f"No types found for canonical ID {canonical_id} among types: {types}") + if None in types_with_ancestors[canonical_id]: + logger.error(f"No types found for canonical ID {canonical_id} among types: {types_with_ancestors}") return None # OK, now we should have id's in the format [ {"i": "MONDO:12312", "l": "Scrofula"}, {},...] @@ -721,8 +757,7 @@ async def create_node(app, canonical_id, equivalent_ids, types, info_contents, i identifiers_with_labels = eids else: # We have a conflation going on! To replicate Babel's behavior, we need to run the algorithem - # on the list of labels corresponding to the first - # So we need to run the algorithm on the first set of identifiers that have any + # on the list of labels corresponding to the first set of identifiers that have any # label whatsoever. identifiers_with_labels = [] curies_already_checked = set() @@ -752,7 +787,7 @@ async def create_node(app, canonical_id, equivalent_ids, types, info_contents, i # need to reverse it in order to apply preferred_name_boost_prefixes for the most # specific type. possible_labels = [] - for typ in types[canonical_id][::-1]: + for typ in types_with_ancestors[canonical_id][::-1]: if typ in config['preferred_name_boost_prefixes']: # This is the most specific matching type, so we use this and then break. possible_labels = list(map(lambda ident: ident.get('l', ''), @@ -800,12 +835,14 @@ async def create_node(app, canonical_id, equivalent_ids, types, info_contents, i # now need to reformat the identifier keys. It could be cleaner but we have to worry about if there is a label descriptions = [] + clique_leaders_output = [] node_taxa = set() node["equivalent_identifiers"] = [] for eqid in eids: eq_item = {"identifier": eqid["i"]} if "l" in eqid and eqid["l"]: eq_item["label"] = eqid["l"] + # if descriptions is enabled, add it to descriptions. if include_descriptions and "d" in eqid and len(eqid["d"]) > 0: desc = eqid["d"][0] @@ -821,6 +858,28 @@ async def create_node(app, canonical_id, equivalent_ids, types, info_contents, i eq_item["type"] = eqid['types'][-1] node["equivalent_identifiers"].append(eq_item) + # print(f"Checking if {canonical_id} is in clique_leaders: {builtin_json.dumps(clique_leaders, indent=2)}") + if clique_leaders: + for conflation_type in clique_leaders: + if canonical_id in clique_leaders[conflation_type] and eqid["i"] in clique_leaders[conflation_type][canonical_id]: + clique_leader_output = { + "identifier": eqid["i"], + "conflation": conflation_type, + } + if "label" in eq_item: + clique_leader_output["label"] = eq_item["label"] + + # For description, taxa and type, we could read them from eq_item, but that + # is only set if the appropriate flag was turned on. For completeness, let's + # try picking them up if they've been passed to us at all. + if "d" in eqid and len(eqid["d"]) > 0: + clique_leader_output["description"] = eqid["d"] + if "t" in eqid and eqid["t"]: + clique_leader_output["taxa"] = eqid["t"] + if 'types' in eqid: + clique_leader_output["type"] = eqid['types'][-1] + clique_leaders_output.append(clique_leader_output) + if include_descriptions and descriptions: node["descriptions"] = descriptions node["id"]["description"] = descriptions[0] @@ -828,12 +887,16 @@ async def create_node(app, canonical_id, equivalent_ids, types, info_contents, i if include_taxa and node_taxa: node["taxa"] = sorted(node_taxa, key=get_numerical_curie_suffix) + # Add clique leaders if available. + if clique_leaders and clique_leaders_output: + node["clique_leaders"] = clique_leaders_output + # We need to remove `biolink:Entity` from the types returned. # (See explanation at https://github.com/NCATSTranslator/NodeNormalization/issues/173) - if 'biolink:Entity' in types[canonical_id]: - types[canonical_id].remove('biolink:Entity') + if 'biolink:Entity' in types_with_ancestors[canonical_id]: + types_with_ancestors[canonical_id].remove('biolink:Entity') - node['type'] = types[canonical_id] + node['type'] = types_with_ancestors[canonical_id] # add the info content to the node if we got one if info_contents[canonical_id] is not None: diff --git a/node_normalizer/server.py b/node_normalizer/server.py index 4f3bcc1..91788c0 100644 --- a/node_normalizer/server.py +++ b/node_normalizer/server.py @@ -282,6 +282,7 @@ async def get_normalized_node_handler( description: bool = fastapi.Query(False, description="Whether to return curie descriptions when possible"), individual_types: bool = fastapi.Query(False, description="Whether to return individual types for equivalent identifiers"), include_taxa: bool = fastapi.Query(True, description="Whether to return taxa for equivalent identifiers"), + include_clique_leaders: bool = fastapi.Query(False, description="Whether to return clique leaders for conflated identifiers"), ): """ Get value(s) for key(s) using redis MGET @@ -291,6 +292,7 @@ async def get_normalized_node_handler( include_descriptions=description, include_individual_types=individual_types, include_taxa=include_taxa, + include_clique_leaders=include_clique_leaders, ) # If curie contains at least one entry, then the only way normalized_nodes could be blank @@ -312,7 +314,7 @@ async def get_normalized_node_handler_post(curies: CurieList): """ normalized_nodes = await get_normalized_nodes(app, curies.curies, curies.conflate, curies.drug_chemical_conflate, curies.description, include_individual_types=curies.individual_types, - include_taxa=curies.include_taxa, + include_taxa=curies.include_taxa, include_clique_leaders=curies.include_clique_leaders, ) # If curies.curies contains at least one entry, then the only way normalized_nodes could be blank diff --git a/node_normalizer/set_id.py b/node_normalizer/set_id.py index dcd5013..c3f3515 100644 --- a/node_normalizer/set_id.py +++ b/node_normalizer/set_id.py @@ -41,7 +41,7 @@ async def generate_setid(app, curies, conflations) -> SetIDResponse: # We use get_normalized_nodes() to normalize all the CURIEs for us. normalization_results = await get_normalized_nodes( - app, curies, gene_protein_conflation, drug_chemical_conflation, include_descriptions=False, include_individual_types=False, include_taxa=False + app, curies, gene_protein_conflation, drug_chemical_conflation, include_descriptions=False, include_individual_types=False, include_taxa=False, include_clique_leaders=False ) # We prepare a set of sorted, deduplicated curies.