Skip to content

Commit

Permalink
add additional logs to executeHandlers
Browse files Browse the repository at this point in the history
Add additional logs so we can see if there's a condition where only
some of the handlers complete.
  • Loading branch information
chrisdickinson committed Oct 24, 2024
1 parent 9e657ef commit 79e2e9a
Showing 1 changed file with 35 additions and 17 deletions.
52 changes: 35 additions & 17 deletions src/domain/interests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ export async function fetchByContentInterest(opts: FetchByContentInterest) {
}

export async function executeHandlers<T>(client: Client, handlers: Handler[], arg: T, defaultValue: T, currentChannel: string | null) {
const log = logger.child({ operation: 'executeHandlers', kind: (arg as any)?.kind })
if (!handlers.length) {
log.info('no handlers')
return
}
const db = await getDatabaseConnection()
Expand All @@ -160,14 +162,22 @@ export async function executeHandlers<T>(client: Client, handlers: Handler[], ar
const resolved = new Promise(resolve => runCompleted = resolve)

for (const handler of handlers) {
promises.push(xtp.extensionPoints.chat.handle(handler.userId, arg, {
log.info({ handler: handler.pluginName, userId: handler.userId }, 'executing handler')
const promise = xtp.extensionPoints.chat.handle(handler.userId, arg, {
bindingName: handler.pluginName,
default: defaultValue,
hostContext: new HostContext(client, handler, currentChannel, resolved as Promise<void>)
}).then(
v => [, Date.now() - start, v],
err => [err, Date.now() - start, null]
));
);

promise.then(
([err, elapsed]) => err
? log.error({ error: err, handler: handler.pluginName, userId: handler.userId, elapsed }, 'handler execution failed')
: log.info({ handler: handler.pluginName, userId: handler.userId, elapsed }, 'handler execution complete')
)
promises.push(promise)
}

const invocations: InvocationData = {
Expand All @@ -178,8 +188,10 @@ export async function executeHandlers<T>(client: Client, handlers: Handler[], ar
logs: [],
}

const results = await Promise.all(promises)
log.info('all handlers complete, storing results...')
let idx = 0;
for (const result of await Promise.all(promises)) {
for (const result of results) {
const [err, elapsed, value]: [Error | null, number, T | null] = result as any
let cost = 0
if (err) {
Expand Down Expand Up @@ -211,20 +223,26 @@ export async function executeHandlers<T>(client: Client, handlers: Handler[], ar
++idx;
}

await db.query(`
UPDATE "handlers"
SET
"ratelimiting_last_reset" = now(),
"ratelimiting_current_tokens" = updater."ct",
"brain" = "b",
"lifetime_cost" = "lifetime_cost" + "c"
FROM (
SELECT id, ct, c, b FROM UNNEST($1::uuid[], $2::int[], $3::int[], $4::jsonb[]) as x("id", "ct", "c", "b")
) updater where updater.id = handlers.id
`, [ids, tokens, costs, brains]);

await createInvocation(db, invocations)
runCompleted!()
try {
await db.query(`
UPDATE "handlers"
SET
"ratelimiting_last_reset" = now(),
"ratelimiting_current_tokens" = updater."ct",
"brain" = "b",
"lifetime_cost" = "lifetime_cost" + "c"
FROM (
SELECT id, ct, c, b FROM UNNEST($1::uuid[], $2::int[], $3::int[], $4::jsonb[]) as x("id", "ct", "c", "b")
) updater where updater.id = handlers.id
`, [ids, tokens, costs, brains]);
log.info('updated handlers')
await createInvocation(db, invocations)
log.info('recorded invocations')
} catch (err: any) {
log.error({ error: err }, 'caught error while updating database')
} finally {
runCompleted!()
}
}

async function registerHandler(db: any, opts: RegisterInterest) {
Expand Down

0 comments on commit 79e2e9a

Please sign in to comment.