From 2f0cb0ca26be4dcbf7bd80a36814d97d99426707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 8 Jan 2024 14:54:31 +0100 Subject: [PATCH] sync to opensearch --- pnpm-lock.yaml | 3 +++ .../package.json | 1 + .../src/activities.ts | 2 ++ .../src/activities/organizationEnrichment.ts | 25 +++++++++++++++++++ .../src/main.ts | 2 +- .../src/workflows/enrichOrganization.ts | 5 +++- 6 files changed, 36 insertions(+), 2 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 54e4b91add..9fd8ff44b7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1234,6 +1234,9 @@ importers: '@crowd/logging': specifier: file:../../../libs/logging version: file:services/libs/logging + '@crowd/opensearch': + specifier: file:../../../libs/opensearch + version: file:services/libs/opensearch '@crowd/redis': specifier: file:../../../libs/redis version: file:services/libs/redis diff --git a/services/apps/premium/organizations_enrichment_worker/package.json b/services/apps/premium/organizations_enrichment_worker/package.json index 08b7678ea7..262e0bb9e2 100644 --- a/services/apps/premium/organizations_enrichment_worker/package.json +++ b/services/apps/premium/organizations_enrichment_worker/package.json @@ -22,6 +22,7 @@ "@crowd/redis": "file:../../../libs/redis", "@crowd/feature-flags": "file:../../../libs/feature-flags", "@crowd/common_services": "file:../../../libs/common_services", + "@crowd/opensearch": "file:../../../libs/opensearch", "@crowd/types": "file:../../../libs/types", "@temporalio/client": "~1.8.6", "@temporalio/workflow": "~1.8.6", diff --git a/services/apps/premium/organizations_enrichment_worker/src/activities.ts b/services/apps/premium/organizations_enrichment_worker/src/activities.ts index f7db07f828..dc13762485 100644 --- a/services/apps/premium/organizations_enrichment_worker/src/activities.ts +++ b/services/apps/premium/organizations_enrichment_worker/src/activities.ts @@ -5,6 +5,7 @@ import { tryEnrichOrganization, getApplicableTenants, hasTenantOrganizationEnrichmentEnabled, + syncToOpensearch, } from './activities/organizationEnrichment' export { @@ -14,4 +15,5 @@ export { getTenantOrganizationsForEnrichment, tryEnrichOrganization, hasTenantOrganizationEnrichmentEnabled, + syncToOpensearch, } diff --git a/services/apps/premium/organizations_enrichment_worker/src/activities/organizationEnrichment.ts b/services/apps/premium/organizations_enrichment_worker/src/activities/organizationEnrichment.ts index c9ba457159..e9d5a9b933 100644 --- a/services/apps/premium/organizations_enrichment_worker/src/activities/organizationEnrichment.ts +++ b/services/apps/premium/organizations_enrichment_worker/src/activities/organizationEnrichment.ts @@ -18,9 +18,34 @@ import { } from '../repos/organization.repo' import { IPremiumTenantInfo, TenantRepository } from '../repos/tenant.repo' import { isFeatureEnabled } from '@crowd/feature-flags' +import { OrganizationSyncService } from '@crowd/opensearch' /* eslint-disable @typescript-eslint/no-explicit-any */ +const syncOrganizations = new OrganizationSyncService( + svc.postgres.writer, + svc.opensearch, + svc.log, + { + edition: process.env['CROWD_EDITION'], + }, +) + +export async function syncToOpensearch(organizationId: string): Promise { + const log = getChildLogger(syncToOpensearch.name, svc.log, { + organizationId, + }) + + try { + await syncOrganizations.syncOrganizations([organizationId]) + } catch (err) { + log.error(err, 'Error while syncing organization to OpenSearch!') + throw new Error(err) + } + + return null +} + export async function getApplicableTenants( page: number, lastId?: string, diff --git a/services/apps/premium/organizations_enrichment_worker/src/main.ts b/services/apps/premium/organizations_enrichment_worker/src/main.ts index 009408c007..d1e6bc4938 100644 --- a/services/apps/premium/organizations_enrichment_worker/src/main.ts +++ b/services/apps/premium/organizations_enrichment_worker/src/main.ts @@ -20,7 +20,7 @@ const options: Options = { enabled: true, }, opensearch: { - enabled: false, + enabled: true, }, } diff --git a/services/apps/premium/organizations_enrichment_worker/src/workflows/enrichOrganization.ts b/services/apps/premium/organizations_enrichment_worker/src/workflows/enrichOrganization.ts index 20be632d93..ab867b600c 100644 --- a/services/apps/premium/organizations_enrichment_worker/src/workflows/enrichOrganization.ts +++ b/services/apps/premium/organizations_enrichment_worker/src/workflows/enrichOrganization.ts @@ -2,7 +2,9 @@ import { proxyActivities } from '@temporalio/workflow' import * as activities from '../activities/organizationEnrichment' import { TenantPlans } from '@crowd/types' -const { tryEnrichOrganization, incrementTenantCredits } = proxyActivities({ +const { tryEnrichOrganization, incrementTenantCredits, syncToOpensearch } = proxyActivities< + typeof activities +>({ startToCloseTimeout: '75 seconds', }) @@ -16,6 +18,7 @@ export async function enrichOrganization(input: IEnrichOrganizationInput): Promi const wasEnriched = await tryEnrichOrganization(input.tenantId, input.organizationId) if (wasEnriched) { + await syncToOpensearch(input.organizationId) await incrementTenantCredits(input.tenantId, input.plan) } }