diff --git a/consumer-server/package.json b/consumer-server/package.json index 65432984..7c28e810 100644 --- a/consumer-server/package.json +++ b/consumer-server/package.json @@ -19,7 +19,7 @@ "@aws-sdk/client-sqs": "^3.489.0", "@aws-sdk/lib-storage": "^3.499.0", "@dcl/platform-server-commons": "^0.0.4", - "@dcl/schemas": "^10.3.0", + "@dcl/schemas": "https://sdk-team-cdn.decentraland.org/@dcl/schemas/branch/feat/catalystdeployment-event-enhance/dcl-schemas-14.1.1-11298178941.commit-64eb184.tgz", "@well-known-components/env-config-provider": "^1.2.0", "@well-known-components/fetch-component": "^2.0.2", "@well-known-components/http-server": "^2.1.0", diff --git a/consumer-server/src/controllers/handlers/reprocess-handler.ts b/consumer-server/src/controllers/handlers/reprocess-handler.ts index 4024cf2c..d90184d2 100644 --- a/consumer-server/src/controllers/handlers/reprocess-handler.ts +++ b/consumer-server/src/controllers/handlers/reprocess-handler.ts @@ -1,6 +1,7 @@ import { IHttpServerComponent } from '@well-known-components/interfaces' -import { HandlerContextWithPath } from '../../types' +import { HandlerContextWithPath, QueueMessage } from '../../types' import { InvalidRequestError } from '@dcl/platform-server-commons' +import { Events } from '@dcl/schemas' function validatePointers(pointers: string[]) { if (!pointers.length) { @@ -16,10 +17,10 @@ function validatePointers(pointers: string[]) { } export async function reprocessHandler( - context: Pick, 'components' | 'request'> + context: Pick, 'components' | 'request'> ): Promise { const { - components: { logs, sceneFetcher, queue }, + components: { logs, config, sceneFetcher, queue }, request } = context @@ -30,24 +31,23 @@ export async function reprocessHandler( validatePointers(pointers) + const catalystUrl = await config.requireString('CATALYST_URL') + try { const entities = await sceneFetcher.fetchByPointers(pointers) logger.info('Reprocessing pointers', { pointers: pointers.join(', '), entitiesAmount: entities.length }) for (const entity of entities) { - const message = { - entity: { - entityType: entity.type, - entityId: entity.id, - entityTimestamp: entity.timestamp, - metadata: { - scene: { - base: entity.metadata.scene.base - } - } - } + const message: QueueMessage = { + type: Events.Type.CATALYST_DEPLOYMENT, + subType: entity.type, + key: entity.id, + timestamp: entity.timestamp, + contentServerUrls: entity.contentServerUrls || [catalystUrl], + entity } + logger.debug('Publishing message to queue', { message: JSON.stringify(message) }) await queue.send(message) } diff --git a/consumer-server/src/logic/bundle-triggerer.ts b/consumer-server/src/logic/bundle-triggerer.ts index 853dc269..76616ce5 100644 --- a/consumer-server/src/logic/bundle-triggerer.ts +++ b/consumer-server/src/logic/bundle-triggerer.ts @@ -1,5 +1,4 @@ -import { AppComponents, BundleTriggererComponent } from '../types' -import { AuthLinkType } from '@dcl/schemas' +import { AppComponents, BundleTriggererComponent, QueueMessage } from '../types' import { Response } from '@well-known-components/interfaces' export async function createBundleTriggererComponent({ @@ -8,19 +7,10 @@ export async function createBundleTriggererComponent({ }: Pick): Promise { const abToken = await config.requireString('AB_TOKEN') - async function queueGeneration(entityId: string, lods: string[], abServer: string): Promise { + async function queueGeneration(message: QueueMessage, lods: string[], abServer: string): Promise { const body = JSON.stringify({ lods: lods.map((lod) => lod.replace('%2C', ',')), - entity: { - entityId, - authChain: [ - { - type: AuthLinkType.SIGNER, - payload: '0x0000000000000000000000000000000000000000', - signature: '' - } - ] - } + ...message }) const headers = { 'Content-Type': 'application/json', Authorization: abToken } diff --git a/consumer-server/src/logic/message-processor.ts b/consumer-server/src/logic/message-processor.ts index 9e655fd4..873975ac 100644 --- a/consumer-server/src/logic/message-processor.ts +++ b/consumer-server/src/logic/message-processor.ts @@ -31,7 +31,7 @@ export async function createMessageProcesorComponent({ async function reQueue(message: QueueMessage): Promise { const retry = (message._retry || 0) + 1 logger.info('Re-queuing message', { - entityId: message.entity.entityId, + entityId: message.entity.id, base: message.entity.metadata.scene.base, retry }) @@ -44,7 +44,7 @@ export async function createMessageProcesorComponent({ } function isInvalid(message: QueueMessage): boolean { - return message.entity.entityType !== 'scene' || !message.entity.metadata?.scene?.base || !message.entity.entityId + return message.entity.type !== 'scene' || !message.entity.metadata?.scene?.base || !message.entity.id } async function process(message: QueueMessage, receiptMessageHandle: string): Promise { @@ -59,7 +59,7 @@ export async function createMessageProcesorComponent({ return } - const entityId = message.entity.entityId + const entityId = message.entity.id const base = message.entity.metadata.scene.base if (RoadCoordinates.includes(base)) { logger.debug('Skipping process since it is a road', { @@ -70,31 +70,6 @@ export async function createMessageProcesorComponent({ return } - /* - const alreadyUploadedFiles = await storage.getFiles(`${base}/LOD/Sources/${message.entity.entityTimestamp.toString()}`) - - - if (!!alreadyUploadedFiles.length) { - const lastUploadDate = alreadyUploadedFiles.reduce((acc, file) => { - if (!file.lastModified) return acc - return file.lastModified > acc ? file.lastModified : acc - }, new Date(0)) - - const currentDate = new Date() - const diff = currentDate.getTime() - lastUploadDate.getTime() - const diffDays = diff / (1000 * 3600 * 24) - if (diffDays < 3) { - logger.debug('Skipping process since it was already processed within the last 3 days', { - entityId, - base, - lastUploadDate: lastUploadDate.toISOString(), - currentDate: currentDate.toISOString() - }) - await queue.deleteMessage(receiptMessageHandle) - return - } - }*/ - logger.info('Processing scene deployment', { entityId, base, @@ -148,7 +123,7 @@ export async function createMessageProcesorComponent({ const uploadedFiles = await storage.storeFiles( lodGenerationResult.lodsFiles, - `${base}/LOD/Sources/${message.entity.entityTimestamp.toString()}` + `${base}/LOD/Sources/${message.entity.timestamp.toString()}` ) logger.info('Publishing message to AssetBundle converter', { entityId, base }) @@ -158,7 +133,7 @@ export async function createMessageProcesorComponent({ await storage.deleteFailureDirectory(base) } catch (error: any) { logger.error('Unexpected failure while handling message from queue', { - entityId: message.entity.entityId, + entityId: message.entity.id, base: message.entity.metadata.scene.base, attempt: retry + 1, error: error.message @@ -175,7 +150,7 @@ export async function createMessageProcesorComponent({ await reQueue(message) } else { logger.warn('Max attempts reached, message will not be retried', { - entityId: message.entity.entityId, + entityId: message.entity.id, base: message.entity.metadata.scene.base, attempt: retry }) diff --git a/consumer-server/src/types.ts b/consumer-server/src/types.ts index c2ecb67c..2042ce34 100644 --- a/consumer-server/src/types.ts +++ b/consumer-server/src/types.ts @@ -10,6 +10,7 @@ import { metricDeclarations } from './metrics' import { Message } from '@aws-sdk/client-sqs' import { Response } from '@well-known-components/interfaces' +import { CatalystDeploymentEvent } from '@dcl/schemas' export type GlobalContext = { components: BaseComponents @@ -54,17 +55,7 @@ export type HandlerContextWithPath< export type Context = IHttpServerComponent.PathAwareContext -export type QueueMessage = { - entity: { - entityType: string - entityId: string - entityTimestamp: number - metadata: { - scene: { - base: string - } - } - } +export type QueueMessage = CatalystDeploymentEvent & { /** * This metadata property keeps track of the number of times the same message has been retried. * 'undefined' means that the message was not retried yet. diff --git a/consumer-server/yarn.lock b/consumer-server/yarn.lock index 165a47a5..9f70f781 100644 --- a/consumer-server/yarn.lock +++ b/consumer-server/yarn.lock @@ -1038,23 +1038,23 @@ "@well-known-components/interfaces" "^1.4.2" node-fetch "^2.7.0" -"@dcl/schemas@^10.3.0": - version "10.4.0" - resolved "https://registry.yarnpkg.com/@dcl/schemas/-/schemas-10.4.0.tgz#8cb19d6710635938c1746c4e4b8eed9a1bbce43b" - integrity sha512-gd69ProYNRxU4YqY7hruLlWdtfHfAuBdn10LIg0kPayvIraBX78emulVFy47r7qDRiqscRn6+1KLyIfMkXEBdA== +"@dcl/schemas@^9.0.0", "@dcl/schemas@^9.2.0": + version "9.15.0" + resolved "https://registry.yarnpkg.com/@dcl/schemas/-/schemas-9.15.0.tgz#81337faa396d21a2d1e704e5ec3cfd7b7b14343e" + integrity sha512-nip5rsOcJplNfBWeImwezuHLprM0gLW03kEeqGIvT9J6HnEBTtvIwkk9+NSt7hzFKEvWGI+C23vyNWbG3nU+SQ== dependencies: ajv "^8.11.0" ajv-errors "^3.0.0" ajv-keywords "^5.1.0" -"@dcl/schemas@^9.0.0", "@dcl/schemas@^9.2.0": - version "9.15.0" - resolved "https://registry.yarnpkg.com/@dcl/schemas/-/schemas-9.15.0.tgz#81337faa396d21a2d1e704e5ec3cfd7b7b14343e" - integrity sha512-nip5rsOcJplNfBWeImwezuHLprM0gLW03kEeqGIvT9J6HnEBTtvIwkk9+NSt7hzFKEvWGI+C23vyNWbG3nU+SQ== +"@dcl/schemas@https://sdk-team-cdn.decentraland.org/@dcl/schemas/branch/feat/catalystdeployment-event-enhance/dcl-schemas-14.1.1-11298178941.commit-64eb184.tgz": + version "14.1.1-11298178941.commit-64eb184" + resolved "https://sdk-team-cdn.decentraland.org/@dcl/schemas/branch/feat/catalystdeployment-event-enhance/dcl-schemas-14.1.1-11298178941.commit-64eb184.tgz#60ef6f3512e99b8a313372879fc0d1dc6a0cea39" dependencies: ajv "^8.11.0" ajv-errors "^3.0.0" ajv-keywords "^5.1.0" + mitt "^3.0.1" "@eslint-community/eslint-utils@^4.2.0": version "4.4.0" @@ -4362,7 +4362,7 @@ minimatch@^9.0.1: resolved "https://registry.yarnpkg.com/minipass/-/minipass-7.0.4.tgz#dbce03740f50a4786ba994c1fb908844d27b038c" integrity sha512-jYofLM5Dam9279rdkWzqHozUo4ybjdZmCsDHePy5V/PbBcVMiSZR97gmAy45aqi8CK1lG2ECd356FU86avfwUQ== -mitt@^3.0.0: +mitt@^3.0.0, mitt@^3.0.1: version "3.0.1" resolved "https://registry.yarnpkg.com/mitt/-/mitt-3.0.1.tgz#ea36cf0cc30403601ae074c8f77b7092cdab36d1" integrity sha512-vKivATfr97l2/QBCYAkXYDbrIWPM2IIKEl7YPhjCvKlG3kE2gm+uBo6nEXK3M5/Ffh/FLpKExzOQ3JJoJGFKBw==