diff --git a/application/cmd/cre_main.py b/application/cmd/cre_main.py index b3d40f7b..819675c6 100644 --- a/application/cmd/cre_main.py +++ b/application/cmd/cre_main.py @@ -68,23 +68,23 @@ def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node: db_link = collection.add_node(link.document) if cres: for cre in cres: - collection.add_link(cre=cre, node=linked_node, type=link.ltype) + collection.add_link(cre=cre, node=linked_node, ltype=link.ltype) for unlinked_standard in cre_less_nodes: # if anything in this collection.add_link( cre=cre, node=db.dbNodeFromNode(unlinked_standard), - type=link.ltype, + ltype=link.ltype, ) else: cres = collection.find_cres_of_node(linked_node) if cres: for cre in cres: - collection.add_link(cre=cre, node=db_link, type=link.ltype) + collection.add_link(cre=cre, node=db_link, ltype=link.ltype) for unlinked_node in cre_less_nodes: collection.add_link( cre=cre, node=db.dbNodeFromNode(unlinked_node), - type=link.ltype, + ltype=link.ltype, ) else: # if neither the root nor a linked node has a CRE, add both as unlinked nodes cre_less_nodes.append(link.document) @@ -96,13 +96,13 @@ def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node: # dbcre,_ = register_cre(link.document, collection) # CREs are idempotent c = collection.get_CREs(name=link.document.name)[0] dbcre = db.dbCREfromCRE(c) - collection.add_link(dbcre, linked_node, type=link.ltype) + collection.add_link(dbcre, linked_node, ltype=link.ltype) cres_added.append(dbcre) for unlinked_standard in cre_less_nodes: # if anything in this collection.add_link( cre=dbcre, node=db.dbNodeFromNode(unlinked_standard), - type=link.ltype, + ltype=link.ltype, ) cre_less_nodes = [] @@ -144,7 +144,7 @@ def register_cre(cre: defs.CRE, collection: db.Node_collection) -> Tuple[db.CRE, collection.add_link( cre=dbcre, node=register_node(node=link.document, collection=collection), - type=link.ltype, + ltype=link.ltype, ) return dbcre, existing @@ -460,6 +460,7 @@ def download_cre_from_upstream(creid: str): cre = defs.Document.from_dict(credict) if cre.id in imported_cres: return + register_cre(cre, collection) imported_cres[cre.id] = "" for link in cre.links: @@ -536,8 +537,6 @@ def run(args: argparse.Namespace) -> None: # pragma: no cover ) elif args.add and args.cre_loc and not args.from_spreadsheet: add_from_disk(cache_loc=args.cache_file, cre_loc=args.cre_loc) - elif args.print_graph: - print_graph() # elif args.review and args.osib_in: # review_osib_from_file( # file_loc=args.osib_in, cache=args.cache_file, cre_loc=args.cre_loc @@ -639,8 +638,6 @@ def run(args: argparse.Namespace) -> None: # pragma: no cover if args.generate_embeddings: generate_embeddings(args.cache_file) - if args.owasp_proj_meta: - owasp_metadata_to_cre(args.owasp_proj_meta) if args.populate_neo4j_db: populate_neo4j_db(args.cache_file) if args.start_worker: diff --git a/application/database/db.py b/application/database/db.py index 035249d8..7ecbe52b 100644 --- a/application/database/db.py +++ b/application/database/db.py @@ -684,9 +684,7 @@ def __init__(self) -> None: def with_graph(self) -> "Node_collection": logger.info("Loading CRE graph in memory, memory-heavy operation!") - self.graph = inmemory_graph.CRE_Graph.instance( - documents=self.__get_all_nodes_and_cres() - ) + self.graph = inmemory_graph.CRE_Graph.instance(documents=self.__get_all_nodes_and_cres()) return self def __get_external_links(self) -> List[Tuple[CRE, Node, str]]: @@ -1499,14 +1497,14 @@ def add_internal_link( self, higher: CRE, lower: CRE, - type: cre_defs.LinkTypes = cre_defs.LinkTypes.Same, + ltype: cre_defs.LinkTypes = cre_defs.LinkTypes.Same, ) -> None: """ adds a link between two CREs in the database, Args: higher (CRE): the higher level CRE that CONTAINS or is the SAME or is RELATED to lower lower (CRE): the lower level CRE that is CONTAINED or is the SAME or is RELATED to higher - type (cre_defs.LinkTypes, optional): the linktype + ltype (cre_defs.LinkTypes, optional): the linktype Defaults to cre_defs.LinkTypes.Same. """ if lower.id is None: @@ -1576,17 +1574,17 @@ def add_internal_link( ) if entry_exists: # logger.info( - # f"knew of internal link {lower.name} == {higher.name} of type {entry_exists.type}," - # f"updating to type {type.value}" + # f"knew of internal link {lower.name} == {higher.name} of ltype {entry_exists.ltype}," + # f"updating to ltype {ltype.value}" # ) - # entry_exists.type = type.value + # entry_exists.ltype = ltype.value # self.session.commit() return logger.info( "did not know of internal link" f" {higher.external_id}:{higher.name}" - f" -> {lower.external_id}:{lower.name} of type {type.value},adding" + f" -> {lower.external_id}:{lower.name} of type {ltype.value},adding" ) if not self.graph: logger.error("graph is null") @@ -1596,12 +1594,15 @@ def add_internal_link( higher_cre = CREfromDB(higher) lower_cre = CREfromDB(higher) - link_to = cre_defs.Link(document=lower_cre, ltype=type) - cycle = self.graph.adds_cycle(doc_from=higher_cre, link_to=link_to) + link_to = cre_defs.Link(document=lower_cre, ltype=ltype) + + if type(self.graph) != inmemory_graph.CRE_Graph: + raise ValueError("wtf?") + cycle = self.graph.introduces_cycle(doc_from=higher_cre, link_to=link_to) if not cycle: self.session.add( - InternalLinks(type=type.value, cre=lower.id, group=higher.id) + InternalLinks(type=ltype.value, cre=lower.id, group=higher.id) ) self.session.commit() if self.graph: @@ -1630,7 +1631,7 @@ def add_link( self, cre: CRE, node: Node, - type: cre_defs.LinkTypes = cre_defs.LinkTypes.Same, + ltype: cre_defs.LinkTypes = cre_defs.LinkTypes.Same, ) -> None: if cre.id is None: cre = ( @@ -1648,9 +1649,9 @@ def add_link( logger.debug( f"knew of link {node.name}:{node.section}" f"=={cre.name} of type {entry.type}," - f"updating type to {type.value}" + f"updating type to {ltype.value}" ) - entry.type = type.value + entry.type = ltype.value self.session.commit() return else: @@ -1659,11 +1660,11 @@ def add_link( f"{node.name}:{node.section}=={cre.id}){cre.name}" " ,adding" ) - self.session.add(Links(type=type.value, cre=cre.id, node=node.id)) + self.session.add(Links(type=ltype.value, cre=cre.id, node=node.id)) if self.graph: self.graph.add_graph_edge( doc_from=CREfromDB(cre), - link_to=cre_defs.Link(document=nodeFromDB(node),ltype=type.value), + link_to=cre_defs.Link(document=nodeFromDB(node),ltype=ltype.value), graph=self.graph.graph ) diff --git a/application/database/inmemory_graph.py b/application/database/inmemory_graph.py index 92c0793d..f54d7267 100644 --- a/application/database/inmemory_graph.py +++ b/application/database/inmemory_graph.py @@ -7,7 +7,7 @@ class CRE_Graph: graph: nx.Graph = None __parent_child_subgraph = None - __instance = None + __instance: "CRE_Graph" = None @classmethod def instance(cls, documents: List[defs.Document] = None) -> "CRE_Graph": @@ -23,7 +23,7 @@ def __init__(sel): def add_node(self, *args, **kwargs): return self.graph.add_node(*args, **kwargs) - def adds_cycle(self, doc_from: defs.Document, link_to: defs.Link): + def introduces_cycle(self, doc_from: defs.Document, link_to: defs.Link): try: existing_cycle = nx.find_cycle(self.graph) if existing_cycle: diff --git a/application/tests/db_test.py b/application/tests/db_test.py index 9be0a04e..5c3dfae3 100644 --- a/application/tests/db_test.py +++ b/application/tests/db_test.py @@ -63,9 +63,9 @@ def setUp(self) -> None: ) collection.session.add(dbcre) - collection.add_link(cre=dbcre, node=dbstandard, type=defs.LinkTypes.LinkedTo) + collection.add_link(cre=dbcre, node=dbstandard, ltype=defs.LinkTypes.LinkedTo) collection.add_internal_link( - lower=dbcre, higher=dbgroup, type=defs.LinkTypes.Contains + lower=dbcre, higher=dbgroup, ltype=defs.LinkTypes.Contains ) self.collection = collection @@ -190,7 +190,7 @@ def test_export(self) -> None: ) ) self.collection.add_link( - self.dbcre, self.collection.add_node(code0), type=defs.LinkTypes.LinkedTo + self.dbcre, self.collection.add_node(code0), ltype=defs.LinkTypes.LinkedTo ) self.collection.add_node(code1) self.collection.add_node(tool0) @@ -2098,7 +2098,7 @@ def test_all_cres_with_pagination(self): defs.Link(document=copy(nodes[i]), ltype=defs.LinkTypes.LinkedTo) ) collection.add_link( - cre=dbcres[i], node=dbnodes[i], type=defs.LinkTypes.LinkedTo + cre=dbcres[i], node=dbnodes[i], ltype=defs.LinkTypes.LinkedTo ) collection.session.commit() @@ -2136,7 +2136,7 @@ def test_all_cres_with_pagination(self): defs.Link(document=copy(nodes[i]), ltype=defs.LinkTypes.LinkedTo) ) collection.add_link( - cre=dbcres[i], node=dbnodes[i], type=defs.LinkTypes.LinkedTo + cre=dbcres[i], node=dbnodes[i], ltype=defs.LinkTypes.LinkedTo ) collection.session.commit() @@ -2169,17 +2169,17 @@ def test_get_cre_hierarchy(self) -> None: ) else: collection.add_link( - node=dbitem, cre=linked_item, type=link.ltype + node=dbitem, cre=linked_item, ltype=link.ltype ) else: linked_item = collection.add_node(link.document) if item.doctype == defs.Credoctypes.CRE: collection.add_link( - cre=dbitem, node=linked_item, type=link.ltype + cre=dbitem, node=linked_item, ltype=link.ltype ) else: collection.add_internal_link( - cre=linked_item, node=dbitem, type=link.ltype + cre=linked_item, node=dbitem, ltype=link.ltype ) cres = inputDocs[defs.Credoctypes.CRE] diff --git a/application/tests/spreadsheet_test.py b/application/tests/spreadsheet_test.py index 76054702..de656dba 100644 --- a/application/tests/spreadsheet_test.py +++ b/application/tests/spreadsheet_test.py @@ -81,13 +81,13 @@ def test_prepare_spreadsheet(self) -> None: ) else: collection.add_link( - node=dbitem, cre=linked_item, type=link.ltype + node=dbitem, cre=linked_item, ltype=link.ltype ) else: linked_item = collection.add_node(link.document) if item.doctype == defs.Credoctypes.CRE: collection.add_link( - cre=dbitem, node=linked_item, type=link.ltype + cre=dbitem, node=linked_item, ltype=link.ltype ) else: collection.add_internal_link( diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 0f79e68d..ad6d94c7 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -514,10 +514,10 @@ def test_smartlink(self) -> None: dasvs = collection.add_node(standards["ASVS"]) dcwe = collection.add_node(standards["cwe0"]) collection.add_internal_link( - higher=dca, lower=dcd, type=defs.LinkTypes.Contains + higher=dca, lower=dcd, ltype=defs.LinkTypes.Contains ) collection.add_internal_link( - higher=dcb, lower=dcd, type=defs.LinkTypes.Contains + higher=dcb, lower=dcd, ltype=defs.LinkTypes.Contains ) collection.add_link(dcb, dasvs)