Skip to content

Commit

Permalink
port forward tunneling
Browse files Browse the repository at this point in the history
  • Loading branch information
sk-keeper committed Nov 7, 2023
1 parent e67325f commit fe6182b
Show file tree
Hide file tree
Showing 5 changed files with 761 additions and 706 deletions.
3 changes: 0 additions & 3 deletions keepercommander/commands/discoveryrotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1935,9 +1935,6 @@ def execute(self, params, **kwargs):

gateway_public_key_bytes = retrieve_gateway_public_key(gateway_uid, params, api, utils)

# TODO remove debug code
print("PUBLIC KEY FOUND: ", gateway_public_key_bytes)

record = params.record_cache.get(record_uid)
if not record:
print(f"{bcolors.FAIL}Record {record_uid} not found.{bcolors.ENDC}")
Expand Down
20 changes: 1 addition & 19 deletions keepercommander/commands/tunnel/port_forward/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def generate_secure_self_signed_cert(private_key_str): # type: (str) -> Tuple[
:param private_key_str: PEM-formatted private key as a string.
:return: Tuple containing the PEM-formatted certificate and private key
"""
# This is the code that generates a new private key
'''
# Generate an EC private key
private_key = ec.generate_private_key(
Expand All @@ -116,24 +117,6 @@ def generate_secure_self_signed_cert(private_key_str): # type: (str) -> Tuple[
password=None,
backend=default_backend()
)
#
# subject = issuer = x509.Name([
# x509.NameAttribute(NameOID.COMMON_NAME, u"localhost"),
# ])
# cert = (
# x509.CertificateBuilder()
# .subject_name(subject)
# .issuer_name(issuer)
# .public_key(private_key.public_key())
# .serial_number(x509.random_serial_number())
# .not_valid_before(datetime.datetime.utcnow())
# .not_valid_after(
# # Our certificate will be valid for 10 days
# datetime.datetime.utcnow() + datetime.timedelta(days=10)
# )
# .sign(private_key, hashes.SHA256(), default_backend())
# )
# cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')

# Define subject and issuer
subject = issuer = x509.Name([
Expand Down Expand Up @@ -804,7 +787,6 @@ async def incoming_forward(f_writer):

client_to_remote = asyncio.create_task(out_going_forward(forwarder_reader))
remote_to_client = asyncio.create_task(incoming_forward(forwarder_writer))

self.client_tasks.extend([client_to_remote, remote_to_client])
self.forwarder_event.set()
except Exception as e:
Expand Down
69 changes: 67 additions & 2 deletions unit-tests/pam-tunnel/test_pam_tunnel.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,79 @@
import datetime
import socket
import string
import unittest
from unittest import mock
from keepercommander.commands.tunnel.port_forward.endpoint import (generate_random_bytes, find_open_port)

from cryptography import x509
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import ec

from keepercommander.commands.tunnel.port_forward.endpoint import (generate_random_bytes, find_open_port,
verify_tls_certificate)


def generate_self_signed_cert(private_key):
# Generate a self-signed certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"localhost"),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(
# Our certificate will be valid for 10 days
datetime.datetime.utcnow() + datetime.timedelta(days=10)
)
.sign(private_key, hashes.SHA256(), default_backend())
)
cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8')

return cert_pem


def new_private_key():
# Generate an EC private key
private_key = ec.generate_private_key(
ec.SECP256R1(), # Using P-256 curve
backend=default_backend()
)
# Serialize to PEM format
private_key_str = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
return private_key, private_key_str


class TestVerifyTLSCertificate(unittest.TestCase):
# TODO: Test that the TLS certificate is verified correctly when we figure it out
def setUp(self):
self.private_key, self.private_key_str = new_private_key()
self.public_cert = generate_self_signed_cert(self.private_key)

def test_verify_tls_certificate(self):
pass
# Test that the TLS certificate is verified correctly
public_key = self.private_key.public_key()
trusted = verify_tls_certificate(self.public_cert,
public_key.public_bytes(encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint))
self.assertTrue(trusted)

def test_failed_verify_tls_certificates(self):
# Test that the TLS certificate is verified correctly
new_private, private_key_str = new_private_key()
public_key = new_private.public_key()
trusted = verify_tls_certificate(self.public_cert,
public_key.public_bytes(encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint))
self.assertFalse(trusted)


class TestFindOpenPort(unittest.TestCase):
Expand Down
Loading

0 comments on commit fe6182b

Please sign in to comment.