Skip to content

Commit

Permalink
Merge branch 'develop' into fix-token-generation-readmodel-checker
Browse files Browse the repository at this point in the history
  • Loading branch information
galales authored Jan 14, 2025
2 parents d27e1a1 + 3f6b0ee commit 0f2a913
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
TokenGenerationStatesApiClient,
TokenGenerationStatesConsumerClient,
unsafeBrandId,
TokenGenerationStatesClientKidPK,
} from "pagopa-interop-models";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { Logger } from "pagopa-interop-commons";
Expand All @@ -34,7 +35,7 @@ import {
deleteClientEntryFromTokenGenerationStates,
deleteEntriesFromTokenGenStatesByClientIdKidV1,
deleteEntriesFromTokenGenStatesByClientIdV1,
deleteEntriesFromTokenGenStatesByGSIPKClientIdPurposeIdV1,
deleteEntriesFromTokenGenStatesByClientIdPurposeIdV1,
extractAgreementIdFromAgreementPK,
extractKidFromTokenGenStatesEntryPK,
readConsumerClientsInTokenGenStatesV1,
Expand Down Expand Up @@ -305,6 +306,7 @@ export async function handleMessageV1(
);
}
})
// eslint-disable-next-line sonarjs/cognitive-complexity
.with({ type: "ClientPurposeAdded" }, async (msg) => {
const clientId = unsafeBrandId<ClientId>(msg.data.clientId);

Expand All @@ -328,7 +330,10 @@ export async function handleMessageV1(
return Promise.resolve();
}

const purposeIds = [...clientEntry.clientPurposesIds, purposeId];
// Deduplicate in case of retry and reprocess
const purposeIds = Array.from(
new Set([...clientEntry.clientPurposesIds, purposeId])
);
await setClientPurposeIdsInPlatformStatesEntry(
{
primaryKey: pk,
Expand Down Expand Up @@ -360,7 +365,8 @@ export async function handleMessageV1(

for (const entry of tokenGenStatesConsumerClients) {
const addedTokenGenStatesConsumerClient = await match(
clientEntry.clientPurposesIds.length
// Count without the current purpose
purposeIds.length - 1
)
.with(0, async () => {
const newTokenGenStatesConsumerClient =
Expand All @@ -380,11 +386,16 @@ export async function handleMessageV1(
dynamoDBClient,
logger
);
await deleteClientEntryFromTokenGenerationStates(
entry.PK,
dynamoDBClient,
logger
);
if (
TokenGenerationStatesClientKidPK.safeParse(entry.PK).success
) {
// Remove only partial entries (to avoid deleting complete entries after retry)
await deleteClientEntryFromTokenGenerationStates(
entry.PK,
dynamoDBClient,
logger
);
}
return newTokenGenStatesConsumerClient;
})
.with(P.number.gt(0), async () => {
Expand Down Expand Up @@ -482,7 +493,7 @@ export async function handleMessageV1(

// token-generation-states
if (updatedPurposeIds.length > 0) {
await deleteEntriesFromTokenGenStatesByGSIPKClientIdPurposeIdV1(
await deleteEntriesFromTokenGenStatesByClientIdPurposeIdV1(
GSIPK_clientId_purposeId,
dynamoDBClient,
logger
Expand Down
145 changes: 16 additions & 129 deletions packages/authorization-platformstate-writer/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ export const readPlatformClientEntry = async (
}
};

const readTokenGenStatesConsumerClientsByGSIPKClientPurposeV1 = async (
const readTokenGenStatesConsumerClientsByClientPurposeV1 = async (
GSIPK_clientId_purposeId: GSIPKClientIdPurposeId,
dynamoDBClient: DynamoDBClient,
exclusiveStartKey?: Record<string, AttributeValue>
Expand Down Expand Up @@ -381,7 +381,7 @@ export const deleteEntriesFromTokenGenStatesByClientIdPurposeIdV2 = async (
);
};

export const deleteEntriesFromTokenGenStatesByGSIPKClientIdPurposeIdV1 = async (
export const deleteEntriesFromTokenGenStatesByClientIdPurposeIdV1 = async (
GSIPK_clientId_purposeId: GSIPKClientIdPurposeId,
dynamoDBClient: DynamoDBClient,
logger: Logger
Expand All @@ -391,7 +391,7 @@ export const deleteEntriesFromTokenGenStatesByGSIPKClientIdPurposeIdV1 = async (
dynamoDBClient: DynamoDBClient,
exclusiveStartKey?: Record<string, AttributeValue>
): Promise<void> => {
const res = await readTokenGenStatesConsumerClientsByGSIPKClientPurposeV1(
const res = await readTokenGenStatesConsumerClientsByClientPurposeV1(
GSIPK_clientId_purposeId,
dynamoDBClient,
exclusiveStartKey
Expand Down Expand Up @@ -426,7 +426,7 @@ export const convertEntriesToClientKidInTokenGenerationStates = async (
dynamoDBClient: DynamoDBClient,
exclusiveStartKey?: Record<string, AttributeValue>
): Promise<TokenGenStatesConsumerClientGSIClientPurpose[]> => {
const res = await readTokenGenStatesConsumerClientsByGSIPKClientPurposeV1(
const res = await readTokenGenStatesConsumerClientsByClientPurposeV1(
GSIPK_clientId_purposeId,
dynamoDBClient,
exclusiveStartKey
Expand All @@ -448,14 +448,21 @@ export const convertEntriesToClientKidInTokenGenerationStates = async (
};

// write the new one
await writeTokenGenStatesConsumerClient(newEntry, dynamoDBClient, logger);

// delete the old one
await deleteClientEntryFromTokenGenerationStates(
entry.PK,
await upsertTokenGenStatesConsumerClient(
newEntry,
dynamoDBClient,
logger
);

if (TokenGenerationStatesClientKidPurposePK.safeParse(entry.PK).success) {
// Remove only complete entries (to avoid deleting partial entries after retry)
// delete the old one
await deleteClientEntryFromTokenGenerationStates(
entry.PK,
dynamoDBClient,
logger
);
}
}

if (!res.lastEvaluatedKey) {
Expand Down Expand Up @@ -730,126 +737,6 @@ export const upsertTokenGenStatesConsumerClient = async (
);
};

export const writeTokenGenStatesConsumerClient = async (
tokenGenStatesConsumerClient: TokenGenerationStatesConsumerClient,
dynamoDBClient: DynamoDBClient,
logger: Logger
): Promise<void> => {
const input: PutItemInput = {
ConditionExpression: "attribute_not_exists(PK)",
Item: {
PK: {
S: tokenGenStatesConsumerClient.PK,
},
...(tokenGenStatesConsumerClient.descriptorState
? {
descriptorState: {
S: tokenGenStatesConsumerClient.descriptorState,
},
}
: {}),
...(tokenGenStatesConsumerClient.descriptorAudience
? {
descriptorAudience: {
L: tokenGenStatesConsumerClient.descriptorAudience.map(
(item) => ({
S: item,
})
),
},
}
: {}),
...(tokenGenStatesConsumerClient.descriptorVoucherLifespan
? {
descriptorVoucherLifespan: {
N: tokenGenStatesConsumerClient.descriptorVoucherLifespan.toString(),
},
}
: {}),
updatedAt: {
S: tokenGenStatesConsumerClient.updatedAt,
},
consumerId: {
S: tokenGenStatesConsumerClient.consumerId,
},
...(tokenGenStatesConsumerClient.agreementId
? {
agreementId: {
S: tokenGenStatesConsumerClient.agreementId,
},
}
: {}),
...(tokenGenStatesConsumerClient.purposeVersionId
? {
purposeVersionId: {
S: tokenGenStatesConsumerClient.purposeVersionId,
},
}
: {}),
...(tokenGenStatesConsumerClient.GSIPK_consumerId_eserviceId
? {
GSIPK_consumerId_eserviceId: {
S: tokenGenStatesConsumerClient.GSIPK_consumerId_eserviceId,
},
}
: {}),
clientKind: {
S: tokenGenStatesConsumerClient.clientKind,
},
publicKey: {
S: tokenGenStatesConsumerClient.publicKey,
},
GSIPK_clientId: {
S: tokenGenStatesConsumerClient.GSIPK_clientId,
},
GSIPK_clientId_kid: {
S: tokenGenStatesConsumerClient.GSIPK_clientId_kid,
},
...(tokenGenStatesConsumerClient.GSIPK_clientId_purposeId
? {
GSIPK_clientId_purposeId: {
S: tokenGenStatesConsumerClient.GSIPK_clientId_purposeId,
},
}
: {}),
...(tokenGenStatesConsumerClient.agreementState
? {
agreementState: {
S: tokenGenStatesConsumerClient.agreementState,
},
}
: {}),
...(tokenGenStatesConsumerClient.GSIPK_eserviceId_descriptorId
? {
GSIPK_eserviceId_descriptorId: {
S: tokenGenStatesConsumerClient.GSIPK_eserviceId_descriptorId,
},
}
: {}),
...(tokenGenStatesConsumerClient.GSIPK_purposeId
? {
GSIPK_purposeId: {
S: tokenGenStatesConsumerClient.GSIPK_purposeId,
},
}
: {}),
...(tokenGenStatesConsumerClient.purposeState
? {
purposeState: {
S: tokenGenStatesConsumerClient.purposeState,
},
}
: {}),
},
TableName: config.tokenGenerationReadModelTableNameTokenGeneration,
};
const command = new PutItemCommand(input);
await dynamoDBClient.send(command);
logger.info(
`Token-generation-states. Written consumer client ${tokenGenStatesConsumerClient.PK}`
);
};

export const clientKindToTokenGenerationStatesClientKind = (
kind: ClientKind
): ClientKindTokenGenStates =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ import {
writePlatformClientEntry,
deleteClientEntryFromTokenGenerationStates,
readPlatformClientEntry,
deleteEntriesFromTokenGenStatesByGSIPKClientIdPurposeIdV1,
deleteEntriesFromTokenGenStatesByClientIdPurposeIdV1,
upsertTokenGenStatesConsumerClient,
upsertTokenGenStatesApiClient,
deleteEntriesFromTokenGenStatesByClientIdV2,
Expand Down Expand Up @@ -601,7 +601,7 @@ describe("utils", () => {
expect(res).toEqual(clientEntry1);
});

it("deleteEntriesFromTokenGenStatesByGSIPKClientIdPurposeIdV1", async () => {
it("deleteEntriesFromTokenGenStatesByClientIdPurposeIdV1", async () => {
const GSIPK_clientId_purposeId = makeGSIPKClientIdPurposeId({
clientId: generateId(),
purposeId: generateId(),
Expand Down Expand Up @@ -631,7 +631,7 @@ describe("utils", () => {
dynamoDBClient
);

await deleteEntriesFromTokenGenStatesByGSIPKClientIdPurposeIdV1(
await deleteEntriesFromTokenGenStatesByClientIdPurposeIdV1(
GSIPK_clientId_purposeId,
dynamoDBClient,
genericLogger
Expand Down
5 changes: 4 additions & 1 deletion packages/kafka-iam-auth/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,10 @@ const initCustomConsumer = async ({
return Promise.resolve(false);
},
},
...batchConsumerConfig,
maxWaitTimeInMs: batchConsumerConfig?.maxWaitKafkaBatchMillis,
minBytes: batchConsumerConfig?.minBytes,
maxBytes: batchConsumerConfig?.maxBytes,
sessionTimeout: batchConsumerConfig?.sessionTimeoutMillis,
});

if (config.resetConsumerOffsets) {
Expand Down

0 comments on commit 0f2a913

Please sign in to comment.