Skip to content

Commit

Permalink
optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
themarolt committed Jan 12, 2025
1 parent 60475e1 commit df65d55
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 66 deletions.
2 changes: 1 addition & 1 deletion services/apps/data_sink_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever {
break
case DataSinkWorkerQueueMessageType.CREATE_AND_PROCESS_ACTIVITY_RESULT: {
const msg = message as CreateAndProcessActivityResultQueueMessage
await service.createAndProcessActivityResult(
await service.processActivityInMemoryResult(
msg.tenantId,
msg.segmentId,
msg.integrationId,
Expand Down
36 changes: 25 additions & 11 deletions services/apps/data_sink_worker/src/service/dataSink.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { addSeconds } from '@crowd/common'
import { addSeconds, generateUUIDv1 } from '@crowd/common'
import { DataSinkWorkerEmitter, SearchSyncWorkerEmitter } from '@crowd/common_services'
import { DbStore } from '@crowd/data-access-layer/src/database'
import { IResultData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.data'
Expand Down Expand Up @@ -42,6 +42,7 @@ export default class DataSinkService extends LoggerBase {

private async triggerResultError(
resultInfo: IResultData,
isCreated: boolean,
location: string,
message: string,
metadata?: unknown,
Expand All @@ -63,9 +64,15 @@ export default class DataSinkService extends LoggerBase {
// delay for #retries * 2 minutes
const until = addSeconds(new Date(), (resultInfo.retries + 1) * 2 * 60)
this.log.warn({ until: until.toISOString() }, 'Retrying result!')
await this.repo.delayResult(resultInfo.id, until, errorData)

await this.repo.delayResult(
resultInfo.id,
until,
errorData,
isCreated ? undefined : resultInfo,
)
} else {
await this.repo.markResultError(resultInfo.id, errorData)
await this.repo.markResultError(resultInfo.id, errorData, isCreated ? undefined : resultInfo)
}
}

Expand Down Expand Up @@ -98,7 +105,7 @@ export default class DataSinkService extends LoggerBase {
}
}

public async createAndProcessActivityResult(
public async processActivityInMemoryResult(
tenantId: string,
segmentId: string,
integrationId: string,
Expand All @@ -112,13 +119,15 @@ export default class DataSinkService extends LoggerBase {
segmentId,
}

const [integration, resultId] = await Promise.all([
integrationId ? this.repo.getIntegrationInfo(integrationId) : Promise.resolve(null),
this.repo.createResult(tenantId, integrationId, payload),
])
let integration

if (integrationId) {
integration = await this.repo.getIntegrationInfo(integrationId)
}

const id = generateUUIDv1()
const result: IResultData = {
id: resultId,
id,
tenantId,
integrationId,
data: payload,
Expand All @@ -132,7 +141,7 @@ export default class DataSinkService extends LoggerBase {
onboarding: false,
}

await this.processResult(resultId, result)
await this.processResult(id, result)
}

public async processResult(resultId: string, result?: IResultData): Promise<boolean> {
Expand Down Expand Up @@ -263,13 +272,18 @@ export default class DataSinkService extends LoggerBase {
type: data.type,
},
)
await this.repo.deleteResult(resultId)

if (!result) {
await this.repo.deleteResult(resultId)
}

return true
} catch (err) {
this.log.error(err, 'Error processing result.')
try {
await this.triggerResultError(
resultInfo,
result === undefined,
'process-result',
'Error processing result.',
undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,6 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
return result
}

public async createResult(
tenantId: string,
integrationId: string,
result: IIntegrationResult,
): Promise<string> {
const results = await this.db().one(
`
insert into integration.results(state, data, "tenantId", "integrationId")
values($(state), $(data), $(tenantId), $(integrationId))
returning id;
`,
{
tenantId,
integrationId,
state: IntegrationResultState.PENDING,
data: JSON.stringify(result),
},
)

return results.id
}

public async getOldResultsToProcessForTenant(
tenantId: string,
limit: number,
Expand Down Expand Up @@ -147,22 +125,44 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
}
}

public async markResultError(resultId: string, error: unknown): Promise<void> {
const result = await this.db().result(
`update integration.results
set state = $(state),
"processedAt" = now(),
error = $(error),
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
state: IntegrationResultState.ERROR,
error: JSON.stringify(error),
},
)
public async markResultError(
resultId: string,
error: unknown,
resultToCreate?: IResultData,
): Promise<void> {
if (resultToCreate) {
const result = await this.db().result(
`
insert into integration.results(state, data, "tenantId", "integrationId", error)
values($(state), $(data), $(tenantId), $(integrationId), $(error))
`,
{
tenantId: resultToCreate.tenantId,
integrationId: resultToCreate.integrationId,
state: IntegrationResultState.ERROR,
data: JSON.stringify(resultToCreate.data),
error: JSON.stringify(error),
},
)

this.checkUpdateRowCount(result.rowCount, 1)
this.checkUpdateRowCount(result.rowCount, 1)
} else {
const result = await this.db().result(
`update integration.results
set state = $(state),
"processedAt" = now(),
error = $(error),
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
state: IntegrationResultState.ERROR,
error: JSON.stringify(error),
},
)

this.checkUpdateRowCount(result.rowCount, 1)
}
}

public async deleteResult(resultId: string): Promise<void> {
Expand Down Expand Up @@ -266,24 +266,48 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
return result.map((r) => r.id)
}

public async delayResult(resultId: string, until: Date, error: unknown): Promise<void> {
const result = await this.db().result(
`update integration.results
set state = $(state),
error = $(error),
"delayedUntil" = $(until),
retries = coalesce(retries, 0) + 1,
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
until,
error: JSON.stringify(error),
state: IntegrationResultState.DELAYED,
},
)
public async delayResult(
resultId: string,
until: Date,
error: unknown,
resultToCreate?: IResultData,
): Promise<void> {
if (resultToCreate) {
const result = await this.db().result(
`
insert into integration.results(state, data, "tenantId", "integrationId", error, retries, "delayedUntil")
values($(state), $(data), $(tenantId), $(integrationId), $(error), $(retries), $(until))
`,
{
tenantId: resultToCreate.tenantId,
integrationId: resultToCreate.integrationId,
state: IntegrationResultState.DELAYED,
data: JSON.stringify(resultToCreate.data),
retries: 1,
error: JSON.stringify(error),
until: until,
},
)
this.checkUpdateRowCount(result.rowCount, 1)
} else {
const result = await this.db().result(
`update integration.results
set state = $(state),
error = $(error),
"delayedUntil" = $(until),
retries = coalesce(retries, 0) + 1,
"updatedAt" = now()
where id = $(resultId)`,
{
resultId,
until,
error: JSON.stringify(error),
state: IntegrationResultState.DELAYED,
},
)

this.checkUpdateRowCount(result.rowCount, 1)
this.checkUpdateRowCount(result.rowCount, 1)
}
}

public async getDelayedResults(limit: number): Promise<IDelayedResults[]> {
Expand Down

0 comments on commit df65d55

Please sign in to comment.