Skip to content

Commit

Permalink
feat: /reprocess endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
aleortega committed Feb 7, 2024
1 parent e55b2f3 commit 38c6ceb
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 98 deletions.
28 changes: 0 additions & 28 deletions consumer-server/src/adapters/memory-queue.ts

This file was deleted.

29 changes: 29 additions & 0 deletions consumer-server/src/adapters/sqs-mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Message } from '@aws-sdk/client-sqs'
import { randomUUID } from 'node:crypto'

import { QueueComponent, QueueMessage } from '../types'

export function createInMemoryQueue(): QueueComponent {
const queue: Map<string, Message> = new Map()

async function send(message: QueueMessage): Promise<void> {
const receiptHandle = randomUUID().toString()
queue.set(receiptHandle, {
MessageId: randomUUID().toString(),
ReceiptHandle: receiptHandle,
Body: JSON.stringify({ Message: JSON.stringify(message) })
})

return
}

async function receiveSingleMessage(): Promise<Message[]> {
return queue.size > 0 ? [queue.values().next().value] : []
}

async function deleteMessage(receiptHandle: string): Promise<void> {
queue.delete(receiptHandle)
}

return { send, receiveSingleMessage, deleteMessage }
}
4 changes: 2 additions & 2 deletions consumer-server/src/adapters/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export async function createSqsAdapter(endpoint: string): Promise<QueueComponent
async function send(message: QueueMessage): Promise<void> {
const sendCommand = new SendMessageCommand({
QueueUrl: endpoint,
MessageBody: JSON.stringify(message)
MessageBody: JSON.stringify({ Message: JSON.stringify(message) })
})
await client.send(sendCommand)
}
Expand All @@ -30,7 +30,7 @@ export async function createSqsAdapter(endpoint: string): Promise<QueueComponent
return Messages
}

async function deleteMessage(receiptHandle: string) {
async function deleteMessage(receiptHandle: string): Promise<void> {
const deleteCommand = new DeleteMessageCommand({
QueueUrl: endpoint,
ReceiptHandle: receiptHandle
Expand Down
5 changes: 4 additions & 1 deletion consumer-server/src/adapters/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import fs from 'fs/promises'

import { AppComponents, StorageComponent } from '../types'

export async function createCloudStorageAdapter({ config, logs }: Pick<AppComponents, 'config' | 'logs'>): Promise<StorageComponent> {
export async function createCloudStorageAdapter({
config,
logs
}: Pick<AppComponents, 'config' | 'logs'>): Promise<StorageComponent> {
const logger = logs.getLogger('storage')
const bucket = await config.getString('BUCKET')
const region = (await config.getString('AWS_REGION')) || 'us-east-1'
Expand Down
10 changes: 5 additions & 5 deletions consumer-server/src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import { metricDeclarations } from './metrics'
import { createSqsAdapter } from './adapters/sqs'
import { createMessagesConsumerComponent } from './logic/message-consumer'
import { buildLicense } from './utils/license-builder'
import { createMemoryQueueAdapter } from './adapters/memory-queue'
import { createLodGeneratorComponent } from './logic/lod-generator'
import { createMessageHandlerComponent } from './logic/message-handler'
import { createCloudStorageAdapter } from './adapters/storage'
import { createEntityFetcherComponent } from './logic/entity-fetcher'
import { createEntityFetcherComponent } from './logic/scene-fetcher'
import { createInMemoryQueue } from './adapters/sqs-mock'

export async function initComponents(): Promise<AppComponents> {
const config = await createDotEnvConfigComponent(
Expand All @@ -32,9 +32,9 @@ export async function initComponents(): Promise<AppComponents> {

await instrumentHttpServerWithMetrics({ metrics, server, config })

const entityFetcher = await createEntityFetcherComponent({ logs, fetcher })
const sceneFetcher = await createEntityFetcherComponent({ logs, fetcher })
const sqsEndpoint = await config.getString('QUEUE_URL')
const queue = !sqsEndpoint ? createMemoryQueueAdapter({ logs }) : await createSqsAdapter(sqsEndpoint)
const queue = sqsEndpoint ? await createSqsAdapter(sqsEndpoint) : createInMemoryQueue()
const lodGenerator = createLodGeneratorComponent()
const storage = await createCloudStorageAdapter({ logs, config })
const messageHandler = createMessageHandlerComponent({ logs, lodGenerator, storage })
Expand All @@ -55,6 +55,6 @@ export async function initComponents(): Promise<AppComponents> {
messageHandler,
storage,
fetcher,
entityFetcher
sceneFetcher
}
}
1 change: 0 additions & 1 deletion consumer-server/src/controllers/handlers/health-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { IHttpServerComponent } from '@well-known-components/interfaces'

import { HandlerContextWithPath } from '../../types'

export async function healthHandler(
Expand Down
22 changes: 17 additions & 5 deletions consumer-server/src/controllers/handlers/reprocess-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import { IHttpServerComponent } from '@well-known-components/interfaces'
import { HandlerContextWithPath } from '../../types'

export async function reprocessHandler(
context: HandlerContextWithPath<'logs' | 'entityFetcher' | 'queue', '/reprocess'>
context: Pick<HandlerContextWithPath<'logs' | 'sceneFetcher' | 'queue', '/reprocess'>, 'components' | 'request'>
): Promise<IHttpServerComponent.IResponse> {
const {
components: { logs, entityFetcher, queue }
components: { logs, sceneFetcher, queue },
request
} = context

const logger = logs.getLogger('reprocess-handler')

const body = await context.request.json()
const body = await request.json()
const pointers = (body.pointers as string[]) || []

if (!pointers.length) {
Expand All @@ -20,12 +21,23 @@ export async function reprocessHandler(
}
}

const entities = await entityFetcher.fetchEntities(pointers)
const entities = await sceneFetcher.fetchByPointers(pointers)

logger.info('Reprocessing pointers', { pointers: pointers.join(', '), entitiesAmount: entities.length })

for (const entity of entities) {
const message = { ...entity, content: [], pointers: [], spawnPoints: [] }
const message = {
entity: {
entityType: entity.type,
entityId: entity.id,
entityTimestamp: entity.timestamp,
metadata: {
scene: {
base: entity.metadata.scene.base
}
}
}
}
logger.debug('Publishing message to queue', { message: JSON.stringify(message) })
await queue.send(message)
}
Expand Down
1 change: 1 addition & 0 deletions consumer-server/src/controllers/routes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Router } from '@well-known-components/http-server'

import { GlobalContext } from '../types'
import { healthHandler } from './handlers/health-handler'
import { reprocessHandler } from './handlers/reprocess-handler'
Expand Down
19 changes: 16 additions & 3 deletions consumer-server/src/logic/message-consumer.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import { AppComponents, QueueWorker } from '../types'
import { sleep } from '../utils/timer'

export async function createMessagesConsumerComponent({
logs,
queue,
messageHandler
}: Pick<AppComponents, 'logs' | 'queue' | 'messageHandler'>): Promise<QueueWorker> {
const logger = logs.getLogger('messages-consumer')
let isRunning = false

async function start() {
logger.info('Starting to listen messages from queue')
while (true) {
isRunning = true
while (isRunning) {
const messages = await queue.receiveSingleMessage()

if (messages.length === 0) {
await sleep(20 * 1000)
continue
}

for (const message of messages) {
const { MessageId, Body, ReceiptHandle } = message

try {
const parsedMessage: { Message: string } = JSON.parse(Body!)
logger.debug('Message received', {
Expand All @@ -33,7 +41,12 @@ export async function createMessagesConsumerComponent({
}
}

async function stop() {
isRunning = false
}

return {
start
start,
stop
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { createCatalystClient } from 'dcl-catalyst-client'

import { AppComponents, EntityFetcherComponent } from '../types'
import { AppComponents, SceneFetcherComponent } from '../types'

export async function createEntityFetcherComponent({
logs,
fetcher
}: Pick<AppComponents, 'logs' | 'fetcher'>): Promise<EntityFetcherComponent> {
}: Pick<AppComponents, 'logs' | 'fetcher'>): Promise<SceneFetcherComponent> {
const logger = logs.getLogger('entity-fetcher')
const contentClient = await createCatalystClient({ url: 'https://peer.decentraland.org', fetcher }).getContentClient()

async function fetchEntities(entityIds: string[]) {
async function fetchByPointers(scenePointers: string[]) {
try {
const entities = await contentClient.fetchEntitiesByIds(entityIds)
const entities = await contentClient.fetchEntitiesByPointers(scenePointers)
return entities
} catch (error: any) {
logger.error('Failed while fetching entity', { error })
return null
}
}

return { fetchEntities }
return { fetchByPointers }
}
6 changes: 3 additions & 3 deletions consumer-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export type BaseComponents = {
messageHandler: MessageHandlerComponent
storage: StorageComponent
fetcher: IFetchComponent
entityFetcher: EntityFetcherComponent
sceneFetcher: SceneFetcherComponent
}

// components used in runtime
Expand Down Expand Up @@ -85,6 +85,6 @@ export type StorageComponent = {
storeFiles(filePaths: string[], basePointer: string, entityTimestamp: string): Promise<boolean>
}

export type EntityFetcherComponent = {
fetchEntities(entityIds: string[]): Promise<any>
export type SceneFetcherComponent = {
fetchByPointers(scenePointers: string[]): Promise<any>
}
21 changes: 18 additions & 3 deletions consumer-server/test/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { createLocalFetchCompoment, createRunner } from '@well-known-components/
import { createInMemoryStorage } from '@dcl/catalyst-storage'

import { main } from '../src/service'
import { TestComponents } from '../src/types'
import { SceneFetcherComponent, QueueComponent, QueueWorker, TestComponents } from '../src/types'
import { initComponents as originalInitComponents } from '../src/components'

import { createTestMetricsComponent } from '@well-known-components/metrics'
Expand Down Expand Up @@ -40,12 +40,27 @@ async function initComponents(): Promise<TestComponents> {

const inMemoryStorage = createInMemoryStorage()
const storage = createInMemoryStorageAdapter(inMemoryStorage)
const queue: QueueComponent = {
deleteMessage: jest.fn(),
receiveSingleMessage: jest.fn(),
send: jest.fn()
}
const messageConsumer: QueueWorker = {
start: jest.fn(),
stop: jest.fn()
}

const sceneFetcher: SceneFetcherComponent = {
fetchByPointers: jest.fn()
}

console.log('Test components')
return {
...components,
localFetch: await createLocalFetchCompoment(config),
storage,
metrics
metrics,
queue,
messageConsumer,
sceneFetcher
}
}
Loading

0 comments on commit 38c6ceb

Please sign in to comment.