Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
aleortega committed Oct 11, 2024
1 parent cef2e0c commit 437229d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 79 deletions.
2 changes: 1 addition & 1 deletion consumer-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"@aws-sdk/client-sqs": "^3.489.0",
"@aws-sdk/lib-storage": "^3.499.0",
"@dcl/platform-server-commons": "^0.0.4",
"@dcl/schemas": "^10.3.0",
"@dcl/schemas": "https://sdk-team-cdn.decentraland.org/@dcl/schemas/branch/feat/catalystdeployment-event-enhance/dcl-schemas-14.1.1-11298178941.commit-64eb184.tgz",
"@well-known-components/env-config-provider": "^1.2.0",
"@well-known-components/fetch-component": "^2.0.2",
"@well-known-components/http-server": "^2.1.0",
Expand Down
28 changes: 14 additions & 14 deletions consumer-server/src/controllers/handlers/reprocess-handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { IHttpServerComponent } from '@well-known-components/interfaces'
import { HandlerContextWithPath } from '../../types'
import { HandlerContextWithPath, QueueMessage } from '../../types'
import { InvalidRequestError } from '@dcl/platform-server-commons'
import { Events } from '@dcl/schemas'

function validatePointers(pointers: string[]) {
if (!pointers.length) {
Expand All @@ -16,10 +17,10 @@ function validatePointers(pointers: string[]) {
}

export async function reprocessHandler(
context: Pick<HandlerContextWithPath<'logs' | 'sceneFetcher' | 'queue', '/reprocess'>, 'components' | 'request'>
context: Pick<HandlerContextWithPath<'logs' | 'config' | 'sceneFetcher' | 'queue', '/reprocess'>, 'components' | 'request'>
): Promise<IHttpServerComponent.IResponse> {
const {
components: { logs, sceneFetcher, queue },
components: { logs, config, sceneFetcher, queue },
request
} = context

Expand All @@ -30,24 +31,23 @@ export async function reprocessHandler(

validatePointers(pointers)

const catalystUrl = await config.requireString('CATALYST_URL')

try {
const entities = await sceneFetcher.fetchByPointers(pointers)

logger.info('Reprocessing pointers', { pointers: pointers.join(', '), entitiesAmount: entities.length })

for (const entity of entities) {
const message = {
entity: {
entityType: entity.type,
entityId: entity.id,
entityTimestamp: entity.timestamp,
metadata: {
scene: {
base: entity.metadata.scene.base
}
}
}
const message: QueueMessage = {
type: Events.Type.CATALYST_DEPLOYMENT,
subType: entity.type,
key: entity.id,
timestamp: entity.timestamp,
contentServerUrls: entity.contentServerUrls || [catalystUrl],
entity
}

logger.debug('Publishing message to queue', { message: JSON.stringify(message) })
await queue.send(message)
}
Expand Down
16 changes: 3 additions & 13 deletions consumer-server/src/logic/bundle-triggerer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { AppComponents, BundleTriggererComponent } from '../types'
import { AuthLinkType } from '@dcl/schemas'
import { AppComponents, BundleTriggererComponent, QueueMessage } from '../types'
import { Response } from '@well-known-components/interfaces'

export async function createBundleTriggererComponent({
Expand All @@ -8,19 +7,10 @@ export async function createBundleTriggererComponent({
}: Pick<AppComponents, 'fetcher' | 'config'>): Promise<BundleTriggererComponent> {
const abToken = await config.requireString('AB_TOKEN')

async function queueGeneration(entityId: string, lods: string[], abServer: string): Promise<Response> {
async function queueGeneration(message: QueueMessage, lods: string[], abServer: string): Promise<Response> {
const body = JSON.stringify({
lods: lods.map((lod) => lod.replace('%2C', ',')),
entity: {
entityId,
authChain: [
{
type: AuthLinkType.SIGNER,
payload: '0x0000000000000000000000000000000000000000',
signature: ''
}
]
}
...message
})
const headers = { 'Content-Type': 'application/json', Authorization: abToken }

Expand Down
37 changes: 6 additions & 31 deletions consumer-server/src/logic/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export async function createMessageProcesorComponent({
async function reQueue(message: QueueMessage): Promise<void> {
const retry = (message._retry || 0) + 1
logger.info('Re-queuing message', {
entityId: message.entity.entityId,
entityId: message.entity.id,
base: message.entity.metadata.scene.base,
retry
})
Expand All @@ -44,7 +44,7 @@ export async function createMessageProcesorComponent({
}

function isInvalid(message: QueueMessage): boolean {
return message.entity.entityType !== 'scene' || !message.entity.metadata?.scene?.base || !message.entity.entityId
return message.entity.type !== 'scene' || !message.entity.metadata?.scene?.base || !message.entity.id
}

async function process(message: QueueMessage, receiptMessageHandle: string): Promise<void> {
Expand All @@ -59,7 +59,7 @@ export async function createMessageProcesorComponent({
return
}

const entityId = message.entity.entityId
const entityId = message.entity.id
const base = message.entity.metadata.scene.base
if (RoadCoordinates.includes(base)) {
logger.debug('Skipping process since it is a road', {
Expand All @@ -70,31 +70,6 @@ export async function createMessageProcesorComponent({
return
}

/*
const alreadyUploadedFiles = await storage.getFiles(`${base}/LOD/Sources/${message.entity.entityTimestamp.toString()}`)
if (!!alreadyUploadedFiles.length) {
const lastUploadDate = alreadyUploadedFiles.reduce((acc, file) => {
if (!file.lastModified) return acc
return file.lastModified > acc ? file.lastModified : acc
}, new Date(0))
const currentDate = new Date()
const diff = currentDate.getTime() - lastUploadDate.getTime()
const diffDays = diff / (1000 * 3600 * 24)
if (diffDays < 3) {
logger.debug('Skipping process since it was already processed within the last 3 days', {
entityId,
base,
lastUploadDate: lastUploadDate.toISOString(),
currentDate: currentDate.toISOString()
})
await queue.deleteMessage(receiptMessageHandle)
return
}
}*/

logger.info('Processing scene deployment', {
entityId,
base,
Expand Down Expand Up @@ -148,7 +123,7 @@ export async function createMessageProcesorComponent({

const uploadedFiles = await storage.storeFiles(
lodGenerationResult.lodsFiles,
`${base}/LOD/Sources/${message.entity.entityTimestamp.toString()}`
`${base}/LOD/Sources/${message.entity.timestamp.toString()}`
)

logger.info('Publishing message to AssetBundle converter', { entityId, base })
Expand All @@ -158,7 +133,7 @@ export async function createMessageProcesorComponent({
await storage.deleteFailureDirectory(base)
} catch (error: any) {
logger.error('Unexpected failure while handling message from queue', {
entityId: message.entity.entityId,
entityId: message.entity.id,
base: message.entity.metadata.scene.base,
attempt: retry + 1,
error: error.message
Expand All @@ -175,7 +150,7 @@ export async function createMessageProcesorComponent({
await reQueue(message)
} else {
logger.warn('Max attempts reached, message will not be retried', {
entityId: message.entity.entityId,
entityId: message.entity.id,
base: message.entity.metadata.scene.base,
attempt: retry
})
Expand Down
13 changes: 2 additions & 11 deletions consumer-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { metricDeclarations } from './metrics'

import { Message } from '@aws-sdk/client-sqs'
import { Response } from '@well-known-components/interfaces'
import { CatalystDeploymentEvent } from '@dcl/schemas'

export type GlobalContext = {
components: BaseComponents
Expand Down Expand Up @@ -54,17 +55,7 @@ export type HandlerContextWithPath<

export type Context<Path extends string = any> = IHttpServerComponent.PathAwareContext<GlobalContext, Path>

export type QueueMessage = {
entity: {
entityType: string
entityId: string
entityTimestamp: number
metadata: {
scene: {
base: string
}
}
}
export type QueueMessage = CatalystDeploymentEvent & {
/**
* This metadata property keeps track of the number of times the same message has been retried.
* 'undefined' means that the message was not retried yet.
Expand Down
18 changes: 9 additions & 9 deletions consumer-server/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1038,23 +1038,23 @@
"@well-known-components/interfaces" "^1.4.2"
node-fetch "^2.7.0"

"@dcl/schemas@^10.3.0":
version "10.4.0"
resolved "https://registry.yarnpkg.com/@dcl/schemas/-/schemas-10.4.0.tgz#8cb19d6710635938c1746c4e4b8eed9a1bbce43b"
integrity sha512-gd69ProYNRxU4YqY7hruLlWdtfHfAuBdn10LIg0kPayvIraBX78emulVFy47r7qDRiqscRn6+1KLyIfMkXEBdA==
"@dcl/schemas@^9.0.0", "@dcl/schemas@^9.2.0":
version "9.15.0"
resolved "https://registry.yarnpkg.com/@dcl/schemas/-/schemas-9.15.0.tgz#81337faa396d21a2d1e704e5ec3cfd7b7b14343e"
integrity sha512-nip5rsOcJplNfBWeImwezuHLprM0gLW03kEeqGIvT9J6HnEBTtvIwkk9+NSt7hzFKEvWGI+C23vyNWbG3nU+SQ==
dependencies:
ajv "^8.11.0"
ajv-errors "^3.0.0"
ajv-keywords "^5.1.0"

"@dcl/schemas@^9.0.0", "@dcl/schemas@^9.2.0":
version "9.15.0"
resolved "https://registry.yarnpkg.com/@dcl/schemas/-/schemas-9.15.0.tgz#81337faa396d21a2d1e704e5ec3cfd7b7b14343e"
integrity sha512-nip5rsOcJplNfBWeImwezuHLprM0gLW03kEeqGIvT9J6HnEBTtvIwkk9+NSt7hzFKEvWGI+C23vyNWbG3nU+SQ==
"@dcl/schemas@https://sdk-team-cdn.decentraland.org/@dcl/schemas/branch/feat/catalystdeployment-event-enhance/dcl-schemas-14.1.1-11298178941.commit-64eb184.tgz":
version "14.1.1-11298178941.commit-64eb184"
resolved "https://sdk-team-cdn.decentraland.org/@dcl/schemas/branch/feat/catalystdeployment-event-enhance/dcl-schemas-14.1.1-11298178941.commit-64eb184.tgz#60ef6f3512e99b8a313372879fc0d1dc6a0cea39"
dependencies:
ajv "^8.11.0"
ajv-errors "^3.0.0"
ajv-keywords "^5.1.0"
mitt "^3.0.1"

"@eslint-community/eslint-utils@^4.2.0":
version "4.4.0"
Expand Down Expand Up @@ -4362,7 +4362,7 @@ minimatch@^9.0.1:
resolved "https://registry.yarnpkg.com/minipass/-/minipass-7.0.4.tgz#dbce03740f50a4786ba994c1fb908844d27b038c"
integrity sha512-jYofLM5Dam9279rdkWzqHozUo4ybjdZmCsDHePy5V/PbBcVMiSZR97gmAy45aqi8CK1lG2ECd356FU86avfwUQ==

mitt@^3.0.0:
mitt@^3.0.0, mitt@^3.0.1:
version "3.0.1"
resolved "https://registry.yarnpkg.com/mitt/-/mitt-3.0.1.tgz#ea36cf0cc30403601ae074c8f77b7092cdab36d1"
integrity sha512-vKivATfr97l2/QBCYAkXYDbrIWPM2IIKEl7YPhjCvKlG3kE2gm+uBo6nEXK3M5/Ffh/FLpKExzOQ3JJoJGFKBw==
Expand Down

0 comments on commit 437229d

Please sign in to comment.