From b08067a07db0603cac58a6e5d7ca19a3bd7a9405 Mon Sep 17 00:00:00 2001 From: Dain Nilsson Date: Mon, 7 Oct 2024 14:47:39 +0200 Subject: [PATCH 1/5] Refactor action signatures and add more types --- helper/helper/__init__.py | 16 ++-- helper/helper/base.py | 11 ++- helper/helper/device.py | 26 +++--- helper/helper/fido.py | 29 +++--- helper/helper/management.py | 57 ++++++++---- helper/helper/oath.py | 97 ++++++++++--------- helper/helper/piv.py | 163 ++++++++++++++++---------------- helper/helper/yubiotp.py | 76 ++++++++------- helper/poetry.lock | 179 ++++++++++++++++-------------------- helper/pyproject.toml | 2 +- 10 files changed, 334 insertions(+), 322 deletions(-) diff --git a/helper/helper/__init__.py b/helper/helper/__init__.py index aa8527d9d..791002296 100644 --- a/helper/helper/__init__.py +++ b/helper/helper/__init__.py @@ -17,7 +17,7 @@ from queue import Queue from threading import Thread, Event -from typing import Callable, Dict, List +from typing import Callable import json import logging @@ -78,14 +78,14 @@ def _handle_incoming(event, recv, error, cmd_queue): def process( - send: Callable[[Dict], None], - recv: Callable[[], Dict], - handler: Callable[[str, List, Dict, Event, Callable[[str], None]], RpcResponse], + send: Callable[[dict], None], + recv: Callable[[], dict], + handler: Callable[[str, list, dict, Event, Callable[[str], None]], RpcResponse], ) -> None: - def error(status: str, message: str, body: Dict = {}): + def error(status: str, message: str, body: dict = {}): send(dict(kind="error", status=status, message=message, body=body)) - def signal(status: str, body: Dict = {}): + def signal(status: str, body: dict = {}): send(dict(kind="signal", status=status, body=body)) def success(response: RpcResponse): @@ -121,8 +121,8 @@ def success(response: RpcResponse): def run_rpc( - send: Callable[[Dict], None], - recv: Callable[[], Dict], + send: Callable[[dict], None], + recv: Callable[[], dict], ) -> None: process(send, recv, RootNode()) diff --git a/helper/helper/base.py b/helper/helper/base.py index c894b10ff..3429e142e 100644 --- a/helper/helper/base.py +++ b/helper/helper/base.py @@ -15,6 +15,7 @@ from yubikit.core import InvalidPinError from functools import partial +import inspect import logging logger = logging.getLogger(__name__) @@ -127,7 +128,13 @@ def __call__(self, action, target, params, event, signal, traversed=None): action, target[1:], params, event, signal, traversed ) elif action in self.list_actions(): - response = self.get_action(action)(params, event, signal) + action_f = self.get_action(action) + args = inspect.signature(action_f).parameters + if "event" in args: + params["event"] = event + if "signal" in args: + params["signal"] = signal + response = action_f(**params) elif action in self.list_children(): traversed += [action] response = self.get_child(action)( @@ -224,7 +231,7 @@ def get_child(self, name): return self._child @action - def get(self, params, event, signal): + def get(self): return dict( data=self.get_data(), actions=self.list_actions(), diff --git a/helper/helper/device.py b/helper/helper/device.py index 1417dd860..de62709c0 100644 --- a/helper/helper/device.py +++ b/helper/helper/device.py @@ -55,7 +55,7 @@ from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from hashlib import sha256 from dataclasses import asdict -from typing import Mapping, Tuple +from typing import Mapping import os import sys @@ -113,19 +113,19 @@ def nfc(self): return self._readers @action - def diagnose(self, *ignored): + def diagnose(self): return dict(diagnostics=get_diagnostics()) @action(closes_child=False) - def logging(self, params, event, signal): - level = LOG_LEVEL[params["level"].upper()] - set_log_level(level) - logger.info(f"Log level set to: {level.name}") + def logging(self, level: str): + lvl = LOG_LEVEL[level.upper()] + set_log_level(lvl) + logger.info(f"Log level set to: {lvl.name}") return dict() @action(closes_child=False) - def qr(self, params, event, signal): - return dict(result=scan_qr(params.get("image"))) + def qr(self, image: str | None = None): + return dict(result=scan_qr(image)) def _id_from_fingerprint(fp): @@ -142,7 +142,7 @@ def __init__(self): self._reader_mapping = {} @action(closes_child=False) - def scan(self, *ignored): + def scan(self): return self.list_children() def list_children(self): @@ -173,7 +173,7 @@ def create_child(self, name): class _ScanDevices: def __init__(self): - self._state: Tuple[Mapping[PID, int], int] = ({}, 0) + self._state: tuple[Mapping[PID, int], int] = ({}, 0) self._caching = False def __call__(self): @@ -225,7 +225,7 @@ def close(self): super().close() @action(closes_child=False) - def scan(self, *ignored): + def scan(self): return self.get_data() def get_data(self): @@ -460,8 +460,8 @@ def _refresh_data(self): return dict(present=False, status="no-card") @action(closes_child=False) - def get(self, params, event, signal): - return super().get(params, event, signal) + def get(self): + return super().get() @child def ccid(self): diff --git a/helper/helper/fido.py b/helper/helper/fido.py index bfadb7bc2..2f2ac7f52 100644 --- a/helper/helper/fido.py +++ b/helper/helper/fido.py @@ -177,7 +177,7 @@ def _prepare_reset_usb(device, event, signal): raise TimeoutException() @action - def reset(self, params, event, signal): + def reset(self, event, signal): target = _ctap_id(self.ctap) device = self.ctap.device if isinstance(device, CtapPcscDevice): @@ -206,8 +206,7 @@ def reset(self, params, event, signal): return RpcResponse(dict(), ["device_info", "device_closed"]) @action(condition=lambda self: self._info.options["clientPin"]) - def unlock(self, params, event, signal): - pin = params.pop("pin") + def unlock(self, pin: str): permissions = ClientPin.PERMISSION(0) if CredentialManagement.is_supported(self._info): permissions |= ClientPin.PERMISSION.CREDENTIAL_MGMT @@ -227,18 +226,14 @@ def unlock(self, params, event, signal): return _handle_pin_error(e, self.client_pin) @action - def set_pin(self, params, event, signal): + def set_pin(self, new_pin: str, pin: str | None = None): has_pin = self.ctap.get_info().options["clientPin"] try: if has_pin: - self.client_pin.change_pin( - params.pop("pin"), - params.pop("new_pin"), - ) + assert pin # nosec + self.client_pin.change_pin(pin, new_pin) else: - self.client_pin.set_pin( - params.pop("new_pin"), - ) + self.client_pin.set_pin(new_pin) self._info = self.ctap.get_info() return RpcResponse(dict(), ["device_info"]) except CtapError as e: @@ -246,7 +241,7 @@ def set_pin(self, params, event, signal): return _handle_pin_error(e, self.client_pin) @action(condition=lambda self: Config.is_supported(self._info)) - def enable_ep_attestation(self, params, event, signal): + def enable_ep_attestation(self): if self._info.options["clientPin"] and not self._token: raise AuthRequiredException() config = Config(self.ctap, self.client_pin.protocol, self._token) @@ -344,7 +339,7 @@ def get_data(self): return self.data @action - def delete(self, params, event, signal): + def delete(self): self.credman.delete_cred(self.data["credential_id"]) self.refresh_rps() return dict() @@ -379,8 +374,7 @@ def create_child(self, name): return super().create_child(name) @action - def add(self, params, event, signal): - name = params.get("name", None) + def add(self, event, signal, name: str | None = None): enroller = self.bio.enroll() template_id = None while template_id is None: @@ -411,15 +405,14 @@ def get_data(self): return dict(template_id=self.template_id, name=self.name) @action - def rename(self, params, event, signal): - name = params.pop("name") + def rename(self, name: str): self.bio.set_name(self.template_id, name) self.name = name self.refresh() return dict() @action - def delete(self, params, event, signal): + def delete(self): self.bio.remove_enrollment(self.template_id) self.refresh() return dict() diff --git a/helper/helper/management.py b/helper/helper/management.py index 5474f8e21..cccfee3f2 100644 --- a/helper/helper/management.py +++ b/helper/helper/management.py @@ -13,11 +13,23 @@ # limitations under the License. from .base import RpcResponse, RpcNode, action -from yubikit.core import require_version, NotSupportedError, TRANSPORT, Connection +from yubikit.core import ( + require_version, + NotSupportedError, + TRANSPORT, + USB_INTERFACE, + Connection, +) from yubikit.core.smartcard import SmartCardConnection from yubikit.core.otp import OtpConnection from yubikit.core.fido import FidoConnection -from yubikit.management import ManagementSession, DeviceConfig, Mode, CAPABILITY +from yubikit.management import ( + ManagementSession, + DeviceConfig, + Mode, + CAPABILITY, + DEVICE_FLAG, +) from ykman.device import list_all_devices from dataclasses import asdict from time import sleep @@ -75,18 +87,26 @@ def _await_reboot(self, serial, usb_enabled): logger.warning("Timed out waiting for device") @action - def configure(self, params, event, signal): - reboot = params.pop("reboot", False) - cur_lock_code = bytes.fromhex(params.pop("cur_lock_code", "")) or None - new_lock_code = bytes.fromhex(params.pop("new_lock_code", "")) or None + def configure( + self, + reboot: bool = False, + cur_lock_code: str = "", + new_lock_code: str = "", + enabled_capabilities: dict = {}, + auto_eject_timeout: int | None = None, + challenge_response_timeout: int | None = None, + device_flags: int | None = None, + ): + cur_code = bytes.fromhex(cur_lock_code) or None + new_code = bytes.fromhex(new_lock_code) or None config = DeviceConfig( - params.pop("enabled_capabilities", {}), - params.pop("auto_eject_timeout", None), - params.pop("challenge_response_timeout", None), - params.pop("device_flags", None), + enabled_capabilities, + auto_eject_timeout, + challenge_response_timeout, + DEVICE_FLAG(device_flags) if device_flags else None, ) serial = self.session.read_device_info().serial - self.session.write_device_config(config, reboot, cur_lock_code, new_lock_code) + self.session.write_device_config(config, reboot, cur_code, new_code) flags = ["device_info"] if reboot: enabled = config.enabled_capabilities.get(TRANSPORT.USB) @@ -95,17 +115,22 @@ def configure(self, params, event, signal): return RpcResponse(dict(), flags) @action - def set_mode(self, params, event, signal): + def set_mode( + self, + interfaces: int, + challenge_response_timeout: int = 0, + auto_eject_timeout: int | None = None, + ): self.session.set_mode( - Mode(params["interfaces"]), - params.pop("challenge_response_timeout", 0), - params.pop("auto_eject_timeout"), + Mode(USB_INTERFACE(interfaces)), + challenge_response_timeout, + auto_eject_timeout, ) return dict() @action( condition=lambda self: issubclass(self._connection_type, SmartCardConnection) ) - def device_reset(self, params, event, signal): + def device_reset(self): self.session.device_reset() return RpcResponse(dict(), ["device_info"]) diff --git a/helper/helper/oath.py b/helper/helper/oath.py index c56efbbfb..c54016af2 100644 --- a/helper/helper/oath.py +++ b/helper/helper/oath.py @@ -118,11 +118,11 @@ def get_data(self): ) @action - def derive(self, params, event, signal): - return dict(key=self.session.derive_key(params.pop("password"))) + def derive(self, password: str): + return dict(key=self.session.derive_key(password)) @action - def forget(self, params, event, signal): + def forget(self): keys = self._get_keys() del keys[self.session.device_id] keys.write() @@ -142,15 +142,13 @@ def _remember_key(self, key): else: return False - def _get_key(self, params): - has_key = "key" in params - has_pw = "password" in params - if has_key and has_pw: + def _get_key(self, key: str | None, password: str | None): + if key and password: raise ValueError("Only one of 'key' and 'password' can be provided.") - if has_pw: - return self.session.derive_key(params.pop("password")) - if has_key: - return decode_bytes(params.pop("key")) + if password: + return self.session.derive_key(password) + if key: + return decode_bytes(key) raise ValueError("One of 'key' and 'password' must be provided.") def _set_key_verifier(self, key): @@ -163,12 +161,16 @@ def _do_validate(self, key): self._set_key_verifier(key) @action - def validate(self, params, event, signal): - remember = params.pop("remember", False) - key = self._get_key(params) + def validate( + self, + key: str | None = None, + password: str | None = None, + remember: bool = False, + ): + access_key = self._get_key(key, password) if self.session.locked: try: - self._do_validate(key) + self._do_validate(access_key) valid = True except ApduError as e: if e.sw == SW.INCORRECT_PARAMETERS: @@ -177,34 +179,38 @@ def validate(self, params, event, signal): raise e elif self._key_verifier: salt, digest = self._key_verifier - verify = hmac.new(salt, key, "sha256").digest() + verify = hmac.new(salt, access_key, "sha256").digest() valid = hmac.compare_digest(digest, verify) else: valid = False if valid and remember: - remembered = self._remember_key(key) + remembered = self._remember_key(access_key) else: remembered = False return dict(valid=valid, remembered=remembered) @action - def set_key(self, params, event, signal): - remember = params.pop("remember", False) - key = self._get_key(params) - self.session.set_key(key) - self._set_key_verifier(key) - remember &= self._remember_key(key if remember else None) + def set_key( + self, + key: str | None = None, + password: str | None = None, + remember: bool = False, + ): + access_key = self._get_key(key, password) + self.session.set_key(access_key) + self._set_key_verifier(access_key) + remember &= self._remember_key(access_key if remember else None) return RpcResponse(dict(remembered=remember), ["device_info"]) @action(condition=lambda self: self.session.has_key) - def unset_key(self, params, event, signal): + def unset_key(self): self.session.unset_key() self._key_verifier = None self._remember_key(None) return dict() @action - def reset(self, params, event, signal): + def reset(self): self.session.reset() self._key_verifier = None self._remember_key(None) @@ -240,8 +246,7 @@ def create_child(self, name): return super().create_child(name) @action - def calculate_all(self, params, event, signal): - timestamp = params.pop("timestamp", None) + def calculate_all(self, timestamp: int | None = None): result = self.session.calculate_all(timestamp) return dict( entries=[ @@ -251,19 +256,22 @@ def calculate_all(self, params, event, signal): ) @action - def put(self, params, event, signal): - require_touch = params.pop("require_touch", False) - if "uri" in params: - data = CredentialData.parse_uri(params.pop("uri")) - if params: + def put( + self, + require_touch: bool = False, + **kwargs, + ): + if "uri" in kwargs: + data = CredentialData.parse_uri(kwargs.pop("uri")) + if kwargs: raise ValueError("Unsupported parameters present") else: data = CredentialData( - params.pop("name"), - OATH_TYPE[params.pop("oath_type").upper()], - HASH_ALGORITHM[params.pop("hash", "sha1".upper())], - decode_bytes(params.pop("secret")), - **params, + kwargs.pop("name"), + OATH_TYPE[kwargs.pop("oath_type").upper()], + HASH_ALGORITHM[kwargs.pop("hash", "sha1".upper())], + decode_bytes(kwargs.pop("secret")), + **kwargs, ) if data.get_id() in self._creds: @@ -322,32 +330,29 @@ def on_timeout(): timer.cancel() @action - def code(self, params, event, signal): - timestamp = params.pop("timestamp", None) + def code(self, signal, timestamp: int | None = None): code = self._do_with_touch( signal, lambda: self.session.calculate_code(self.credential, timestamp) ) return asdict(code) @action - def calculate(self, params, event, signal): - challenge = decode_bytes(params.pop("challenge")) + def calculate(self, signal, challenge: str): response = self._do_with_touch( - signal, lambda: self.session.calculate(self.credential.id, challenge) + signal, + lambda: self.session.calculate(self.credential.id, decode_bytes(challenge)), ) return dict(response=response) @action - def delete(self, params, event, signal): + def delete(self): self.session.delete_credential(self.credential.id) self.refresh() self.credential = None return dict() @action(condition=lambda self: self._require_version(5, 3, 1)) - def rename(self, params, event, signal): - name = params.pop("name") - issuer = params.pop("issuer", None) + def rename(self, name: str, issuer: str | None = None): try: new_id = self.session.rename_credential(self.credential.id, name, issuer) self.refresh() diff --git a/helper/helper/piv.py b/helper/helper/piv.py index 7045343b9..0ad166fcd 100644 --- a/helper/helper/piv.py +++ b/helper/helper/piv.py @@ -178,9 +178,7 @@ def _authenticate(self, key, signal): self._authenticated = True @action - def verify_pin(self, params, event, signal): - pin = params.pop("pin") - + def verify_pin(self, signal, pin: str): self.session.verify_pin(pin) key = None @@ -203,10 +201,9 @@ def verify_pin(self, params, event, signal): return dict(status=True, authenticated=self._authenticated) @action - def authenticate(self, params, event, signal): - key = bytes.fromhex(params.pop("key")) + def authenticate(self, signal, key: str): try: - self._authenticate(key, signal) + self._authenticate(bytes.fromhex(key), signal) return dict(status=True) except ApduError as e: if e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: @@ -214,38 +211,41 @@ def authenticate(self, params, event, signal): raise @action(condition=lambda self: self._authenticated) - def set_key(self, params, event, signal): - key_type = MANAGEMENT_KEY_TYPE(params.pop("key_type", MANAGEMENT_KEY_TYPE.TDES)) - key = bytes.fromhex(params.pop("key")) - store_key = params.pop("store_key", False) - pivman_set_mgm_key(self.session, key, key_type, False, store_key) + def set_key( + self, + params, + key: str, + key_type: int = MANAGEMENT_KEY_TYPE.TDES, + store_key: bool = False, + ): + pivman_set_mgm_key( + self.session, + bytes.fromhex(key), + MANAGEMENT_KEY_TYPE(key_type), + False, + store_key, + ) self._pivman_data = get_pivman_data(self.session) return RpcResponse(dict(), ["device_info"]) @action - def change_pin(self, params, event, signal): - old_pin = params.pop("pin") - new_pin = params.pop("new_pin") + def change_pin(self, pin: str, new_pin: str): try: - pivman_change_pin(self.session, old_pin, new_pin) + pivman_change_pin(self.session, pin, new_pin) return RpcResponse(dict(), ["device_info"]) except Exception as e: _handle_pin_puk_error(e) @action - def change_puk(self, params, event, signal): - old_puk = params.pop("puk") - new_puk = params.pop("new_puk") + def change_puk(self, puk: str, new_puk: str): try: - self.session.change_puk(old_puk, new_puk) + self.session.change_puk(puk, new_puk) return RpcResponse(dict(), ["device_info"]) except Exception as e: _handle_pin_puk_error(e) @action - def unblock_pin(self, params, event, signal): - puk = params.pop("puk") - new_pin = params.pop("new_pin") + def unblock_pin(self, puk: str, new_pin: str): try: self.session.unblock_pin(puk, new_pin) return RpcResponse(dict(), ["device_info"]) @@ -253,7 +253,7 @@ def unblock_pin(self, params, event, signal): _handle_pin_puk_error(e) @action - def reset(self, params, event, signal): + def reset(self): self.session.reset() self._authenticated = False self._pivman_data = get_pivman_data(self.session) @@ -264,11 +264,9 @@ def slots(self): return SlotsNode(self.session) @action(closes_child=False) - def examine_file(self, params, event, signal): - data = bytes.fromhex(params.pop("data")) - password = params.pop("password", None) + def examine_file(self, data: str, password: str | None = None): try: - private_key, certs = _parse_file(data, password) + private_key, certs = _parse_file(bytes.fromhex(data), password) certificate = _choose_cert(certs) return dict( @@ -286,9 +284,9 @@ def examine_file(self, params, event, signal): return dict(status=False) @action(closes_child=False) - def validate_rfc4514(self, params, event, signal): + def validate_rfc4514(self, data: str): try: - parse_rfc4514_string(params.pop("data")) + parse_rfc4514_string(data) return dict(status=True) except ValueError: return dict(status=False) @@ -431,10 +429,7 @@ def get_data(self): ) @action(condition=lambda self: self.certificate or self.metadata) - def delete(self, params, event, signal): - delete_cert = params.pop("delete_cert", False) - delete_key = params.pop("delete_key", False) - + def delete(self, delete_cert: bool = False, delete_key: bool = False): if not delete_cert and not delete_key: raise ValueError("Missing delete option") @@ -448,19 +443,17 @@ def delete(self, params, event, signal): return dict() @action(condition=lambda self: self.metadata) - def move_key(self, params, event, signal): - destination = params.pop("destination") - overwrite_key = params.pop("overwrite_key") - include_certificate = params.pop("include_certificate") - + def move_key( + self, destination: str, overwrite_key: bool, include_certificate: bool + ): if include_certificate: source_object = self.session.get_object(OBJECT_ID.from_slot(self.slot)) - destination = SLOT(int(destination, base=16)) + dest = SLOT(int(destination, base=16)) if overwrite_key: - self.session.delete_key(destination) - self.session.move_key(self.slot, destination) + self.session.delete_key(dest) + self.session.move_key(self.slot, dest) if include_certificate: - self.session.put_object(OBJECT_ID.from_slot(destination), source_object) + self.session.put_object(OBJECT_ID.from_slot(dest), source_object) self.session.delete_certificate(self.slot) self.session.put_object(OBJECT_ID.CHUID, generate_chuid()) self.certificate = None @@ -468,12 +461,9 @@ def move_key(self, params, event, signal): return dict() @action - def import_file(self, params, event, signal): - data = bytes.fromhex(params.pop("data")) - password = params.pop("password", None) - + def import_file(self, data: str, password: str | None = None, **kwargs): try: - private_key, certs = _parse_file(data, password) + private_key, certs = _parse_file(bytes.fromhex(data), password) except InvalidPasswordError: logger.debug("Invalid or missing password", exc_info=True) raise ValueError("Wrong/Missing password") @@ -484,9 +474,9 @@ def import_file(self, params, event, signal): metadata = None if private_key: - pin_policy = PIN_POLICY(params.pop("pin_policy", PIN_POLICY.DEFAULT)) + pin_policy = PIN_POLICY(kwargs.pop("pin_policy", PIN_POLICY.DEFAULT)) touch_policy = TOUCH_POLICY( - params.pop("touch_policy", TOUCH_POLICY.DEFAULT) + kwargs.pop("touch_policy", TOUCH_POLICY.DEFAULT) ) self.session.put_key(self.slot, private_key, pin_policy, touch_policy) try: @@ -522,16 +512,23 @@ def import_file(self, params, event, signal): return RpcResponse(response, ["device_info"]) @action - def generate(self, params, event, signal): - key_type = KEY_TYPE(params.pop("key_type")) - pin_policy = PIN_POLICY(params.pop("pin_policy", PIN_POLICY.DEFAULT)) - touch_policy = TOUCH_POLICY(params.pop("touch_policy", TOUCH_POLICY.DEFAULT)) - subject = params.pop("subject") - generate_type = GENERATE_TYPE( - params.pop("generate_type", GENERATE_TYPE.CERTIFICATE) - ) + def generate( + self, + signal, + key_type: int, + pin_policy: int = PIN_POLICY.DEFAULT, + touch_policy: int = TOUCH_POLICY.DEFAULT, + generate_type: str = GENERATE_TYPE.CERTIFICATE, + subject: str | None = None, + pin: str | None = None, + **kwargs, + ): + generate_type = GENERATE_TYPE(generate_type) public_key = self.session.generate_key( - self.slot, key_type, pin_policy, touch_policy + self.slot, + KEY_TYPE(key_type), + PIN_POLICY(pin_policy), + TOUCH_POLICY(touch_policy), ) public_key_pem = public_key.public_bytes( encoding=Encoding.PEM, format=PublicFormat.SubjectPublicKeyInfo @@ -539,35 +536,37 @@ def generate(self, params, event, signal): if pin_policy != PIN_POLICY.NEVER: # TODO: Check if verified? - pin = params.pop("pin") self.session.verify_pin(pin) if touch_policy in (TOUCH_POLICY.ALWAYS, TOUCH_POLICY.CACHED): signal("touch") - if generate_type == GENERATE_TYPE.PUBLIC_KEY: - result = public_key_pem - elif generate_type == GENERATE_TYPE.CSR: - csr = generate_csr(self.session, self.slot, public_key, subject) - result = csr.public_bytes(encoding=Encoding.PEM).decode() - elif generate_type == GENERATE_TYPE.CERTIFICATE: - now = datetime.datetime.utcnow() - then = now + datetime.timedelta(days=365) - valid_from = params.pop("valid_from", now.strftime(_date_format)) - valid_to = params.pop("valid_to", then.strftime(_date_format)) - cert = generate_self_signed_certificate( - self.session, - self.slot, - public_key, - subject, - datetime.datetime.strptime(valid_from, _date_format), - datetime.datetime.strptime(valid_to, _date_format), - ) - result = cert.public_bytes(encoding=Encoding.PEM).decode() - self.session.put_certificate(self.slot, cert) - self.session.put_object(OBJECT_ID.CHUID, generate_chuid()) - else: - raise ValueError(f"Unsupported GENERATE_TYPE: {generate_type}") + match GENERATE_TYPE(generate_type): + case GENERATE_TYPE.PUBLIC_KEY: + result = public_key_pem + case GENERATE_TYPE.CSR: + assert subject # nosec + csr = generate_csr(self.session, self.slot, public_key, subject) + result = csr.public_bytes(encoding=Encoding.PEM).decode() + case GENERATE_TYPE.CERTIFICATE: + assert subject # nosec + now = datetime.datetime.utcnow() + then = now + datetime.timedelta(days=365) + valid_from = kwargs.pop("valid_from", now.strftime(_date_format)) + valid_to = kwargs.pop("valid_to", then.strftime(_date_format)) + cert = generate_self_signed_certificate( + self.session, + self.slot, + public_key, + subject, + datetime.datetime.strptime(valid_from, _date_format), + datetime.datetime.strptime(valid_to, _date_format), + ) + result = cert.public_bytes(encoding=Encoding.PEM).decode() + self.session.put_certificate(self.slot, cert) + self.session.put_object(OBJECT_ID.CHUID, generate_chuid()) + case other: + raise ValueError(f"Unsupported GENERATE_TYPE: {other}") self._refresh() diff --git a/helper/helper/yubiotp.py b/helper/helper/yubiotp.py index 5da9d057c..87ffacd1b 100644 --- a/helper/helper/yubiotp.py +++ b/helper/helper/yubiotp.py @@ -30,7 +30,6 @@ from yubikit.oath import parse_b32_key from ykman.scancodes import KEYBOARD_LAYOUT, encode -from typing import Dict import struct _FAIL_MSG = ( @@ -46,7 +45,7 @@ def __init__(self, connection, scp_params=None): def get_data(self): state = self.session.get_config_state() - data: Dict[str, bool] = {} + data: dict[str, bool] = {} try: data.update( slot1_configured=state.is_configured(SLOT.ONE), @@ -64,7 +63,7 @@ def get_data(self): return data @action - def swap(self, params, event, signal): + def swap(self): try: self.session.swap_slots() except CommandError: @@ -80,27 +79,33 @@ def two(self): return SlotNode(self.session, SLOT.TWO) @action(closes_child=False) - def serial_modhex(self, params, event, signal): - serial = params["serial"] + def serial_modhex(self, serial: int): return dict(encoded=modhex_encode(b"\xff\x00" + struct.pack(b">I", serial))) @action(closes_child=False) - def generate_static(self, params, event, signal): - layout, length = params["layout"], int(params["length"]) + def generate_static(self, length: int, layout: str): return dict(password=generate_static_pw(length, KEYBOARD_LAYOUT[layout])) @action(closes_child=False) - def keyboard_layouts(self, params, event, signal): + def keyboard_layouts(self): return {layout.name: [sc for sc in layout.value] for layout in KEYBOARD_LAYOUT} @action(closes_child=False) - def format_yubiotp_csv(self, params, even, signal): - serial = params["serial"] - public_id = modhex_decode(params["public_id"]) - private_id = bytes.fromhex(params["private_id"]) - key = bytes.fromhex(params["key"]) - - return dict(csv=format_csv(serial, public_id, private_id, key)) + def format_yubiotp_csv( + self, + serial: int, + public_id: str, + private_id: str, + key: str, + ): + return dict( + csv=format_csv( + serial, + modhex_decode(public_id), + bytes.fromhex(private_id), + bytes.fromhex(key), + ) + ) _CONFIG_TYPES = dict( @@ -121,7 +126,7 @@ def __init__(self, session, slot): def get_data(self): self._state = self.session.get_config_state() - data: Dict[str, bool] = {} + data: dict[str, bool] = {} try: data.update(is_configured=self._state.is_configured(self.slot)) data.update(is_touch_triggered=self._state.is_touch_triggered(self.slot)) @@ -149,19 +154,19 @@ def _can_calculate(self, slot): return False @action(condition=lambda self: self._maybe_configured(self.slot)) - def delete(self, params, event, signal): + def delete(self, curr_acc_code: str | None = None): try: - access_code = params.pop("curr_acc_code", None) - access_code = bytes.fromhex(access_code) if access_code else None + access_code = bytes.fromhex(curr_acc_code) if curr_acc_code else None self.session.delete_slot(self.slot, access_code) return dict() except CommandError: raise ValueError(_FAIL_MSG) @action(condition=lambda self: self._can_calculate(self.slot)) - def calculate(self, params, event, signal): - challenge = bytes.fromhex(params.pop("challenge")) - response = self.session.calculate_hmac_sha1(self.slot, challenge, event) + def calculate(self, event, challenge: str): + response = self.session.calculate_hmac_sha1( + self.slot, bytes.fromhex(challenge), event + ) return dict(response=response) def _apply_options(self, config, options): @@ -221,14 +226,11 @@ def _get_config(self, type, **kwargs): return config @action - def put(self, params, event, signal): - type = params.pop("type") - options = params.pop("options", {}) - access_code = params.pop("curr_acc_code", None) - access_code = bytes.fromhex(access_code) if access_code else None - args = params - - config = self._get_config(type, **args) + def put( + self, type: str, options: dict = {}, curr_acc_code: str | None = None, **kwargs + ): + access_code = bytes.fromhex(curr_acc_code) if curr_acc_code else None + config = self._get_config(type, **kwargs) self._apply_options(config, options) try: self.session.put_configuration( @@ -245,13 +247,19 @@ def put(self, params, event, signal): condition=lambda self: self._state.version >= (2, 2, 0) and self._maybe_configured(self.slot) ) - def update(self, params, event, signal): + def update( + self, + params, + acc_code: str | None = None, + curr_acc_code: str | None = None, + **kwargs + ): config = UpdateConfiguration() - self._apply_options(config, params) + self._apply_options(config, kwargs) self.session.update_configuration( self.slot, config, - params.pop("acc_code", None), - params.pop("cur_acc_code", None), + bytes.fromhex(acc_code) if acc_code else None, + bytes.fromhex(curr_acc_code) if curr_acc_code else None, ) return dict() diff --git a/helper/poetry.lock b/helper/poetry.lock index b27379c53..c34efcb39 100644 --- a/helper/poetry.lock +++ b/helper/poetry.lock @@ -141,52 +141,52 @@ markers = {main = "platform_system == \"Windows\"", dev = "sys_platform == \"win [[package]] name = "cryptography" -version = "43.0.3" +version = "44.0.0" description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers." optional = false -python-versions = ">=3.7" +python-versions = "!=3.9.0,!=3.9.1,>=3.7" groups = ["main"] files = [ - {file = "cryptography-43.0.3-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:bf7a1932ac4176486eab36a19ed4c0492da5d97123f1406cf15e41b05e787d2e"}, - {file = "cryptography-43.0.3-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:63efa177ff54aec6e1c0aefaa1a241232dcd37413835a9b674b6e3f0ae2bfd3e"}, - {file = "cryptography-43.0.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e1ce50266f4f70bf41a2c6dc4358afadae90e2a1e5342d3c08883df1675374f"}, - {file = "cryptography-43.0.3-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:443c4a81bb10daed9a8f334365fe52542771f25aedaf889fd323a853ce7377d6"}, - {file = "cryptography-43.0.3-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:74f57f24754fe349223792466a709f8e0c093205ff0dca557af51072ff47ab18"}, - {file = "cryptography-43.0.3-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:9762ea51a8fc2a88b70cf2995e5675b38d93bf36bd67d91721c309df184f49bd"}, - {file = "cryptography-43.0.3-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:81ef806b1fef6b06dcebad789f988d3b37ccaee225695cf3e07648eee0fc6b73"}, - {file = "cryptography-43.0.3-cp37-abi3-win32.whl", hash = "sha256:cbeb489927bd7af4aa98d4b261af9a5bc025bd87f0e3547e11584be9e9427be2"}, - {file = "cryptography-43.0.3-cp37-abi3-win_amd64.whl", hash = "sha256:f46304d6f0c6ab8e52770addfa2fc41e6629495548862279641972b6215451cd"}, - {file = "cryptography-43.0.3-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:8ac43ae87929a5982f5948ceda07001ee5e83227fd69cf55b109144938d96984"}, - {file = "cryptography-43.0.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:846da004a5804145a5f441b8530b4bf35afbf7da70f82409f151695b127213d5"}, - {file = "cryptography-43.0.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0f996e7268af62598f2fc1204afa98a3b5712313a55c4c9d434aef49cadc91d4"}, - {file = "cryptography-43.0.3-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:f7b178f11ed3664fd0e995a47ed2b5ff0a12d893e41dd0494f406d1cf555cab7"}, - {file = "cryptography-43.0.3-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:c2e6fc39c4ab499049df3bdf567f768a723a5e8464816e8f009f121a5a9f4405"}, - {file = "cryptography-43.0.3-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:e1be4655c7ef6e1bbe6b5d0403526601323420bcf414598955968c9ef3eb7d16"}, - {file = "cryptography-43.0.3-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:df6b6c6d742395dd77a23ea3728ab62f98379eff8fb61be2744d4679ab678f73"}, - {file = "cryptography-43.0.3-cp39-abi3-win32.whl", hash = "sha256:d56e96520b1020449bbace2b78b603442e7e378a9b3bd68de65c782db1507995"}, - {file = "cryptography-43.0.3-cp39-abi3-win_amd64.whl", hash = "sha256:0c580952eef9bf68c4747774cde7ec1d85a6e61de97281f2dba83c7d2c806362"}, - {file = "cryptography-43.0.3-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:d03b5621a135bffecad2c73e9f4deb1a0f977b9a8ffe6f8e002bf6c9d07b918c"}, - {file = "cryptography-43.0.3-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a2a431ee15799d6db9fe80c82b055bae5a752bef645bba795e8e52687c69efe3"}, - {file = "cryptography-43.0.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:281c945d0e28c92ca5e5930664c1cefd85efe80e5c0d2bc58dd63383fda29f83"}, - {file = "cryptography-43.0.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:f18c716be16bc1fea8e95def49edf46b82fccaa88587a45f8dc0ff6ab5d8e0a7"}, - {file = "cryptography-43.0.3-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4a02ded6cd4f0a5562a8887df8b3bd14e822a90f97ac5e544c162899bc467664"}, - {file = "cryptography-43.0.3-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:53a583b6637ab4c4e3591a15bc9db855b8d9dee9a669b550f311480acab6eb08"}, - {file = "cryptography-43.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:1ec0bcf7e17c0c5669d881b1cd38c4972fade441b27bda1051665faaa89bdcaa"}, - {file = "cryptography-43.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:2ce6fae5bdad59577b44e4dfed356944fbf1d925269114c28be377692643b4ff"}, - {file = "cryptography-43.0.3.tar.gz", hash = "sha256:315b9001266a492a6ff443b61238f956b214dbec9910a081ba5b6646a055a805"}, + {file = "cryptography-44.0.0-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:84111ad4ff3f6253820e6d3e58be2cc2a00adb29335d4cacb5ab4d4d34f2a123"}, + {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b15492a11f9e1b62ba9d73c210e2416724633167de94607ec6069ef724fad092"}, + {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:831c3c4d0774e488fdc83a1923b49b9957d33287de923d58ebd3cec47a0ae43f"}, + {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:761817a3377ef15ac23cd7834715081791d4ec77f9297ee694ca1ee9c2c7e5eb"}, + {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3c672a53c0fb4725a29c303be906d3c1fa99c32f58abe008a82705f9ee96f40b"}, + {file = "cryptography-44.0.0-cp37-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:4ac4c9f37eba52cb6fbeaf5b59c152ea976726b865bd4cf87883a7e7006cc543"}, + {file = "cryptography-44.0.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:ed3534eb1090483c96178fcb0f8893719d96d5274dfde98aa6add34614e97c8e"}, + {file = "cryptography-44.0.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:f3f6fdfa89ee2d9d496e2c087cebef9d4fcbb0ad63c40e821b39f74bf48d9c5e"}, + {file = "cryptography-44.0.0-cp37-abi3-win32.whl", hash = "sha256:eb33480f1bad5b78233b0ad3e1b0be21e8ef1da745d8d2aecbb20671658b9053"}, + {file = "cryptography-44.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:abc998e0c0eee3c8a1904221d3f67dcfa76422b23620173e28c11d3e626c21bd"}, + {file = "cryptography-44.0.0-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:660cb7312a08bc38be15b696462fa7cc7cd85c3ed9c576e81f4dc4d8b2b31591"}, + {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1923cb251c04be85eec9fda837661c67c1049063305d6be5721643c22dd4e2b7"}, + {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:404fdc66ee5f83a1388be54300ae978b2efd538018de18556dde92575e05defc"}, + {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:c5eb858beed7835e5ad1faba59e865109f3e52b3783b9ac21e7e47dc5554e289"}, + {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f53c2c87e0fb4b0c00fa9571082a057e37690a8f12233306161c8f4b819960b7"}, + {file = "cryptography-44.0.0-cp39-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:9e6fc8a08e116fb7c7dd1f040074c9d7b51d74a8ea40d4df2fc7aa08b76b9e6c"}, + {file = "cryptography-44.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:d2436114e46b36d00f8b72ff57e598978b37399d2786fd39793c36c6d5cb1c64"}, + {file = "cryptography-44.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a01956ddfa0a6790d594f5b34fc1bfa6098aca434696a03cfdbe469b8ed79285"}, + {file = "cryptography-44.0.0-cp39-abi3-win32.whl", hash = "sha256:eca27345e1214d1b9f9490d200f9db5a874479be914199194e746c893788d417"}, + {file = "cryptography-44.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:708ee5f1bafe76d041b53a4f95eb28cdeb8d18da17e597d46d7833ee59b97ede"}, + {file = "cryptography-44.0.0-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:37d76e6863da3774cd9db5b409a9ecfd2c71c981c38788d3fcfaf177f447b731"}, + {file = "cryptography-44.0.0-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:f677e1268c4e23420c3acade68fac427fffcb8d19d7df95ed7ad17cdef8404f4"}, + {file = "cryptography-44.0.0-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:f5e7cb1e5e56ca0933b4873c0220a78b773b24d40d186b6738080b73d3d0a756"}, + {file = "cryptography-44.0.0-pp310-pypy310_pp73-manylinux_2_34_aarch64.whl", hash = "sha256:8b3e6eae66cf54701ee7d9c83c30ac0a1e3fa17be486033000f2a73a12ab507c"}, + {file = "cryptography-44.0.0-pp310-pypy310_pp73-manylinux_2_34_x86_64.whl", hash = "sha256:be4ce505894d15d5c5037167ffb7f0ae90b7be6f2a98f9a5c3442395501c32fa"}, + {file = "cryptography-44.0.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:62901fb618f74d7d81bf408c8719e9ec14d863086efe4185afd07c352aee1d2c"}, + {file = "cryptography-44.0.0.tar.gz", hash = "sha256:cd4e834f340b4293430701e772ec543b0fbe6c2dea510a5286fe0acabe153a02"}, ] [package.dependencies] cffi = {version = ">=1.12", markers = "platform_python_implementation != \"PyPy\""} [package.extras] -docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=1.1.1)"] -docstest = ["pyenchant (>=1.6.11)", "readme-renderer", "sphinxcontrib-spelling (>=4.0.1)"] -nox = ["nox"] -pep8test = ["check-sdist", "click", "mypy", "ruff"] -sdist = ["build"] +docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=3.0.0)"] +docstest = ["pyenchant (>=3)", "readme-renderer (>=30.0)", "sphinxcontrib-spelling (>=7.3.1)"] +nox = ["nox (>=2024.4.15)", "nox[uv] (>=2024.3.2)"] +pep8test = ["check-sdist", "click (>=8.0.1)", "mypy (>=1.4)", "ruff (>=0.3.6)"] +sdist = ["build (>=1.0.0)"] ssh = ["bcrypt (>=3.1.5)"] -test = ["certifi", "cryptography-vectors (==43.0.3)", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"] +test = ["certifi (>=2024)", "cryptography-vectors (==44.0.0)", "pretend (>=0.7)", "pytest (>=7.4.0)", "pytest-benchmark (>=4.0)", "pytest-cov (>=2.10.1)", "pytest-xdist (>=3.5.0)"] test-randomorder = ["pytest-randomly"] [[package]] @@ -229,12 +229,12 @@ version = "8.5.0" description = "Read metadata from Python packages" optional = false python-versions = ">=3.8" -groups = ["main", "dev"] +groups = ["main"] +markers = "python_version < \"3.12\"" files = [ {file = "importlib_metadata-8.5.0-py3-none-any.whl", hash = "sha256:45e54197d28b7a7f1559e60b95e7c567032b602131fbd588f1497f47880aa68b"}, {file = "importlib_metadata-8.5.0.tar.gz", hash = "sha256:71522656f0abace1d072b9e5481a48f07c138e00f079c38c8f883823f9c26bd7"}, ] -markers = {main = "python_version < \"3.12\"", dev = "python_version < \"3.10\""} [package.dependencies] zipp = ">=3.20" @@ -248,30 +248,6 @@ perf = ["ipython"] test = ["flufl.flake8", "importlib-resources (>=1.3)", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] type = ["pytest-mypy"] -[[package]] -name = "importlib-resources" -version = "6.4.5" -description = "Read resources from Python packages" -optional = false -python-versions = ">=3.8" -groups = ["main"] -markers = "python_version < \"3.9\"" -files = [ - {file = "importlib_resources-6.4.5-py3-none-any.whl", hash = "sha256:ac29d5f956f01d5e4bb63102a5a19957f1b9175e45649977264a1416783bb717"}, - {file = "importlib_resources-6.4.5.tar.gz", hash = "sha256:980862a1d16c9e147a59603677fa2aa5fd82b87f223b6cb870695bcfce830065"}, -] - -[package.dependencies] -zipp = {version = ">=3.1.0", markers = "python_version < \"3.10\""} - -[package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] -cover = ["pytest-cov"] -doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -enabler = ["pytest-enabler (>=2.2)"] -test = ["jaraco.test (>=5.4)", "pytest (>=6,!=8.1.*)", "zipp (>=3.17)"] -type = ["pytest-mypy"] - [[package]] name = "iniconfig" version = "2.0.0" @@ -364,19 +340,18 @@ trio = ["async_generator", "trio"] [[package]] name = "keyring" -version = "25.5.0" +version = "25.6.0" description = "Store and access your passwords safely." optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "keyring-25.5.0-py3-none-any.whl", hash = "sha256:e67f8ac32b04be4714b42fe84ce7dad9c40985b9ca827c592cc303e7c26d9741"}, - {file = "keyring-25.5.0.tar.gz", hash = "sha256:4c753b3ec91717fe713c4edd522d625889d8973a349b0e582622f49766de58e6"}, + {file = "keyring-25.6.0-py3-none-any.whl", hash = "sha256:552a3f7af126ece7ed5c89753650eec89c7eaae8617d0aa4d9ad2b75111266bd"}, + {file = "keyring-25.6.0.tar.gz", hash = "sha256:0b39998aa941431eb3d9b0d4b2460bc773b9df6fed7621c2dfb291a7e0187a66"}, ] [package.dependencies] -importlib-metadata = {version = ">=4.11.4", markers = "python_version < \"3.12\""} -importlib-resources = {version = "*", markers = "python_version < \"3.9\""} +importlib_metadata = {version = ">=4.11.4", markers = "python_version < \"3.12\""} "jaraco.classes" = "*" "jaraco.context" = "*" "jaraco.functools" = "*" @@ -686,7 +661,6 @@ files = [ [package.dependencies] altgraph = "*" -importlib-metadata = {version = ">=4.6", markers = "python_version < \"3.10\""} macholib = {version = ">=1.8", markers = "sys_platform == \"darwin\""} packaging = ">=22.0" pefile = {version = ">=2022.5.30,<2024.8.26 || >2024.8.26", markers = "sys_platform == \"win32\""} @@ -712,37 +686,38 @@ files = [ ] [package.dependencies] -importlib_metadata = {version = ">=4.6", markers = "python_version < \"3.10\""} packaging = ">=22.0" setuptools = ">=42.0.0" [[package]] name = "pyscard" -version = "2.1.1" +version = "2.2.0" description = "Smartcard module for Python." optional = false -python-versions = "*" +python-versions = ">=3.9" groups = ["main"] files = [ - {file = "pyscard-2.1.1-cp310-cp310-win32.whl", hash = "sha256:73738b401bd8ed56dc508e8c035fda876d3612f2fd6ebd1fd1ecb1f70c81280e"}, - {file = "pyscard-2.1.1-cp310-cp310-win_amd64.whl", hash = "sha256:d8e57488f311299b60c1067bb1ad123f0367a356b4b011383ca4d592d365a81e"}, - {file = "pyscard-2.1.1-cp311-cp311-win32.whl", hash = "sha256:07a35f1c2d6521f82c0e5b793f0968cc0b3c2473667435414eb467cb3e5b3765"}, - {file = "pyscard-2.1.1-cp311-cp311-win_amd64.whl", hash = "sha256:0fb446d519e9ebc437fabc0c797e869ccd9f0340f9d2d72d7edc628f22267ac3"}, - {file = "pyscard-2.1.1-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:df5fa9423b61a2528b91de7a171fe110b7bee7e1d36e1a7e479512014a9e90a2"}, - {file = "pyscard-2.1.1-cp312-cp312-win32.whl", hash = "sha256:bc1f2185e696261f6e29eb0d2beba0c86796e1b4f0b17cd69900c597b591be2a"}, - {file = "pyscard-2.1.1-cp312-cp312-win_amd64.whl", hash = "sha256:5294303aee527999accfe18aec5087223f3e0b164c323f6c62bdbde2d06fca79"}, - {file = "pyscard-2.1.1-cp37-cp37m-win32.whl", hash = "sha256:213cab68b10e3d66702a56d12b5f30a3096a27e25a39a168d292775509d34014"}, - {file = "pyscard-2.1.1-cp37-cp37m-win_amd64.whl", hash = "sha256:ee9925d8c69fd853f06fa637f6823463d6cf4353396f455c8e53028fdaa74342"}, - {file = "pyscard-2.1.1-cp38-cp38-win32.whl", hash = "sha256:9e2ee52e4420125af3bf67b7f4dcb80b7f7242e1482d247c13698ae1727a7d00"}, - {file = "pyscard-2.1.1-cp38-cp38-win_amd64.whl", hash = "sha256:b9bdd5255a69b28e32a79152a66e8ac135d67eb4f81d0e2834a12e263a50f62b"}, - {file = "pyscard-2.1.1-cp39-cp39-win32.whl", hash = "sha256:486e2b52f4de1ecdfe2342fcabb8991d9e8ba713d33cb1a73ad9972fead846ff"}, - {file = "pyscard-2.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:68c0cbaec6468e2eac599ca90efa2b0427721f9f3ca8dbcf7c5ca3b996045984"}, - {file = "pyscard-2.1.1.tar.gz", hash = "sha256:f9b0dc3fad83ac72a9335af4d04b608edc9d01e2b90e0c38ed0ef1fd014c4414"}, + {file = "pyscard-2.2.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:05cfa9840b3f4b08769487e4d84b1432d9d913035a3726856329c2648444a6ba"}, + {file = "pyscard-2.2.0-cp310-cp310-win32.whl", hash = "sha256:c363a36a803cd3e6334546d661c0b1375c16ab42600bc4be18ade3ed70eae1a6"}, + {file = "pyscard-2.2.0-cp310-cp310-win_amd64.whl", hash = "sha256:001e760f42d2f9b7b6aba6b83fca67314e925d6df1ded75689c5ef5377f6e701"}, + {file = "pyscard-2.2.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:51758a04e70d07b233a5d7ed58492007bbbebcb3f47130a638edbe021b82df86"}, + {file = "pyscard-2.2.0-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:a138707d8ef5c53da4e7a86d1e604e2cba42fe92249099a67374624503825326"}, + {file = "pyscard-2.2.0-cp311-cp311-win_amd64.whl", hash = "sha256:45ec2a19cbc40dc56c9831f9979c81617a3a89265b1430708e1c137a82ec5c83"}, + {file = "pyscard-2.2.0-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:9a3d47a6799efbc8e6124638730a86b705ddecfba92874aec6b79348e764fe8a"}, + {file = "pyscard-2.2.0-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:fe8625b7cb1be2af0fe5d90d6daabb26a65aeb7d601455446c6a06341a2f0814"}, + {file = "pyscard-2.2.0-cp312-cp312-win32.whl", hash = "sha256:b0b476691652bb641b175d7d345bb27639615c6a5bf140e63a057750aefc1bd9"}, + {file = "pyscard-2.2.0-cp312-cp312-win_amd64.whl", hash = "sha256:9ce06b7007147f7346114274dd20edd0399e9aedbdadcc9cf4d0c867bec1cae1"}, + {file = "pyscard-2.2.0-cp313-cp313-macosx_14_0_x86_64.whl", hash = "sha256:5296c705d2f5b6947f6929a63d55521a380125c0160679b8069a11107c8b94ec"}, + {file = "pyscard-2.2.0-cp313-cp313-win32.whl", hash = "sha256:6cdb99c1d51f625fe246df57b65fad6d834a094fa5efd36d0c40356916c19431"}, + {file = "pyscard-2.2.0-cp313-cp313-win_amd64.whl", hash = "sha256:ff6f2b8ef00b48fa5f1448f0c7d812f677ce98a9994e917500ecf00e3f31dc89"}, + {file = "pyscard-2.2.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:0f23309f4f98ae41c836090c3449d8fb7470c19d57adda2592803e1df668bece"}, + {file = "pyscard-2.2.0-cp39-cp39-win32.whl", hash = "sha256:5857d60b44bbb074c9b174111fe94af4179b882942e53292de16f0c798f50c5d"}, + {file = "pyscard-2.2.0-cp39-cp39-win_amd64.whl", hash = "sha256:e4e664b1d3b87956104b87094f81a98bdd6ac40986b52d0c40451de1fc98ca49"}, + {file = "pyscard-2.2.0.tar.gz", hash = "sha256:6aa194d4bb295e78a97056dd1d32273cc69ddbe3c852aad60a8578f04017a1bf"}, ] [package.extras] gui = ["wxPython"] -pyro = ["Pyro"] [[package]] name = "pytest" @@ -828,25 +803,25 @@ jeepney = ">=0.6" [[package]] name = "setuptools" -version = "75.3.0" +version = "75.7.0" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["dev"] markers = "python_version < \"3.13\"" files = [ - {file = "setuptools-75.3.0-py3-none-any.whl", hash = "sha256:f2504966861356aa38616760c0f66568e535562374995367b4e69c7143cf6bcd"}, - {file = "setuptools-75.3.0.tar.gz", hash = "sha256:fba5dd4d766e97be1b1681d98712680ae8f2f26d7881245f2ce9e40714f1a686"}, + {file = "setuptools-75.7.0-py3-none-any.whl", hash = "sha256:84fb203f278ebcf5cd08f97d3fb96d3fbed4b629d500b29ad60d11e00769b183"}, + {file = "setuptools-75.7.0.tar.gz", hash = "sha256:886ff7b16cd342f1d1defc16fc98c9ce3fde69e087a4e1983d7ab634e5f41f4f"}, ] [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.5.2)"] -core = ["importlib-metadata (>=6)", "importlib-resources (>=5.10.2)", "jaraco.collections", "jaraco.functools", "jaraco.text (>=3.7)", "more-itertools", "more-itertools (>=8.8)", "packaging", "packaging (>=24)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.8.0)"] +core = ["importlib_metadata (>=6)", "jaraco.collections", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] enabler = ["pytest-enabler (>=2.2)"] -test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test (>=5.5)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] -type = ["importlib-metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.12.*)", "pytest-mypy"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.14.*)", "pytest-mypy"] [[package]] name = "tomli" @@ -937,16 +912,16 @@ pywin32 = {version = ">=223", markers = "sys_platform == \"win32\""} [[package]] name = "zipp" -version = "3.20.2" +version = "3.21.0" description = "Backport of pathlib-compatible object wrapper for zip files" optional = false -python-versions = ">=3.8" -groups = ["main", "dev"] +python-versions = ">=3.9" +groups = ["main"] +markers = "python_version < \"3.12\"" files = [ - {file = "zipp-3.20.2-py3-none-any.whl", hash = "sha256:a817ac80d6cf4b23bf7f2828b7cabf326f15a001bea8b1f9b49631780ba28350"}, - {file = "zipp-3.20.2.tar.gz", hash = "sha256:bc9eb26f4506fda01b81bcde0ca78103b6e62f991b381fec825435c836edbc29"}, + {file = "zipp-3.21.0-py3-none-any.whl", hash = "sha256:ac1bbe05fd2991f160ebce24ffbac5f6d11d83dc90891255885223d42b3cd931"}, + {file = "zipp-3.21.0.tar.gz", hash = "sha256:2c9958f6430a2040341a52eb608ed6dd93ef4392e02ffe219417c1b28b5dd1f4"}, ] -markers = {main = "python_version < \"3.12\"", dev = "python_version < \"3.10\""} [package.extras] check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] @@ -994,5 +969,5 @@ files = [ [metadata] lock-version = "2.1" -python-versions = ">=3.8, <4" -content-hash = "313b13179429184258c84a9ee3b30dddb28f6e3bb42170364db02cbbae331b7e" +python-versions = ">=3.10, <4" +content-hash = "a0a501798111829df4fc86ae4bc057c00fb57a1fe5a3492e24d42a814d6e4b19" diff --git a/helper/pyproject.toml b/helper/pyproject.toml index 527281b6e..640e8a5fa 100644 --- a/helper/pyproject.toml +++ b/helper/pyproject.toml @@ -3,7 +3,7 @@ name = "authenticator-helper" version = "0.1.0" description = "Yubico Authenticator Helper" authors = [{ name = "Dain Nilsson", email = "dain@yubico.com" }] -requires-python = ">=3.8, <4" +requires-python = ">=3.10, <4" dependencies = [ "yubikey-manager (>=5.5, <6)", "mss (>=9.0.1, <10)", From a0b548051d56a62f2950338889ae4c7d0f6666a5 Mon Sep 17 00:00:00 2001 From: Dain Nilsson Date: Mon, 7 Oct 2024 15:17:44 +0200 Subject: [PATCH 2/5] More match-statements --- helper/helper/__init__.py | 34 +++++++++++++++------------ helper/helper/yubiotp.py | 48 ++++++++++++++++----------------------- 2 files changed, 38 insertions(+), 44 deletions(-) diff --git a/helper/helper/__init__.py b/helper/helper/__init__.py index 791002296..dc84fa47e 100644 --- a/helper/helper/__init__.py +++ b/helper/helper/__init__.py @@ -52,20 +52,21 @@ def _handle_incoming(event, recv, error, cmd_queue): if not request: break try: - kind = request["kind"] - if kind == "signal": - # Cancel signals are handled here, the rest forwarded - if request["status"] == "cancel": - event.set() - else: - # Ignore other signals - logger.error("Unhandled signal: %r", request) - elif kind == "command": - cmd_queue.join() # Wait for existing command to complete - event.clear() # Reset event for next command - cmd_queue.put(request) - else: - error("invalid-command", "Unsupported request type") + match request["kind"]: + case "signal": + # Cancel signals are handled here, the rest forwarded + if request["status"] == "cancel": + logger.debug("Got cancel signal!") + event.set() + else: + # Ignore other signals + logger.error("Unhandled signal: %r", request) + case "command": + cmd_queue.join() # Wait for existing command to complete + event.clear() # Reset event for next command + cmd_queue.put(request) + case _: + error("invalid-command", "Unsupported request type") except KeyError as e: error("invalid-command", str(e)) except RpcException as e: @@ -171,7 +172,10 @@ def send(data): def recv(): line = b"" while not line.endswith(b"\n"): - chunk = sock.recv(1024) + try: + chunk = sock.recv(1024) + except ConnectionError: + return None if not chunk: return None line += chunk diff --git a/helper/helper/yubiotp.py b/helper/helper/yubiotp.py index 87ffacd1b..feb37bf33 100644 --- a/helper/helper/yubiotp.py +++ b/helper/helper/yubiotp.py @@ -19,12 +19,12 @@ from yubikit.yubiotp import ( YubiOtpSession, SLOT, + SlotConfiguration, UpdateConfiguration, HmacSha1SlotConfiguration, HotpSlotConfiguration, StaticPasswordSlotConfiguration, YubiOtpSlotConfiguration, - StaticTicketSlotConfiguration, ) from ykman.otp import generate_static_pw, format_csv from yubikit.oath import parse_b32_key @@ -108,15 +108,6 @@ def format_yubiotp_csv( ) -_CONFIG_TYPES = dict( - hmac_sha1=HmacSha1SlotConfiguration, - hotp=HotpSlotConfiguration, - static_password=StaticPasswordSlotConfiguration, - yubiotp=YubiOtpSlotConfiguration, - static_ticket=StaticTicketSlotConfiguration, -) - - class SlotNode(RpcNode): def __init__(self, session, slot): super().__init__() @@ -169,7 +160,8 @@ def calculate(self, event, challenge: str): ) return dict(response=response) - def _apply_options(self, config, options): + @staticmethod + def _apply_options(config, options) -> None: for option in ( "serial_api_visible", "serial_usb_visible", @@ -199,31 +191,29 @@ def _apply_options(self, config, options): token_id, *args = options.pop("token_id") config.token_id(bytes.fromhex(token_id), *args) - return config - - def _get_config(self, type, **kwargs): - config = None - - if type in _CONFIG_TYPES: - if type == "hmac_sha1": - config = _CONFIG_TYPES[type](bytes.fromhex(kwargs["key"])) - elif type == "hotp": - config = _CONFIG_TYPES[type](parse_b32_key(kwargs["key"])) - elif type == "static_password": - config = _CONFIG_TYPES[type]( + @staticmethod + def _get_config(cfg_type: str, **kwargs) -> SlotConfiguration: + match cfg_type: + case "hmac_sha1": + return HmacSha1SlotConfiguration(bytes.fromhex(kwargs["key"])) + case "hotp": + return HotpSlotConfiguration(parse_b32_key(kwargs["key"])) + case "static_password": + return StaticPasswordSlotConfiguration( encode( kwargs["password"], KEYBOARD_LAYOUT[kwargs["keyboard_layout"]] ) ) - elif type == "yubiotp": - config = _CONFIG_TYPES[type]( + case "yubiotp": + return YubiOtpSlotConfiguration( fixed=modhex_decode(kwargs["public_id"]), uid=bytes.fromhex(kwargs["private_id"]), key=bytes.fromhex(kwargs["key"]), ) - else: - raise ValueError("No supported configuration type provided.") - return config + case unsupported: + raise ValueError( + f"Unsupported configuration type provided: {unsupported}" + ) @action def put( @@ -252,7 +242,7 @@ def update( params, acc_code: str | None = None, curr_acc_code: str | None = None, - **kwargs + **kwargs, ): config = UpdateConfiguration() self._apply_options(config, kwargs) From bc54b7dacecf700dc2ea068cf0e5ce1da423670b Mon Sep 17 00:00:00 2001 From: Dain Nilsson Date: Mon, 7 Oct 2024 16:12:33 +0200 Subject: [PATCH 3/5] Automatically deserialize bytes in actions --- helper/helper/base.py | 7 ++++++ helper/helper/management.py | 8 +++---- helper/helper/oath.py | 8 +++---- helper/helper/piv.py | 17 +++++++------ helper/helper/yubiotp.py | 48 ++++++++++++++++++------------------- 5 files changed, 46 insertions(+), 42 deletions(-) diff --git a/helper/helper/base.py b/helper/helper/base.py index 3429e142e..2739590bc 100644 --- a/helper/helper/base.py +++ b/helper/helper/base.py @@ -130,6 +130,13 @@ def __call__(self, action, target, params, event, signal, traversed=None): elif action in self.list_actions(): action_f = self.get_action(action) args = inspect.signature(action_f).parameters + # Decode any serialized bytes parameters + for key, param in args.items(): + if param.annotation in (bytes, bytes | None): + value = params.get(key, None) + if value is not None: + params[key] = decode_bytes(value) + # Add event and signal if requested if "event" in args: params["event"] = event if "signal" in args: diff --git a/helper/helper/management.py b/helper/helper/management.py index cccfee3f2..003151e8f 100644 --- a/helper/helper/management.py +++ b/helper/helper/management.py @@ -90,15 +90,13 @@ def _await_reboot(self, serial, usb_enabled): def configure( self, reboot: bool = False, - cur_lock_code: str = "", - new_lock_code: str = "", + cur_lock_code: bytes | None = None, + new_lock_code: bytes | None = None, enabled_capabilities: dict = {}, auto_eject_timeout: int | None = None, challenge_response_timeout: int | None = None, device_flags: int | None = None, ): - cur_code = bytes.fromhex(cur_lock_code) or None - new_code = bytes.fromhex(new_lock_code) or None config = DeviceConfig( enabled_capabilities, auto_eject_timeout, @@ -106,7 +104,7 @@ def configure( DEVICE_FLAG(device_flags) if device_flags else None, ) serial = self.session.read_device_info().serial - self.session.write_device_config(config, reboot, cur_code, new_code) + self.session.write_device_config(config, reboot, cur_lock_code, new_lock_code) flags = ["device_info"] if reboot: enabled = config.enabled_capabilities.get(TRANSPORT.USB) diff --git a/helper/helper/oath.py b/helper/helper/oath.py index c54016af2..383440445 100644 --- a/helper/helper/oath.py +++ b/helper/helper/oath.py @@ -142,13 +142,13 @@ def _remember_key(self, key): else: return False - def _get_key(self, key: str | None, password: str | None): + def _get_key(self, key: bytes | None, password: str | None): if key and password: raise ValueError("Only one of 'key' and 'password' can be provided.") if password: return self.session.derive_key(password) if key: - return decode_bytes(key) + return key raise ValueError("One of 'key' and 'password' must be provided.") def _set_key_verifier(self, key): @@ -163,7 +163,7 @@ def _do_validate(self, key): @action def validate( self, - key: str | None = None, + key: bytes | None = None, password: str | None = None, remember: bool = False, ): @@ -192,7 +192,7 @@ def validate( @action def set_key( self, - key: str | None = None, + key: bytes | None = None, password: str | None = None, remember: bool = False, ): diff --git a/helper/helper/piv.py b/helper/helper/piv.py index 0ad166fcd..a86c933ae 100644 --- a/helper/helper/piv.py +++ b/helper/helper/piv.py @@ -201,9 +201,9 @@ def verify_pin(self, signal, pin: str): return dict(status=True, authenticated=self._authenticated) @action - def authenticate(self, signal, key: str): + def authenticate(self, signal, key: bytes): try: - self._authenticate(bytes.fromhex(key), signal) + self._authenticate(key, signal) return dict(status=True) except ApduError as e: if e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: @@ -213,14 +213,13 @@ def authenticate(self, signal, key: str): @action(condition=lambda self: self._authenticated) def set_key( self, - params, - key: str, + key: bytes, key_type: int = MANAGEMENT_KEY_TYPE.TDES, store_key: bool = False, ): pivman_set_mgm_key( self.session, - bytes.fromhex(key), + key, MANAGEMENT_KEY_TYPE(key_type), False, store_key, @@ -264,9 +263,9 @@ def slots(self): return SlotsNode(self.session) @action(closes_child=False) - def examine_file(self, data: str, password: str | None = None): + def examine_file(self, data: bytes, password: str | None = None): try: - private_key, certs = _parse_file(bytes.fromhex(data), password) + private_key, certs = _parse_file(data, password) certificate = _choose_cert(certs) return dict( @@ -461,9 +460,9 @@ def move_key( return dict() @action - def import_file(self, data: str, password: str | None = None, **kwargs): + def import_file(self, data: bytes, password: str | None = None, **kwargs): try: - private_key, certs = _parse_file(bytes.fromhex(data), password) + private_key, certs = _parse_file(data, password) except InvalidPasswordError: logger.debug("Invalid or missing password", exc_info=True) raise ValueError("Wrong/Missing password") diff --git a/helper/helper/yubiotp.py b/helper/helper/yubiotp.py index feb37bf33..098400e50 100644 --- a/helper/helper/yubiotp.py +++ b/helper/helper/yubiotp.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .base import RpcNode, action, child +from .base import RpcNode, action, child, decode_bytes from yubikit.core import NotSupportedError, CommandError from yubikit.core.otp import modhex_encode, modhex_decode @@ -95,15 +95,15 @@ def format_yubiotp_csv( self, serial: int, public_id: str, - private_id: str, - key: str, + private_id: bytes, + key: bytes, ): return dict( csv=format_csv( serial, modhex_decode(public_id), - bytes.fromhex(private_id), - bytes.fromhex(key), + private_id, + key, ) ) @@ -145,19 +145,16 @@ def _can_calculate(self, slot): return False @action(condition=lambda self: self._maybe_configured(self.slot)) - def delete(self, curr_acc_code: str | None = None): + def delete(self, curr_acc_code: bytes | None = None): try: - access_code = bytes.fromhex(curr_acc_code) if curr_acc_code else None - self.session.delete_slot(self.slot, access_code) + self.session.delete_slot(self.slot, curr_acc_code) return dict() except CommandError: raise ValueError(_FAIL_MSG) @action(condition=lambda self: self._can_calculate(self.slot)) - def calculate(self, event, challenge: str): - response = self.session.calculate_hmac_sha1( - self.slot, bytes.fromhex(challenge), event - ) + def calculate(self, event, challenge: bytes): + response = self.session.calculate_hmac_sha1(self.slot, challenge, event) return dict(response=response) @staticmethod @@ -189,13 +186,13 @@ def _apply_options(config, options) -> None: if "token_id" in options: token_id, *args = options.pop("token_id") - config.token_id(bytes.fromhex(token_id), *args) + config.token_id(decode_bytes(token_id), *args) @staticmethod def _get_config(cfg_type: str, **kwargs) -> SlotConfiguration: match cfg_type: case "hmac_sha1": - return HmacSha1SlotConfiguration(bytes.fromhex(kwargs["key"])) + return HmacSha1SlotConfiguration(decode_bytes(kwargs["key"])) case "hotp": return HotpSlotConfiguration(parse_b32_key(kwargs["key"])) case "static_password": @@ -207,8 +204,8 @@ def _get_config(cfg_type: str, **kwargs) -> SlotConfiguration: case "yubiotp": return YubiOtpSlotConfiguration( fixed=modhex_decode(kwargs["public_id"]), - uid=bytes.fromhex(kwargs["private_id"]), - key=bytes.fromhex(kwargs["key"]), + uid=decode_bytes(kwargs["private_id"]), + key=decode_bytes(kwargs["key"]), ) case unsupported: raise ValueError( @@ -217,17 +214,20 @@ def _get_config(cfg_type: str, **kwargs) -> SlotConfiguration: @action def put( - self, type: str, options: dict = {}, curr_acc_code: str | None = None, **kwargs + self, + type: str, + options: dict = {}, + curr_acc_code: bytes | None = None, + **kwargs, ): - access_code = bytes.fromhex(curr_acc_code) if curr_acc_code else None config = self._get_config(type, **kwargs) self._apply_options(config, options) try: self.session.put_configuration( self.slot, config, - access_code, - access_code, + curr_acc_code, + curr_acc_code, ) return dict() except CommandError: @@ -240,8 +240,8 @@ def put( def update( self, params, - acc_code: str | None = None, - curr_acc_code: str | None = None, + acc_code: bytes | None = None, + curr_acc_code: bytes | None = None, **kwargs, ): config = UpdateConfiguration() @@ -249,7 +249,7 @@ def update( self.session.update_configuration( self.slot, config, - bytes.fromhex(acc_code) if acc_code else None, - bytes.fromhex(curr_acc_code) if curr_acc_code else None, + acc_code, + curr_acc_code, ) return dict() From 9af4fcdfaddb580c147c16b39d0b5fcbe92b27a6 Mon Sep 17 00:00:00 2001 From: Elias Bonnici Date: Wed, 8 Jan 2025 11:11:20 +0100 Subject: [PATCH 4/5] Add missing type annotations --- helper/helper/oath.py | 6 +++--- helper/helper/piv.py | 4 ++-- helper/helper/yubiotp.py | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/helper/helper/oath.py b/helper/helper/oath.py index 383440445..93dcaf9d7 100644 --- a/helper/helper/oath.py +++ b/helper/helper/oath.py @@ -128,7 +128,7 @@ def forget(self): keys.write() return dict() - def _remember_key(self, key): + def _remember_key(self, key: bytes | None): keys = self._get_keys() if key is None: if self.session.device_id in keys: @@ -151,12 +151,12 @@ def _get_key(self, key: bytes | None, password: str | None): return key raise ValueError("One of 'key' and 'password' must be provided.") - def _set_key_verifier(self, key): + def _set_key_verifier(self, key: bytes): salt = os.urandom(32) digest = hmac.new(salt, key, "sha256").digest() self._key_verifier = (salt, digest) - def _do_validate(self, key): + def _do_validate(self, key: bytes): self.session.validate(key) self._set_key_verifier(key) diff --git a/helper/helper/piv.py b/helper/helper/piv.py index a86c933ae..34e7dffd6 100644 --- a/helper/helper/piv.py +++ b/helper/helper/piv.py @@ -109,7 +109,7 @@ def __call__(self, *args, **kwargs): except InvalidPinError as e: raise InvalidPinException(cause=e) - def _get_object(self, object_id): + def _get_object(self, object_id: int): try: return self.session.get_object(object_id) except ApduError as e: @@ -153,7 +153,7 @@ def get_data(self): metadata=metadata, ) - def _authenticate(self, key, signal): + def _authenticate(self, key: bytes, signal): try: metadata = self.session.get_management_key_metadata() key_type = metadata.key_type diff --git a/helper/helper/yubiotp.py b/helper/helper/yubiotp.py index 098400e50..7948f7856 100644 --- a/helper/helper/yubiotp.py +++ b/helper/helper/yubiotp.py @@ -239,7 +239,6 @@ def put( ) def update( self, - params, acc_code: bytes | None = None, curr_acc_code: bytes | None = None, **kwargs, From 4eaee33d18c51bfdf0cbecc8719f81a110d9cbf7 Mon Sep 17 00:00:00 2001 From: Elias Bonnici Date: Wed, 8 Jan 2025 11:44:27 +0100 Subject: [PATCH 5/5] Make management lock codes nullable --- lib/android/management/state.dart | 4 ++-- lib/desktop/management/state.dart | 4 ++-- lib/management/state.dart | 4 +--- lib/management/views/management_screen.dart | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/android/management/state.dart b/lib/android/management/state.dart index 3d31cc9ba..e5631c43a 100755 --- a/lib/android/management/state.dart +++ b/lib/android/management/state.dart @@ -48,8 +48,8 @@ class _AndroidManagementStateNotifier extends ManagementStateNotifier { @override Future writeConfig(DeviceConfig config, - {String currentLockCode = '', - String newLockCode = '', + {String? currentLockCode, + String? newLockCode, bool reboot = false}) async { if (reboot) { state = const AsyncValue.loading(); diff --git a/lib/desktop/management/state.dart b/lib/desktop/management/state.dart index bb5b9b4ff..355c526ef 100755 --- a/lib/desktop/management/state.dart +++ b/lib/desktop/management/state.dart @@ -97,8 +97,8 @@ class _DesktopManagementStateNotifier extends ManagementStateNotifier { @override Future writeConfig(DeviceConfig config, - {String currentLockCode = '', - String newLockCode = '', + {String? currentLockCode, + String? newLockCode, bool reboot = false}) async { await _session.command('configure', target: _subpath, params: { ...config.toJson(), diff --git a/lib/management/state.dart b/lib/management/state.dart index 700ef6be9..2fb93259a 100755 --- a/lib/management/state.dart +++ b/lib/management/state.dart @@ -28,9 +28,7 @@ final managementStateProvider = AsyncNotifierProvider.autoDispose abstract class ManagementStateNotifier extends ApplicationStateNotifier { Future writeConfig(DeviceConfig config, - {String currentLockCode = '', - String newLockCode = '', - bool reboot = false}); + {String? currentLockCode, String? newLockCode, bool reboot = false}); Future setMode({ required int interfaces, diff --git a/lib/management/views/management_screen.dart b/lib/management/views/management_screen.dart index 8c7451829..14c3b4f81 100755 --- a/lib/management/views/management_screen.dart +++ b/lib/management/views/management_screen.dart @@ -308,7 +308,7 @@ class _ManagementScreenState extends ConsumerState { widget.deviceData.info.config .copyWith(enabledCapabilities: _enabled), reboot: reboot, - currentLockCode: _lockCodeController.text); + currentLockCode: isLocked ? _lockCodeController.text : null); if (!mounted) return; Navigator.pop(context); showMessage(context, l10n.s_config_updated);