Skip to content

Commit

Permalink
chore: fix type correctness and remove many "type: ignore" directives (
Browse files Browse the repository at this point in the history
…#509)

Signed-off-by: Nicholas Allen <[email protected]>
  • Loading branch information
nicallen authored Oct 13, 2023
1 parent 023c6ac commit eaa1f22
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 141 deletions.
12 changes: 5 additions & 7 deletions src/macaron/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,12 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None

if analyzer_single_args.config_path:
# Get user config from yaml file
run_config = YamlLoader.load(analyzer_single_args.config_path)
if run_config is None:
# The error mypy raises here is "Statement is unreachable" for the log statement.
# This is because the return type of `YamlLoader.load` is Any (which is not correct).
# TODO: revisit this issue together with the ``Configuration`` class issue mentioned in
# https://github.com/oracle/macaron/pull/401#discussion_r1303695425.
logger.error("The input yaml config at %s is invalid.", analyzer_single_args.config_path) # type: ignore
loaded_config = YamlLoader.load(analyzer_single_args.config_path)
if loaded_config is None:
logger.error("The input yaml config at %s is invalid.", analyzer_single_args.config_path)
sys.exit(os.EX_DATAERR)
else:
run_config = loaded_config
else:
repo_path = analyzer_single_args.repo_path
purl = analyzer_single_args.package_url
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/database/table_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import string
from datetime import datetime
from pathlib import Path
from typing import Self
from typing import Any, Self

from packageurl import PackageURL
from sqlalchemy import Boolean, Column, Enum, ForeignKey, Integer, String, Table, UniqueConstraint
Expand Down Expand Up @@ -287,7 +287,7 @@ class Repository(ORMBase):
#: The path to the repo on the file system.
fs_path: Mapped[str] = mapped_column(String, nullable=False)

def __init__(self, *args, files: list[str] | None = None, **kwargs): # type: ignore[no-untyped-def]
def __init__(self, *args: Any, files: list[str] | None = None, **kwargs: Any):
"""Instantiate the repository and set files.
Parameters
Expand Down
49 changes: 35 additions & 14 deletions src/macaron/database/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,28 @@
https://github.com/sqlalchemy/sqlalchemy/wiki/Views
"""

from typing import Any

import sqlalchemy as sa
import sqlalchemy.event
from sqlalchemy import Connection
from sqlalchemy.ext import compiler
from sqlalchemy.schema import DDLElement
from sqlalchemy.sql import TableClause, table
from sqlalchemy.schema import BaseDDLElement, DDLElement, MetaData, SchemaItem, Table
from sqlalchemy.sql import Select, TableClause, table


class CreateView(DDLElement):
"""CreateView."""

def __init__(self, name, selectable): # type: ignore
def __init__(self, name: str, selectable: Select[Any]):
self.name = name
self.selectable = selectable


class DropView(DDLElement):
"""DropView."""

def __init__(self, name): # type: ignore
def __init__(self, name: str):
self.name = name


Expand All @@ -40,25 +44,42 @@ def _drop_view(element, comp, **kw):
return "DROP VIEW %s" % (element.name)


def view_exists(ddl, target, connection, **kw): # type: ignore
def view_exists(
ddl: BaseDDLElement,
target: SchemaItem,
bind: Connection | None,
tables: list[Table] | None = None,
state: Any | None = None,
**kw: Any,
) -> bool:
"""View exists."""
return ddl.name in sa.inspect(connection).get_view_names()


def view_doesnt_exist(ddl, target, connection, **kw): # type: ignore
if isinstance(ddl, CreateView) or isinstance(ddl, DropView):
assert isinstance(bind, Connection)
return ddl.name in sa.inspect(bind).get_view_names()
return False


def view_doesnt_exist(
ddl: BaseDDLElement,
target: SchemaItem,
bind: Connection | None,
tables: list[Table] | None = None,
state: Any | None = None,
**kw: Any,
) -> bool:
"""Not view exists."""
return not view_exists(ddl, target, connection, **kw) # type: ignore
return not view_exists(ddl, target, bind, **kw)


def create_view(name, metadata, selectable) -> TableClause: # type: ignore
def create_view(name: str, metadata: MetaData, selectable: Select[Any]) -> TableClause:
"""Create a view."""
view = table(name)
view._columns._populate_separate_keys(col._make_proxy(view) for col in selectable.selected_columns)

sa.event.listen(
sqlalchemy.event.listen(
metadata,
"after_create",
CreateView(name, selectable).execute_if(callable_=view_doesnt_exist), # type: ignore
CreateView(name, selectable).execute_if(callable_=view_doesnt_exist),
)
sa.event.listen(metadata, "before_drop", DropView(name).execute_if(callable_=view_exists)) # type: ignore
sqlalchemy.event.listen(metadata, "before_drop", DropView(name).execute_if(callable_=view_exists))
return view
6 changes: 3 additions & 3 deletions src/macaron/dependency_analyzer/dependency_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,12 @@ def resolve_dependencies(main_ctx: Any, sbom_path: str) -> dict[str, DependencyI
def _resolve_more_dependencies(dependencies: dict[str, DependencyInfo]) -> None:
"""Utilise the Repo Finder to resolve the repositories of more dependencies."""
for item in dependencies.values():
if item.get("available") != SCMStatus.MISSING_SCM:
if item["available"] != SCMStatus.MISSING_SCM:
continue

item["url"] = find_repo(item.get("purl")) # type: ignore
item["url"] = find_repo(item["purl"])
if item["url"] == "":
logger.debug("Failed to find url for purl: %s", item.get("purl"))
logger.debug("Failed to find url for purl: %s", item["purl"])
else:
# TODO decide how to handle possible duplicates here
item["available"] = SCMStatus.AVAILABLE
Expand Down
6 changes: 3 additions & 3 deletions src/macaron/slsa_analyzer/analyze_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ def get_dict(self) -> dict:
sorted_on_id = []
for res in _sorted_on_id:
# res is CheckResult(TypedDict)
res: dict = dict(res.copy()) # type: ignore
res.pop("result_tables") # type: ignore
sorted_on_id.append(res)
res_copy: dict = dict(res.copy())
res_copy.pop("result_tables")
sorted_on_id.append(res_copy)
sorted_results = sorted(sorted_on_id, key=lambda item: item["result_type"], reverse=True)
check_summary = {
result_type.value: len(result_list) for result_type, result_list in self.get_check_summary().items()
Expand Down
4 changes: 2 additions & 2 deletions src/macaron/slsa_analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import NamedTuple
from typing import Any, NamedTuple

import sqlalchemy.exc
from git import InvalidGitRepositoryError
Expand Down Expand Up @@ -881,7 +881,7 @@ def perform_checks(self, analyze_ctx: AnalyzeContext) -> dict[str, CheckResult]:
class DuplicateCmpError(DuplicateError):
"""This class is used for duplicated software component errors."""

def __init__(self, *args, context: AnalyzeContext | None = None, **kwargs) -> None: # type: ignore[no-untyped-def]
def __init__(self, *args: Any, context: AnalyzeContext | None = None, **kwargs: Any) -> None:
"""Create a DuplicateCmpError instance.
Parameters
Expand Down
5 changes: 3 additions & 2 deletions src/macaron/slsa_analyzer/checks/check_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from enum import Enum
from typing import TypedDict

from sqlalchemy import Table
from sqlalchemy.orm import DeclarativeBase

from macaron.slsa_analyzer.provenance.expectations.expectation import Expectation


class CheckResultType(str, Enum):
"""This class contains the types of a check result."""
Expand Down Expand Up @@ -37,7 +38,7 @@ class CheckResult(TypedDict):
justification: list[str | dict[str, str]]
# human_readable_justification: str
# result_values: dict[str, str | float | int] | list[dict[str, str | float | int]]
result_tables: list[DeclarativeBase | Table]
result_tables: list[DeclarativeBase | Expectation]
# recommendation: str
result_type: CheckResultType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResu
)

if expectation.validate(provenance.payload):
check_result["result_tables"].append(expectation) # type: ignore[arg-type]
check_result["result_tables"].append(expectation)
check_result["justification"].append(
f"Successfully verified the expectation against the provenance {provenance.asset.url}."
)
Expand Down Expand Up @@ -108,7 +108,7 @@ def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResu
if expectation.validate(payload):
# We need to use typing.Protocol for multiple inheritance, however, the Expectation
# class uses inlined functions, which is not supported by Protocol.
check_result["result_tables"].append(expectation) # type: ignore[arg-type]
check_result["result_tables"].append(expectation)
check_result["justification"].append(
"Successfully verified the expectation against provenance."
)
Expand Down
6 changes: 3 additions & 3 deletions src/macaron/slsa_analyzer/git_service/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,18 @@ def get_workflow_runs(self, full_name: str, branch_name: str, created_after: str
)
"""
logger.debug("Query for runs data in repo %s", full_name)
query_params = {
query_params: dict = {
"page": page,
"per_page": defaults.getint("ci.github_actions", "max_items_num", fallback=100),
}

# We assume that workflow run only happens after the commit date.
# https://docs.github.com/en/rest/reference/actions#list-workflow-runs-for-a-repository
if branch_name:
query_params["branch"] = branch_name # type: ignore
query_params["branch"] = branch_name

if created_after:
query_params["created"] = f">={created_after}" # type: ignore
query_params["created"] = f">={created_after}"

encoded_params = construct_query(query_params)
url = f"{GhAPIClient._REPO_END_POINT}/{full_name}/actions/runs?" + encoded_params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class Expectation:
#: The kind of expectation, e.g., CUE.
expectation_type: Mapped[str] = mapped_column(nullable=False)

# mypy cannot resolve *args, **kwargs, unfortunately.
def __init__(self, *args, **kwargs) -> None: # type: ignore
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Create an instance provenance expectation."""
self._validator: ExpectationFn | None = None
super().__init__(*args, **kwargs)
Expand Down
Loading

0 comments on commit eaa1f22

Please sign in to comment.