Skip to content

Commit

Permalink
Improvement/prevent creation of duplicate member suggestions (#2022)
Browse files Browse the repository at this point in the history
  • Loading branch information
epipav authored Jan 5, 2024
1 parent fcbb7d0 commit d392c1b
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 194 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table "tenants" drop column "memberMergeSuggestionsLastGeneratedAt";
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table "tenants"
add column "memberMergeSuggestionsLastGeneratedAt" timestamp with time zone null;
63 changes: 26 additions & 37 deletions backend/src/database/repositories/memberRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,54 +239,43 @@ class MemberRepository {
}

static async findMembersWithMergeSuggestions(
{ limit = 20, offset = 0 },
{ limit = 20, offset = 0, memberId = undefined },
options: IRepositoryOptions,
) {
const currentTenant = SequelizeRepository.getCurrentTenant(options)
const segmentIds = SequelizeRepository.getSegmentIds(options)

const order = (await isFeatureEnabled(FeatureFlag.SEGMENTS, options))
? 'mtm."activityEstimate" desc, mtm.similarity desc, mtm."memberId", mtm."toMergeId"'
: 'mtm.similarity desc, mtm."activityEstimate" desc, mtm."memberId", mtm."toMergeId"'

const memberFilter = memberId
? ` and (mtm."memberId" = :memberId OR mtm."toMergeId" = :memberId)`
: ''

const mems = await options.database.sequelize.query(
`
SELECT
"membersToMerge".id,
"membersToMerge"."toMergeId",
"membersToMerge"."total_count",
"membersToMerge"."similarity",
"membersToMerge"."activityEstimate"
FROM
(
SELECT
mem.id,
mtm."toMergeId",
COUNT(*) OVER() as total_count,
mtm."similarity",
mtm."activityEstimate",
ROW_NUMBER() OVER (PARTITION BY Greatest(Hashtext(Concat(mem.id, mtm."toMergeId")), Hashtext(Concat(mtm."toMergeId", mem.id))) ORDER BY mem.id, mtm."toMergeId") as rn
FROM
members mem
INNER JOIN
"memberToMerge" mtm ON mem.id = mtm."memberId"
JOIN
"memberSegments" ms ON ms."memberId" = mem.id
WHERE
mem."tenantId" = :tenantId
AND ms."segmentId" IN (:segmentIds)
) as "membersToMerge"
WHERE
"membersToMerge".rn = 1
ORDER BY
"membersToMerge"."activityEstimate",
"membersToMerge"."similarity" DESC,
"membersToMerge".id,
"membersToMerge"."toMergeId"
LIMIT :limit OFFSET :offset;
with members_in_segments as (
select distinct mtm."memberId" from "memberToMerge" mtm
inner join "memberSegments" ms on mtm."memberId" = ms."memberId"
where ms."segmentId" in (:segmentIds)
${memberFilter}
)
select
mtm."memberId" as id,
mtm."toMergeId",
COUNT(*) OVER() as total_count,
mtm.similarity
from members_in_segments mis
inner join "memberToMerge" mtm on mtm."memberId" = mis."memberId"
order by ${order}
LIMIT :limit OFFSET :offset;
`,
{
replacements: {
tenantId: currentTenant.id,
segmentIds,
limit,
offset,
memberId,
},
type: QueryTypes.SELECT,
},
Expand All @@ -308,7 +297,7 @@ class MemberRepository {
members: [i, memberToMergeResults[idx]],
similarity: mems[idx].similarity,
}))
return { rows: result, count: mems[0].total_count / 2, limit, offset }
return { rows: result, count: mems[0].total_count, limit, offset }
}

return { rows: [{ members: [], similarity: 0 }], count: 0, limit, offset }
Expand Down
8 changes: 7 additions & 1 deletion backend/src/database/repositories/organizationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1445,12 +1445,16 @@ class OrganizationRepository {
}

static async findOrganizationsWithMergeSuggestions(
{ limit = 20, offset = 0 },
{ limit = 20, offset = 0, organizationId = undefined },
options: IRepositoryOptions,
) {
const currentTenant = SequelizeRepository.getCurrentTenant(options)
const segmentIds = SequelizeRepository.getSegmentIds(options)

const organizationFilter = organizationId
? ` AND ("otm"."organizationId" = :organizationId OR "otm"."toMergeId" = :organizationId)`
: ''

const orgs = await options.database.sequelize.query(
`WITH
cte AS (
Expand All @@ -1473,6 +1477,7 @@ class OrganizationRepository {
WHERE org."tenantId" = :tenantId
AND os."segmentId" IN (:segmentIds)
AND (ma.id IS NULL OR ma.state = :mergeActionStatus)
${organizationFilter}
),
count_cte AS (
Expand Down Expand Up @@ -1510,6 +1515,7 @@ class OrganizationRepository {
offset,
mergeActionType: MergeActionType.ORG,
mergeActionStatus: MergeActionState.ERROR,
organizationId,
},
type: QueryTypes.SELECT,
},
Expand Down
91 changes: 0 additions & 91 deletions backend/src/services/__tests__/tenantService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,97 +24,6 @@ describe('TenantService tests', () => {
await SequelizeTestUtils.closeConnection(db)
})

describe('findMembersToMerge', () => {
it('Should show the same merge suggestion once, with reverse order', async () => {
const mockIServiceOptions = await SequelizeTestUtils.getTestIServiceOptions(db)
const memberService = new MemberService(mockIServiceOptions)
const tenantService = new TenantService(mockIServiceOptions)

const memberToCreate1 = {
username: {
[PlatformType.SLACK]: {
username: 'member 1',
integrationId: generateUUIDv1(),
},
},
platform: PlatformType.SLACK,
email: '[email protected]',
joinedAt: '2020-05-27T15:13:30Z',
}

const memberToCreate2 = {
username: {
[PlatformType.DISCORD]: {
username: 'member 2',
integrationId: generateUUIDv1(),
},
},
platform: PlatformType.DISCORD,
email: '[email protected]',
joinedAt: '2020-05-26T15:13:30Z',
}

const memberToCreate3 = {
username: {
[PlatformType.GITHUB]: {
username: 'member 3',
integrationId: generateUUIDv1(),
},
},
platform: PlatformType.GITHUB,
email: '[email protected]',
joinedAt: '2020-05-25T15:13:30Z',
}

const memberToCreate4 = {
username: {
[PlatformType.TWITTER]: {
username: 'member 4',
integrationId: generateUUIDv1(),
},
},
platform: PlatformType.TWITTER,
email: '[email protected]',
joinedAt: '2020-05-24T15:13:30Z',
}

const member1 = await memberService.upsert(memberToCreate1)
let member2 = await memberService.upsert(memberToCreate2)
const member3 = await memberService.upsert(memberToCreate3)
let member4 = await memberService.upsert(memberToCreate4)

await memberService.addToMerge([{ members: [member1.id, member2.id], similarity: 1 }])
await memberService.addToMerge([{ members: [member3.id, member4.id], similarity: 0.5 }])

member2 = await memberService.findById(member2.id)
member4 = await memberService.findById(member4.id)

const memberToMergeSuggestions = await tenantService.findMembersToMerge({})

// In the DB there should be:
// - Member 1 should have member 2 in toMerge
// - Member 3 should have member 4 in toMerge
// - Member 4 should have member 3 in toMerge
// - We should get these 4 combinations
// But this function should not return duplicates, so we should get
// only two pairs: [m2, m1] and [m4, m3]

expect(memberToMergeSuggestions.count).toEqual(1)

expect(
memberToMergeSuggestions.rows[0].members
.sort((a, b) => (a.createdAt > b.createdAt ? 1 : -1))
.map((m) => m.id),
).toStrictEqual([member1.id, member2.id])

expect(
memberToMergeSuggestions.rows[1].members
.sort((a, b) => (a.createdAt > b.createdAt ? 1 : -1))
.map((m) => m.id),
).toStrictEqual([member3.id, member4.id])
})
})

describe('_findAndCountAllForEveryUser method', () => {
it('Should succesfully find all tenants without filtering by currentUser', async () => {
let tenants = await TenantService._findAndCountAllForEveryUser({ filter: {} })
Expand Down
6 changes: 4 additions & 2 deletions services/apps/merge_suggestions_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ import {
getMergeSuggestions,
addToMerge,
getMembers,
findTenantsLatestSuggestionCreatedAt,
findTenantsLatestMemberSuggestionGeneratedAt,
updateMemberMergeSuggestionsLastGeneratedAt,
} from './activities/member-merge-suggestions/getMergeSuggestions'

export {
getAllTenants,
getMergeSuggestions,
getMembers,
addToMerge,
findTenantsLatestSuggestionCreatedAt,
findTenantsLatestMemberSuggestionGeneratedAt,
updateMemberMergeSuggestionsLastGeneratedAt,
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { ITenantId } from 'types'
import { ITenant } from 'types'
import { svc } from '../main'
import TenantRepository from 'repo/tenant.repo'
import { isFeatureEnabled } from '@crowd/feature-flags'
import { FeatureFlag } from '@crowd/types'

export async function getAllTenants(): Promise<ITenantId[]> {
export async function getAllTenants(): Promise<ITenant[]> {
const tenantRepository = new TenantRepository(svc.postgres.writer.connection(), svc.log)
const tenants = await tenantRepository.getAllTenants()

Expand All @@ -15,6 +15,7 @@ export async function getAllTenants(): Promise<ITenantId[]> {
async () => {
return {
tenantId: tenant.tenantId,
plan: tenant.plan,
}
},
svc.unleash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ export async function getMergeSuggestions(

let hasFuzzySearch = false

for (const identity of member.nested_identities) {
// prevent processing more than 200 identities because of opensearch limits
for (const identity of member.nested_identities.slice(0, 200)) {
if (identity.string_username.length > 0) {
// weak identity search
identitiesPartialQuery.should[1].nested.query.bool.should.push({
Expand Down Expand Up @@ -165,13 +166,23 @@ export async function getMergeSuggestions(
],
}

const membersToMerge: ISimilarMemberOpensearch[] =
(
await svc.opensearch.client.search({
index: OpenSearchIndex.MEMBERS,
body: similarMembersQueryBody,
})
).body?.hits?.hits || []
let membersToMerge: ISimilarMemberOpensearch[]

try {
membersToMerge =
(
await svc.opensearch.client.search({
index: OpenSearchIndex.MEMBERS,
body: similarMembersQueryBody,
})
).body?.hits?.hits || []
} catch (e) {
svc.log.info(
{ error: e, query: identitiesPartialQuery },
'Error while searching for similar members!',
)
throw e
}

for (const memberToMerge of membersToMerge) {
mergeSuggestions.push({
Expand All @@ -193,19 +204,29 @@ export async function addToMerge(suggestions: IMemberMergeSuggestion[]): Promise
await memberMergeSuggestionsRepo.addToMerge(suggestions)
}

export async function findTenantsLatestSuggestionCreatedAt(tenantId: string): Promise<string> {
export async function findTenantsLatestMemberSuggestionGeneratedAt(
tenantId: string,
): Promise<string> {
const memberMergeSuggestionsRepo = new MemberMergeSuggestionsRepository(
svc.postgres.writer.connection(),
svc.log,
)
return memberMergeSuggestionsRepo.findTenantsLatestMemberSuggestionGeneratedAt(tenantId)
}

export async function updateMemberMergeSuggestionsLastGeneratedAt(tenantId: string): Promise<void> {
const memberMergeSuggestionsRepo = new MemberMergeSuggestionsRepository(
svc.postgres.writer.connection(),
svc.log,
)
return memberMergeSuggestionsRepo.findTenantsLatestSuggestionCreatedAt(tenantId)
await memberMergeSuggestionsRepo.updateMemberMergeSuggestionsLastGeneratedAt(tenantId)
}

export async function getMembers(
tenantId: string,
batchSize: number = 100,
afterMemberId?: string,
lastCreatedAt?: string,
lastGeneratedAt?: string,
): Promise<IMemberPartialAggregatesOpensearch[]> {
try {
const queryBody: IMemberQueryBody = {
Expand Down Expand Up @@ -248,11 +269,11 @@ export async function getMembers(
})
}

if (lastCreatedAt) {
if (lastGeneratedAt) {
queryBody.query.bool.filter.push({
range: {
date_createdAt: {
gt: new Date(lastCreatedAt).toISOString(),
gt: new Date(lastGeneratedAt).toISOString(),
},
},
})
Expand Down
Loading

0 comments on commit d392c1b

Please sign in to comment.