From eaa1f228e6ba802ff143debe4b6b01bd48bba602 Mon Sep 17 00:00:00 2001 From: Nicholas Allen Date: Fri, 13 Oct 2023 13:24:54 +1000 Subject: [PATCH] chore: fix type correctness and remove many "type: ignore" directives (#509) Signed-off-by: Nicholas Allen --- src/macaron/__main__.py | 12 +- src/macaron/database/table_definitions.py | 4 +- src/macaron/database/views.py | 49 +++++-- .../dependency_resolver.py | 6 +- src/macaron/slsa_analyzer/analyze_context.py | 6 +- src/macaron/slsa_analyzer/analyzer.py | 4 +- .../slsa_analyzer/checks/check_result.py | 5 +- .../checks/provenance_l3_content_check.py | 4 +- .../slsa_analyzer/git_service/api_client.py | 6 +- .../provenance/expectations/expectation.py | 3 +- src/macaron/slsa_analyzer/slsa_req.py | 124 +++++++++--------- src/macaron/util.py | 6 +- tests/slsa_analyzer/checks/test_registry.py | 68 +++++----- 13 files changed, 156 insertions(+), 141 deletions(-) diff --git a/src/macaron/__main__.py b/src/macaron/__main__.py index 001241882..cadcdc7de 100644 --- a/src/macaron/__main__.py +++ b/src/macaron/__main__.py @@ -78,14 +78,12 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None if analyzer_single_args.config_path: # Get user config from yaml file - run_config = YamlLoader.load(analyzer_single_args.config_path) - if run_config is None: - # The error mypy raises here is "Statement is unreachable" for the log statement. - # This is because the return type of `YamlLoader.load` is Any (which is not correct). - # TODO: revisit this issue together with the ``Configuration`` class issue mentioned in - # https://github.com/oracle/macaron/pull/401#discussion_r1303695425. - logger.error("The input yaml config at %s is invalid.", analyzer_single_args.config_path) # type: ignore + loaded_config = YamlLoader.load(analyzer_single_args.config_path) + if loaded_config is None: + logger.error("The input yaml config at %s is invalid.", analyzer_single_args.config_path) sys.exit(os.EX_DATAERR) + else: + run_config = loaded_config else: repo_path = analyzer_single_args.repo_path purl = analyzer_single_args.package_url diff --git a/src/macaron/database/table_definitions.py b/src/macaron/database/table_definitions.py index cab711938..16598b672 100644 --- a/src/macaron/database/table_definitions.py +++ b/src/macaron/database/table_definitions.py @@ -16,7 +16,7 @@ import string from datetime import datetime from pathlib import Path -from typing import Self +from typing import Any, Self from packageurl import PackageURL from sqlalchemy import Boolean, Column, Enum, ForeignKey, Integer, String, Table, UniqueConstraint @@ -287,7 +287,7 @@ class Repository(ORMBase): #: The path to the repo on the file system. fs_path: Mapped[str] = mapped_column(String, nullable=False) - def __init__(self, *args, files: list[str] | None = None, **kwargs): # type: ignore[no-untyped-def] + def __init__(self, *args: Any, files: list[str] | None = None, **kwargs: Any): """Instantiate the repository and set files. Parameters diff --git a/src/macaron/database/views.py b/src/macaron/database/views.py index fc1c6b523..2582232b4 100644 --- a/src/macaron/database/views.py +++ b/src/macaron/database/views.py @@ -9,16 +9,20 @@ https://github.com/sqlalchemy/sqlalchemy/wiki/Views """ +from typing import Any + import sqlalchemy as sa +import sqlalchemy.event +from sqlalchemy import Connection from sqlalchemy.ext import compiler -from sqlalchemy.schema import DDLElement -from sqlalchemy.sql import TableClause, table +from sqlalchemy.schema import BaseDDLElement, DDLElement, MetaData, SchemaItem, Table +from sqlalchemy.sql import Select, TableClause, table class CreateView(DDLElement): """CreateView.""" - def __init__(self, name, selectable): # type: ignore + def __init__(self, name: str, selectable: Select[Any]): self.name = name self.selectable = selectable @@ -26,7 +30,7 @@ def __init__(self, name, selectable): # type: ignore class DropView(DDLElement): """DropView.""" - def __init__(self, name): # type: ignore + def __init__(self, name: str): self.name = name @@ -40,25 +44,42 @@ def _drop_view(element, comp, **kw): return "DROP VIEW %s" % (element.name) -def view_exists(ddl, target, connection, **kw): # type: ignore +def view_exists( + ddl: BaseDDLElement, + target: SchemaItem, + bind: Connection | None, + tables: list[Table] | None = None, + state: Any | None = None, + **kw: Any, +) -> bool: """View exists.""" - return ddl.name in sa.inspect(connection).get_view_names() - - -def view_doesnt_exist(ddl, target, connection, **kw): # type: ignore + if isinstance(ddl, CreateView) or isinstance(ddl, DropView): + assert isinstance(bind, Connection) + return ddl.name in sa.inspect(bind).get_view_names() + return False + + +def view_doesnt_exist( + ddl: BaseDDLElement, + target: SchemaItem, + bind: Connection | None, + tables: list[Table] | None = None, + state: Any | None = None, + **kw: Any, +) -> bool: """Not view exists.""" - return not view_exists(ddl, target, connection, **kw) # type: ignore + return not view_exists(ddl, target, bind, **kw) -def create_view(name, metadata, selectable) -> TableClause: # type: ignore +def create_view(name: str, metadata: MetaData, selectable: Select[Any]) -> TableClause: """Create a view.""" view = table(name) view._columns._populate_separate_keys(col._make_proxy(view) for col in selectable.selected_columns) - sa.event.listen( + sqlalchemy.event.listen( metadata, "after_create", - CreateView(name, selectable).execute_if(callable_=view_doesnt_exist), # type: ignore + CreateView(name, selectable).execute_if(callable_=view_doesnt_exist), ) - sa.event.listen(metadata, "before_drop", DropView(name).execute_if(callable_=view_exists)) # type: ignore + sqlalchemy.event.listen(metadata, "before_drop", DropView(name).execute_if(callable_=view_exists)) return view diff --git a/src/macaron/dependency_analyzer/dependency_resolver.py b/src/macaron/dependency_analyzer/dependency_resolver.py index de28e04b3..3d3ac43d5 100644 --- a/src/macaron/dependency_analyzer/dependency_resolver.py +++ b/src/macaron/dependency_analyzer/dependency_resolver.py @@ -377,12 +377,12 @@ def resolve_dependencies(main_ctx: Any, sbom_path: str) -> dict[str, DependencyI def _resolve_more_dependencies(dependencies: dict[str, DependencyInfo]) -> None: """Utilise the Repo Finder to resolve the repositories of more dependencies.""" for item in dependencies.values(): - if item.get("available") != SCMStatus.MISSING_SCM: + if item["available"] != SCMStatus.MISSING_SCM: continue - item["url"] = find_repo(item.get("purl")) # type: ignore + item["url"] = find_repo(item["purl"]) if item["url"] == "": - logger.debug("Failed to find url for purl: %s", item.get("purl")) + logger.debug("Failed to find url for purl: %s", item["purl"]) else: # TODO decide how to handle possible duplicates here item["available"] = SCMStatus.AVAILABLE diff --git a/src/macaron/slsa_analyzer/analyze_context.py b/src/macaron/slsa_analyzer/analyze_context.py index 7148f9278..b6a32bcec 100644 --- a/src/macaron/slsa_analyzer/analyze_context.py +++ b/src/macaron/slsa_analyzer/analyze_context.py @@ -181,9 +181,9 @@ def get_dict(self) -> dict: sorted_on_id = [] for res in _sorted_on_id: # res is CheckResult(TypedDict) - res: dict = dict(res.copy()) # type: ignore - res.pop("result_tables") # type: ignore - sorted_on_id.append(res) + res_copy: dict = dict(res.copy()) + res_copy.pop("result_tables") + sorted_on_id.append(res_copy) sorted_results = sorted(sorted_on_id, key=lambda item: item["result_type"], reverse=True) check_summary = { result_type.value: len(result_list) for result_type, result_list in self.get_check_summary().items() diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 52641c3ae..2005f2af1 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -8,7 +8,7 @@ import sys from datetime import datetime, timezone from pathlib import Path -from typing import NamedTuple +from typing import Any, NamedTuple import sqlalchemy.exc from git import InvalidGitRepositoryError @@ -881,7 +881,7 @@ def perform_checks(self, analyze_ctx: AnalyzeContext) -> dict[str, CheckResult]: class DuplicateCmpError(DuplicateError): """This class is used for duplicated software component errors.""" - def __init__(self, *args, context: AnalyzeContext | None = None, **kwargs) -> None: # type: ignore[no-untyped-def] + def __init__(self, *args: Any, context: AnalyzeContext | None = None, **kwargs: Any) -> None: """Create a DuplicateCmpError instance. Parameters diff --git a/src/macaron/slsa_analyzer/checks/check_result.py b/src/macaron/slsa_analyzer/checks/check_result.py index d36f07f0e..f772e162b 100644 --- a/src/macaron/slsa_analyzer/checks/check_result.py +++ b/src/macaron/slsa_analyzer/checks/check_result.py @@ -5,9 +5,10 @@ from enum import Enum from typing import TypedDict -from sqlalchemy import Table from sqlalchemy.orm import DeclarativeBase +from macaron.slsa_analyzer.provenance.expectations.expectation import Expectation + class CheckResultType(str, Enum): """This class contains the types of a check result.""" @@ -37,7 +38,7 @@ class CheckResult(TypedDict): justification: list[str | dict[str, str]] # human_readable_justification: str # result_values: dict[str, str | float | int] | list[dict[str, str | float | int]] - result_tables: list[DeclarativeBase | Table] + result_tables: list[DeclarativeBase | Expectation] # recommendation: str result_type: CheckResultType diff --git a/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py b/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py index 1d17286d5..ea8963b1c 100644 --- a/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py +++ b/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py @@ -78,7 +78,7 @@ def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResu ) if expectation.validate(provenance.payload): - check_result["result_tables"].append(expectation) # type: ignore[arg-type] + check_result["result_tables"].append(expectation) check_result["justification"].append( f"Successfully verified the expectation against the provenance {provenance.asset.url}." ) @@ -108,7 +108,7 @@ def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResu if expectation.validate(payload): # We need to use typing.Protocol for multiple inheritance, however, the Expectation # class uses inlined functions, which is not supported by Protocol. - check_result["result_tables"].append(expectation) # type: ignore[arg-type] + check_result["result_tables"].append(expectation) check_result["justification"].append( "Successfully verified the expectation against provenance." ) diff --git a/src/macaron/slsa_analyzer/git_service/api_client.py b/src/macaron/slsa_analyzer/git_service/api_client.py index 5ac0f0cd4..e00065f27 100644 --- a/src/macaron/slsa_analyzer/git_service/api_client.py +++ b/src/macaron/slsa_analyzer/git_service/api_client.py @@ -245,7 +245,7 @@ def get_workflow_runs(self, full_name: str, branch_name: str, created_after: str ) """ logger.debug("Query for runs data in repo %s", full_name) - query_params = { + query_params: dict = { "page": page, "per_page": defaults.getint("ci.github_actions", "max_items_num", fallback=100), } @@ -253,10 +253,10 @@ def get_workflow_runs(self, full_name: str, branch_name: str, created_after: str # We assume that workflow run only happens after the commit date. # https://docs.github.com/en/rest/reference/actions#list-workflow-runs-for-a-repository if branch_name: - query_params["branch"] = branch_name # type: ignore + query_params["branch"] = branch_name if created_after: - query_params["created"] = f">={created_after}" # type: ignore + query_params["created"] = f">={created_after}" encoded_params = construct_query(query_params) url = f"{GhAPIClient._REPO_END_POINT}/{full_name}/actions/runs?" + encoded_params diff --git a/src/macaron/slsa_analyzer/provenance/expectations/expectation.py b/src/macaron/slsa_analyzer/provenance/expectations/expectation.py index 158e628c3..9f9e8c797 100644 --- a/src/macaron/slsa_analyzer/provenance/expectations/expectation.py +++ b/src/macaron/slsa_analyzer/provenance/expectations/expectation.py @@ -36,8 +36,7 @@ class Expectation: #: The kind of expectation, e.g., CUE. expectation_type: Mapped[str] = mapped_column(nullable=False) - # mypy cannot resolve *args, **kwargs, unfortunately. - def __init__(self, *args, **kwargs) -> None: # type: ignore + def __init__(self, *args: Any, **kwargs: Any) -> None: """Create an instance provenance expectation.""" self._validator: ExpectationFn | None = None super().__init__(*args, **kwargs) diff --git a/src/macaron/slsa_analyzer/slsa_req.py b/src/macaron/slsa_analyzer/slsa_req.py index 5af535f7f..5ddb95aa1 100644 --- a/src/macaron/slsa_analyzer/slsa_req.py +++ b/src/macaron/slsa_analyzer/slsa_req.py @@ -71,23 +71,23 @@ class Category(Enum): # Contains the description, category and level each requirement correspond to. -BUILD_REQ_DESC = { - ReqName.VCS: [ +BUILD_REQ_DESC: dict[ReqName, tuple[str, Category, SLSALevels]] = { + ReqName.VCS: ( """ Every change to the source is tracked in a version control """, Category.SOURCE, SLSALevels.LEVEL2, - ], - ReqName.VERIFIED_HISTORY: [ + ), + ReqName.VERIFIED_HISTORY: ( """ Every change in the revision's history has at least one strongly authenticated actor identity (author, uploader, reviewer, etc.) and timestamp. """, Category.SOURCE, SLSALevels.LEVEL3, - ], - ReqName.RETAINED_INDEFINITELY: [ + ), + ReqName.RETAINED_INDEFINITELY: ( """ The revision and its change history are preserved indefinitely and cannot be deleted, except when subject to an established and transparent expectation for obliteration, @@ -95,16 +95,16 @@ class Category(Enum): """, Category.SOURCE, SLSALevels.LEVEL3, - ], - ReqName.TWO_PERSON_REVIEWED: [ + ), + ReqName.TWO_PERSON_REVIEWED: ( """ Every change in the revision's history was agreed to by two trusted persons prior to submission, and both of these trusted persons were strongly authenticated. """, Category.SOURCE, SLSALevels.LEVEL4, - ], - ReqName.SCRIPTED_BUILD: [ + ), + ReqName.SCRIPTED_BUILD: ( """ All build steps were fully defined in some sort of "build script". The only manual command, if any, was to invoke the build script. @@ -114,39 +114,39 @@ class Category(Enum): """, Category.BUILD, SLSALevels.LEVEL1, - ], - ReqName.BUILD_SERVICE: [ + ), + ReqName.BUILD_SERVICE: ( """ All build steps ran using some build service, not on a developer's workstation. Examples: GitHub Actions, Google Cloud Build, Travis CI. """, Category.BUILD, SLSALevels.LEVEL2, - ], - ReqName.BUILD_AS_CODE: [ + ), + ReqName.BUILD_AS_CODE: ( """ The build definition and configuration is defined in source control and is executed by the build service. """, Category.BUILD, SLSALevels.LEVEL3, - ], - ReqName.EPHEMERAL_ENVIRONMENT: [ + ), + ReqName.EPHEMERAL_ENVIRONMENT: ( """ The build service ensured that the build steps ran in an ephemeral environment, such as a container or VM, provisioned solely for this build, and not reused from a prior build. """, Category.BUILD, SLSALevels.LEVEL3, - ], - ReqName.ISOLATED: [ + ), + ReqName.ISOLATED: ( """ The build service ensured that the build steps ran in an isolated environment free of influence from other build instances, whether prior or concurrent. """, Category.BUILD, SLSALevels.LEVEL3, - ], - ReqName.PARAMETERLESS: [ + ), + ReqName.PARAMETERLESS: ( """ The build output cannot be affected by user parameters other than the build entry point and the top-level source location. In other words, the build is fully defined through the build script @@ -154,24 +154,24 @@ class Category(Enum): """, Category.BUILD, SLSALevels.LEVEL4, - ], - ReqName.HERMETIC: [ + ), + ReqName.HERMETIC: ( """ All transitive build steps, sources, and dependencies were fully declared up front with immutable references, and the build steps ran with no network access. """, Category.BUILD, SLSALevels.LEVEL4, - ], - ReqName.REPRODUCIBLE: [ + ), + ReqName.REPRODUCIBLE: ( """ Re-running the build steps with identical input artifacts results in bit-for-bit identical output. Builds that cannot meet this MUST provide a justification why the build cannot be made reproducible. """, Category.BUILD, SLSALevels.LEVEL4, - ], - ReqName.PROV_AVAILABLE: [ + ), + ReqName.PROV_AVAILABLE: ( """ The provenance is available to the consumer in a format that the consumer accepts. The format SHOULD be in-toto SLSA Provenance, but another format MAY be used if both @@ -179,16 +179,16 @@ class Category(Enum): """, Category.PROVENANCE, SLSALevels.LEVEL1, - ], - ReqName.PROV_AUTH: [ + ), + ReqName.PROV_AUTH: ( """ The provenance's authenticity and integrity can be verified by the consumer. This SHOULD be through a digital signature from a private key accessible only to the service generating the provenance. """, Category.PROVENANCE, SLSALevels.LEVEL2, - ], - ReqName.PROV_SERVICE_GEN: [ + ), + ReqName.PROV_SERVICE_GEN: ( """ The data in the provenance MUST be obtained from the build service (either because the generator is the build service or because the provenance generator reads @@ -197,120 +197,120 @@ class Category(Enum): """, Category.PROVENANCE, SLSALevels.LEVEL2, - ], - ReqName.PROV_NON_FALSIFIABLE: [ + ), + ReqName.PROV_NON_FALSIFIABLE: ( """ Provenance cannot be falsified by the build service's users. """, Category.PROVENANCE, SLSALevels.LEVEL3, - ], - ReqName.PROV_DEPENDENCIES_COMPLETE: [ + ), + ReqName.PROV_DEPENDENCIES_COMPLETE: ( """ Provenance records all build dependencies that were available while running the build steps. """, Category.PROVENANCE, SLSALevels.LEVEL4, - ], - ReqName.PROV_CONT_ARTI: [ + ), + ReqName.PROV_CONT_ARTI: ( """ The provenance MUST identify the output artifact via at least one cryptographic hash. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL1, - ], - ReqName.PROV_CONT_BUILDER: [ + ), + ReqName.PROV_CONT_BUILDER: ( """ The provenance identifies the entity that performed the build and generated the provenance. This represents the entity that the consumer must trust. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL1, - ], - ReqName.PROV_CONT_BUILD_INS: [ + ), + ReqName.PROV_CONT_BUILD_INS: ( """ The provenance identifies the top-level instructions used to execute the build. The identified instructions SHOULD be at the highest level available to the build. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL1, - ], - ReqName.PROV_CONT_SOURCE: [ + ), + ReqName.PROV_CONT_SOURCE: ( """ The provenance identifies the repository origin(s) for the source code used in the build. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL2, - ], - ReqName.PROV_CONT_ENTRY: [ + ), + ReqName.PROV_CONT_ENTRY: ( """ The provenance identifies the “entry point” of the build definition (see build-as-code) used to drive the build including what source repo the configuration was read from. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL3, - ], - ReqName.PROV_CONT_BUILD_PARAMS: [ + ), + ReqName.PROV_CONT_BUILD_PARAMS: ( """ The provenance includes all build parameters under a user's control. See Parameterless for details. (At L3, the parameters must be listed; at L4, they must be empty.) """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL3, - ], - ReqName.PROV_CONT_TRANSITIVE_DEPS: [ + ), + ReqName.PROV_CONT_TRANSITIVE_DEPS: ( """ The provenance includes all transitive dependencies listed in Provenance: Dependencies Complete requirement. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL4, - ], - ReqName.PROV_CONT_REPRODUCIBLE_INFO: [ + ), + ReqName.PROV_CONT_REPRODUCIBLE_INFO: ( """ The provenance includes a boolean indicating whether build is intended to be reproducible and, if so, all information necessary to reproduce the build. See Reproducible for more details. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL4, - ], - ReqName.PROV_CONT_META_DATA: [ + ), + ReqName.PROV_CONT_META_DATA: ( """ The provenance includes metadata to aid debugging and investigations. This SHOULD at least include start and end timestamps and a permalink to debug logs. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL1, - ], - ReqName.SECURITY: [ + ), + ReqName.SECURITY: ( """ The system meets some baseline security standard to prevent compromise. (Patching, vulnerability scanning, user isolation, transport security, secure boot, machine identity, etc.) """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL4, - ], - ReqName.ACCESS: [ + ), + ReqName.ACCESS: ( """ All physical and remote access must be rare, logged, and gated behind multi-party approval. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL4, - ], - ReqName.SUPERUSERS: [ + ), + ReqName.SUPERUSERS: ( """ Only a small number of platform admins may override the guarantees listed here. Doing so MUST require approval of a second platform admin. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL4, - ], - ReqName.EXPECTATION: [ + ), + ReqName.EXPECTATION: ( """ Check whether the SLSA provenance for the produced artifact conforms to the expectation. """, Category.PROVENANCE_CONTENT, SLSALevels.LEVEL3, - ], + ), } @@ -400,6 +400,6 @@ def get_requirements_dict() -> dict: desc = req_details[0] category = req_details[1] min_level = req_details[2] - result[req_name] = SLSAReq(req_name.value, desc, category, min_level) # type: ignore [arg-type] + result[req_name] = SLSAReq(req_name.value, desc, category, min_level) return result diff --git a/src/macaron/util.py b/src/macaron/util.py index 697e9dabc..526c60c0b 100644 --- a/src/macaron/util.py +++ b/src/macaron/util.py @@ -254,8 +254,10 @@ def get_if_exists(doc: JsonType, path: list[str | int]) -> JsonType | None: """Get a json dict value if it exists.""" while len(path) > 0: this = path.pop(0) - if isinstance(doc, (dict, list)) and this in doc: - doc = doc[this] # type: ignore + if isinstance(this, str) and isinstance(doc, dict) and this in doc: + doc = doc[this] + elif isinstance(this, int) and isinstance(doc, list) and 0 <= this < len(doc): + doc = doc[this] else: return None return doc diff --git a/tests/slsa_analyzer/checks/test_registry.py b/tests/slsa_analyzer/checks/test_registry.py index 179d12c8d..74d396c6b 100644 --- a/tests/slsa_analyzer/checks/test_registry.py +++ b/tests/slsa_analyzer/checks/test_registry.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2022, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2023, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the tests for the Registry class.""" @@ -11,11 +11,19 @@ from hypothesis import given from hypothesis.strategies import SearchStrategy, binary, booleans, integers, lists, none, one_of, text, tuples +from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.base_check import BaseCheck -from macaron.slsa_analyzer.checks.check_result import CheckResultType +from macaron.slsa_analyzer.checks.check_result import CheckResult, CheckResultType from macaron.slsa_analyzer.registry import Registry +class MockCheck(BaseCheck): + """BaseCheck with no-op impl for abstract method""" + + def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResultType: + return CheckResultType.UNKNOWN + + # pylint: disable=protected-access class TestRegistry(TestCase): """This class tests the Registry module.""" @@ -32,13 +40,13 @@ def setUp(self) -> None: def test_exit_on_duplicated(self) -> None: """Test registering a duplicated check_id Check.""" with self.assertRaises(SystemExit): - self.REGISTRY.register(BaseCheck("mcn_duplicated_check_1", "")) # type: ignore - self.REGISTRY.register(BaseCheck("mcn_duplicated_check_1", "")) # type: ignore + self.REGISTRY.register(MockCheck("mcn_duplicated_check_1", "")) + self.REGISTRY.register(MockCheck("mcn_duplicated_check_1", "")) def test_exit_on_empty_check_id(self) -> None: """Test registering an empty check_id Check.""" with self.assertRaises(SystemExit): - self.REGISTRY.register(BaseCheck("", "")) # type: ignore + self.REGISTRY.register(MockCheck("", "")) @given(one_of(none(), text(), integers(), tuples(), binary(), booleans())) def test_exit_on_invalid_registered_check(self, check: SearchStrategy) -> None: @@ -48,22 +56,20 @@ def test_exit_on_invalid_registered_check(self, check: SearchStrategy) -> None: def test_add_successfully(self) -> None: """Test registering a Check correctly.""" - self.REGISTRY.register(BaseCheck("mcn_correct_check_1", "This check is a correct Check.")) # type: ignore + self.REGISTRY.register(MockCheck("mcn_correct_check_1", "This check is a correct Check.")) assert self.REGISTRY.get_all_checks_mapping().get("mcn_correct_check_1") def test_exit_on_registering_undefined_check(self) -> None: """Test registering a check which Macaron cannot resolve its module.""" with patch("inspect.getmodule", return_value=False): with self.assertRaises(SystemExit): - self.REGISTRY.register( - BaseCheck("mcn_undefined_check_1", "This check is an undefined Check.") # type: ignore - ) + self.REGISTRY.register(MockCheck("mcn_undefined_check_1", "This check is an undefined Check.")) @given(one_of(none(), text(), integers(), tuples(), binary(), booleans())) def test_exit_on_invalid_check_relationship(self, relationship: SearchStrategy) -> None: """Test registering a check with invalid parent status definition.""" with self.assertRaises(SystemExit): - check = BaseCheck( + check = MockCheck( "mcn_invalid_status_1", "Invalid_status_check_should_exit", [relationship] # type: ignore ) self.REGISTRY.register(check) @@ -71,10 +77,10 @@ def test_exit_on_invalid_check_relationship(self, relationship: SearchStrategy) def test_add_relationship_entry(self) -> None: """Test adding a check relationship entry.""" # Adding successfully - self.REGISTRY.register(BaseCheck("mcn_a_1", "Parent", [])) # type: ignore - self.REGISTRY.register(BaseCheck("mcn_b_1", "Child1", [("mcn_a_1", CheckResultType.PASSED)])) # type: ignore + self.REGISTRY.register(MockCheck("mcn_a_1", "Parent", [])) + self.REGISTRY.register(MockCheck("mcn_b_1", "Child1", [("mcn_a_1", CheckResultType.PASSED)])) self.REGISTRY.register( - BaseCheck( # type: ignore + MockCheck( "mcn_c_1", "Child2", [ @@ -83,7 +89,7 @@ def test_add_relationship_entry(self) -> None: ], ) ) - self.REGISTRY.register(BaseCheck("mcn_d_1", "Stand-alone", [])) # type: ignore + self.REGISTRY.register(MockCheck("mcn_d_1", "Stand-alone", [])) assert self.REGISTRY.get_all_checks_relationships() == { "mcn_a_1": { "mcn_b_1": CheckResultType.PASSED, @@ -95,16 +101,14 @@ def test_add_relationship_entry(self) -> None: # Cannot add a check that depends on itself with self.assertRaises(SystemExit): - self.REGISTRY.register( - BaseCheck("mcn_e_1", "Self-dependent-check", [("mcn_e_1", CheckResultType.PASSED)]) # type: ignore - ) + self.REGISTRY.register(MockCheck("mcn_e_1", "Self-dependent-check", [("mcn_e_1", CheckResultType.PASSED)])) # Add a check with duplicated relationships with self.assertRaises(SystemExit): self.REGISTRY.register( - BaseCheck( + MockCheck( "mcn_f_1", - "Existing-relationship", # type: ignore + "Existing-relationship", [ ("mcn_c_1", CheckResultType.PASSED), ("mcn_c_1", CheckResultType.FAILED), @@ -121,7 +125,7 @@ def test_add_relationship_entry(self) -> None: def test_exit_on_invalid_eval_reqs(self, eval_reqs: SearchStrategy) -> None: """Test registering a check with invalid eval reqs definition.""" with self.assertRaises(SystemExit): - check = BaseCheck("mcn_invalid_eval_reqs_1", "Invalid_eval_reqs_should_exit", [], eval_reqs) # type: ignore + check = MockCheck("mcn_invalid_eval_reqs_1", "Invalid_eval_reqs_should_exit", [], eval_reqs) # type: ignore self.REGISTRY.register(check) @given( @@ -133,36 +137,26 @@ def test_exit_on_invalid_eval_reqs(self, eval_reqs: SearchStrategy) -> None: def test_exit_on_invalid_status_on_skipped(self, status_on_skipped: SearchStrategy) -> None: """Test registering a check with invalid status_on_skipped instance variable.""" with self.assertRaises(SystemExit): - check = BaseCheck( + check = MockCheck( "mcn_invalid_eval_reqs_1", "Invalid_status_on_skipped", [], [], status_on_skipped # type: ignore ) self.REGISTRY.register(check) def test_add_graph_node_after_prepare(self) -> None: """Test calling Registry._add_node after calling prepare on the graph.""" - assert self.REGISTRY._add_node( - BaseCheck("mcn_before_1", "Check_registered_before_prepare", [], []) # type: ignore - ) + assert self.REGISTRY._add_node(MockCheck("mcn_before_1", "Check_registered_before_prepare", [], [])) Registry._graph.prepare() Registry._is_graph_ready = True - assert not self.REGISTRY._add_node( - BaseCheck("mcn_after_1", "Check_registered_after_prepare", [], []) # type: ignore - ) + assert not self.REGISTRY._add_node(MockCheck("mcn_after_1", "Check_registered_after_prepare", [], [])) def test_circular_dependencies(self) -> None: """Test registering checks with circular dependencies.""" - self.REGISTRY.register( - BaseCheck("mcn_a_1", "Depend_on_b", [("mcn_b_1", CheckResultType.PASSED)], []) # type: ignore - ) + self.REGISTRY.register(MockCheck("mcn_a_1", "Depend_on_b", [("mcn_b_1", CheckResultType.PASSED)], [])) - self.REGISTRY.register( - BaseCheck("mcn_b_1", "Depend_on_c", [("mcn_c_1", CheckResultType.PASSED)], []) # type: ignore - ) + self.REGISTRY.register(MockCheck("mcn_b_1", "Depend_on_c", [("mcn_c_1", CheckResultType.PASSED)], [])) - self.REGISTRY.register( - BaseCheck("mcn_c_1", "Depend_on_a", [("mcn_a_1", CheckResultType.PASSED)], []) # type: ignore - ) + self.REGISTRY.register(MockCheck("mcn_c_1", "Depend_on_a", [("mcn_a_1", CheckResultType.PASSED)], [])) assert not self.REGISTRY.prepare() @@ -180,7 +174,7 @@ def test_running_with_no_runners(self) -> None: def test_prepare_successfully(self) -> None: """Test registry is prepared successfully and ready for the analysis.""" - self.REGISTRY.register(BaseCheck("mcn_correct_check_1", "This check is a correct Check.")) # type: ignore + self.REGISTRY.register(MockCheck("mcn_correct_check_1", "This check is a correct Check.")) # For the prepare method to complete successfully, # the registry must have at least one check registered, # at least one runner initialized and no check circular