Skip to content

Commit

Permalink
feat: remove prior failure on successful conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
aleortega committed Apr 26, 2024
1 parent 323f7f8 commit 3768a9b
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 14 deletions.
103 changes: 103 additions & 0 deletions consumer-server/bin/get-failures.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { createDotEnvConfigComponent } from "@well-known-components/env-config-provider"
import {
GetObjectCommand,
ListObjectsV2Command,
ListObjectsV2Output,
ListObjectsV2Request,
S3Client,
} from "@aws-sdk/client-s3"
import { Readable } from "stream"
import { promises as fs } from 'fs';

const REGION = "us-east-1"

function parseKey(key: string): { pointer: string, file: string } {
const [_failures, pointer, file] = key.split("/")
return { pointer, file }
}

async function main() {
const config = await createDotEnvConfigComponent({
path: [".env.admin"],
})

const [user, secret, bucket, bucketEndpoint] = await Promise.all([
config.requireString("AWS_USER"),
config.requireString("AWS_SECRET"),
config.requireString("S3_BUCKET"),
config.getString("S3_BUCKET_ENDPOINT")
])

console.log('secrets read', { user, secret, bucket })

const s3Client = new S3Client({
region: REGION,
credentials: {
secretAccessKey: secret,
accessKeyId: user,
},
endpoint: bucketEndpoint,
})

let logContent = ""
let response: ListObjectsV2Output
do {
const params: ListObjectsV2Request = {
Bucket: bucket,
Prefix: "failures",
}

console.log('Listing objects')
const command = new ListObjectsV2Command(params)
response = await s3Client.send(command)

if (response.Contents) {
// download all objects
for (const content of response.Contents) {
const key = content.Key
let parsedKey: {
pointer: string;
file: string;
} | undefined = undefined

if (!key) continue

if (content.LastModified) {
const lastModified = new Date(content.LastModified)
const now = new Date()
const diff = now.getTime() - lastModified.getTime()
const days = diff / (1000 * 60 * 60 * 24)
console.log('Object modified days ago', days)
if (days > 2) continue
}

parsedKey = parseKey(key)

const getObjectCommand = new GetObjectCommand({
Bucket: bucket,
Key: key,
})

// log
console.log(`Downloading ${key}`)
const object = await s3Client.send(getObjectCommand)
let data: any[] = [];
for await (const chunk of object.Body as Readable) {
data.push(chunk);
}
const buffer = Buffer.concat(data)
const failureContent = new TextDecoder().decode(buffer)
console.log(`Adding ${key} to content`)
logContent += `${parsedKey.pointer}\t${failureContent}\n\n`
}
}

params.ContinuationToken = response.NextContinuationToken
} while (response.IsTruncated)


console.log('writing failures.log')
await fs.writeFile('failures.log', logContent)
}

main().catch(console.error)
7 changes: 4 additions & 3 deletions consumer-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
"test": "jest --forceExit --detectOpenHandles --verbose",
"lint:fix": "eslint --fix --ext .ts src",
"admin:lod0": "tsc --esModuleInterop ./bin/generate-zip.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/generate-zip.js 0",
"admin:lod1": "tsc ./bin/generate-zip.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/generate-zip.js 1",
"admin:lod2": "tsc ./bin/generate-zip.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/generate-zip.js 2",
"admin:lod3": "tsc ./bin/generate-zip.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/generate-zip.js 3"
"admin:lod1": "tsc --esModuleInterop ./bin/generate-zip.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/generate-zip.js 1",
"admin:lod2": "tsc --esModuleInterop ./bin/generate-zip.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/generate-zip.js 2",
"admin:lod3": "tsc --esModuleInterop ./bin/generate-zip.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/generate-zip.js 3",
"admin:get-failures": "tsc --esModuleInterop ./bin/get-failures.ts && node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict ./bin/get-failures.js"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.499.0",
Expand Down
30 changes: 28 additions & 2 deletions consumer-server/src/adapters/storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3'
import { DeleteObjectsCommand, ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import fs from 'fs/promises'
import mime from 'mime-types'
Expand Down Expand Up @@ -56,5 +56,31 @@ export async function createCloudStorageAdapter({ config }: Pick<AppComponents,
return uploadedFiles
}

return { storeFiles, getFiles }
async function deleteFailureDirectory(pointer: string): Promise<void> {
const listParams = {
Bucket: bucket,
Prefix: `failures/${pointer}`
}

const listedObjects = await s3.send(new ListObjectsV2Command(listParams))

if (!listedObjects.Contents || listedObjects.Contents.length === 0) return

const deleteParams = {
Bucket: bucket,
Delete: { Objects: [] as { Key: string }[] }
}

listedObjects.Contents.forEach(({ Key }) => {
if (Key) {
deleteParams.Delete.Objects.push({ Key })
}
})

await s3.send(new DeleteObjectsCommand(deleteParams))

if (listedObjects.IsTruncated) await deleteFailureDirectory(pointer)
}

return { storeFiles, getFiles, deleteFailureDirectory }
}
25 changes: 16 additions & 9 deletions consumer-server/src/logic/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export async function createMessageProcesorComponent({
const logger = logs.getLogger('message-procesor')
const abServers = (await config.requireString('AB_SERVERS')).split(';')

function isRelatedToAssetBundlePublish(errorMessage: string | undefined): boolean {
return !!errorMessage && abServers.some((abServer) => errorMessage.includes(abServer))
}

async function reQueue(message: QueueMessage): Promise<void> {
const retry = (message._retry || 0) + 1
logger.info('Re-queuing message', {
Expand Down Expand Up @@ -86,7 +90,6 @@ export async function createMessageProcesorComponent({

if (retry < 3) {
await reQueue(message)
metrics.increment('lod_generation_count', { status: 'retryable' }, 1)
} else {
logger.warn('Max attempts reached, moving to error bucket', {
entityId,
Expand Down Expand Up @@ -118,6 +121,7 @@ export async function createMessageProcesorComponent({
await Promise.all(abServers.map((abServer) => bundleTriggerer.queueGeneration(entityId, uploadedFiles, abServer)))
await queue.deleteMessage(receiptMessageHandle)
metrics.increment('lod_generation_count', { status: 'succeed' }, 1)
await storage.deleteFailureDirectory(base)
} catch (error: any) {
logger.error('Unexpected failure while handling message from queue', {
entityId: message.entity.entityId,
Expand All @@ -126,16 +130,19 @@ export async function createMessageProcesorComponent({
error: error.message
})

if (retry < 3) {
if (isRelatedToAssetBundlePublish(error?.message)) {
await reQueue(message)
metrics.increment('lod_generation_count', { status: 'retryable' }, 1)
} else {
logger.warn('Max attempts reached, message will not be retried', {
entityId: message.entity.entityId,
base: message.entity.metadata.scene.base,
attempt: retry
})
metrics.increment('lod_generation_count', { status: 'failed' }, 1)
if (retry < 3) {
await reQueue(message)
} else {
logger.warn('Max attempts reached, message will not be retried', {
entityId: message.entity.entityId,
base: message.entity.metadata.scene.base,
attempt: retry
})
metrics.increment('lod_generation_count', { status: 'failed' }, 1)
}
}

await queue.deleteMessage(receiptMessageHandle)
Expand Down
1 change: 1 addition & 0 deletions consumer-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export type MessageProcessorComponent = {
export type StorageComponent = {
storeFiles(filePaths: string[], prefix: string): Promise<string[]>
getFiles(prefix: string): Promise<string[]>
deleteFailureDirectory(pointer: string): Promise<void>
}

export type SceneFetcherComponent = {
Expand Down

0 comments on commit 3768a9b

Please sign in to comment.