diff --git a/backend/.env.dist.local b/backend/.env.dist.local index c4bb933dbc..6081f91955 100755 --- a/backend/.env.dist.local +++ b/backend/.env.dist.local @@ -109,7 +109,7 @@ CROWD_STACKEXCHANGE_KEY= # Nango settings CROWD_NANGO_URL=http://localhost:3003 CROWD_NANGO_SECRET_KEY=424242 -CROWD_NANGO_INTEGRATIONS=reddit,linkedin,stackexchange,hubspot +CROWD_NANGO_INTEGRATIONS=reddit,linkedin,stackexchange # Cohere settings CROWD_COHERE_API_KEY= diff --git a/backend/config/custom-environment-variables.json b/backend/config/custom-environment-variables.json index 91e5fcf1a8..739611c36e 100644 --- a/backend/config/custom-environment-variables.json +++ b/backend/config/custom-environment-variables.json @@ -120,11 +120,6 @@ "stackexchange": { "key": "CROWD_STACKEXCHANGE_KEY" }, - "hubspot": { - "appId": "CROWD_HUBSPOT_APP_ID", - "clientId": "CROWD_HUBSPOT_CLIENT_ID", - "clientSecret": "CROWD_HUBSPOT_CLIENT_SECRET" - }, "reddit": { "clientId": "CROWD_REDDIT_CLIENT_ID", "clientSecret": "CROWD_REDDIT_CLIENT_SECRET" diff --git a/backend/config/default.json b/backend/config/default.json index 8f5753a3e7..aac59ef26d 100644 --- a/backend/config/default.json +++ b/backend/config/default.json @@ -46,11 +46,11 @@ "crowdAnalytics": { "isEnabled": "false" }, - "temporal": { - "automationsTaskQueue": "automations" - }, + "temporal": {}, "searchSyncApi": {}, "encryption": {}, "openStatusApi": {}, - "gitlab": {} + "gitlab": {}, + "jiraIssueReporter": {}, + "snowflake": {} } diff --git a/backend/id-openapi.yaml b/backend/id-openapi.yaml index ff7c816df8..6b8c7ae5be 100644 --- a/backend/id-openapi.yaml +++ b/backend/id-openapi.yaml @@ -103,7 +103,7 @@ paths: description: Organization ID source: type: string - enum: ['ui', 'email-domain', 'enrichment', 'hubspot', 'github'] + enum: ['ui', 'email-domain', 'enrichment', 'github'] example: 'ui' description: Data source. For manual updates, always use 'ui' title: @@ -188,7 +188,7 @@ paths: description: Organization ID source: type: string - enum: ['ui', 'email-domain', 'enrichment', 'hubspot', 'github'] + enum: ['ui', 'email-domain', 'enrichment', 'github'] example: 'ui' description: Data source. For manual updates, always use 'ui' title: diff --git a/backend/src/api/auth/authMe.ts b/backend/src/api/auth/authMe.ts index 2844b54f37..7046d6dcf6 100644 --- a/backend/src/api/auth/authMe.ts +++ b/backend/src/api/auth/authMe.ts @@ -1,7 +1,5 @@ import { Error403 } from '@crowd/common' -import AutomationRepository from '@/database/repositories/automationRepository' - export default async (req, res) => { if (!req.currentUser || !req.currentUser.id) { await req.responseHandler.error(req, res, new Error403(req.language)) @@ -10,17 +8,5 @@ export default async (req, res) => { const payload = req.currentUser - payload.tenants = await Promise.all( - payload.tenants.map(async (tenantUser) => { - tenantUser.tenant.dataValues = { - ...tenantUser.tenant.dataValues, - automationCount: - Number(await AutomationRepository.countAllActive(req.database, tenantUser.tenant.id)) || - 0, - } - return tenantUser - }), - ) - await req.responseHandler.success(req, res, payload) } diff --git a/backend/src/conf/configTypes.ts b/backend/src/conf/configTypes.ts index 47bf548c4b..fa5a108683 100644 --- a/backend/src/conf/configTypes.ts +++ b/backend/src/conf/configTypes.ts @@ -1,5 +1,3 @@ -import { ITemporalConfig } from '@crowd/temporal' - export enum ServiceType { API = 'api', JOB_GENERATOR = 'job-generator', @@ -101,11 +99,6 @@ export interface SlackConfiguration { appToken?: string } -export interface SlackNotifierConfiguration { - clientId: string - clientSecret: string -} - export interface GoogleConfiguration { clientId: string clientSecret: string @@ -179,10 +172,6 @@ export interface CrowdAnalyticsConfiguration { apiToken: string } -export interface IBackendTemporalConfig extends ITemporalConfig { - automationsTaskQueue: string -} - export interface EncryptionConfiguration { secretKey: string initVector: string diff --git a/backend/src/conf/index.ts b/backend/src/conf/index.ts index 727b4b37e9..d2fe5b5185 100644 --- a/backend/src/conf/index.ts +++ b/backend/src/conf/index.ts @@ -4,6 +4,7 @@ import { IDatabaseConfig } from '@crowd/data-access-layer/src/database' import { ISearchSyncApiConfig } from '@crowd/opensearch' import { IQueueClientConfig } from '@crowd/queue' import { IRedisConfiguration } from '@crowd/redis' +import { ITemporalConfig } from '@crowd/temporal' import { IGithubIssueReporterConfiguration, IJiraIssueReporterConfiguration } from '@crowd/types' import { @@ -21,7 +22,6 @@ import { GithubTokenConfiguration, GitlabConfiguration, GoogleConfiguration, - IBackendTemporalConfig, IOpenSearchConfig, IOpenStatusApiConfig, IRedditConfig, @@ -34,7 +34,6 @@ import { ServiceType, SlackAlertingConfiguration, SlackConfiguration, - SlackNotifierConfiguration, SnowflakeConfiguration, StackExchangeConfiguration, TenantMode, @@ -100,9 +99,6 @@ export const TWITTER_CONFIG: TwitterConfiguration = config.get('slack') -export const SLACK_NOTIFIER_CONFIG: SlackNotifierConfiguration = - config.get('slackNotifier') - export const GOOGLE_CONFIG: GoogleConfiguration = config.get('google') export const DISCORD_CONFIG: DiscordConfiguration = config.get('discord') @@ -144,8 +140,7 @@ export const INTEGRATION_PROCESSING_CONFIG: IntegrationProcessingConfiguration = export const CROWD_ANALYTICS_CONFIG: CrowdAnalyticsConfiguration = config.get('crowdAnalytics') -export const TEMPORAL_CONFIG: IBackendTemporalConfig = - config.get('temporal') +export const TEMPORAL_CONFIG: ITemporalConfig = config.get('temporal') export const SEARCH_SYNC_API_CONFIG: ISearchSyncApiConfig = config.get('searchSyncApi') diff --git a/backend/src/database/models/automation.ts b/backend/src/database/models/automation.ts deleted file mode 100644 index a2726ba29e..0000000000 --- a/backend/src/database/models/automation.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { DataTypes } from 'sequelize' - -export default (sequelize) => { - const automation = sequelize.define( - 'automation', - { - id: { - type: DataTypes.UUID, - defaultValue: DataTypes.UUIDV4, - primaryKey: true, - allowNull: false, - }, - type: { - type: DataTypes.STRING(80), - allowNull: false, - validate: { - notEmpty: true, - }, - }, - name: { - type: DataTypes.STRING(255), - }, - tenantId: { - type: DataTypes.UUID, - allowNull: false, - }, - trigger: { - type: DataTypes.STRING(80), - allowNull: false, - validate: { - notEmpty: true, - }, - }, - settings: { - type: DataTypes.JSONB, - allowNull: false, - }, - state: { - type: DataTypes.STRING(80), - allowNull: false, - validate: { - notEmpty: true, - }, - }, - }, - { - indexes: [ - { - fields: ['type', 'tenantId', 'trigger', 'state'], - }, - ], - timestamps: true, - }, - ) - - automation.associate = (models) => { - models.automation.belongsTo(models.tenant, { - as: 'tenant', - foreignKey: { - allowNull: false, - }, - }) - - models.automation.belongsTo(models.user, { - as: 'createdBy', - }) - - models.automation.belongsTo(models.user, { - as: 'updatedBy', - }) - } - - return automation -} diff --git a/backend/src/database/models/automationExecution.ts b/backend/src/database/models/automationExecution.ts deleted file mode 100644 index 72b59291ce..0000000000 --- a/backend/src/database/models/automationExecution.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { DataTypes } from 'sequelize' - -export default (sequelize) => { - const automationExecution = sequelize.define( - 'automationExecution', - { - id: { - type: DataTypes.UUID, - defaultValue: DataTypes.UUIDV4, - primaryKey: true, - allowNull: false, - }, - automationId: { - type: DataTypes.UUID, - allowNull: false, - }, - type: { - type: DataTypes.STRING(80), - allowNull: false, - validate: { - notEmpty: true, - }, - }, - tenantId: { - type: DataTypes.UUID, - allowNull: false, - }, - trigger: { - type: DataTypes.STRING(80), - allowNull: false, - validate: { - notEmpty: true, - }, - }, - state: { - type: DataTypes.STRING(80), - allowNull: false, - validate: { - notEmpty: true, - }, - }, - error: { - type: DataTypes.JSON, - allowNull: true, - }, - executedAt: { - type: DataTypes.DATE, - allowNull: false, - }, - eventId: { - type: DataTypes.STRING(255), - allowNull: false, - validate: { - notEmpty: true, - }, - }, - payload: { - type: DataTypes.JSON, - allowNull: false, - }, - }, - { - indexes: [ - { - fields: ['automationId'], - }, - ], - timestamps: false, - paranoid: false, - }, - ) - - automationExecution.associate = (models) => { - models.automationExecution.belongsTo(models.tenant, { - as: 'tenant', - foreignKey: { - allowNull: false, - }, - }) - - models.automationExecution.belongsTo(models.automation, { - as: 'automation', - foreignKey: { - allowNull: false, - }, - }) - } - - return automationExecution -} diff --git a/backend/src/database/models/index.ts b/backend/src/database/models/index.ts index 28816ce2aa..6806261462 100644 --- a/backend/src/database/models/index.ts +++ b/backend/src/database/models/index.ts @@ -131,8 +131,6 @@ async function models(queryTimeoutMilliseconds: number, databaseHostnameOverride require('./conversationSettings').default, require('./eagleEyeContent').default, require('./eagleEyeAction').default, - require('./automation').default, - require('./automationExecution').default, require('./organization').default, require('./memberAttributeSettings').default, require('./segment').default, diff --git a/backend/src/database/repositories/automationExecutionRepository.ts b/backend/src/database/repositories/automationExecutionRepository.ts deleted file mode 100644 index 9d38fe03ff..0000000000 --- a/backend/src/database/repositories/automationExecutionRepository.ts +++ /dev/null @@ -1,170 +0,0 @@ -/* eslint-disable class-methods-use-this */ - -/* eslint-disable @typescript-eslint/no-unused-vars */ -import { QueryTypes } from 'sequelize' - -import { AutomationExecutionState, PageData } from '@crowd/types' - -import { AutomationExecution, AutomationExecutionCriteria } from '../../types/automationTypes' - -import { IRepositoryOptions } from './IRepositoryOptions' -import { RepositoryBase } from './repositoryBase' -import { DbAutomationExecutionInsertData } from './types/automationTypes' - -export default class AutomationExecutionRepository extends RepositoryBase< - AutomationExecution, - string, - DbAutomationExecutionInsertData, - unknown, - AutomationExecutionCriteria -> { - public constructor(options: IRepositoryOptions) { - super(options, false) - } - - override async create(data: DbAutomationExecutionInsertData): Promise { - const transaction = this.transaction - - return this.database.automationExecution.create( - { - automationId: data.automationId, - type: data.type, - tenantId: data.tenantId, - trigger: data.trigger, - state: data.state, - error: data.error, - executedAt: data.executedAt, - eventId: data.eventId, - payload: data.payload, - }, - { transaction }, - ) - } - - override async findAndCountAll( - criteria: AutomationExecutionCriteria, - ): Promise> { - // get current tenant that was used to make a request - const currentTenant = this.currentTenant - - // get plain sequelize object to use with a raw query - const seq = this.seq - - // construct a query with pagination - const query = ` - select id, - "automationId", - state, - error, - "executedAt", - "eventId", - payload, - count(*) over () as "paginatedItemsCount" - from "automationExecutions" - where "tenantId" = :tenantId - and "automationId" = :automationId - order by "executedAt" desc - limit ${criteria.limit} offset ${criteria.offset} - - ` - - const results = await seq.query(query, { - replacements: { - tenantId: currentTenant.id, - automationId: criteria.automationId, - }, - type: QueryTypes.SELECT, - }) - - if (results.length === 0) { - return { - rows: [], - count: 0, - offset: criteria.offset, - limit: criteria.limit, - } - } - - const count = parseInt((results[0] as any).paginatedItemsCount, 10) - const rows: AutomationExecution[] = results.map((r) => { - const row = r as any - return { - id: row.id, - automationId: row.automationId, - executedAt: row.executedAt, - eventId: row.eventId, - payload: row.payload, - error: row.error, - state: row.state, - } - }) - - return { - rows, - count, - offset: criteria.offset, - limit: criteria.limit, - } - } - - public async hasAlreadyBeenTriggered(automationId: string, eventId: string): Promise { - const seq = this.seq - - const query = ` - select id - from "automationExecutions" - where "automationId" = :automationId - and "eventId" = :eventId - and state = '${AutomationExecutionState.SUCCESS}'; - ` - - const results = await seq.query(query, { - replacements: { - automationId, - eventId, - }, - type: QueryTypes.SELECT, - }) - - return results.length > 0 - } - - override async update(id: string, data: unknown): Promise { - throw new Error('Method not implemented.') - } - - override async destroy(id: string): Promise { - throw new Error('Method not implemented.') - } - - async destroyAllAutomation(automationIds: string[]): Promise { - const transaction = this.transaction - - const seq = this.seq - - const currentTenant = this.currentTenant - - const query = ` - delete - from "automationExecutions" - where "automationId" in (:automationIds) - and "tenantId" = :tenantId;` - - await seq.query(query, { - replacements: { - automationIds, - tenantId: currentTenant.id, - }, - type: QueryTypes.DELETE, - transaction, - }) - } - - override async destroyAll(ids: string[]): Promise { - throw new Error('Method not implemented.') - } - - override async findById(id: string): Promise { - throw new Error('Method not implemented.') - } -} diff --git a/backend/src/database/repositories/automationRepository.ts b/backend/src/database/repositories/automationRepository.ts deleted file mode 100644 index a09c79480d..0000000000 --- a/backend/src/database/repositories/automationRepository.ts +++ /dev/null @@ -1,295 +0,0 @@ -import Sequelize, { QueryTypes } from 'sequelize' - -import { Error404 } from '@crowd/common' -import { AutomationState, AutomationSyncTrigger, IAutomationData, PageData } from '@crowd/types' - -import { AutomationCriteria } from '../../types/automationTypes' - -import { IRepositoryOptions } from './IRepositoryOptions' -import AuditLogRepository from './auditLogRepository' -import { RepositoryBase } from './repositoryBase' -import { DbAutomationInsertData, DbAutomationUpdateData } from './types/automationTypes' - -const { Op } = Sequelize - -export default class AutomationRepository extends RepositoryBase< - IAutomationData, - string, - DbAutomationInsertData, - DbAutomationUpdateData, - AutomationCriteria -> { - public constructor(options: IRepositoryOptions) { - super(options, true) - } - - override async create(data: DbAutomationInsertData): Promise { - const currentUser = this.currentUser - - const tenant = this.currentTenant - - const transaction = this.transaction - - const record = await this.database.automation.create( - { - name: data.name, - type: data.type, - trigger: data.trigger, - settings: data.settings, - state: data.state, - tenantId: tenant.id, - createdById: currentUser.id, - updatedById: currentUser.id, - }, - { - transaction, - }, - ) - - await this.createAuditLog('automation', AuditLogRepository.CREATE, record, data) - - return this.findById(record.id) - } - - override async update(id, data: DbAutomationUpdateData): Promise { - const currentUser = this.currentUser - - const currentTenant = this.currentTenant - - const transaction = this.transaction - - let record = await this.database.automation.findOne({ - where: { - id, - tenantId: currentTenant.id, - }, - transaction, - }) - - if (!record) { - throw new Error404() - } - - record = await record.update( - { - name: data.name, - trigger: data.trigger, - settings: data.settings, - state: data.state, - updatedById: currentUser.id, - }, - { - transaction, - }, - ) - - await this.createAuditLog('automation', AuditLogRepository.UPDATE, record, data) - - return this.findById(record.id) - } - - override async destroyAll(ids: string[]): Promise { - const transaction = this.transaction - - const currentTenant = this.currentTenant - - const records = await this.database.automation.findAll({ - where: { - id: { - [Op.in]: ids, - }, - tenantId: currentTenant.id, - }, - transaction, - }) - - if (ids.some((id) => records.find((r) => r.id === id) === undefined)) { - throw new Error404() - } - - await Promise.all( - records.flatMap((r) => [ - r.destroy({ transaction }), - this.createAuditLog('automation', AuditLogRepository.DELETE, r, r), - ]), - ) - } - - override async findById(id: string): Promise { - const results = await this.findAndCountAll({ - id, - offset: 0, - limit: 1, - }) - - if (results.count === 1) { - return results.rows[0] - } - - if (results.count === 0) { - throw new Error404() - } - - throw new Error('More than one row returned when fetching by automation unique ID!') - } - - override async findAndCountAll(criteria: AutomationCriteria): Promise> { - // get current tenant that was used to make a request - const currentTenant = this.currentTenant - - // we need transaction if there is one set because some records were perhaps created/updated in the same transaction - const transaction = this.transaction - - // get plain sequelize object to use with a raw query - const seq = this.seq - - // build a where condition based on tenant and other criteria passed as parameter - const conditions = ['a."tenantId" = :tenantId'] - const parameters: any = { - tenantId: currentTenant.id, - } - - if (criteria.id) { - conditions.push('a.id = :id') - parameters.id = criteria.id - } - - if (criteria.state) { - conditions.push('a.state = :state') - parameters.state = criteria.state - } - - if (criteria.type) { - conditions.push('a.type = :type') - parameters.type = criteria.type - } - - if (criteria.trigger) { - conditions.push('a.trigger = :trigger') - parameters.trigger = criteria.trigger - } - - const conditionsString = conditions.join(' and ') - - const query = ` - -- common table expression (CTE) to prepare the last execution information for each automationId - with latest_executions as (select distinct on ("automationId") "automationId", "executedAt", state, error - from "automationExecutions" - order by "automationId", "executedAt" desc) - select a.id, - a.name, - a.type, - a."tenantId", - a.trigger, - a.settings, - a.state, - a."createdAt", - a."updatedAt", - le."executedAt" as "lastExecutionAt", - le.state as "lastExecutionState", - le.error as "lastExecutionError", - count(*) over () as "paginatedItemsCount" - from automations a - left join latest_executions le on a.id = le."automationId" - where ${conditionsString} - order by "updatedAt" desc - ${this.getPaginationString(criteria)} - ` - // fetch all automations for a tenant - // and include the latest execution data if available - const results = await seq.query(query, { - replacements: parameters, - type: QueryTypes.SELECT, - transaction, - }) - - if (results.length === 0) { - return { - rows: [], - count: 0, - offset: criteria.offset, - limit: criteria.limit, - } - } - - const count = parseInt((results[0] as any).paginatedItemsCount, 10) - const rows: IAutomationData[] = results.map((r) => { - const row = r as any - return { - id: row.id, - name: row.name, - type: row.type, - tenantId: row.tenantId, - trigger: row.trigger, - settings: row.settings, - state: row.state, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - lastExecutionAt: row.lastExecutionAt, - lastExecutionState: row.lastExecutionState, - lastExecutionError: row.lastExecutionError, - } - }) - - return { - rows, - count, - offset: criteria.offset, - limit: criteria.limit, - } - } - - static async countAllActive(database: any, tenantId: string): Promise { - const automationCount = await database.automation.count({ - where: { - tenantId, - state: AutomationState.ACTIVE, - }, - useMaster: true, - }) - - return automationCount - } - - public async findSyncAutomations( - tenantId: string, - platform: string, - ): Promise { - const seq = this.seq - - const transaction = this.transaction - - const pageSize = 10 - const syncAutomations: IAutomationData[] = [] - - let results - let offset - - do { - offset = results ? pageSize + offset : 0 - - results = await seq.query( - `select * from automations - where type = :platform and "tenantId" = :tenantId and trigger in (:syncAutomationTriggers) - limit :limit offset :offset`, - { - replacements: { - tenantId, - platform, - syncAutomationTriggers: [ - AutomationSyncTrigger.MEMBER_ATTRIBUTES_MATCH, - AutomationSyncTrigger.ORGANIZATION_ATTRIBUTES_MATCH, - ], - limit: pageSize, - offset, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - syncAutomations.push(...results) - } while (results.length > 0) - - return syncAutomations - } -} diff --git a/backend/src/database/repositories/integrationRepository.ts b/backend/src/database/repositories/integrationRepository.ts index 4617d53e16..ecd49e3ff2 100644 --- a/backend/src/database/repositories/integrationRepository.ts +++ b/backend/src/database/repositories/integrationRepository.ts @@ -16,12 +16,8 @@ import SequelizeFilterUtils from '../utils/sequelizeFilterUtils' import { IRepositoryOptions } from './IRepositoryOptions' import AuditLogRepository from './auditLogRepository' -import AutomationExecutionRepository from './automationExecutionRepository' -import AutomationRepository from './automationRepository' import QueryParser from './filters/queryParser' import { QueryOutput } from './filters/queryTypes' -import MemberSyncRemoteRepository from './memberSyncRemoteRepository' -import OrganizationSyncRemoteRepository from './organizationSyncRemoteRepository' import SequelizeRepository from './sequelizeRepository' const { Op } = Sequelize @@ -172,30 +168,6 @@ class IntegrationRepository { }, ) - // delete syncRemote rows coming from integration - await new MemberSyncRemoteRepository({ ...options, transaction }).destroyAllIntegration([ - record.id, - ]) - await new OrganizationSyncRemoteRepository({ ...options, transaction }).destroyAllIntegration([ - record.id, - ]) - - // destroy existing automations for outgoing integrations - const syncAutomationIds = ( - await new AutomationRepository({ ...options, transaction }).findSyncAutomations( - currentTenant.id, - record.platform, - ) - ).map((a) => a.id) - - if (syncAutomationIds.length > 0) { - await new AutomationExecutionRepository({ ...options, transaction }).destroyAllAutomation( - syncAutomationIds, - ) - } - - await new AutomationRepository({ ...options, transaction }).destroyAll(syncAutomationIds) - await this._createAuditLog(AuditLogRepository.DELETE, record, record, options) } diff --git a/backend/src/database/repositories/memberRepository.ts b/backend/src/database/repositories/memberRepository.ts index 632fbe033d..d6b6381932 100644 --- a/backend/src/database/repositories/memberRepository.ts +++ b/backend/src/database/repositories/memberRepository.ts @@ -67,7 +67,6 @@ import { SegmentProjectGroupNestedData, SegmentProjectNestedData, SegmentType, - SyncStatus, } from '@crowd/types' import { KUBE_MODE, SERVICE } from '@/conf' @@ -85,7 +84,6 @@ import { IRepositoryOptions } from './IRepositoryOptions' import AuditLogRepository from './auditLogRepository' import MemberAttributeSettingsRepository from './memberAttributeSettingsRepository' import MemberSegmentAffiliationRepository from './memberSegmentAffiliationRepository' -import MemberSyncRemoteRepository from './memberSyncRemoteRepository' import OrganizationRepository from './organizationRepository' import SegmentRepository from './segmentRepository' import SequelizeRepository from './sequelizeRepository' @@ -2678,20 +2676,6 @@ class MemberRepository { output.affiliations = await this.getAffiliations(record.id, options) - const manualSyncRemote = await new MemberSyncRemoteRepository(options).findMemberManualSync( - record.id, - ) - - for (const syncRemote of manualSyncRemote) { - if (output.attributes?.syncRemote) { - output.attributes.syncRemote[syncRemote.platform] = syncRemote.status === SyncStatus.ACTIVE - } else { - output.attributes.syncRemote = { - [syncRemote.platform]: syncRemote.status === SyncStatus.ACTIVE, - } - } - } - return output } diff --git a/backend/src/database/repositories/memberSyncRemoteRepository.ts b/backend/src/database/repositories/memberSyncRemoteRepository.ts deleted file mode 100644 index 861068fc07..0000000000 --- a/backend/src/database/repositories/memberSyncRemoteRepository.ts +++ /dev/null @@ -1,250 +0,0 @@ -import { QueryTypes } from 'sequelize' - -import { generateUUIDv1 as uuid } from '@crowd/common' -import { IMemberSyncRemoteData, SyncStatus } from '@crowd/types' - -import { IRepositoryOptions } from './IRepositoryOptions' -import { RepositoryBase } from './repositoryBase' -import SequelizeRepository from './sequelizeRepository' - -class MemberSyncRemoteRepository extends RepositoryBase< - IMemberSyncRemoteData, - string, - IMemberSyncRemoteData, - unknown, - unknown -> { - public constructor(options: IRepositoryOptions) { - super(options, true) - } - - async stopSyncingAutomation(automationId: string) { - await this.options.database.sequelize.query( - `update "membersSyncRemote" set status = :status where "syncFrom" = :automationId - `, - { - replacements: { - status: SyncStatus.STOPPED, - automationId, - }, - type: QueryTypes.UPDATE, - }, - ) - } - - async findRemoteSync(integrationId: string, memberId: string, syncFrom: string) { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `SELECT * - FROM "membersSyncRemote" - WHERE "integrationId" = :integrationId and "memberId" = :memberId and "syncFrom" = :syncFrom; - `, - { - replacements: { - integrationId, - memberId, - syncFrom, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - if (records.length === 0) { - return null - } - - return records[0] - } - - async startManualSync(id: string, sourceId: string) { - const transaction = SequelizeRepository.getTransaction(this.options) - - await this.options.database.sequelize.query( - `update "membersSyncRemote" set status = :status, "sourceId" = :sourceId where "id" = :id - `, - { - replacements: { - status: SyncStatus.ACTIVE, - id, - sourceId: sourceId || null, - }, - type: QueryTypes.UPDATE, - transaction, - }, - ) - } - - async stopMemberManualSync(memberId: string) { - await this.options.database.sequelize.query( - `update "membersSyncRemote" set status = :status where "memberId" = :memberId and "syncFrom" = :manualSync - `, - { - replacements: { - status: SyncStatus.STOPPED, - memberId, - manualSync: 'manual', - }, - type: QueryTypes.UPDATE, - }, - ) - } - - async destroyAllAutomation(automationIds: string[]): Promise { - const transaction = this.transaction - - const seq = this.seq - - const query = ` - delete - from "membersSyncRemote" - where "syncFrom" in (:automationIds);` - - await seq.query(query, { - replacements: { - automationIds, - }, - type: QueryTypes.DELETE, - transaction, - }) - } - - async destroyAllIntegration(integrationIds: string[]): Promise { - const transaction = this.transaction - - const seq = this.seq - - const query = ` - delete - from "membersSyncRemote" - where "integrationId" in (:integrationIds);` - - await seq.query(query, { - replacements: { - integrationIds, - }, - type: QueryTypes.DELETE, - transaction, - }) - } - - async markMemberForSyncing(data: IMemberSyncRemoteData): Promise { - const transaction = SequelizeRepository.getTransaction(this.options) - - const existingSyncRemote = await this.findByMemberId(data.memberId) - - if (existingSyncRemote) { - data.sourceId = existingSyncRemote.sourceId - } - - const existingManualSyncRemote = await this.findRemoteSync( - data.integrationId, - data.memberId, - data.syncFrom, - ) - - if (existingManualSyncRemote) { - await this.startManualSync(existingManualSyncRemote.id, data.sourceId) - return existingManualSyncRemote - } - - const memberSyncRemoteInserted = await this.options.database.sequelize.query( - `insert into "membersSyncRemote" ("id", "memberId", "sourceId", "integrationId", "syncFrom", "metaData", "lastSyncedAt", "status") - values - (:id, :memberId, :sourceId, :integrationId, :syncFrom, :metaData, :lastSyncedAt, :status) - returning "id" - `, - { - replacements: { - id: uuid(), - memberId: data.memberId, - integrationId: data.integrationId, - syncFrom: data.syncFrom, - metaData: data.metaData, - lastSyncedAt: data.lastSyncedAt || null, - sourceId: data.sourceId || null, - status: SyncStatus.ACTIVE, - }, - type: QueryTypes.INSERT, - transaction, - }, - ) - - const memberSyncRemote = await this.findById(memberSyncRemoteInserted[0][0].id) - return memberSyncRemote - } - - async findMemberManualSync(memberId: string) { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `select i.platform, msr.status from "membersSyncRemote" msr - inner join integrations i on msr."integrationId" = i.id - where msr."syncFrom" = :syncFrom and msr."memberId" = :memberId; - `, - { - replacements: { - memberId, - syncFrom: 'manual', - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - return records - } - - async findByMemberId(memberId: string): Promise { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `SELECT * - FROM "membersSyncRemote" - WHERE "memberId" = :memberId - and "sourceId" is not null - limit 1; - `, - { - replacements: { - memberId, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - if (records.length === 0) { - return null - } - - return records[0] - } - - async findById(id: string): Promise { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `SELECT * - FROM "membersSyncRemote" - WHERE id = :id; - `, - { - replacements: { - id, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - if (records.length === 0) { - return null - } - - return records[0] - } -} - -export default MemberSyncRemoteRepository diff --git a/backend/src/database/repositories/organizationSyncRemoteRepository.ts b/backend/src/database/repositories/organizationSyncRemoteRepository.ts deleted file mode 100644 index ef2eb3b306..0000000000 --- a/backend/src/database/repositories/organizationSyncRemoteRepository.ts +++ /dev/null @@ -1,252 +0,0 @@ -import { QueryTypes } from 'sequelize' - -import { generateUUIDv1 as uuid } from '@crowd/common' -import { IOrganizationSyncRemoteData, SyncStatus } from '@crowd/types' - -import { IRepositoryOptions } from './IRepositoryOptions' -import { RepositoryBase } from './repositoryBase' -import SequelizeRepository from './sequelizeRepository' - -class OrganizationSyncRemoteRepository extends RepositoryBase< - IOrganizationSyncRemoteData, - string, - IOrganizationSyncRemoteData, - unknown, - unknown -> { - public constructor(options: IRepositoryOptions) { - super(options, true) - } - - async stopSyncingAutomation(automationId: string) { - await this.options.database.sequelize.query( - `update "organizationsSyncRemote" set status = :status where "syncFrom" = :automationId - `, - { - replacements: { - status: SyncStatus.STOPPED, - automationId, - }, - type: QueryTypes.UPDATE, - }, - ) - } - - async stopOrganizationManualSync(organizationId: string) { - await this.options.database.sequelize.query( - `update "organizationsSyncRemote" set status = :status where "organizationId" = :organizationId and "syncFrom" = :manualSync - `, - { - replacements: { - status: SyncStatus.STOPPED, - organizationId, - manualSync: 'manual', - }, - type: QueryTypes.UPDATE, - }, - ) - } - - async startManualSync(id: string, sourceId: string) { - const transaction = SequelizeRepository.getTransaction(this.options) - - await this.options.database.sequelize.query( - `update "organizationsSyncRemote" set status = :status, "sourceId" = :sourceId where "id" = :id - `, - { - replacements: { - status: SyncStatus.ACTIVE, - id, - sourceId: sourceId || null, - }, - type: QueryTypes.UPDATE, - transaction, - }, - ) - } - - async findRemoteSync(integrationId: string, organizationId: string, syncFrom: string) { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `SELECT * - FROM "organizationsSyncRemote" - WHERE "integrationId" = :integrationId and "organizationId" = :organizationId and "syncFrom" = :syncFrom; - `, - { - replacements: { - integrationId, - organizationId, - syncFrom, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - if (records.length === 0) { - return null - } - - return records[0] - } - - async markOrganizationForSyncing( - data: IOrganizationSyncRemoteData, - ): Promise { - const transaction = SequelizeRepository.getTransaction(this.options) - - const existingSyncRemote = await this.findByOrganizationId(data.organizationId) - - if (existingSyncRemote) { - data.sourceId = existingSyncRemote.sourceId - } - - const existingManualSyncRemote = await this.findRemoteSync( - data.integrationId, - data.organizationId, - data.syncFrom, - ) - - if (existingManualSyncRemote) { - await this.startManualSync(existingManualSyncRemote.id, data.sourceId) - return existingManualSyncRemote - } - - const organizationSyncRemoteInserted = await this.options.database.sequelize.query( - `insert into "organizationsSyncRemote" ("id", "organizationId", "sourceId", "integrationId", "syncFrom", "metaData", "lastSyncedAt", "status") - VALUES - (:id, :organizationId, :sourceId, :integrationId, :syncFrom, :metaData, :lastSyncedAt, :status) - returning "id" - `, - { - replacements: { - id: uuid(), - organizationId: data.organizationId, - integrationId: data.integrationId, - syncFrom: data.syncFrom, - metaData: data.metaData, - lastSyncedAt: data.lastSyncedAt || null, - sourceId: data.sourceId || null, - status: SyncStatus.ACTIVE, - }, - type: QueryTypes.INSERT, - transaction, - }, - ) - - const organizationSyncRemote = await this.findById(organizationSyncRemoteInserted[0][0].id) - return organizationSyncRemote - } - - async destroyAllAutomation(automationIds: string[]): Promise { - const transaction = this.transaction - - const seq = this.seq - - const query = ` - delete - from "organizationsSyncRemote" - where "syncFrom" in (:automationIds);` - - await seq.query(query, { - replacements: { - automationIds, - }, - type: QueryTypes.DELETE, - transaction, - }) - } - - async destroyAllIntegration(integrationIds: string[]): Promise { - const transaction = this.transaction - - const seq = this.seq - - const query = ` - delete - from "organizationsSyncRemote" - where "integrationId" in (:integrationIds);` - - await seq.query(query, { - replacements: { - integrationIds, - }, - type: QueryTypes.DELETE, - transaction, - }) - } - - async findOrganizationManualSync(organizationId: string) { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `select i.platform, osr.status from "organizationsSyncRemote" osr - inner join integrations i on osr."integrationId" = i.id - where osr."syncFrom" = :syncFrom and osr."organizationId" = :organizationId; - `, - { - replacements: { - organizationId, - syncFrom: 'manual', - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - return records - } - - async findByOrganizationId(organizationId: string): Promise { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `SELECT * - FROM "organizationsSyncRemote" - WHERE "organizationId" = :organizationId - and "sourceId" is not null - limit 1; - `, - { - replacements: { - organizationId, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - if (records.length === 0) { - return null - } - - return records[0] - } - - async findById(id: string): Promise { - const transaction = SequelizeRepository.getTransaction(this.options) - - const records = await this.options.database.sequelize.query( - `SELECT * - FROM "organizationsSyncRemote" - WHERE id = :id; - `, - { - replacements: { - id, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - if (records.length === 0) { - return null - } - - return records[0] - } -} - -export default OrganizationSyncRemoteRepository diff --git a/backend/src/database/repositories/types/automationTypes.ts b/backend/src/database/repositories/types/automationTypes.ts deleted file mode 100644 index 3a9da8ced7..0000000000 --- a/backend/src/database/repositories/types/automationTypes.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { - AutomationExecutionState, - AutomationSettings, - AutomationState, - AutomationSyncTrigger, - AutomationTrigger, - AutomationType, -} from '@crowd/types' - -export interface DbAutomationInsertData { - name: string - type: AutomationType - trigger: AutomationTrigger | AutomationSyncTrigger - settings: AutomationSettings - state: AutomationState -} - -export interface DbAutomationUpdateData { - name: string - trigger: AutomationTrigger - settings: AutomationSettings - state: AutomationState -} - -export interface DbAutomationExecutionInsertData { - automationId: string - type: AutomationType - tenantId: string - trigger: AutomationTrigger | AutomationSyncTrigger - state: AutomationExecutionState - error: any | null - executedAt: Date - eventId: string - payload: any -} diff --git a/backend/src/security/permissions.ts b/backend/src/security/permissions.ts index 6594004e60..3927e4432a 100644 --- a/backend/src/security/permissions.ts +++ b/backend/src/security/permissions.ts @@ -111,22 +111,6 @@ class Permissions { id: 'activityAutocomplete', allowedRoles: [roles.admin, roles.projectAdmin, roles.readonly], }, - automationCreate: { - id: 'automationCreate', - allowedRoles: [roles.admin, roles.projectAdmin], - }, - automationUpdate: { - id: 'automationUpdate', - allowedRoles: [roles.admin, roles.projectAdmin], - }, - automationDestroy: { - id: 'automationDestroy', - allowedRoles: [roles.admin, roles.projectAdmin], - }, - automationRead: { - id: 'automationRead', - allowedRoles: [roles.admin, roles.projectAdmin, roles.readonly], - }, tagImport: { id: 'tagImport', allowedRoles: [roles.admin, roles.projectAdmin], diff --git a/backend/src/serverless/utils/queueService.ts b/backend/src/serverless/utils/queueService.ts index edd732002a..30b42239c8 100644 --- a/backend/src/serverless/utils/queueService.ts +++ b/backend/src/serverless/utils/queueService.ts @@ -2,7 +2,6 @@ import { DataSinkWorkerEmitter, IntegrationRunWorkerEmitter, IntegrationStreamWorkerEmitter, - IntegrationSyncWorkerEmitter, QueuePriorityContextLoader, SearchSyncWorkerEmitter, } from '@crowd/common_services' @@ -93,20 +92,6 @@ export const getSearchSyncWorkerEmitter = async (): Promise => { - if (integrationSyncWorkerEmitter) return integrationSyncWorkerEmitter - - integrationSyncWorkerEmitter = new IntegrationSyncWorkerEmitter( - QUEUE_CLIENT(), - await REDIS_CLIENT(), - await QUEUE_PRIORITY_LOADER(), - log, - ) - await integrationSyncWorkerEmitter.init() - return integrationSyncWorkerEmitter -} - let dataSinkWorkerEmitter: DataSinkWorkerEmitter export const getDataSinkWorkerEmitter = async (): Promise => { if (dataSinkWorkerEmitter) return dataSinkWorkerEmitter diff --git a/backend/src/services/activityService.ts b/backend/src/services/activityService.ts index 2b4807682c..e92f1c12ce 100644 --- a/backend/src/services/activityService.ts +++ b/backend/src/services/activityService.ts @@ -19,20 +19,13 @@ import { import { optionsQx } from '@crowd/data-access-layer/src/queryExecutor' import { ActivityDisplayService } from '@crowd/integrations' import { LoggerBase, logExecutionTime } from '@crowd/logging' -import { WorkflowIdReusePolicy } from '@crowd/temporal' -import { - IMemberIdentity, - IntegrationResultType, - PlatformType, - SegmentData, - TemporalWorkflowId, -} from '@crowd/types' +import { IMemberIdentity, IntegrationResultType, PlatformType, SegmentData } from '@crowd/types' import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions' import OrganizationRepository from '@/database/repositories/organizationRepository' import { getDataSinkWorkerEmitter } from '@/serverless/utils/queueService' -import { GITHUB_CONFIG, IS_DEV_ENV, IS_TEST_ENV, TEMPORAL_CONFIG } from '../conf' +import { GITHUB_CONFIG, IS_DEV_ENV, IS_TEST_ENV } from '../conf' import ActivityRepository from '../database/repositories/activityRepository' import MemberRepository from '../database/repositories/memberRepository' import SegmentRepository from '../database/repositories/segmentRepository' @@ -251,42 +244,6 @@ export default class ActivityService extends LoggerBase { }) } - if (!existing && fireCrowdWebhooks) { - try { - const handle = await this.options.temporal.workflow.start( - 'processNewActivityAutomation', - { - workflowId: `${TemporalWorkflowId.NEW_ACTIVITY_AUTOMATION}/${record.id}`, - taskQueue: TEMPORAL_CONFIG.automationsTaskQueue, - workflowIdReusePolicy: - WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, - retry: { - maximumAttempts: 100, - }, - args: [ - { - tenantId: this.options.currentTenant.id, - activityId: record.id, - }, - ], - searchAttributes: { - TenantId: [this.options.currentTenant.id], - }, - }, - ) - this.log.info( - { workflowId: handle.workflowId }, - 'Started temporal workflow to process new activity automation!', - ) - } catch (err) { - this.log.error( - err, - { activityId: record.id }, - 'Error triggering new activity automation!', - ) - } - } - if (!fireCrowdWebhooks) { this.log.info('Ignoring outgoing webhooks because of fireCrowdWebhooks!') } diff --git a/backend/src/services/auth/passportStrategies/slackStrategy.ts b/backend/src/services/auth/passportStrategies/slackStrategy.ts index 7e12b1c5d8..7f727116ce 100644 --- a/backend/src/services/auth/passportStrategies/slackStrategy.ts +++ b/backend/src/services/auth/passportStrategies/slackStrategy.ts @@ -3,7 +3,7 @@ import SlackStrategy from 'passport-slack' import { PlatformType } from '@crowd/types' -import { API_CONFIG, SLACK_CONFIG, SLACK_NOTIFIER_CONFIG } from '../../../conf' +import { API_CONFIG, SLACK_CONFIG } from '../../../conf' export function getSlackStrategy(): SlackStrategy { return new SlackStrategy.Strategy( @@ -41,27 +41,3 @@ export function getSlackStrategy(): SlackStrategy { }, ) } -export function getSlackNotifierStrategy(): SlackStrategy { - return new SlackStrategy.Strategy( - { - clientID: SLACK_NOTIFIER_CONFIG.clientId, - clientSecret: SLACK_NOTIFIER_CONFIG.clientSecret, - callbackURL: `${API_CONFIG.url}/tenant/automation/slack/callback`, - skipUserProfile: true, - }, - (req, accessToken, webhookData, profile, done) => { - if (!done) { - throw new TypeError( - 'Missing req in verifyCallback; did you enable passReqToCallback in your strategy?', - ) - } - return done(null, { - accessToken: webhookData.access_token, - url: webhookData.incoming_webhook.url, - configurationUrl: webhookData.incoming_webhook.url, - channelId: webhookData.incoming_webhook.url, - channelName: webhookData.incoming_webhook.channel, - }) - }, - ) -} diff --git a/backend/src/services/automationExecutionService.ts b/backend/src/services/automationExecutionService.ts deleted file mode 100644 index cc8ad274b1..0000000000 --- a/backend/src/services/automationExecutionService.ts +++ /dev/null @@ -1,78 +0,0 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ - -/* eslint-disable class-methods-use-this */ -import { PageData } from '@crowd/types' - -import AutomationExecutionRepository from '../database/repositories/automationExecutionRepository' -import SequelizeRepository from '../database/repositories/sequelizeRepository' -import { - AutomationExecution, - AutomationExecutionCriteria, - CreateAutomationExecutionRequest, -} from '../types/automationTypes' - -import { IServiceOptions } from './IServiceOptions' -import { ServiceBase } from './serviceBase' - -export default class AutomationExecutionService extends ServiceBase< - AutomationExecution, - string, - CreateAutomationExecutionRequest, - unknown, - AutomationExecutionCriteria -> { - public constructor(options: IServiceOptions) { - super(options) - } - - /** - * Method used by service that is processing automations as they are triggered - * @param data {CreateAutomationExecutionRequest} all the necessary data to log a new automation execution - */ - override async create(data: CreateAutomationExecutionRequest): Promise { - const transaction = await SequelizeRepository.createTransaction(this.options) - - try { - const record = await new AutomationExecutionRepository(this.options).create({ - automationId: data.automation.id, - type: data.automation.type, - tenantId: data.automation.tenantId, - trigger: data.automation.trigger, - error: data.error !== undefined ? data.error : null, - executedAt: new Date(), - state: data.state, - eventId: data.eventId, - payload: data.payload, - }) - await SequelizeRepository.commitTransaction(transaction) - - return record - } catch (error) { - await SequelizeRepository.rollbackTransaction(transaction) - throw error - } - } - - /** - * Method used to fetch all automation executions. - * @param criteria {AutomationExecutionCriteria} filters to be used when returning automation executions - * @returns {PageData>} - */ - override async findAndCountAll( - criteria: AutomationExecutionCriteria, - ): Promise> { - return new AutomationExecutionRepository(this.options).findAndCountAll(criteria) - } - - override async update(id: string, data: unknown): Promise { - throw new Error('Method not implemented.') - } - - override async destroyAll(ids: string[]): Promise { - throw new Error('Method not implemented.') - } - - override async findById(id: string): Promise { - throw new Error('Method not implemented.') - } -} diff --git a/backend/src/services/automationService.ts b/backend/src/services/automationService.ts deleted file mode 100644 index b2635af2c5..0000000000 --- a/backend/src/services/automationService.ts +++ /dev/null @@ -1,197 +0,0 @@ -import { Error404 } from '@crowd/common' -import { - AutomationState, - AutomationSyncTrigger, - AutomationType, - IAutomationData, - PageData, - PlatformType, -} from '@crowd/types' - -import AutomationExecutionRepository from '@/database/repositories/automationExecutionRepository' -import IntegrationRepository from '@/database/repositories/integrationRepository' -import MemberSyncRemoteRepository from '@/database/repositories/memberSyncRemoteRepository' -import OrganizationSyncRemoteRepository from '@/database/repositories/organizationSyncRemoteRepository' -import { getIntegrationSyncWorkerEmitter } from '@/serverless/utils/queueService' - -import AutomationRepository from '../database/repositories/automationRepository' -import SequelizeRepository from '../database/repositories/sequelizeRepository' -import { - AutomationCriteria, - CreateAutomationRequest, - UpdateAutomationRequest, -} from '../types/automationTypes' - -import { IServiceOptions } from './IServiceOptions' -import { ServiceBase } from './serviceBase' - -export default class AutomationService extends ServiceBase< - IAutomationData, - string, - CreateAutomationRequest, - UpdateAutomationRequest, - AutomationCriteria -> { - public constructor(options: IServiceOptions) { - super(options) - } - - /** - * Creates a new active automation - * @param req {CreateAutomationRequest} data used to create a new automation - * @returns {IAutomationData} object for frontend to use - */ - override async create(req: CreateAutomationRequest): Promise { - const txOptions = await this.getTxRepositoryOptions() - - try { - // create an automation - const result = await new AutomationRepository(txOptions).create({ - ...req, - state: AutomationState.ACTIVE, - }) - - // check automation type, if hubspot trigger an automation onboard - if (req.type === AutomationType.HUBSPOT) { - let integration - - try { - integration = await IntegrationRepository.findByPlatform(PlatformType.HUBSPOT, { - ...this.options, - }) - } catch (err) { - this.options.log.error(err, 'Error while fetching HubSpot integration from DB!') - throw new Error404() - } - - // enable sync remote for integration - integration = await IntegrationRepository.update( - integration.id, - { - settings: { - ...integration.settings, - syncRemoteEnabled: true, - }, - }, - txOptions, - ) - - const integrationSyncWorkerEmitter = await getIntegrationSyncWorkerEmitter() - await integrationSyncWorkerEmitter.triggerOnboardAutomation( - this.options.currentTenant.id, - integration.id, - result.id, - req.trigger as AutomationSyncTrigger, - ) - } - - await SequelizeRepository.commitTransaction(txOptions.transaction) - - return result - } catch (error) { - await SequelizeRepository.rollbackTransaction(txOptions.transaction) - throw error - } - } - - /** - * Updates an existing automation. - * Also used to change automation state - to enable or disable an automation. - * It updates all the columns at once so all the properties in the request parameter - * have to be filled. - * @param id of the existing automation that is being updated - * @param req {UpdateAutomationRequest} data used to update an existing automation - * @returns {IAutomationData} object for frontend to use - */ - override async update(id: string, req: UpdateAutomationRequest): Promise { - const txOptions = await this.getTxRepositoryOptions() - - try { - // update an existing automation including its state - const result = await new AutomationRepository(txOptions).update(id, req) - await SequelizeRepository.commitTransaction(txOptions.transaction) - // check automation type, if hubspot trigger an automation onboard - if (result.type === AutomationType.HUBSPOT) { - let integration - - try { - integration = await IntegrationRepository.findByPlatform(PlatformType.HUBSPOT, { - ...this.options, - }) - } catch (err) { - this.options.log.error(err, 'Error while fetching HubSpot integration from DB!') - throw new Error404() - } - - if ( - result.trigger === AutomationSyncTrigger.MEMBER_ATTRIBUTES_MATCH || - result.trigger === AutomationSyncTrigger.ORGANIZATION_ATTRIBUTES_MATCH - ) { - if (result.state === AutomationState.ACTIVE) { - const integrationSyncWorkerEmitter = await getIntegrationSyncWorkerEmitter() - await integrationSyncWorkerEmitter.triggerOnboardAutomation( - this.options.currentTenant.id, - integration.id, - result.id, - result.trigger as AutomationSyncTrigger, - ) - } else if (result.trigger === AutomationSyncTrigger.MEMBER_ATTRIBUTES_MATCH) { - // disable memberSyncRemote for given automationId - const syncRepo = new MemberSyncRemoteRepository(this.options) - await syncRepo.stopSyncingAutomation(result.id) - } else if (result.trigger === AutomationSyncTrigger.ORGANIZATION_ATTRIBUTES_MATCH) { - // disable organizationSyncRemote for given automationId - const syncRepo = new OrganizationSyncRemoteRepository(this.options) - await syncRepo.stopSyncingAutomation(result.id) - } - } - } - return result - } catch (error) { - await SequelizeRepository.rollbackTransaction(txOptions.transaction) - throw error - } - } - - /** - * Method used to fetch all tenants automation with filters available in the criteria parameter - * @param criteria {AutomationCriteria} filters to be used when returning automations - * @returns {PageData>} - */ - override async findAndCountAll(criteria: AutomationCriteria): Promise> { - return new AutomationRepository(this.options).findAndCountAll(criteria) - } - - /** - * Method used to fetch a single automation by its id - * @param id automation id - * @returns {IAutomationData} - */ - override async findById(id: string): Promise { - return new AutomationRepository(this.options).findById(id) - } - - /** - * Deletes existing automations by id - * @param ids automation unique IDs to be deleted - */ - override async destroyAll(ids: string[]): Promise { - const txOptions = await this.getTxRepositoryOptions() - - try { - // delete automation executions - await new AutomationExecutionRepository(txOptions).destroyAllAutomation(ids) - - // delete syncRemote rows coming from automations - await new MemberSyncRemoteRepository(txOptions).destroyAllAutomation(ids) - await new OrganizationSyncRemoteRepository(txOptions).destroyAllAutomation(ids) - - const result = await new AutomationRepository(txOptions).destroyAll(ids) - await SequelizeRepository.commitTransaction(txOptions.transaction) - return result - } catch (error) { - await SequelizeRepository.rollbackTransaction(txOptions.transaction) - throw error - } - } -} diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index 33e9e638d6..47b19dc31d 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -5,28 +5,12 @@ import lodash from 'lodash' import moment from 'moment' import { EDITION, Error400, Error404 } from '@crowd/common' -import { MemberField, findMemberById } from '@crowd/data-access-layer/src/members' -import { - HubspotEndpoint, - HubspotEntity, - HubspotFieldMapperFactory, - IHubspotManualSyncPayload, - IHubspotOnboardingSettings, - IHubspotProperty, - IHubspotTokenInfo, - IProcessStreamContext, - getHubspotLists, - getHubspotProperties, - getHubspotTokenInfo, -} from '@crowd/integrations' import { RedisCache } from '@crowd/redis' import { Edition, PlatformType } from '@crowd/types' import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions' import GitlabReposRepository from '@/database/repositories/gitlabReposRepository' import IntegrationProgressRepository from '@/database/repositories/integrationProgressRepository' -import MemberSyncRemoteRepository from '@/database/repositories/memberSyncRemoteRepository' -import OrganizationSyncRemoteRepository from '@/database/repositories/organizationSyncRemoteRepository' import { IntegrationProgress } from '@/serverless/integrations/types/regularTypes' import { fetchAllGitlabGroups, @@ -49,13 +33,10 @@ import { GITLAB_CONFIG, IS_TEST_ENV, KUBE_MODE, - NANGO_CONFIG, } from '../conf/index' import GithubReposRepository from '../database/repositories/githubReposRepository' import IntegrationRepository from '../database/repositories/integrationRepository' -import MemberAttributeSettingsRepository from '../database/repositories/memberAttributeSettingsRepository' import SequelizeRepository from '../database/repositories/sequelizeRepository' -import TenantRepository from '../database/repositories/tenantRepository' import telemetryTrack from '../segment/telemetryTrack' import track from '../segment/track' import { ILinkedInOrganization } from '../serverless/integrations/types/linkedinTypes' @@ -65,15 +46,10 @@ import { } from '../serverless/integrations/usecases/github/rest/getRemoteStats' import { getOrganizations } from '../serverless/integrations/usecases/linkedin/getOrganizations' import getToken from '../serverless/integrations/usecases/nango/getToken' -import { - getIntegrationRunWorkerEmitter, - getIntegrationSyncWorkerEmitter, -} from '../serverless/utils/queueService' +import { getIntegrationRunWorkerEmitter } from '../serverless/utils/queueService' import { encryptData } from '../utils/crypto' import { IServiceOptions } from './IServiceOptions' -import OrganizationService from './organizationService' -import SearchSyncService from './searchSyncService' const discordToken = DISCORD_CONFIG.token || DISCORD_CONFIG.token2 @@ -755,509 +731,6 @@ export default class IntegrationService { throw new Error404(this.options.language, 'errors.linkedin.cantOnboardWrongStatus') } - async hubspotStopSyncMember(payload: IHubspotManualSyncPayload) { - if (!payload.memberId) { - throw new Error('memberId is required in the payload while syncing member to hubspot!') - } - - const transaction = await SequelizeRepository.createTransaction(this.options) - - try { - const qx = SequelizeRepository.getQueryExecutor(this.options, transaction) - const member = await findMemberById(qx, payload.memberId, [MemberField.ID]) - - const memberSyncRemoteRepository = new MemberSyncRemoteRepository({ - ...this.options, - transaction, - }) - await memberSyncRemoteRepository.stopMemberManualSync(member.id) - - await SequelizeRepository.commitTransaction(transaction) - } catch (err) { - this.options.log.error(err, 'Error while stopping hubspot member sync!') - await SequelizeRepository.rollbackTransaction(transaction) - throw err - } - } - - async hubspotSyncMember(payload: IHubspotManualSyncPayload) { - if (!payload.memberId) { - throw new Error('memberId is required in the payload while syncing member to hubspot!') - } - - const transaction = await SequelizeRepository.createTransaction(this.options) - - let integration - let member: { id: string } - let memberSyncRemote - - try { - integration = await IntegrationRepository.findByPlatform(PlatformType.HUBSPOT, { - ...this.options, - transaction, - }) - - const qx = SequelizeRepository.getQueryExecutor(this.options, transaction) - member = await findMemberById(qx, payload.memberId, [MemberField.ID]) - - const memberSyncRemoteRepo = new MemberSyncRemoteRepository({ ...this.options, transaction }) - - memberSyncRemote = await memberSyncRemoteRepo.markMemberForSyncing({ - integrationId: integration.id, - memberId: member.id, - metaData: null, - syncFrom: 'manual', - lastSyncedAt: null, - }) - - integration = await this.createOrUpdate( - { - platform: PlatformType.HUBSPOT, - settings: { - ...integration.settings, - syncRemoteEnabled: true, - }, - }, - transaction, - ) - - await SequelizeRepository.commitTransaction(transaction) - } catch (err) { - this.options.log.error(err, 'Error while starting Hubspot member sync!') - await SequelizeRepository.rollbackTransaction(transaction) - throw err - } - - const integrationSyncWorkerEmitter = await getIntegrationSyncWorkerEmitter() - await integrationSyncWorkerEmitter.triggerSyncMember( - this.options.currentTenant.id, - integration.id, - payload.memberId, - memberSyncRemote.id, - ) - - const searchSyncService = new SearchSyncService(this.options) - - // send it to opensearch because in member.update we bypass while passing transactions - await searchSyncService.triggerMemberSync(this.options.currentTenant.id, member.id, { - withAggs: true, - }) - } - - async hubspotStopSyncOrganization(payload: IHubspotManualSyncPayload) { - if (!payload.organizationId) { - throw new Error( - 'organizationId is required in the payload while stopping organization sync to hubspot!', - ) - } - - const transaction = await SequelizeRepository.createTransaction(this.options) - - try { - const organizationService = new OrganizationService(this.options) - - const organization = await organizationService.findById(payload.organizationId) - - const organizationSyncRemoteRepository = new OrganizationSyncRemoteRepository({ - ...this.options, - transaction, - }) - await organizationSyncRemoteRepository.stopOrganizationManualSync(organization.id) - } catch (err) { - this.options.log.error(err, 'Error while stopping Hubspot organization sync!') - await SequelizeRepository.rollbackTransaction(transaction) - throw err - } - } - - async hubspotSyncOrganization(payload: IHubspotManualSyncPayload) { - if (!payload.organizationId) { - throw new Error( - 'organizationId is required in the payload while syncing organization to hubspot!', - ) - } - - const transaction = await SequelizeRepository.createTransaction(this.options) - - let integration - let organization - let organizationSyncRemote - - try { - integration = await IntegrationRepository.findByPlatform(PlatformType.HUBSPOT, { - ...this.options, - transaction, - }) - - const organizationService = new OrganizationService(this.options) - - organization = await organizationService.findById(payload.organizationId) - - const organizationSyncRemoteRepo = new OrganizationSyncRemoteRepository({ - ...this.options, - transaction, - }) - - organizationSyncRemote = await organizationSyncRemoteRepo.markOrganizationForSyncing({ - integrationId: integration.id, - organizationId: organization.id, - metaData: null, - syncFrom: 'manual', - lastSyncedAt: null, - }) - - integration = await this.createOrUpdate( - { - platform: PlatformType.HUBSPOT, - settings: { - ...integration.settings, - syncRemoteEnabled: true, - }, - }, - transaction, - ) - - await SequelizeRepository.commitTransaction(transaction) - - const integrationSyncWorkerEmitter = await getIntegrationSyncWorkerEmitter() - await integrationSyncWorkerEmitter.triggerSyncOrganization( - this.options.currentTenant.id, - integration.id, - payload.organizationId, - organizationSyncRemote.id, - ) - } catch (err) { - this.options.log.error(err, 'Error while starting Hubspot organization sync!') - await SequelizeRepository.rollbackTransaction(transaction) - throw err - } - } - - async hubspotOnboard(onboardSettings: IHubspotOnboardingSettings) { - if (onboardSettings.enabledFor.length === 0) { - throw new Error400(this.options.language, 'errors.hubspot.missingEnabledEntities') - } - - if ( - !onboardSettings.attributesMapping.members && - !onboardSettings.attributesMapping.organizations - ) { - throw new Error400(this.options.language, 'errors.hubspot.missingAttributesMapping') - } - - if ( - onboardSettings.enabledFor.includes(HubspotEntity.MEMBERS) && - !onboardSettings.attributesMapping.members - ) { - throw new Error400(this.options.language, 'errors.hubspot.missingAttributesMapping') - } - - if ( - onboardSettings.enabledFor.includes(HubspotEntity.ORGANIZATIONS) && - !onboardSettings.attributesMapping.organizations - ) { - throw new Error400(this.options.language, 'errors.hubspot.missingAttributesMapping') - } - - const tenantId = this.options.currentTenant.id - - let integration - - try { - integration = await IntegrationRepository.findByPlatform(PlatformType.HUBSPOT, { - ...this.options, - }) - } catch (err) { - this.options.log.error(err, 'Error while fetching HubSpot integration from DB!') - throw new Error404() - } - - const memberAttributeSettings = ( - await MemberAttributeSettingsRepository.findAndCountAll({}, this.options) - ).rows - - const platforms = (await TenantRepository.getAvailablePlatforms(tenantId, this.options)).map( - (p) => p.platform, - ) - - const hubspotId = integration.settings.hubspotId - - const memberMapper = HubspotFieldMapperFactory.getFieldMapper( - HubspotEntity.MEMBERS, - hubspotId, - memberAttributeSettings, - platforms, - ) - const organizationMapper = HubspotFieldMapperFactory.getFieldMapper( - HubspotEntity.ORGANIZATIONS, - hubspotId, - ) - - // validate members - if (onboardSettings.attributesMapping.members) { - for (const field of Object.keys(onboardSettings.attributesMapping.members)) { - const hubspotProperty: IHubspotProperty = - integration.settings.hubspotProperties.members.find( - (p) => p.name === onboardSettings.attributesMapping.members[field], - ) - if (!memberMapper.isFieldMappableToHubspotType(field, hubspotProperty.type)) { - throw new Error( - `Member field ${field} has incompatible type with hubspot property ${hubspotProperty.name}`, - ) - } - } - } - - // validate organizations - if (onboardSettings.attributesMapping.organizations) { - for (const field of Object.keys(onboardSettings.attributesMapping.organizations)) { - const hubspotProperty: IHubspotProperty = - integration.settings.hubspotProperties.organizations.find( - (p) => p.name === onboardSettings.attributesMapping.organizations[field], - ) - if (!organizationMapper.isFieldMappableToHubspotType(field, hubspotProperty.type)) { - throw new Error( - `Organization field ${field} has incompatible type with hubspot property ${hubspotProperty.name}`, - ) - } - } - } - - const transaction = await SequelizeRepository.createTransaction(this.options) - - // save attribute mapping and enabledFor - try { - integration = await this.createOrUpdate( - { - platform: PlatformType.HUBSPOT, - settings: { - ...integration.settings, - attributesMapping: onboardSettings.attributesMapping, - enabledFor: onboardSettings.enabledFor, - crowdAttributes: memberAttributeSettings, - platforms, - }, - status: 'in-progress', - }, - transaction, - ) - await SequelizeRepository.commitTransaction(transaction) - } catch (err) { - await SequelizeRepository.rollbackTransaction(transaction) - throw err - } - - // Send queue message that starts the hubspot integration - const emitter = await getIntegrationRunWorkerEmitter() - await emitter.triggerIntegrationRun( - integration.tenantId, - integration.platform, - integration.id, - true, - ) - } - - async hubspotGetLists() { - const tenantId = this.options.currentTenant.id - const nangoId = `${tenantId}-${PlatformType.HUBSPOT}` - - let token: string - try { - token = await getToken(nangoId, PlatformType.HUBSPOT, this.options.log) - } catch (err) { - this.options.log.error(err, 'Error while verifying HubSpot tenant token in Nango!') - throw new Error400(this.options.language, 'errors.noNangoToken.message') - } - - if (!token) { - throw new Error400(this.options.language, 'errors.noNangoToken.message') - } - - const context = { - log: this.options.log, - serviceSettings: { - nangoId, - nangoUrl: NANGO_CONFIG.url, - nangoSecretKey: NANGO_CONFIG.secretKey, - }, - } as IProcessStreamContext - - const memberLists = await getHubspotLists(nangoId, context) - - return { - members: memberLists, - organizations: [], // hubspot doesn't support company lists yet - } - } - - async hubspotGetMappableFields() { - const memberAttributeSettings = ( - await MemberAttributeSettingsRepository.findAndCountAll({}, this.options) - ).rows - - const identities = await TenantRepository.getAvailablePlatforms( - this.options.currentTenant.id, - this.options, - ) - - // hubspotId is not used while getting the typemap, we can send it null - const memberMapper = HubspotFieldMapperFactory.getFieldMapper( - HubspotEntity.MEMBERS, - null, - memberAttributeSettings, - identities.map((i) => i.platform), - ) - const organizationMapper = HubspotFieldMapperFactory.getFieldMapper( - HubspotEntity.ORGANIZATIONS, - null, - ) - - return { - members: memberMapper.getTypeMap(), - organizations: organizationMapper.getTypeMap(), - } - } - - async hubspotUpdateProperties(): Promise { - const tenantId = this.options.currentTenant.id - const nangoId = `${tenantId}-${PlatformType.HUBSPOT}` - - let integration - - try { - integration = await IntegrationRepository.findByPlatform(PlatformType.HUBSPOT, { - ...this.options, - }) - } catch (err) { - this.options.log.error(err, 'Error while fetching HubSpot integration from DB!') - throw new Error404() - } - - let token: string - try { - token = await getToken(nangoId, PlatformType.HUBSPOT, this.options.log) - } catch (err) { - this.options.log.error(err, 'Error while verifying HubSpot tenant token in Nango!') - throw new Error400(this.options.language, 'errors.noNangoToken.message') - } - - if (!token) { - throw new Error400(this.options.language, 'errors.noNangoToken.message') - } - - const transaction = await SequelizeRepository.createTransaction(this.options) - - const context = { - log: this.options.log, - serviceSettings: { - nangoId, - nangoUrl: NANGO_CONFIG.url, - nangoSecretKey: NANGO_CONFIG.secretKey, - }, - } as IProcessStreamContext - - const hubspotMemberProperties = await getHubspotProperties( - nangoId, - HubspotEndpoint.CONTACTS, - context, - ) - const hubspotOrganizationProperties = await getHubspotProperties( - nangoId, - HubspotEndpoint.COMPANIES, - context, - ) - - try { - integration = await this.createOrUpdate( - { - platform: PlatformType.HUBSPOT, - settings: { - ...integration.settings, - updateMemberAttributes: true, - hubspotProperties: { - [HubspotEntity.MEMBERS]: hubspotMemberProperties, - [HubspotEntity.ORGANIZATIONS]: hubspotOrganizationProperties, - }, - }, - }, - transaction, - ) - await SequelizeRepository.commitTransaction(transaction) - } catch (err) { - await SequelizeRepository.rollbackTransaction(transaction) - throw err - } - - return integration.settings.hubspotProperties - } - - async hubspotConnect() { - const tenantId = this.options.currentTenant.id - const nangoId = `${tenantId}-${PlatformType.HUBSPOT}` - - let token: string - try { - token = await getToken(nangoId, PlatformType.HUBSPOT, this.options.log) - } catch (err) { - this.options.log.error(err, 'Error while verifying HubSpot tenant token in Nango!') - throw new Error400(this.options.language, 'errors.noNangoToken.message') - } - - if (!token) { - throw new Error400(this.options.language, 'errors.noNangoToken.message') - } - - const transaction = await SequelizeRepository.createTransaction(this.options) - let integration - - const context = { - log: this.options.log, - serviceSettings: { - nangoId, - nangoUrl: NANGO_CONFIG.url, - nangoSecretKey: NANGO_CONFIG.secretKey, - }, - } as IProcessStreamContext - - const hubspotMemberProperties: IHubspotProperty[] = await getHubspotProperties( - nangoId, - HubspotEndpoint.CONTACTS, - context, - ) - - const hubspotOrganizationProperties: IHubspotProperty[] = await getHubspotProperties( - nangoId, - HubspotEndpoint.COMPANIES, - context, - ) - - const hubspotInfo: IHubspotTokenInfo = await getHubspotTokenInfo(nangoId, context) - - try { - integration = await this.createOrUpdate( - { - platform: PlatformType.HUBSPOT, - settings: { - updateMemberAttributes: true, - hubspotProperties: { - [HubspotEntity.MEMBERS]: hubspotMemberProperties, - [HubspotEntity.ORGANIZATIONS]: hubspotOrganizationProperties, - }, - hubspotId: hubspotInfo.hub_id, - }, - status: 'pending-action', - }, - transaction, - ) - await SequelizeRepository.commitTransaction(transaction) - } catch (err) { - await SequelizeRepository.rollbackTransaction(transaction) - throw err - } - - return integration - } - async linkedinConnect() { const tenantId = this.options.currentTenant.id const nangoId = `${tenantId}-${PlatformType.LINKEDIN}` diff --git a/backend/src/services/memberService.ts b/backend/src/services/memberService.ts index 279f79b780..c7d09d85cb 100644 --- a/backend/src/services/memberService.ts +++ b/backend/src/services/memberService.ts @@ -27,7 +27,6 @@ import { QueryExecutor, optionsQx } from '@crowd/data-access-layer/src/queryExec // import { getActivityCountOfMemberIdentities } from '@crowd/data-access-layer' import { fetchManySegments } from '@crowd/data-access-layer/src/segments' import { LoggerBase } from '@crowd/logging' -import { WorkflowIdReusePolicy } from '@crowd/temporal' import { IMemberIdentity, IMemberRoleWithOrganization, @@ -43,10 +42,8 @@ import { MergeActionType, OrganizationIdentityType, SyncMode, - TemporalWorkflowId, } from '@crowd/types' -import { TEMPORAL_CONFIG } from '@/conf' import MemberOrganizationRepository from '@/database/repositories/memberOrganizationRepository' import { MergeActionsRepository } from '@/database/repositories/mergeActionsRepository' import OrganizationRepository from '@/database/repositories/organizationRepository' @@ -539,35 +536,6 @@ export default class MemberService extends LoggerBase { await searchSyncService.triggerMemberSync(this.options.currentTenant.id, record.id) } - if (!existing && fireCrowdWebhooks) { - try { - const handle = await this.options.temporal.workflow.start('processNewMemberAutomation', { - workflowId: `${TemporalWorkflowId.NEW_MEMBER_AUTOMATION}/${record.id}`, - taskQueue: TEMPORAL_CONFIG.automationsTaskQueue, - workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, - retry: { - maximumAttempts: 100, - }, - - args: [ - { - tenantId: this.options.currentTenant.id, - memberId: record.id, - }, - ], - searchAttributes: { - TenantId: [this.options.currentTenant.id], - }, - }) - this.log.info( - { workflowId: handle.workflowId }, - 'Started temporal workflow to process new member automation!', - ) - } catch (err) { - logger.error(err, `Error triggering new member automation - ${record.id}!`) - } - } - if (!fireCrowdWebhooks) { this.log.info('Ignoring outgoing webhooks because of fireCrowdWebhooks!') } diff --git a/backend/src/types/automationTypes.ts b/backend/src/types/automationTypes.ts deleted file mode 100644 index 9278820077..0000000000 --- a/backend/src/types/automationTypes.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { - AutomationExecutionState, - AutomationSettings, - AutomationState, - AutomationSyncTrigger, - AutomationTrigger, - AutomationType, - IAutomationData, - SearchCriteria, -} from '@crowd/types' - -/** - * This data is used to create a new automation - */ -export interface CreateAutomationRequest { - name: string - type: AutomationType - trigger: AutomationTrigger | AutomationSyncTrigger - settings: AutomationSettings -} - -/** - * This data is used to update an existing automation - */ -export interface UpdateAutomationRequest { - name: string - trigger: AutomationTrigger - settings: AutomationSettings - state: AutomationState -} - -/** - * What filters we have available to list all automations - */ -export interface AutomationCriteria extends SearchCriteria { - id?: string - type?: AutomationType - trigger?: AutomationTrigger - state?: AutomationState -} - -export interface CreateAutomationExecutionRequest { - automation: IAutomationData - eventId: string - payload: any - state: AutomationExecutionState - error?: any -} - -/** - * Data about specific automation execution that was processed when a trigger was detected - */ -export interface AutomationExecution { - id: string - automationId: string - state: AutomationExecutionState - error: any | null - executedAt: string - eventId: string - payload: any -} - -/** - * What filters we have available to list all automations - */ -export interface AutomationExecutionCriteria extends SearchCriteria { - automationId: string -} diff --git a/frontend/src/modules/admin/modules/integration/components/integration-list-item.vue b/frontend/src/modules/admin/modules/integration/components/integration-list-item.vue index 47b2596757..ed694956e7 100644 --- a/frontend/src/modules/admin/modules/integration/components/integration-list-item.vue +++ b/frontend/src/modules/admin/modules/integration/components/integration-list-item.vue @@ -47,6 +47,7 @@
{{ props.config.name }} integration failed to connect due to an API error.
+ + -
- +
+