diff --git a/src/domain/interests.ts b/src/domain/interests.ts index 597dee0..5fb6891 100644 --- a/src/domain/interests.ts +++ b/src/domain/interests.ts @@ -144,7 +144,9 @@ export async function fetchByContentInterest(opts: FetchByContentInterest) { } export async function executeHandlers(client: Client, handlers: Handler[], arg: T, defaultValue: T, currentChannel: string | null) { + const log = logger.child({ operation: 'executeHandlers', kind: (arg as any)?.kind }) if (!handlers.length) { + log.info('no handlers') return } const db = await getDatabaseConnection() @@ -160,14 +162,22 @@ export async function executeHandlers(client: Client, handlers: Handler[], ar const resolved = new Promise(resolve => runCompleted = resolve) for (const handler of handlers) { - promises.push(xtp.extensionPoints.chat.handle(handler.userId, arg, { + log.info({ handler: handler.pluginName, userId: handler.userId }, 'executing handler') + const promise = xtp.extensionPoints.chat.handle(handler.userId, arg, { bindingName: handler.pluginName, default: defaultValue, hostContext: new HostContext(client, handler, currentChannel, resolved as Promise) }).then( v => [, Date.now() - start, v], err => [err, Date.now() - start, null] - )); + ); + + promise.then( + ([err, elapsed]) => err + ? log.error({ error: err, handler: handler.pluginName, userId: handler.userId, elapsed }, 'handler execution failed') + : log.info({ handler: handler.pluginName, userId: handler.userId, elapsed }, 'handler execution complete') + ) + promises.push(promise) } const invocations: InvocationData = { @@ -178,8 +188,10 @@ export async function executeHandlers(client: Client, handlers: Handler[], ar logs: [], } + const results = await Promise.all(promises) + log.info('all handlers complete, storing results...') let idx = 0; - for (const result of await Promise.all(promises)) { + for (const result of results) { const [err, elapsed, value]: [Error | null, number, T | null] = result as any let cost = 0 if (err) { @@ -211,20 +223,26 @@ export async function executeHandlers(client: Client, handlers: Handler[], ar ++idx; } - await db.query(` - UPDATE "handlers" - SET - "ratelimiting_last_reset" = now(), - "ratelimiting_current_tokens" = updater."ct", - "brain" = "b", - "lifetime_cost" = "lifetime_cost" + "c" - FROM ( - SELECT id, ct, c, b FROM UNNEST($1::uuid[], $2::int[], $3::int[], $4::jsonb[]) as x("id", "ct", "c", "b") - ) updater where updater.id = handlers.id - `, [ids, tokens, costs, brains]); - - await createInvocation(db, invocations) - runCompleted!() + try { + await db.query(` + UPDATE "handlers" + SET + "ratelimiting_last_reset" = now(), + "ratelimiting_current_tokens" = updater."ct", + "brain" = "b", + "lifetime_cost" = "lifetime_cost" + "c" + FROM ( + SELECT id, ct, c, b FROM UNNEST($1::uuid[], $2::int[], $3::int[], $4::jsonb[]) as x("id", "ct", "c", "b") + ) updater where updater.id = handlers.id + `, [ids, tokens, costs, brains]); + log.info('updated handlers') + await createInvocation(db, invocations) + log.info('recorded invocations') + } catch (err: any) { + log.error({ error: err }, 'caught error while updating database') + } finally { + runCompleted!() + } } async function registerHandler(db: any, opts: RegisterInterest) {