Skip to content

Commit

Permalink
update to pydantic 2
Browse files Browse the repository at this point in the history
  • Loading branch information
johanlundberg committed Feb 22, 2024
1 parent a72454b commit 56c1224
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 43 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pydantic<2
pydantic
cryptography
pyOpenSSL
fido2>=1.0.0
4 changes: 2 additions & 2 deletions src/fido_mds/metadata_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def __init__(self, metadata_path: Optional[Path] = None):
if metadata_path is not None:
try:
with open(metadata_path, "r") as mdf:
self.metadata = FidoMD.parse_raw(mdf.read())
self.metadata = FidoMD.model_validate_json(mdf.read())
except IOError as e:
logger.error(f"Could not open file {mdf}: {e}")
else:
with resources.open_text("fido_mds.data", "metadata.json") as f:
self.metadata = FidoMD.parse_raw(f.read())
self.metadata = FidoMD.model_validate_json(f.read())

self._entry_cache: Dict[Union[str, UUID], Entry] = {}
self._other_cache: Dict[str, List[Union[str, int]]] = {}
Expand Down
8 changes: 4 additions & 4 deletions src/fido_mds/models/fido_mds.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from enum import Enum
from typing import List, Optional, Union

from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, field_validator


def date2datetime(value: str) -> datetime:
Expand Down Expand Up @@ -209,7 +209,7 @@ class StatusReport(BaseModel):
url: Optional[str] = None

# validators
_effective_date_to_datetime = validator("effective_date", pre=True, allow_reuse=True)(date2datetime)
_effective_date_to_datetime = field_validator("effective_date", mode="before")(date2datetime)


class Entry(BaseModel):
Expand All @@ -223,7 +223,7 @@ class Entry(BaseModel):
aaid: Optional[str] = None

# validators
_time_of_last_status_change_to_datetime = validator("time_of_last_status_change", pre=True, allow_reuse=True)(
_time_of_last_status_change_to_datetime = field_validator("time_of_last_status_change", mode="before")(
date2datetime
)

Expand All @@ -235,4 +235,4 @@ class FidoMD(BaseModel):
entries: List["Entry"]

# validators
_next_update_to_datetime = validator("next_update", pre=True, allow_reuse=True)(date2datetime)
_next_update_to_datetime = field_validator("next_update", mode="before")(date2datetime)
70 changes: 34 additions & 36 deletions src/fido_mds/models/webauthn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,44 @@
from fido2.cose import ES256, PS256, RS1, RS256, CoseKey, EdDSA
from fido2.utils import websafe_decode
from fido2.webauthn import AttestationObject
from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, ConfigDict, Field, field_validator

__author__ = 'lundberg'
__author__ = "lundberg"

from fido_mds.helpers import load_raw_cert


class AttestationFormat(str, Enum):
PACKED = 'packed'
FIDO_U2F = 'fido-u2f'
NONE = 'none'
ANDROID_KEY = 'android-key'
ANDROID_SAFETYNET = 'android-safetynet'
TPM = 'tpm'
APPLE = 'apple'
PACKED = "packed"
FIDO_U2F = "fido-u2f"
NONE = "none"
ANDROID_KEY = "android-key"
ANDROID_SAFETYNET = "android-safetynet"
TPM = "tpm"
APPLE = "apple"


class AttestationConfig(BaseModel):
class Config:
orm_mode = True
arbitrary_types_allowed = True
model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)


class AttestationStatementResponseHeader(AttestationConfig):
alg: str
x5c: List[Certificate] = Field(default=[])

@validator('x5c', pre=True)
@field_validator("x5c", mode="before")
def validate_x5c(cls, v: List[str]) -> List[Certificate]:
return [load_raw_cert(item) for item in v]


class AttestationStatementResponsePayload(AttestationConfig):
nonce: str
timestampMs: datetime
apk_package_name: str = Field(alias='apkPackageName')
apk_digest_sha256: str = Field(alias='apkDigestSha256')
cts_profile_match: bool = Field(alias='ctsProfileMatch')
apk_certificate_digest_sha256: List[str] = Field(alias='apkCertificateDigestSha256')
basic_integrity: bool = Field(alias='basicIntegrity')
apk_package_name: str = Field(alias="apkPackageName")
apk_digest_sha256: str = Field(alias="apkDigestSha256")
cts_profile_match: bool = Field(alias="ctsProfileMatch")
apk_certificate_digest_sha256: List[str] = Field(alias="apkCertificateDigestSha256")
basic_integrity: bool = Field(alias="basicIntegrity")


class AttestationStatementResponse(AttestationConfig):
Expand All @@ -60,21 +58,21 @@ class AttestationStatementResponse(AttestationConfig):


class AttestationStatement(AttestationConfig):
alg: Optional[int]
sig: Optional[bytes]
alg: Optional[int] = None
sig: Optional[bytes] = None
x5c: List[Certificate] = Field(default=[])
ver: Optional[str]
response: Optional[AttestationStatementResponse]
cert_info: Optional[bytes] = Field(alias='certInfo')
pub_area: Optional[bytes] = Field(alias='pubArea')
ver: Optional[str] = None
response: Optional[AttestationStatementResponse] = None
cert_info: Optional[bytes] = Field(alias="certInfo", default=None)
pub_area: Optional[bytes] = Field(alias="pubArea", default=None)

@validator('x5c', pre=True)
@field_validator("x5c", mode="before")
def validate_x5c(cls, v: List[bytes]) -> List[Certificate]:
return [x509.load_der_x509_certificate(item) for item in v]

@validator('response', pre=True)
@field_validator("response", mode="before")
def validate_response(cls, v: bytes) -> Optional[AttestationStatementResponse]:
header, payload, signature = v.decode(encoding='utf-8').split('.')
header, payload, signature = v.decode(encoding="utf-8").split(".")
return AttestationStatementResponse(
header=AttestationStatementResponseHeader.parse_raw(websafe_decode(header)),
payload=AttestationStatementResponsePayload.parse_raw(websafe_decode(payload)),
Expand All @@ -94,11 +92,11 @@ class CredentialData(AttestationConfig):
credential_id: bytes
public_key: Union[ES256, RS256, PS256, EdDSA, RS1]

@validator('aaguid', pre=True)
@field_validator("aaguid", mode="before")
def validate_aaguid(cls, v: bytes) -> UUID:
return UUID(bytes=v)

@validator('public_key', pre=True)
@field_validator("public_key", mode="before")
def validate_public_key(cls, v: Mapping[int, Any]) -> CoseKey:
return CoseKey.parse(v)

Expand All @@ -109,7 +107,7 @@ class AuthenticatorData(AttestationConfig):
counter: int
credential_data: CredentialData

@validator('flags', pre=True)
@field_validator("flags", mode="before")
def validate_flags(cls, v: int) -> AuthenticatorFlags:
# see https://www.w3.org/TR/webauthn/#table-authData
user_present = bool(v & 0x01)
Expand Down Expand Up @@ -147,16 +145,16 @@ def attestation_obj(self) -> AttestationObject:
@classmethod
def from_attestation_object(cls, data: AttestationObject) -> Attestation:
d = {
'fmt': data.fmt,
'att_statement': data.att_stmt,
'auth_data': data.auth_data,
'raw_attestation_obj': bytes(data),
"fmt": data.fmt,
"att_statement": data.att_stmt,
"auth_data": data.auth_data,
"raw_attestation_obj": bytes(data),
}
return cls.parse_obj(d)
return cls.model_validate(d)

@classmethod
def from_base64(cls, data: str) -> Attestation:
try:
return cls.from_attestation_object(AttestationObject(websafe_decode(data)))
except AttributeError as e:
raise AttributeError(f'Could not parse attestation: {e}')
raise AttributeError(f"Could not parse attestation: {e}")

0 comments on commit 56c1224

Please sign in to comment.