From 756718f5d27f7d9a8ca82af0b1102592511dd210 Mon Sep 17 00:00:00 2001 From: Sergey Kolupaev Date: Fri, 1 Dec 2023 18:55:26 -0800 Subject: [PATCH] Release 16.9.28 --- .github/workflows/test-with-pytest.yml | 2 +- keepercommander/__init__.py | 2 +- keepercommander/commands/base.py | 7 +- requirements.txt | 2 +- setup.cfg | 2 +- unit-tests/pam-tunnel/test_pam_tunnel.py | 293 ++++++++++--------- unit-tests/pam-tunnel/test_private_tunnel.py | 26 +- unit-tests/pam-tunnel/test_public_tunnel.py | 19 +- 8 files changed, 178 insertions(+), 175 deletions(-) diff --git a/.github/workflows/test-with-pytest.yml b/.github/workflows/test-with-pytest.yml index 5914956bc..4ff899c48 100644 --- a/.github/workflows/test-with-pytest.yml +++ b/.github/workflows/test-with-pytest.yml @@ -9,7 +9,7 @@ jobs: test-with-pytest: strategy: matrix: - python-version: ['3.6', '3.10'] + python-version: ['3.6', '3.11'] runs-on: ubuntu-20.04 diff --git a/keepercommander/__init__.py b/keepercommander/__init__.py index ef367215c..759cf5b48 100644 --- a/keepercommander/__init__.py +++ b/keepercommander/__init__.py @@ -10,4 +10,4 @@ # Contact: ops@keepersecurity.com # -__version__ = '16.9.27' +__version__ = '16.9.28' diff --git a/keepercommander/commands/base.py b/keepercommander/commands/base.py index f4e8fc430..228434ffe 100644 --- a/keepercommander/commands/base.py +++ b/keepercommander/commands/base.py @@ -113,9 +113,10 @@ def register_commands(commands, aliases, command_info): commands['2fa'] = TwoFaCommand() command_info['2fa'] = '2FA management' - from . import discoveryrotation - discoveryrotation.register_commands(commands) - discoveryrotation.register_command_info(aliases, command_info) + if sys.version_info.major >= 3 and sys.version_info.minor >= 8: + from . import discoveryrotation + discoveryrotation.register_commands(commands) + discoveryrotation.register_command_info(aliases, command_info) def register_enterprise_commands(commands, aliases, command_info): diff --git a/requirements.txt b/requirements.txt index 32ad548bb..9a6a7386c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,4 @@ tabulate pyqrcode keeper-secrets-manager-core>=16.6.0 websockets -aiortc \ No newline at end of file +aiortc; python_version >= '3.8' \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 81319ee51..6421e209e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -42,7 +42,7 @@ install_requires = tabulate keeper-secrets-manager-core>=16.2.0 websockets - aiortc + aiortc; python_version >= '3.8' [options.extras_require] test = diff --git a/unit-tests/pam-tunnel/test_pam_tunnel.py b/unit-tests/pam-tunnel/test_pam_tunnel.py index 0aec4e0be..03097c7cf 100644 --- a/unit-tests/pam-tunnel/test_pam_tunnel.py +++ b/unit-tests/pam-tunnel/test_pam_tunnel.py @@ -1,155 +1,156 @@ -import datetime -import socket -import string +import sys import unittest from unittest import mock -from cryptography import x509 -from cryptography.hazmat._oid import NameOID -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization, hashes -from cryptography.hazmat.primitives.asymmetric import ec - -from keepercommander.commands.tunnel.port_forward.endpoint import (generate_random_bytes, find_open_port) - - -def generate_self_signed_cert(private_key): - # Generate a self-signed certificate - subject = issuer = x509.Name([ - x509.NameAttribute(NameOID.COMMON_NAME, u"localhost"), - ]) - cert = ( - x509.CertificateBuilder() - .subject_name(subject) - .issuer_name(issuer) - .public_key(private_key.public_key()) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.datetime.utcnow()) - .not_valid_after( - # Our certificate will be valid for 10 days - datetime.datetime.utcnow() + datetime.timedelta(days=10) +if sys.version_info >= (3, 15): + import datetime + import socket + import string + from cryptography import x509 + from cryptography.hazmat._oid import NameOID + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization, hashes + from cryptography.hazmat.primitives.asymmetric import ec + + from keepercommander.commands.tunnel.port_forward.endpoint import (generate_random_bytes, find_open_port) + + def generate_self_signed_cert(private_key): + # Generate a self-signed certificate + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, u"localhost"), + ]) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(private_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.utcnow()) + .not_valid_after( + # Our certificate will be valid for 10 days + datetime.datetime.utcnow() + datetime.timedelta(days=10) + ) + .sign(private_key, hashes.SHA256(), default_backend()) ) - .sign(private_key, hashes.SHA256(), default_backend()) - ) - cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8') - - return cert_pem - - -def new_private_key(): - # Generate an EC private key - private_key = ec.generate_private_key( - ec.SECP256R1(), # Using P-256 curve - backend=default_backend() - ) - # Serialize to PEM format - private_key_str = private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() - ).decode('utf-8') - return private_key, private_key_str - - -class TestFindOpenPort(unittest.TestCase): - def mock_bind(self, address): - # Mock the behavior of socket.socket.bind - port = address[1] - if port in self.in_use_ports: - raise OSError("Address already in use") - else: - print(f"Port {port} bound successfully.") - - def test_preferred_port(self): - # Test that the function returns the preferred port if it's available - preferred_port = 50000 - open_port = find_open_port([], preferred_port=preferred_port) - self.assertEqual(open_port, preferred_port) - - def test_preferred_port_unavailable(self): - # Mock the bind method to simulate that port 80 is in use - with mock.patch('socket.socket.bind', side_effect=OSError("Address already in use")): - preferred_port = 80 - open_port = find_open_port([], preferred_port=preferred_port) - self.assertIsNone(open_port) + cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8') - def test_range(self): - # Test that the function returns a port within the specified range - start_port = 50000 - end_port = 50010 - open_port = find_open_port([], start_port=start_port, end_port=end_port) - self.assertTrue(start_port <= open_port <= end_port) - - def test_no_available_ports(self): - # Setup - self.in_use_ports = set(range(50000, 50011)) # All these ports are in use - - # Patch - with mock.patch.object(socket.socket, 'bind', side_effect=self.mock_bind): - # Test - open_port = find_open_port([], start_port=50000, end_port=50010) - self.assertIsNone(open_port) + return cert_pem - def test_invalid_range(self): - # Test that the function returns None if the range is invalid - open_port = find_open_port([], start_port=50010, end_port=50000) - self.assertIsNone(open_port) - def test_socket_exception(self): - # Test that the function handles exceptions other than OSError gracefully - with mock.patch('socket.socket.bind', side_effect=Exception("Test exception")): - open_port = find_open_port([], start_port=49152, end_port=49153, host='localhost') + def new_private_key(): + # Generate an EC private key + private_key = ec.generate_private_key( + ec.SECP256R1(), # Using P-256 curve + backend=default_backend() + ) + # Serialize to PEM format + private_key_str = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ).decode('utf-8') + return private_key, private_key_str + + + class TestFindOpenPort(unittest.TestCase): + def mock_bind(self, address): + # Mock the behavior of socket.socket.bind + port = address[1] + if port in self.in_use_ports: + raise OSError("Address already in use") + else: + print(f"Port {port} bound successfully.") + + def test_preferred_port(self): + # Test that the function returns the preferred port if it's available + preferred_port = 50000 + open_port = find_open_port([], preferred_port=preferred_port) + self.assertEqual(open_port, preferred_port) + + def test_preferred_port_unavailable(self): + # Mock the bind method to simulate that port 80 is in use + with mock.patch('socket.socket.bind', side_effect=OSError("Address already in use")): + preferred_port = 80 + open_port = find_open_port([], preferred_port=preferred_port) + self.assertIsNone(open_port) + + def test_range(self): + # Test that the function returns a port within the specified range + start_port = 50000 + end_port = 50010 + open_port = find_open_port([], start_port=start_port, end_port=end_port) + self.assertTrue(start_port <= open_port <= end_port) + + def test_no_available_ports(self): + # Setup + self.in_use_ports = set(range(50000, 50011)) # All these ports are in use + + # Patch + with mock.patch.object(socket.socket, 'bind', side_effect=self.mock_bind): + # Test + open_port = find_open_port([], start_port=50000, end_port=50010) + self.assertIsNone(open_port) + + def test_invalid_range(self): + # Test that the function returns None if the range is invalid + open_port = find_open_port([], start_port=50010, end_port=50000) self.assertIsNone(open_port) - def test_tried_ports(self): - # Setup - self.in_use_ports = {50000, 50001} # These ports are in use - - # Patch - with mock.patch.object(socket.socket, 'bind', side_effect=self.mock_bind): - # Test - open_port = find_open_port([50000, 50001], start_port=50000, end_port=50002) - self.assertEqual(open_port, 50002) - - -class TestGenerateRandomBytes(unittest.TestCase): - - def test_default_length(self): - # Test that the default length of the returned bytes is 32 - random_bytes = generate_random_bytes() - self.assertEqual(len(random_bytes), 32, f'Length 32 failed found {len(random_bytes)} in ' - f'{random_bytes}') - - def test_custom_length(self): - # Test custom lengths - for length in [1, 10, 20, 50, 100]: - random_bytes = generate_random_bytes(length) - self.assertEqual(len(random_bytes), length, f'Length {length} failed found {len(random_bytes)} in ' - f'{random_bytes}') - - def test_content(self): - # Test that the returned bytes only contain printable characters - for length in [1, 10, 20, 50, 100]: - random_bytes = generate_random_bytes(length) - self.assertTrue(all(byte in string.printable.encode('utf-8') for byte in random_bytes)) - - def test_zero_length(self): - # Test that a zero length returns an empty bytes object - random_bytes = generate_random_bytes(0) - self.assertEqual(random_bytes, b'') - - def test_negative_length(self): - # Test that a negative length raises a ValueError - with self.assertRaises(ValueError): - generate_random_bytes(-1) - - def test_type(self): - # Test that the return type is bytes - random_bytes = generate_random_bytes() - self.assertIsInstance(random_bytes, bytes) - - def test_uniqueness(self): - # Test that multiple calls return different values - random_bytes1 = generate_random_bytes() - random_bytes2 = generate_random_bytes() - self.assertNotEqual(random_bytes1, random_bytes2) + def test_socket_exception(self): + # Test that the function handles exceptions other than OSError gracefully + with mock.patch('socket.socket.bind', side_effect=Exception("Test exception")): + open_port = find_open_port([], start_port=49152, end_port=49153, host='localhost') + self.assertIsNone(open_port) + + def test_tried_ports(self): + # Setup + self.in_use_ports = {50000, 50001} # These ports are in use + + # Patch + with mock.patch.object(socket.socket, 'bind', side_effect=self.mock_bind): + # Test + open_port = find_open_port([50000, 50001], start_port=50000, end_port=50002) + self.assertEqual(open_port, 50002) + + + class TestGenerateRandomBytes(unittest.TestCase): + + def test_default_length(self): + # Test that the default length of the returned bytes is 32 + random_bytes = generate_random_bytes() + self.assertEqual(len(random_bytes), 32, f'Length 32 failed found {len(random_bytes)} in ' + f'{random_bytes}') + + def test_custom_length(self): + # Test custom lengths + for length in [1, 10, 20, 50, 100]: + random_bytes = generate_random_bytes(length) + self.assertEqual(len(random_bytes), length, f'Length {length} failed found {len(random_bytes)} in ' + f'{random_bytes}') + + def test_content(self): + # Test that the returned bytes only contain printable characters + for length in [1, 10, 20, 50, 100]: + random_bytes = generate_random_bytes(length) + self.assertTrue(all(byte in string.printable.encode('utf-8') for byte in random_bytes)) + + def test_zero_length(self): + # Test that a zero length returns an empty bytes object + random_bytes = generate_random_bytes(0) + self.assertEqual(random_bytes, b'') + + def test_negative_length(self): + # Test that a negative length raises a ValueError + with self.assertRaises(ValueError): + generate_random_bytes(-1) + + def test_type(self): + # Test that the return type is bytes + random_bytes = generate_random_bytes() + self.assertIsInstance(random_bytes, bytes) + + def test_uniqueness(self): + # Test that multiple calls return different values + random_bytes1 = generate_random_bytes() + random_bytes2 = generate_random_bytes() + self.assertNotEqual(random_bytes1, random_bytes2) diff --git a/unit-tests/pam-tunnel/test_private_tunnel.py b/unit-tests/pam-tunnel/test_private_tunnel.py index a3c5c51cc..7a89b4841 100644 --- a/unit-tests/pam-tunnel/test_private_tunnel.py +++ b/unit-tests/pam-tunnel/test_private_tunnel.py @@ -1,21 +1,21 @@ -import asyncio -import logging -import socket import sys import unittest - -from aiortc import RTCDataChannel -from cryptography.utils import int_to_bytes -from keepercommander import utils -from keepercommander.commands.tunnel.port_forward.endpoint import (PrivateTunnelEntrance, ControlMessage, - CONTROL_MESSAGE_NO_LENGTH, CONNECTION_NO_LENGTH, - ConnectionNotFoundException, - TERMINATOR, DATA_LENGTH) -from test_pam_tunnel import new_private_key from unittest import mock +if sys.version_info >= (3, 15): + import asyncio + import logging + import socket + + from aiortc import RTCDataChannel + from cryptography.utils import int_to_bytes + from keepercommander import utils + from keepercommander.commands.tunnel.port_forward.endpoint import (PrivateTunnelEntrance, ControlMessage, + CONTROL_MESSAGE_NO_LENGTH, CONNECTION_NO_LENGTH, + ConnectionNotFoundException, + TERMINATOR, DATA_LENGTH) + from test_pam_tunnel import new_private_key -if sys.version_info >= (3, 11): # Only define the class if Python version is 3.8 or higher class TestPrivateTunnelEntrance(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): diff --git a/unit-tests/pam-tunnel/test_public_tunnel.py b/unit-tests/pam-tunnel/test_public_tunnel.py index ed170293a..06f09d132 100644 --- a/unit-tests/pam-tunnel/test_public_tunnel.py +++ b/unit-tests/pam-tunnel/test_public_tunnel.py @@ -1,17 +1,18 @@ import sys import unittest -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -from keeper_secrets_manager_core.utils import base64_to_bytes -from keepercommander.commands.tunnel.port_forward.tunnel import ITunnel -from keepercommander.commands.tunnel.port_forward.endpoint import (ControlMessage, CONTROL_MESSAGE_NO_LENGTH, - DATA_LENGTH, CONNECTION_NO_LENGTH, TunnelProtocol, - TERMINATOR, find_server_public_key) from unittest import mock +if sys.version_info >= (3, 15): + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization + from keeper_secrets_manager_core.utils import base64_to_bytes + from keepercommander.commands.tunnel.port_forward.tunnel import ITunnel + from keepercommander.commands.tunnel.port_forward.endpoint import (ControlMessage, CONTROL_MESSAGE_NO_LENGTH, + DATA_LENGTH, CONNECTION_NO_LENGTH, TunnelProtocol, + TERMINATOR, find_server_public_key) + -if sys.version_info >= (3, 11): # Only define the class if Python version is 3.8 or higher def make_private_key(): private_key = ec.generate_private_key(