diff --git a/services/apps/data_sink_worker/src/queue/index.ts b/services/apps/data_sink_worker/src/queue/index.ts index cea052dcb..0b63f27c7 100644 --- a/services/apps/data_sink_worker/src/queue/index.ts +++ b/services/apps/data_sink_worker/src/queue/index.ts @@ -58,7 +58,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { break case DataSinkWorkerQueueMessageType.CREATE_AND_PROCESS_ACTIVITY_RESULT: { const msg = message as CreateAndProcessActivityResultQueueMessage - await service.createAndProcessActivityResult( + await service.processActivityInMemoryResult( msg.tenantId, msg.segmentId, msg.integrationId, diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index b7ff42235..0b4c64563 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -1,4 +1,4 @@ -import { addSeconds } from '@crowd/common' +import { addSeconds, generateUUIDv1 } from '@crowd/common' import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services' import { DbStore } from '@crowd/data-access-layer/src/database' import { IResultData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data' @@ -42,6 +42,7 @@ export default class DataSinkService extends LoggerBase { private async triggerResultError( resultInfo: IResultData, + isCreated: boolean, location: string, message: string, metadata?: unknown, @@ -63,9 +64,15 @@ export default class DataSinkService extends LoggerBase { // delay for #retries * 2 minutes const until = addSeconds(new Date(), (resultInfo.retries + 1) * 2 * 60) this.log.warn({ until: until.toISOString() }, 'Retrying result!') - await this.repo.delayResult(resultInfo.id, until, errorData) + + await this.repo.delayResult( + resultInfo.id, + until, + errorData, + isCreated ? undefined : resultInfo, + ) } else { - await this.repo.markResultError(resultInfo.id, errorData) + await this.repo.markResultError(resultInfo.id, errorData, isCreated ? undefined : resultInfo) } } @@ -98,7 +105,7 @@ export default class DataSinkService extends LoggerBase { } } - public async createAndProcessActivityResult( + public async processActivityInMemoryResult( tenantId: string, segmentId: string, integrationId: string, @@ -112,13 +119,15 @@ export default class DataSinkService extends LoggerBase { segmentId, } - const [integration, resultId] = await Promise.all([ - integrationId ? this.repo.getIntegrationInfo(integrationId) : Promise.resolve(null), - this.repo.createResult(tenantId, integrationId, payload), - ]) + let integration + + if (integrationId) { + integration = await this.repo.getIntegrationInfo(integrationId) + } + const id = generateUUIDv1() const result: IResultData = { - id: resultId, + id, tenantId, integrationId, data: payload, @@ -132,7 +141,7 @@ export default class DataSinkService extends LoggerBase { onboarding: false, } - await this.processResult(resultId, result) + await this.processResult(id, result) } public async processResult(resultId: string, result?: IResultData): Promise { @@ -263,13 +272,18 @@ export default class DataSinkService extends LoggerBase { type: data.type, }, ) - await this.repo.deleteResult(resultId) + + if (!result) { + await this.repo.deleteResult(resultId) + } + return true } catch (err) { this.log.error(err, 'Error processing result.') try { await this.triggerResultError( resultInfo, + result === undefined, 'process-result', 'Error processing result.', undefined, diff --git a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts index d54795dfe..0e6aeaae1 100644 --- a/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts @@ -46,28 +46,6 @@ export default class DataSinkRepository extends RepositoryBase { - const results = await this.db().one( - ` - insert into integration.results(state, data, "tenantId", "integrationId") - values($(state), $(data), $(tenantId), $(integrationId)) - returning id; - `, - { - tenantId, - integrationId, - state: IntegrationResultState.PENDING, - data: JSON.stringify(result), - }, - ) - - return results.id - } - public async getOldResultsToProcessForTenant( tenantId: string, limit: number, @@ -147,22 +125,44 @@ export default class DataSinkRepository extends RepositoryBase { - const result = await this.db().result( - `update integration.results - set state = $(state), - "processedAt" = now(), - error = $(error), - "updatedAt" = now() - where id = $(resultId)`, - { - resultId, - state: IntegrationResultState.ERROR, - error: JSON.stringify(error), - }, - ) + public async markResultError( + resultId: string, + error: unknown, + resultToCreate?: IResultData, + ): Promise { + if (resultToCreate) { + const result = await this.db().result( + ` + insert into integration.results(state, data, "tenantId", "integrationId", error) + values($(state), $(data), $(tenantId), $(integrationId), $(error)) + `, + { + tenantId: resultToCreate.tenantId, + integrationId: resultToCreate.integrationId, + state: IntegrationResultState.ERROR, + data: JSON.stringify(resultToCreate.data), + error: JSON.stringify(error), + }, + ) - this.checkUpdateRowCount(result.rowCount, 1) + this.checkUpdateRowCount(result.rowCount, 1) + } else { + const result = await this.db().result( + `update integration.results + set state = $(state), + "processedAt" = now(), + error = $(error), + "updatedAt" = now() + where id = $(resultId)`, + { + resultId, + state: IntegrationResultState.ERROR, + error: JSON.stringify(error), + }, + ) + + this.checkUpdateRowCount(result.rowCount, 1) + } } public async deleteResult(resultId: string): Promise { @@ -266,24 +266,48 @@ export default class DataSinkRepository extends RepositoryBase r.id) } - public async delayResult(resultId: string, until: Date, error: unknown): Promise { - const result = await this.db().result( - `update integration.results - set state = $(state), - error = $(error), - "delayedUntil" = $(until), - retries = coalesce(retries, 0) + 1, - "updatedAt" = now() - where id = $(resultId)`, - { - resultId, - until, - error: JSON.stringify(error), - state: IntegrationResultState.DELAYED, - }, - ) + public async delayResult( + resultId: string, + until: Date, + error: unknown, + resultToCreate?: IResultData, + ): Promise { + if (resultToCreate) { + const result = await this.db().result( + ` + insert into integration.results(state, data, "tenantId", "integrationId", error, retries, "delayedUntil") + values($(state), $(data), $(tenantId), $(integrationId), $(error), $(retries), $(until)) + `, + { + tenantId: resultToCreate.tenantId, + integrationId: resultToCreate.integrationId, + state: IntegrationResultState.DELAYED, + data: JSON.stringify(resultToCreate.data), + retries: 1, + error: JSON.stringify(error), + until: until, + }, + ) + this.checkUpdateRowCount(result.rowCount, 1) + } else { + const result = await this.db().result( + `update integration.results + set state = $(state), + error = $(error), + "delayedUntil" = $(until), + retries = coalesce(retries, 0) + 1, + "updatedAt" = now() + where id = $(resultId)`, + { + resultId, + until, + error: JSON.stringify(error), + state: IntegrationResultState.DELAYED, + }, + ) - this.checkUpdateRowCount(result.rowCount, 1) + this.checkUpdateRowCount(result.rowCount, 1) + } } public async getDelayedResults(limit: number): Promise {