Skip to content

Commit

Permalink
suggestions use memberMergeSuggestionsLastGeneratedAt instead
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav committed Jan 4, 2024
1 parent 1df07cb commit 40bce4e
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table "tenants" drop column "memberMergeSuggestionsLastGeneratedAt";
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table "tenants"
add column "memberMergeSuggestionsLastGeneratedAt" timestamp with time zone null;
6 changes: 4 additions & 2 deletions services/apps/merge_suggestions_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import {
getMergeSuggestions,
addToMerge,
getMembers,
findTenantsLatestSuggestionCreatedAt,
findTenantsLatestMemberSuggestionGeneratedAt,
updateMemberMergeSuggestionsLastGeneratedAt,
} from './activities/member-merge-suggestions/getMergeSuggestions'

export {
getAllTenants,
getMergeSuggestions,
getMembers,
addToMerge,
findTenantsLatestSuggestionCreatedAt,
findTenantsLatestMemberSuggestionGeneratedAt,
updateMemberMergeSuggestionsLastGeneratedAt,
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,19 +193,29 @@ export async function addToMerge(suggestions: IMemberMergeSuggestion[]): Promise
await memberMergeSuggestionsRepo.addToMerge(suggestions)
}

export async function findTenantsLatestSuggestionCreatedAt(tenantId: string): Promise<string> {
export async function findTenantsLatestMemberSuggestionGeneratedAt(
tenantId: string,
): Promise<string> {
const memberMergeSuggestionsRepo = new MemberMergeSuggestionsRepository(
svc.postgres.writer.connection(),
svc.log,
)
return memberMergeSuggestionsRepo.findTenantsLatestMemberSuggestionGeneratedAt(tenantId)
}

export async function updateMemberMergeSuggestionsLastGeneratedAt(tenantId: string): Promise<void> {
const memberMergeSuggestionsRepo = new MemberMergeSuggestionsRepository(
svc.postgres.writer.connection(),
svc.log,
)
return memberMergeSuggestionsRepo.findTenantsLatestSuggestionCreatedAt(tenantId)
await memberMergeSuggestionsRepo.updateMemberMergeSuggestionsLastGeneratedAt(tenantId)
}

export async function getMembers(
tenantId: string,
batchSize: number = 100,
afterMemberId?: string,
lastCreatedAt?: string,
lastGeneratedAt?: string,
): Promise<IMemberPartialAggregatesOpensearch[]> {
try {
const queryBody: IMemberQueryBody = {
Expand Down Expand Up @@ -248,11 +258,11 @@ export async function getMembers(
})
}

if (lastCreatedAt) {
if (lastGeneratedAt) {
queryBody.query.bool.filter.push({
range: {
date_createdAt: {
gt: new Date(lastCreatedAt).toISOString(),
gt: new Date(lastGeneratedAt).toISOString(),
},
},
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { DbConnection, DbTransaction } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IMemberMergeSuggestion } from '@crowd/types'
import { IMemberId, IMemberNoMerge, IMemberMergeSuggestionsLatestCreatedAt } from 'types'
import { IMemberId, IMemberMergeSuggestionsLatestGeneratedAt, IMemberNoMerge } from 'types'
import { chunkArray, removeDuplicateSuggestions } from 'utils'

class MemberMergeSuggestionsRepository {
Expand Down Expand Up @@ -34,19 +34,34 @@ class MemberMergeSuggestionsRepository {
}
}

async findTenantsLatestSuggestionCreatedAt(tenantId: string): Promise<string> {
async findTenantsLatestMemberSuggestionGeneratedAt(tenantId: string): Promise<string> {
try {
const result: IMemberMergeSuggestionsLatestCreatedAt = await this.connection.oneOrNone(
const result: IMemberMergeSuggestionsLatestGeneratedAt = await this.connection.oneOrNone(
`
select max(mtm."createdAt") as "latestCreatedAt"
from "memberToMerge" mtm
inner join members m on mtm."memberId" = m.id
where m."tenantId" = $(tenantId);`,
select "memberMergeSuggestionsLastGeneratedAt"
from tenants
where "id" = $(tenantId);`,
{
tenantId,
},
)
return result?.memberMergeSuggestionsLastGeneratedAt
} catch (err) {
throw new Error(err)
}
}

async updateMemberMergeSuggestionsLastGeneratedAt(tenantId: string): Promise<void> {
try {
await this.connection.any(
`
update tenants set "memberMergeSuggestionsLastGeneratedAt" = now()
where "id" = $(tenantId);
`,
{
tenantId,
},
)
return result?.latestCreatedAt
} catch (err) {
throw new Error(err)
}
Expand Down
4 changes: 2 additions & 2 deletions services/apps/merge_suggestions_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ export interface IMemberNoMerge {
noMergeId: string
}

export interface IMemberMergeSuggestionsLatestCreatedAt {
latestCreatedAt: string
export interface IMemberMergeSuggestionsLatestGeneratedAt {
memberMergeSuggestionsLastGeneratedAt: string
}

export interface IMemberId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ export async function generateMemberMergeSuggestions(
let result: IMemberPartialAggregatesOpensearch[]
let lastUuid: string

// get the latest createdAt of tenant's member suggestions, we'll only get members created after that for new suggestions
const lastCreatedAt = await activity.findTenantsLatestSuggestionCreatedAt(args.tenantId)
// get the latest generation time of tenant's member suggestions, we'll only get members created after that for new suggestions
const lastGeneratedAt = await activity.findTenantsLatestMemberSuggestionGeneratedAt(args.tenantId)

do {
result = await activity.getMembers(args.tenantId, PAGE_SIZE, lastUuid, lastCreatedAt)
result = await activity.getMembers(args.tenantId, PAGE_SIZE, lastUuid, lastGeneratedAt)

lastUuid = result.length > 0 ? result[result.length - 1]?.uuid_memberId : null

Expand All @@ -46,4 +46,6 @@ export async function generateMemberMergeSuggestions(
await activity.addToMerge(allMergeSuggestions)
}
} while (result.length > 0)

await activity.updateMemberMergeSuggestionsLastGeneratedAt(args.tenantId)
}

0 comments on commit 40bce4e

Please sign in to comment.