Skip to content

Commit

Permalink
add create secret step
Browse files Browse the repository at this point in the history
  • Loading branch information
kayman-mk committed Jan 16, 2025
1 parent fa0040b commit f426f20
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 26 deletions.
141 changes: 129 additions & 12 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 57 additions & 4 deletions src/rds_proxy_password_rotatation/adapter/aws_secrets_manager.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
from uuid import uuid4

from aws_lambda_powertools import Logger
from cachetools import cached

Check warning on line 4 in src/rds_proxy_password_rotatation/adapter/aws_secrets_manager.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (cachetools)
from mypy_boto3_secretsmanager.client import SecretsManagerClient
from mypy_boto3_secretsmanager.type_defs import DescribeSecretResponseTypeDef
from pydantic import ValidationError

from rds_proxy_password_rotatation.services import SecretsManagerService
from rds_proxy_password_rotatation.model import DatabaseCredentials, PasswordStage
from rds_proxy_password_rotatation.services import PasswordService


class AwsSecretsManagerService(SecretsManagerService):
class AwsSecretsManagerService(PasswordService):
def __init__(self, secretsmanager_client: SecretsManagerClient, logger: Logger):
self.client = secretsmanager_client
self.logger = logger

def is_rotation_enabled(self, secret_id: str) -> bool:
metadata = self.client.describe_secret(SecretId=secret_id)
metadata = self.__get_secret_metadata(secret_id)

return 'RotationEnabled' in metadata and metadata['RotationEnabled']

def ensure_valid_secret_state(self, secret_id: str, token: str) -> bool:
metadata = self.client.describe_secret(SecretId=secret_id)
metadata = self.__get_secret_metadata(secret_id)
versions = metadata['VersionIdsToStages']

if token not in versions:
Expand All @@ -29,3 +35,50 @@ def ensure_valid_secret_state(self, secret_id: str, token: str) -> bool:
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, secret_id))
else:
return True

def get_credential(self, secret_id: str, stage: PasswordStage, token: str = None) -> DatabaseCredentials | None:
stage_string = AwsSecretsManagerService.__get_stage_string(stage)

try:
secret = self.client.get_secret_value(SecretId=secret_id, VersionId=token, VersionStage=stage_string)

return DatabaseCredentials.model_validate_json(secret['SecretString'])
except ValidationError as e:
self.logger.error(f"Failed to parse secret value for secret {secret_id} (stage: {stage_string}, token: {token})")

raise e
except self.client.exceptions.ResourceNotFoundException as e:

Check failure on line 50 in src/rds_proxy_password_rotatation/adapter/aws_secrets_manager.py

View workflow job for this annotation

GitHub Actions / build

Ruff (F841)

src/rds_proxy_password_rotatation/adapter/aws_secrets_manager.py:50:68: F841 Local variable `e` is assigned to but never used
self.logger.error(f"Failed to retrieve secret value for secret {secret_id} (stage: {stage_string}, token: {token})")

return None

def set_new_pending_password(self,secret_id: str, stage: PasswordStage, token: str, credential: DatabaseCredentials):
if token is None:
token = str(uuid4())

pending_credential = credential.model_copy()
pending_credential.password = self.client.get_random_password(ExcludeCharacters=':/@"\'\\')['RandomPassword']

self.client.put_secret_value(
SecretId=secret_id,
ClientRequestToken=token,
SecretString=pending_credential.model_dump_json(),
VersionStages=[AwsSecretsManagerService.__get_stage_string(stage)])

self.logger.info(f'new pending secret created: {secret_id} and version {token}')

@cached
def __get_secret_metadata(self, secret_id: str) -> DescribeSecretResponseTypeDef:
return self.client.describe_secret(SecretId=secret_id)

@staticmethod

Check warning on line 74 in src/rds_proxy_password_rotatation/adapter/aws_secrets_manager.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (staticmethod)
def __get_stage_string(stage: PasswordStage) -> str:
match stage:
case PasswordStage.CURRENT:
return "AWSCURRENT"
case PasswordStage.PENDING:
return "AWSPENDING"
case PasswordStage.PREVIOUS:
return "AWSPREVIOUS"

Check warning on line 82 in src/rds_proxy_password_rotatation/adapter/aws_secrets_manager.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (AWSPREVIOUS)
case _:
raise ValueError(f"Invalid stage: {stage}")
16 changes: 16 additions & 0 deletions src/rds_proxy_password_rotatation/model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from enum import Enum

from pydantic import BaseModel
from pydantic.dataclasses import dataclass

Check warning on line 4 in src/rds_proxy_password_rotatation/model.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (dataclasses)

Check warning on line 4 in src/rds_proxy_password_rotatation/model.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (dataclass)


class RotationStep(Enum):
CREATE_SECRET = "create_secret"
Expand All @@ -10,3 +13,16 @@ class RotationStep(Enum):
"""Test the new secret version"""
FINISH_SECRET = "finish_secret"
"""Finish the rotation"""


class PasswordStage(Enum):
CURRENT = "CURRENT"
PENDING = "PENDING"
PREVIOUS = "PREVIOUS"


@dataclass(frozen=True)

Check warning on line 24 in src/rds_proxy_password_rotatation/model.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (dataclass)
class DatabaseCredentials(BaseModel, extra='allow'):
username: str
password: str

56 changes: 50 additions & 6 deletions src/rds_proxy_password_rotatation/password_rotation_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,30 @@
from aws_lambda_powertools import Logger

from rds_proxy_password_rotatation.model import RotationStep
from rds_proxy_password_rotatation.services import SecretsManagerService
from rds_proxy_password_rotatation.services import PasswordService


class PasswordRotationResult(Enum):
NOTHING_TO_ROTATE = "nothing_to_rotate"
STEP_EXECUTED = "step_executed"


class PasswordRotationApplication:
def __init__(self, secrets_manager: SecretsManagerService, logger: Logger):
self.secrets_manager = secrets_manager
def __init__(self, password_service: PasswordService, logger: Logger):
self.password_service = password_service
self.logger = logger

def rotate_secret(self, step: RotationStep, secret_id: str, token: str) -> PasswordRotationResult:
if not self.secrets_manager.is_rotation_enabled(secret_id):
if not self.password_service.is_rotation_enabled(secret_id):
self.logger.warning("Rotation is not enabled for the secret %s", secret_id)
return PasswordRotationResult.NOTHING_TO_ROTATE

if not self.secrets_manager.ensure_valid_secret_state(secret_id, token):
if not self.password_service.ensure_valid_secret_state(secret_id, token):
return PasswordRotationResult.NOTHING_TO_ROTATE

match step:
case RotationStep.CREATE_SECRET:
pass
self.__create_secret(secret_id, token)
case RotationStep.SET_SECRET:
pass
case RotationStep.TEST_SECRET:
Expand All @@ -34,3 +35,46 @@ def rotate_secret(self, step: RotationStep, secret_id: str, token: str) -> Passw
pass
case _:
raise ValueError(f"Invalid rotation step: {step}")

return PasswordRotationResult.STEP_EXECUTED

def __create_secret(self, secret_id: str, token: str):
"""
Creates a new version of the secret with the password to rotate to unless a version tagged with AWSPENDING
already exists.
"""

credentials_to_rotate = self.password_service.get_credential(secret_id, 'AWSCURRENT')

current_username = credentials_to_rotate.username
new_username = PasswordRotationApplication.__get_other_username(current_username)
is_multi_user_rotation = current_username != new_username

if is_multi_user_rotation:
# we rotate the previous user's password, so the current user is still valid
credentials_to_rotate = self.password_service.get_credential(secret_id, 'AWSPREVIOUS')

Check warning on line 55 in src/rds_proxy_password_rotatation/password_rotation_application.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (AWSPREVIOUS)

pending_credentials = self.password_service.get_credential(secret_id, 'AWSPENDING', token)

if pending_credentials and pending_credentials.username == credentials_to_rotate['username']:
return

self.password_service.set_new_pending_password(secret_id, 'AWSPENDING', token, credentials_to_rotate)

@staticmethod

Check warning on line 64 in src/rds_proxy_password_rotatation/password_rotation_application.py

View workflow job for this annotation

GitHub Actions / default / cspell

Unknown word (staticmethod)
def __get_other_username(username: str) -> str:
"""
Returns the other username in a multi-user rotation strategy based on the given username. For single-user rotation,
it returns the same username.
Use '1' and '2' as suffixes to indicate multi-user rotation.
"""

if username.endswith('1'):
new_username = username[:len(username) - 1] + '2'
elif username.endswith('2'):
new_username = username[:len(username) - 1] + '1'
else:
new_username = username

return new_username
Loading

0 comments on commit f426f20

Please sign in to comment.