From 4fbdd9191d419d91e97118159d1e3cfa67336b3d Mon Sep 17 00:00:00 2001 From: Indrajit Raychaudhuri Date: Mon, 13 Jan 2025 19:34:56 -0600 Subject: [PATCH 1/3] dhcp: T7052: Refactor kea dhcp op-mode functions to vyos.kea Relocate the kea dhcp op-mode functions to kea helper functions in vyos.kea. This allows the functions to be reused by other scripts, not just op-mode wrappers. This moves the source of truth for the op-mode commands to the actual running kea instance, rather than VyOS config path. Also, apply some minor code cleanup and make some of the mappings consistent across the functions. --- python/vyos/kea.py | 131 ++++++++++++++++++++++- src/op_mode/dhcp.py | 249 +++++++++++++++----------------------------- 2 files changed, 212 insertions(+), 168 deletions(-) diff --git a/python/vyos/kea.py b/python/vyos/kea.py index addfdba496..32a118f68a 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -1,4 +1,4 @@ -# Copyright 2023-2024 VyOS maintainers and contributors +# Copyright 2023-2025 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -17,6 +17,9 @@ import os import socket +from datetime import datetime +from datetime import timezone + from vyos.template import is_ipv6 from vyos.template import isc_static_route from vyos.template import netmask_from_cidr @@ -57,6 +60,27 @@ kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket' +def _format_hex_string(in_str): + out_str = "" + # if input is divisible by 2, add : every 2 chars + if len(in_str) > 0 and len(in_str) % 2 == 0: + out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2])) + else: + out_str = in_str + + return out_str + +def _find_list_of_dict_index(lst, key='ip', value=''): + """ + Find the index entry of list of dict matching the dict value + Exampe: + % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}] + % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2') + % 1 + """ + idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None) + return idx + def kea_parse_options(config): options = [] @@ -347,6 +371,10 @@ def kea_get_active_config(inet): return config +def kea_get_dhcp_pools(config, inet): + shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') + return [network['name'] for network in shared_networks] if shared_networks else [] + def kea_get_pool_from_subnet_id(config, inet, subnet_id): shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') @@ -362,3 +390,104 @@ def kea_get_pool_from_subnet_id(config, inet, subnet_id): return network['name'] return None + + +def kea_get_static_mappings(config, inet, pools=[]) -> list: + """ + Get DHCP static mapping from active Kea DHCPv4 or DHCPv6 configuration + :return list + """ + shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') + + mappings = [] + + if shared_networks: + for network in shared_networks: + if f'subnet{inet}' not in network: + continue + + for p in pools: + if network['name'] == p: + for subnet in network[f'subnet{inet}']: + if 'reservations' in subnet: + for reservation in subnet['reservations']: + mapping = {'pool': p, 'subnet': subnet['subnet']} + mapping.update(reservation) + # rename 'ip(v6)-address' to 'ip', inet6 has 'ipv6-address' and inet has 'ip-address' + mapping['ip'] = mapping.pop('ipv6-address', mapping.pop('ip-address', None)) + # rename 'hw-address' to 'mac' + mapping['mac'] = mapping.pop('hw-address', None) + mappings.append(mapping) + + return mappings + + +def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list: + """ + Get DHCP server leases from active Kea DHCPv4 or DHCPv6 configuration + :return list + """ + leases = kea_get_leases(inet) + + data = [] + for lease in leases: + lifetime = lease['valid-lft'] + expiry = (lease['cltt'] + lifetime) + + lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc) + lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None + + data_lease = {} + data_lease['ip'] = lease['ip-address'] + lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'} + data_lease['state'] = lease_state_long[lease['state']] + data_lease['pool'] = kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) if config else '-' + data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None + data_lease['origin'] = 'local' # TODO: Determine remote in HA + # remove trailing dot in 'hostname' to ensure consistency for `vyos-hostsd-client` + data_lease['hostname'] = lease.get('hostname', '-').rstrip('.') + + if inet == '4': + data_lease['mac'] = lease['hw-address'] + data_lease['start'] = lease['start_timestamp'].timestamp() + + if inet == '6': + data_lease['last_communication'] = lease['start_timestamp'].timestamp() + data_lease['duid'] = _format_hex_string(lease['duid']) + data_lease['type'] = lease['type'] + + if lease['type'] == 'IA_PD': + prefix_len = lease['prefix-len'] + data_lease['ip'] += f'/{prefix_len}' + + data_lease['remaining'] = '-' + + if lease['valid-lft'] > 0: + data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc) + + if data_lease['remaining'].days >= 0: + # substraction gives us a timedelta object which can't be formatted with strftime + # so we use str(), split gets rid of the microseconds + data_lease['remaining'] = str(data_lease['remaining']).split('.')[0] + + # Do not add old leases + if ( + data_lease['remaining'] + and data_lease['pool'] in pools + and data_lease['state'] != 'free' + and (not state or state == 'all' or data_lease['state'] in state) + ): + data.append(data_lease) + + # deduplicate + checked = [] + for entry in data: + addr = entry.get('ip') + if addr not in checked: + checked.append(addr) + else: + idx = _find_list_of_dict_index(data, key='ip', value=addr) + if idx is not None: + data.pop(idx) + + return data diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index 45de86cab7..fde89c7068 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2022-2024 VyOS maintainers and contributors +# Copyright (C) 2022-2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -19,7 +19,6 @@ import typing from datetime import datetime -from datetime import timezone from glob import glob from ipaddress import ip_address from tabulate import tabulate @@ -30,11 +29,13 @@ from vyos.configquery import ConfigTreeQuery from vyos.kea import kea_get_active_config +from vyos.kea import kea_get_dhcp_pools from vyos.kea import kea_get_leases -from vyos.kea import kea_get_pool_from_subnet_id +from vyos.kea import kea_get_server_leases +from vyos.kea import kea_get_static_mappings from vyos.kea import kea_delete_lease -from vyos.utils.process import is_systemd_service_running from vyos.utils.process import call +from vyos.utils.process import is_systemd_service_running time_string = "%a %b %d %H:%M:%S %Z %Y" @@ -52,115 +53,18 @@ def _utc_to_local(utc_dt): return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()) -def _format_hex_string(in_str): - out_str = "" - # if input is divisible by 2, add : every 2 chars - if len(in_str) > 0 and len(in_str) % 2 == 0: - out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2])) - else: - out_str = in_str - - return out_str - - -def _find_list_of_dict_index(lst, key='ip', value=''): - """ - Find the index entry of list of dict matching the dict value - Exampe: - % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}] - % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2') - % 1 - """ - idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None) - return idx - - -def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[], origin=None) -> list: - """ - Get DHCP server leases - :return list - """ +def _get_raw_server_leases(config, family='inet', pool=None, sorted=None, state=[], origin=None) -> list: inet_suffix = '6' if family == 'inet6' else '4' - try: - leases = kea_get_leases(inet_suffix) - except Exception: - raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server lease information') + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] - - try: - active_config = kea_get_active_config(inet_suffix) - except Exception: - raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') - - data = [] - for lease in leases: - lifetime = lease['valid-lft'] - expiry = (lease['cltt'] + lifetime) - - lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc) - lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None - - data_lease = {} - data_lease['ip'] = lease['ip-address'] - lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'} - data_lease['state'] = lease_state_long[lease['state']] - data_lease['pool'] = kea_get_pool_from_subnet_id(active_config, inet_suffix, lease['subnet-id']) if active_config else '-' - data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None - data_lease['origin'] = 'local' # TODO: Determine remote in HA - data_lease['hostname'] = lease.get('hostname', '-') - # remove trailing dot to ensure consistency for `vyos-hostsd-client` - if data_lease['hostname'] and data_lease['hostname'][-1] == '.': - data_lease['hostname'] = data_lease['hostname'][:-1] - - if family == 'inet': - data_lease['mac'] = lease['hw-address'] - data_lease['start'] = lease['start_timestamp'].timestamp() - - if family == 'inet6': - data_lease['last_communication'] = lease['start_timestamp'].timestamp() - data_lease['duid'] = _format_hex_string(lease['duid']) - data_lease['type'] = lease['type'] - - if lease['type'] == 'IA_PD': - prefix_len = lease['prefix-len'] - data_lease['ip'] += f'/{prefix_len}' - - data_lease['remaining'] = '-' - - if lease['valid-lft'] > 0: - data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc) - - if data_lease['remaining'].days >= 0: - # substraction gives us a timedelta object which can't be formatted with strftime - # so we use str(), split gets rid of the microseconds - data_lease['remaining'] = str(data_lease['remaining']).split('.')[0] - - # Do not add old leases - if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free': - if not state or state == 'all' or data_lease['state'] in state: - data.append(data_lease) - - # deduplicate - checked = [] - for entry in data: - addr = entry.get('ip') - if addr not in checked: - checked.append(addr) - else: - idx = _find_list_of_dict_index(data, key='ip', value=addr) - if idx is not None: - data.pop(idx) + mappings = kea_get_server_leases(config, inet_suffix, pools, state, origin) if sorted: if sorted == 'ip': - data.sort(key = lambda x:ip_address(x['ip'])) + mappings.sort(key = lambda x:ip_address(x['ip'])) else: - data.sort(key = lambda x:x[sorted]) - return data + mappings.sort(key = lambda x:x[sorted]) + return mappings def _get_formatted_server_leases(raw_data, family='inet'): @@ -204,12 +108,6 @@ def _get_formatted_server_leases(raw_data, family='inet'): return output -def _get_dhcp_pools(family='inet') -> list: - v = 'v6' if family == 'inet6' else '' - pools = config.list_nodes(f'service dhcp{v}-server shared-network-name') - return pools - - def _get_pool_size(pool, family='inet'): v = 'v6' if family == 'inet6' else '' base = f'service dhcp{v}-server shared-network-name {pool}' @@ -229,21 +127,17 @@ def _get_pool_size(pool, family='inet'): return size -def _get_raw_pool_statistics(family='inet', pool=None): - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] +def _get_raw_pool_statistics(config, family='inet', pool=None): + inet_suffix = '6' if family == 'inet6' else '4' + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) - v = 'v6' if family == 'inet6' else '' stats = [] - for p in pool: - subnet = config.list_nodes(f'service dhcp{v}-server shared-network-name {p} subnet') + for p in pools: size = _get_pool_size(family=family, pool=p) - leases = len(_get_raw_server_leases(family=family, pool=p)) + leases = len(_get_raw_server_leases(config, family=family, pool=p)) use_percentage = round(leases / size * 100) if size != 0 else 0 pool_stats = {'pool': p, 'size': size, 'leases': leases, - 'available': (size - leases), 'use_percentage': use_percentage, 'subnet': subnet} + 'available': (size - leases), 'use_percentage': use_percentage} stats.append(pool_stats) return stats @@ -263,59 +157,46 @@ def _get_formatted_pool_statistics(pool_data, family='inet'): output = tabulate(data_entries, headers, numalign='left') return output -def _get_raw_server_static_mappings(family='inet', pool=None, sorted=None): - if pool is None: - pool = _get_dhcp_pools(family=family) - else: - pool = [pool] - v = 'v6' if family == 'inet6' else '' - mappings = [] - for p in pool: - pool_config = config.get_config_dict(['service', f'dhcp{v}-server', 'shared-network-name', p], - get_first_key=True) - if 'subnet' in pool_config: - for subnet, subnet_config in pool_config['subnet'].items(): - if 'static-mapping' in subnet_config: - for name, mapping_config in subnet_config['static-mapping'].items(): - mapping = {'pool': p, 'subnet': subnet, 'name': name} - mapping.update(mapping_config) - mappings.append(mapping) +def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=None): + + inet_suffix = '6' if family == 'inet6' else '4' + pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) + + mappings = kea_get_static_mappings(config, inet_suffix, pools) if sorted: if sorted == 'ip': - if family == 'inet6': - mappings.sort(key = lambda x:ip_address(x['ipv6-address'])) - else: - mappings.sort(key = lambda x:ip_address(x['ip-address'])) + mappings.sort(key = lambda x:ip_address(x['ip'])) else: mappings.sort(key = lambda x:x[sorted]) return mappings + def _get_formatted_server_static_mappings(raw_data, family='inet'): data_entries = [] if family == 'inet': for entry in raw_data: pool = entry.get('pool') subnet = entry.get('subnet') - name = entry.get('name') - ip_addr = entry.get('ip-address', 'N/A') + hostname = entry.get('hostname') + ip_addr = entry.get('ip', 'N/A') mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description]) + data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) elif family == 'inet6': for entry in raw_data: pool = entry.get('pool') subnet = entry.get('subnet') - name = entry.get('name') - ip_addr = entry.get('ipv6-address', 'N/A') + hostname = entry.get('hostname') + ip_addr = entry.get('ip', 'N/A') mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description]) + data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) - headers = ['Pool', 'Subnet', 'Name', 'IP Address', 'MAC Address', 'DUID', 'Description'] + headers = ['Pool', 'Subnet', 'Hostname', 'IP Address', 'MAC Address', 'DUID', 'Description'] output = tabulate(data_entries, headers, numalign='left') return output @@ -357,7 +238,24 @@ def _wrapper(*args, **kwargs): @_verify def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): - pool_data = _get_raw_pool_statistics(family=family, pool=pool) + + v = 'v6' if family == 'inet6' else '' + inet_suffix = '6' if family == 'inet6' else '4' + + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning('DHCP server is configured but not started. Data may be stale.') + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: + raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') + + pool_data = _get_raw_pool_statistics(active_config, family=family, pool=pool) if raw: return pool_data else: @@ -368,23 +266,31 @@ def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str], state: typing.Optional[ArgState], origin: typing.Optional[ArgOrigin] ): - # if dhcp server is down, inactive leases may still be shown as active, so warn the user. - v = '6' if family == 'inet6' else '4' - if not is_systemd_service_running(f'kea-dhcp{v}-server.service'): - Warning('DHCP server is configured but not started. Data may be stale.') v = 'v6' if family == 'inet6' else '' - if pool and pool not in _get_dhcp_pools(family=family): - raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') + inet_suffix = '6' if family == 'inet6' else '4' - if state and state not in lease_valid_states: - raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning('DHCP server is configured but not started. Data may be stale.') + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: + raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet if sorted and sorted not in sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') - lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state, origin=origin) + if state and state not in lease_valid_states: + raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') + + lease_data = _get_raw_server_leases(config=active_config, family=family, pool=pool, sorted=sorted, state=state, origin=origin) if raw: return lease_data else: @@ -394,13 +300,25 @@ def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str]): v = 'v6' if family == 'inet6' else '' - if pool and pool not in _get_dhcp_pools(family=family): + inet_suffix = '6' if family == 'inet6' else '4' + + if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): + Warning('DHCP server is configured but not started. Data may be stale.') + + try: + active_config = kea_get_active_config(inet_suffix) + except Exception: + raise vyos.opmode.DataUnavailable('Cannot fetch DHCP server configuration') + + active_pools = kea_get_dhcp_pools(active_config, inet_suffix) + + if pool and active_pools and pool not in active_pools: raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') if sorted and sorted not in mapping_sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') - static_mappings = _get_raw_server_static_mappings(family=family, pool=pool, sorted=sorted) + static_mappings = _get_raw_server_static_mappings(config=active_config, family=family, pool=pool, sorted=sorted) if raw: return static_mappings else: @@ -408,10 +326,7 @@ def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optio def _lease_valid(inet, address): leases = kea_get_leases(inet) - for lease in leases: - if address == lease['ip-address']: - return True - return False + return any(lease['ip-address'] == address for lease in leases) @_verify def clear_dhcp_server_lease(family: ArgFamily, address: str): From a31d5b661bf2d37c72a21e434d8ce16b22a597b9 Mon Sep 17 00:00:00 2001 From: Indrajit Raychaudhuri Date: Wed, 15 Jan 2025 23:47:03 -0600 Subject: [PATCH 2/3] dhcp: T7052: Rename and simplify functions for consistency --- op-mode-definitions/dhcp.xml.in | 8 ++--- src/op_mode/dhcp.py | 58 ++++++++++++++------------------- 2 files changed, 29 insertions(+), 37 deletions(-) diff --git a/op-mode-definitions/dhcp.xml.in b/op-mode-definitions/dhcp.xml.in index 63b1f62bb0..4ee66a90c5 100644 --- a/op-mode-definitions/dhcp.xml.in +++ b/op-mode-definitions/dhcp.xml.in @@ -140,7 +140,7 @@ Show DHCP server statistics - ${vyos_op_scripts_dir}/dhcp.py show_pool_statistics --family inet + ${vyos_op_scripts_dir}/dhcp.py show_server_pool_statistics --family inet @@ -149,7 +149,7 @@ service dhcp-server shared-network-name - ${vyos_op_scripts_dir}/dhcp.py show_pool_statistics --family inet --pool $6 + ${vyos_op_scripts_dir}/dhcp.py show_server_pool_statistics --family inet --pool $6 @@ -232,7 +232,7 @@ Show DHCPv6 server statistics - ${vyos_op_scripts_dir}/dhcp.py show_pool_statistics --family inet6 + ${vyos_op_scripts_dir}/dhcp.py show_server_pool_statistics --family inet6 @@ -241,7 +241,7 @@ service dhcpv6-server shared-network-name - ${vyos_op_scripts_dir}/dhcp.py show_pool_statistics --family inet6 --pool $6 + ${vyos_op_scripts_dir}/dhcp.py show_server_pool_statistics --family inet6 --pool $6 diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index fde89c7068..0717fc3339 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -45,6 +45,8 @@ sort_valid_inet6 = ['end', 'duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type'] mapping_sort_valid = ['mac', 'ip', 'pool', 'duid'] +stale_warn_msg = 'DHCP server is configured but not started. Data may be stale.' + ArgFamily = typing.Literal['inet', 'inet6'] ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] ArgOrigin = typing.Literal['local', 'remote'] @@ -127,7 +129,7 @@ def _get_pool_size(pool, family='inet'): return size -def _get_raw_pool_statistics(config, family='inet', pool=None): +def _get_raw_server_pool_statistics(config, family='inet', pool=None): inet_suffix = '6' if family == 'inet6' else '4' pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) @@ -142,7 +144,7 @@ def _get_raw_pool_statistics(config, family='inet', pool=None): return stats -def _get_formatted_pool_statistics(pool_data, family='inet'): +def _get_formatted_server_pool_statistics(pool_data, family='inet'): data_entries = [] for entry in pool_data: pool = entry.get('pool') @@ -175,32 +177,22 @@ def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=Non def _get_formatted_server_static_mappings(raw_data, family='inet'): data_entries = [] - if family == 'inet': - for entry in raw_data: - pool = entry.get('pool') - subnet = entry.get('subnet') - hostname = entry.get('hostname') - ip_addr = entry.get('ip', 'N/A') - mac_addr = entry.get('mac', 'N/A') - duid = entry.get('duid', 'N/A') - description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) - elif family == 'inet6': - for entry in raw_data: - pool = entry.get('pool') - subnet = entry.get('subnet') - hostname = entry.get('hostname') - ip_addr = entry.get('ip', 'N/A') - mac_addr = entry.get('mac', 'N/A') - duid = entry.get('duid', 'N/A') - description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) + + for entry in raw_data: + pool = entry.get('pool') + subnet = entry.get('subnet') + hostname = entry.get('hostname') + ip_addr = entry.get('ip', 'N/A') + mac_addr = entry.get('mac', 'N/A') + duid = entry.get('duid', 'N/A') + description = entry.get('description', 'N/A') + data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) headers = ['Pool', 'Subnet', 'Hostname', 'IP Address', 'MAC Address', 'DUID', 'Description'] output = tabulate(data_entries, headers, numalign='left') return output -def _verify(func): +def _verify_server(func): """Decorator checks if DHCP(v6) config exists""" from functools import wraps @@ -236,14 +228,14 @@ def _wrapper(*args, **kwargs): return func(*args, **kwargs) return _wrapper -@_verify -def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): +@_verify_server +def show_server_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): v = 'v6' if family == 'inet6' else '' inet_suffix = '6' if family == 'inet6' else '4' if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): - Warning('DHCP server is configured but not started. Data may be stale.') + Warning(stale_warn_msg) try: active_config = kea_get_active_config(inet_suffix) @@ -255,14 +247,14 @@ def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str if pool and active_pools and pool not in active_pools: raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') - pool_data = _get_raw_pool_statistics(active_config, family=family, pool=pool) + pool_data = _get_raw_server_pool_statistics(active_config, family=family, pool=pool) if raw: return pool_data else: - return _get_formatted_pool_statistics(pool_data, family=family) + return _get_formatted_server_pool_statistics(pool_data, family=family) -@_verify +@_verify_server def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str], state: typing.Optional[ArgState], origin: typing.Optional[ArgOrigin] ): @@ -271,7 +263,7 @@ def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], inet_suffix = '6' if family == 'inet6' else '4' if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): - Warning('DHCP server is configured but not started. Data may be stale.') + Warning(stale_warn_msg) try: active_config = kea_get_active_config(inet_suffix) @@ -296,14 +288,14 @@ def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], else: return _get_formatted_server_leases(lease_data, family=family) -@_verify +@_verify_server def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str]): v = 'v6' if family == 'inet6' else '' inet_suffix = '6' if family == 'inet6' else '4' if not is_systemd_service_running(f'kea-dhcp{inet_suffix}-server.service'): - Warning('DHCP server is configured but not started. Data may be stale.') + Warning(stale_warn_msg) try: active_config = kea_get_active_config(inet_suffix) @@ -328,7 +320,7 @@ def _lease_valid(inet, address): leases = kea_get_leases(inet) return any(lease['ip-address'] == address for lease in leases) -@_verify +@_verify_server def clear_dhcp_server_lease(family: ArgFamily, address: str): v = 'v6' if family == 'inet6' else '' inet = '6' if family == 'inet6' else '4' From 5f06e1cb23e3094eadf560b1008fca0080fed70a Mon Sep 17 00:00:00 2001 From: Indrajit Raychaudhuri Date: Thu, 16 Jan 2025 16:47:14 -0600 Subject: [PATCH 3/3] ruff: T6583: Reformat to comply with code style --- python/vyos/kea.py | 148 +++++++++++++++++++---------- src/op_mode/dhcp.py | 221 +++++++++++++++++++++++++++++++++----------- 2 files changed, 267 insertions(+), 102 deletions(-) diff --git a/python/vyos/kea.py b/python/vyos/kea.py index 32a118f68a..951c836939 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -43,7 +43,7 @@ 'time_offset': 'time-offset', 'wpad_url': 'wpad-url', 'ipv6_only_preferred': 'v6-only-preferred', - 'captive_portal': 'v4-captive-portal' + 'captive_portal': 'v4-captive-portal', } kea6_options = { @@ -55,21 +55,23 @@ 'nisplus_domain': 'nisp-domain-name', 'nisplus_server': 'nisp-servers', 'sntp_server': 'sntp-servers', - 'captive_portal': 'v6-captive-portal' + 'captive_portal': 'v6-captive-portal', } kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket' + def _format_hex_string(in_str): - out_str = "" + out_str = '' # if input is divisible by 2, add : every 2 chars if len(in_str) > 0 and len(in_str) % 2 == 0: - out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2])) + out_str = ':'.join(a + b for a, b in zip(in_str[::2], in_str[1::2])) else: out_str = in_str return out_str + def _find_list_of_dict_index(lst, key='ip', value=''): """ Find the index entry of list of dict matching the dict value @@ -81,6 +83,7 @@ def _find_list_of_dict_index(lst, key='ip', value=''): idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None) return idx + def kea_parse_options(config): options = [] @@ -88,14 +91,21 @@ def kea_parse_options(config): if node not in config: continue - value = ", ".join(config[node]) if isinstance(config[node], list) else config[node] + value = ( + ', '.join(config[node]) if isinstance(config[node], list) else config[node] + ) options.append({'name': option_name, 'data': value}) if 'client_prefix_length' in config: - options.append({'name': 'subnet-mask', 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length'])}) + options.append( + { + 'name': 'subnet-mask', + 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length']), + } + ) if 'ip_forwarding' in config: - options.append({'name': 'ip-forwarding', 'data': "true"}) + options.append({'name': 'ip-forwarding', 'data': 'true'}) if 'static_route' in config: default_route = '' @@ -103,31 +113,41 @@ def kea_parse_options(config): if 'default_router' in config: default_route = isc_static_route('0.0.0.0/0', config['default_router']) - routes = [isc_static_route(route, route_options['next_hop']) for route, route_options in config['static_route'].items()] - - options.append({'name': 'rfc3442-static-route', 'data': ", ".join(routes if not default_route else routes + [default_route])}) - options.append({'name': 'windows-static-route', 'data': ", ".join(routes)}) + routes = [ + isc_static_route(route, route_options['next_hop']) + for route, route_options in config['static_route'].items() + ] + + options.append( + { + 'name': 'rfc3442-static-route', + 'data': ', '.join( + routes if not default_route else routes + [default_route] + ), + } + ) + options.append({'name': 'windows-static-route', 'data': ', '.join(routes)}) if 'time_zone' in config: - with open("/usr/share/zoneinfo/" + config['time_zone'], "rb") as f: - tz_string = f.read().split(b"\n")[-2].decode("utf-8") + with open('/usr/share/zoneinfo/' + config['time_zone'], 'rb') as f: + tz_string = f.read().split(b'\n')[-2].decode('utf-8') options.append({'name': 'pcode', 'data': tz_string}) options.append({'name': 'tcode', 'data': config['time_zone']}) - unifi_controller = dict_search_args(config, 'vendor_option', 'ubiquiti', 'unifi_controller') + unifi_controller = dict_search_args( + config, 'vendor_option', 'ubiquiti', 'unifi_controller' + ) if unifi_controller: - options.append({ - 'name': 'unifi-controller', - 'data': unifi_controller, - 'space': 'ubnt' - }) + options.append( + {'name': 'unifi-controller', 'data': unifi_controller, 'space': 'ubnt'} + ) return options + def kea_parse_subnet(subnet, config): out = {'subnet': subnet, 'id': int(config['subnet_id'])} - options = [] if 'option' in config: out['option-data'] = kea_parse_options(config['option']) @@ -149,9 +169,7 @@ def kea_parse_subnet(subnet, config): pools = [] for num, range_config in config['range'].items(): start, stop = range_config['start'], range_config['stop'] - pool = { - 'pool': f'{start} - {stop}' - } + pool = {'pool': f'{start} - {stop}'} if 'option' in range_config: pool['option-data'] = kea_parse_options(range_config['option']) @@ -188,16 +206,21 @@ def kea_parse_subnet(subnet, config): reservation['option-data'] = kea_parse_options(host_config['option']) if 'bootfile_name' in host_config['option']: - reservation['boot-file-name'] = host_config['option']['bootfile_name'] + reservation['boot-file-name'] = host_config['option'][ + 'bootfile_name' + ] if 'bootfile_server' in host_config['option']: - reservation['next-server'] = host_config['option']['bootfile_server'] + reservation['next-server'] = host_config['option'][ + 'bootfile_server' + ] reservations.append(reservation) out['reservations'] = reservations return out + def kea6_parse_options(config): options = [] @@ -205,7 +228,9 @@ def kea6_parse_options(config): if node not in config: continue - value = ", ".join(config[node]) if isinstance(config[node], list) else config[node] + value = ( + ', '.join(config[node]) if isinstance(config[node], list) else config[node] + ) options.append({'name': option_name, 'data': value}) if 'sip_server' in config: @@ -221,17 +246,20 @@ def kea6_parse_options(config): hosts.append(server) if addrs: - options.append({'name': 'sip-server-addr', 'data': ", ".join(addrs)}) + options.append({'name': 'sip-server-addr', 'data': ', '.join(addrs)}) if hosts: - options.append({'name': 'sip-server-dns', 'data': ", ".join(hosts)}) + options.append({'name': 'sip-server-dns', 'data': ', '.join(hosts)}) cisco_tftp = dict_search_args(config, 'vendor_option', 'cisco', 'tftp-server') if cisco_tftp: - options.append({'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp}) + options.append( + {'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp} + ) return options + def kea6_parse_subnet(subnet, config): out = {'subnet': subnet, 'id': int(config['subnet_id'])} @@ -269,12 +297,14 @@ def kea6_parse_subnet(subnet, config): pd_pool = { 'prefix': prefix, 'prefix-len': int(pd_conf['prefix_length']), - 'delegated-len': int(pd_conf['delegated_length']) + 'delegated-len': int(pd_conf['delegated_length']), } if 'excluded_prefix' in pd_conf: pd_pool['excluded-prefix'] = pd_conf['excluded_prefix'] - pd_pool['excluded-prefix-len'] = int(pd_conf['excluded_prefix_length']) + pd_pool['excluded-prefix-len'] = int( + pd_conf['excluded_prefix_length'] + ) pd_pools.append(pd_pool) @@ -294,9 +324,7 @@ def kea6_parse_subnet(subnet, config): if 'disable' in host_config: continue - reservation = { - 'hostname': host - } + reservation = {'hostname': host} if 'mac' in host_config: reservation['hw-address'] = host_config['mac'] @@ -305,10 +333,10 @@ def kea6_parse_subnet(subnet, config): reservation['duid'] = host_config['duid'] if 'ipv6_address' in host_config: - reservation['ip-addresses'] = [ host_config['ipv6_address'] ] + reservation['ip-addresses'] = [host_config['ipv6_address']] if 'ipv6_prefix' in host_config: - reservation['prefixes'] = [ host_config['ipv6_prefix'] ] + reservation['prefixes'] = [host_config['ipv6_prefix']] if 'option' in host_config: reservation['option-data'] = kea6_parse_options(host_config['option']) @@ -319,6 +347,7 @@ def kea6_parse_subnet(subnet, config): return out + def _ctrl_socket_command(inet, command, args=None): path = kea_ctrl_socket.format(inet=inet) @@ -345,6 +374,7 @@ def _ctrl_socket_command(inet, command, args=None): return json.loads(result.decode('utf-8')) + def kea_get_leases(inet): leases = _ctrl_socket_command(inet, f'lease{inet}-get-all') @@ -353,6 +383,7 @@ def kea_get_leases(inet): return leases['arguments']['leases'] + def kea_delete_lease(inet, ip_address): args = {'ip-address': ip_address} @@ -363,6 +394,7 @@ def kea_delete_lease(inet, ip_address): return False + def kea_get_active_config(inet): config = _ctrl_socket_command(inet, 'config-get') @@ -371,12 +403,18 @@ def kea_get_active_config(inet): return config + def kea_get_dhcp_pools(config, inet): - shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') + shared_networks = dict_search_args( + config, 'arguments', f'Dhcp{inet}', 'shared-networks' + ) return [network['name'] for network in shared_networks] if shared_networks else [] + def kea_get_pool_from_subnet_id(config, inet, subnet_id): - shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') + shared_networks = dict_search_args( + config, 'arguments', f'Dhcp{inet}', 'shared-networks' + ) if not shared_networks: return None @@ -397,7 +435,9 @@ def kea_get_static_mappings(config, inet, pools=[]) -> list: Get DHCP static mapping from active Kea DHCPv4 or DHCPv6 configuration :return list """ - shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') + shared_networks = dict_search_args( + config, 'arguments', f'Dhcp{inet}', 'shared-networks' + ) mappings = [] @@ -414,7 +454,9 @@ def kea_get_static_mappings(config, inet, pools=[]) -> list: mapping = {'pool': p, 'subnet': subnet['subnet']} mapping.update(reservation) # rename 'ip(v6)-address' to 'ip', inet6 has 'ipv6-address' and inet has 'ip-address' - mapping['ip'] = mapping.pop('ipv6-address', mapping.pop('ip-address', None)) + mapping['ip'] = mapping.pop( + 'ipv6-address', mapping.pop('ip-address', None) + ) # rename 'hw-address' to 'mac' mapping['mac'] = mapping.pop('hw-address', None) mappings.append(mapping) @@ -432,18 +474,28 @@ def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list data = [] for lease in leases: lifetime = lease['valid-lft'] - expiry = (lease['cltt'] + lifetime) + expiry = lease['cltt'] + lifetime - lease['start_timestamp'] = datetime.fromtimestamp(expiry - lifetime, timezone.utc) - lease['expire_timestamp'] = datetime.fromtimestamp(expiry, timezone.utc) if expiry else None + lease['start_timestamp'] = datetime.fromtimestamp( + expiry - lifetime, timezone.utc + ) + lease['expire_timestamp'] = ( + datetime.fromtimestamp(expiry, timezone.utc) if expiry else None + ) data_lease = {} data_lease['ip'] = lease['ip-address'] lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'} data_lease['state'] = lease_state_long[lease['state']] - data_lease['pool'] = kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) if config else '-' - data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None - data_lease['origin'] = 'local' # TODO: Determine remote in HA + data_lease['pool'] = ( + kea_get_pool_from_subnet_id(config, inet, lease['subnet-id']) + if config + else '-' + ) + data_lease['end'] = ( + lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None + ) + data_lease['origin'] = 'local' # TODO: Determine remote in HA # remove trailing dot in 'hostname' to ensure consistency for `vyos-hostsd-client` data_lease['hostname'] = lease.get('hostname', '-').rstrip('.') @@ -463,7 +515,9 @@ def kea_get_server_leases(config, inet, pools=[], state=[], origin=None) -> list data_lease['remaining'] = '-' if lease['valid-lft'] > 0: - data_lease['remaining'] = lease['expire_timestamp'] - datetime.now(timezone.utc) + data_lease['remaining'] = lease['expire_timestamp'] - datetime.now( + timezone.utc + ) if data_lease['remaining'].days >= 0: # substraction gives us a timedelta object which can't be formatted with strftime diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index 0717fc3339..b3d7d4dd3c 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -37,25 +37,59 @@ from vyos.utils.process import call from vyos.utils.process import is_systemd_service_running -time_string = "%a %b %d %H:%M:%S %Z %Y" +time_string = '%a %b %d %H:%M:%S %Z %Y' config = ConfigTreeQuery() -lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] -sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state'] -sort_valid_inet6 = ['end', 'duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type'] +lease_valid_states = [ + 'all', + 'active', + 'free', + 'expired', + 'released', + 'abandoned', + 'reset', + 'backup', +] +sort_valid_inet = [ + 'end', + 'mac', + 'hostname', + 'ip', + 'pool', + 'remaining', + 'start', + 'state', +] +sort_valid_inet6 = [ + 'end', + 'duid', + 'ip', + 'last_communication', + 'pool', + 'remaining', + 'state', + 'type', +] mapping_sort_valid = ['mac', 'ip', 'pool', 'duid'] stale_warn_msg = 'DHCP server is configured but not started. Data may be stale.' ArgFamily = typing.Literal['inet', 'inet6'] -ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] +ArgState = typing.Literal[ + 'all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup' +] ArgOrigin = typing.Literal['local', 'remote'] + def _utc_to_local(utc_dt): - return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()) + return datetime.fromtimestamp( + (datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds() + ) -def _get_raw_server_leases(config, family='inet', pool=None, sorted=None, state=[], origin=None) -> list: +def _get_raw_server_leases( + config, family='inet', pool=None, sorted=None, state=[], origin=None +) -> list: inet_suffix = '6' if family == 'inet6' else '4' pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) @@ -63,9 +97,9 @@ def _get_raw_server_leases(config, family='inet', pool=None, sorted=None, state= if sorted: if sorted == 'ip': - mappings.sort(key = lambda x:ip_address(x['ip'])) + mappings.sort(key=lambda x: ip_address(x['ip'])) else: - mappings.sort(key = lambda x:x[sorted]) + mappings.sort(key=lambda x: x[sorted]) return mappings @@ -77,34 +111,55 @@ def _get_formatted_server_leases(raw_data, family='inet'): hw_addr = lease.get('mac') state = lease.get('state') start = lease.get('start') - start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') + start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') end = lease.get('end') - end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-' + end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-' remain = lease.get('remaining') pool = lease.get('pool') hostname = lease.get('hostname') origin = lease.get('origin') - data_entries.append([ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin]) - - headers = ['IP Address', 'MAC address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool', - 'Hostname', 'Origin'] + data_entries.append( + [ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin] + ) + + headers = [ + 'IP Address', + 'MAC address', + 'State', + 'Lease start', + 'Lease expiration', + 'Remaining', + 'Pool', + 'Hostname', + 'Origin', + ] if family == 'inet6': for lease in raw_data: ipaddr = lease.get('ip') state = lease.get('state') start = lease.get('last_communication') - start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') + start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') end = lease.get('end') - end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') + end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') remain = lease.get('remaining') lease_type = lease.get('type') pool = lease.get('pool') host_identifier = lease.get('duid') - data_entries.append([ipaddr, state, start, end, remain, lease_type, pool, host_identifier]) - - headers = ['IPv6 address', 'State', 'Last communication', 'Lease expiration', 'Remaining', 'Type', 'Pool', - 'DUID'] + data_entries.append( + [ipaddr, state, start, end, remain, lease_type, pool, host_identifier] + ) + + headers = [ + 'IPv6 address', + 'State', + 'Last communication', + 'Lease expiration', + 'Remaining', + 'Type', + 'Pool', + 'DUID', + ] output = tabulate(data_entries, headers, numalign='left') return output @@ -138,8 +193,13 @@ def _get_raw_server_pool_statistics(config, family='inet', pool=None): size = _get_pool_size(family=family, pool=p) leases = len(_get_raw_server_leases(config, family=family, pool=p)) use_percentage = round(leases / size * 100) if size != 0 else 0 - pool_stats = {'pool': p, 'size': size, 'leases': leases, - 'available': (size - leases), 'use_percentage': use_percentage} + pool_stats = { + 'pool': p, + 'size': size, + 'leases': leases, + 'available': (size - leases), + 'use_percentage': use_percentage, + } stats.append(pool_stats) return stats @@ -155,13 +215,12 @@ def _get_formatted_server_pool_statistics(pool_data, family='inet'): use_percentage = f'{use_percentage}%' data_entries.append([pool, size, leases, available, use_percentage]) - headers = ['Pool', 'Size','Leases', 'Available', 'Usage'] + headers = ['Pool', 'Size', 'Leases', 'Available', 'Usage'] output = tabulate(data_entries, headers, numalign='left') return output def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=None): - inet_suffix = '6' if family == 'inet6' else '4' pools = [pool] if pool else kea_get_dhcp_pools(config, inet_suffix) @@ -169,9 +228,9 @@ def _get_raw_server_static_mappings(config, family='inet', pool=None, sorted=Non if sorted: if sorted == 'ip': - mappings.sort(key = lambda x:ip_address(x['ip'])) + mappings.sort(key=lambda x: ip_address(x['ip'])) else: - mappings.sort(key = lambda x:x[sorted]) + mappings.sort(key=lambda x: x[sorted]) return mappings @@ -186,12 +245,23 @@ def _get_formatted_server_static_mappings(raw_data, family='inet'): mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') description = entry.get('description', 'N/A') - data_entries.append([pool, subnet, hostname, ip_addr, mac_addr, duid, description]) - - headers = ['Pool', 'Subnet', 'Hostname', 'IP Address', 'MAC Address', 'DUID', 'Description'] + data_entries.append( + [pool, subnet, hostname, ip_addr, mac_addr, duid, description] + ) + + headers = [ + 'Pool', + 'Subnet', + 'Hostname', + 'IP Address', + 'MAC Address', + 'DUID', + 'Description', + ] output = tabulate(data_entries, headers, numalign='left') return output + def _verify_server(func): """Decorator checks if DHCP(v6) config exists""" from functools import wraps @@ -206,8 +276,10 @@ def _wrapper(*args, **kwargs): if not config.exists(f'service dhcp{v}-server'): raise vyos.opmode.UnconfiguredSubsystem(unconf_message) return func(*args, **kwargs) + return _wrapper + def _verify_client(func): """Decorator checks if interface is configured as DHCP client""" from functools import wraps @@ -226,11 +298,14 @@ def _wrapper(*args, **kwargs): if not config.exists(f'interfaces {interface_path} address dhcp{v}'): raise vyos.opmode.UnconfiguredObject(unconf_message) return func(*args, **kwargs) + return _wrapper -@_verify_server -def show_server_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): +@_verify_server +def show_server_pool_statistics( + raw: bool, family: ArgFamily, pool: typing.Optional[str] +): v = 'v6' if family == 'inet6' else '' inet_suffix = '6' if family == 'inet6' else '4' @@ -255,10 +330,14 @@ def show_server_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optio @_verify_server -def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], - sorted: typing.Optional[str], state: typing.Optional[ArgState], - origin: typing.Optional[ArgOrigin] ): - +def show_server_leases( + raw: bool, + family: ArgFamily, + pool: typing.Optional[str], + sorted: typing.Optional[str], + state: typing.Optional[ArgState], + origin: typing.Optional[ArgOrigin], +): v = 'v6' if family == 'inet6' else '' inet_suffix = '6' if family == 'inet6' else '4' @@ -282,15 +361,27 @@ def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], if state and state not in lease_valid_states: raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') - lease_data = _get_raw_server_leases(config=active_config, family=family, pool=pool, sorted=sorted, state=state, origin=origin) + lease_data = _get_raw_server_leases( + config=active_config, + family=family, + pool=pool, + sorted=sorted, + state=state, + origin=origin, + ) if raw: return lease_data else: return _get_formatted_server_leases(lease_data, family=family) + @_verify_server -def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str], - sorted: typing.Optional[str]): +def show_server_static_mappings( + raw: bool, + family: ArgFamily, + pool: typing.Optional[str], + sorted: typing.Optional[str], +): v = 'v6' if family == 'inet6' else '' inet_suffix = '6' if family == 'inet6' else '4' @@ -310,16 +401,20 @@ def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optio if sorted and sorted not in mapping_sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') - static_mappings = _get_raw_server_static_mappings(config=active_config, family=family, pool=pool, sorted=sorted) + static_mappings = _get_raw_server_static_mappings( + config=active_config, family=family, pool=pool, sorted=sorted + ) if raw: return static_mappings else: return _get_formatted_server_static_mappings(static_mappings, family=family) + def _lease_valid(inet, address): leases = kea_get_leases(inet) return any(lease['ip-address'] == address for lease in leases) + @_verify_server def clear_dhcp_server_lease(family: ArgFamily, address: str): v = 'v6' if family == 'inet6' else '' @@ -335,6 +430,7 @@ def clear_dhcp_server_lease(family: ArgFamily, address: str): print(f'Lease "{address}" has been cleared') + def _get_raw_client_leases(family='inet', interface=None): from time import mktime from datetime import datetime @@ -363,21 +459,28 @@ def _get_raw_client_leases(family='inet', interface=None): # format this makes less sense for an API and also the expiry # timestamp is provided in UNIX time. Convert string (e.g. Sun Jul # 30 18:13:44 CEST 2023) to UNIX time (1690733624) - tmp.update({'last_update' : int(mktime(datetime.strptime(line, time_string).timetuple()))}) + tmp.update( + { + 'last_update': int( + mktime(datetime.strptime(line, time_string).timetuple()) + ) + } + ) continue k, v = line.split('=') - tmp.update({k : v.replace("'", "")}) + tmp.update({k: v.replace("'", '')}) if 'interface' in tmp: vrf = get_interface_vrf(tmp['interface']) if vrf: - tmp.update({'vrf' : vrf}) + tmp.update({'vrf': vrf}) lease_data.append(tmp) return lease_data + def _get_formatted_client_leases(lease_data, family): from time import localtime from time import strftime @@ -388,30 +491,34 @@ def _get_formatted_client_leases(lease_data, family): for lease in lease_data: if not lease.get('new_ip_address'): continue - data_entries.append(["Interface", lease['interface']]) + data_entries.append(['Interface', lease['interface']]) if 'new_ip_address' in lease: - tmp = '[Active]' if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) else '[Inactive]' - data_entries.append(["IP address", lease['new_ip_address'], tmp]) + tmp = ( + '[Active]' + if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) + else '[Inactive]' + ) + data_entries.append(['IP address', lease['new_ip_address'], tmp]) if 'new_subnet_mask' in lease: - data_entries.append(["Subnet Mask", lease['new_subnet_mask']]) + data_entries.append(['Subnet Mask', lease['new_subnet_mask']]) if 'new_domain_name' in lease: - data_entries.append(["Domain Name", lease['new_domain_name']]) + data_entries.append(['Domain Name', lease['new_domain_name']]) if 'new_routers' in lease: - data_entries.append(["Router", lease['new_routers']]) + data_entries.append(['Router', lease['new_routers']]) if 'new_domain_name_servers' in lease: - data_entries.append(["Name Server", lease['new_domain_name_servers']]) + data_entries.append(['Name Server', lease['new_domain_name_servers']]) if 'new_dhcp_server_identifier' in lease: - data_entries.append(["DHCP Server", lease['new_dhcp_server_identifier']]) + data_entries.append(['DHCP Server', lease['new_dhcp_server_identifier']]) if 'new_dhcp_lease_time' in lease: - data_entries.append(["DHCP Server", lease['new_dhcp_lease_time']]) + data_entries.append(['DHCP Server', lease['new_dhcp_lease_time']]) if 'vrf' in lease: - data_entries.append(["VRF", lease['vrf']]) + data_entries.append(['VRF', lease['vrf']]) if 'last_update' in lease: tmp = strftime(time_string, localtime(int(lease['last_update']))) - data_entries.append(["Last Update", tmp]) + data_entries.append(['Last Update', tmp]) if 'new_expiry' in lease: tmp = strftime(time_string, localtime(int(lease['new_expiry']))) - data_entries.append(["Expiry", tmp]) + data_entries.append(['Expiry', tmp]) # Add empty marker data_entries.append(['']) @@ -420,6 +527,7 @@ def _get_formatted_client_leases(lease_data, family): return output + def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[str]): lease_data = _get_raw_client_leases(family=family, interface=interface) if raw: @@ -427,6 +535,7 @@ def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[ else: return _get_formatted_client_leases(lease_data, family=family) + @_verify_client def renew_client_lease(raw: bool, family: ArgFamily, interface: str): if not raw: @@ -437,6 +546,7 @@ def renew_client_lease(raw: bool, family: ArgFamily, interface: str): else: call(f'systemctl restart dhclient@{interface}.service') + @_verify_client def release_client_lease(raw: bool, family: ArgFamily, interface: str): if not raw: @@ -447,6 +557,7 @@ def release_client_lease(raw: bool, family: ArgFamily, interface: str): else: call(f'systemctl stop dhclient@{interface}.service') + if __name__ == '__main__': try: res = vyos.opmode.run(sys.modules[__name__])