Skip to content

Commit

Permalink
worker: detect ws connection errors and retry
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Apr 24, 2024
1 parent 2deb898 commit 1d2dd39
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ export class Worker {
throw new Error('--api-secret is required, or set LIVEKIT_API_SECRET env var');

const workerWS = async () => {
// const retries = 0;
let retries = 0;
while (!this.closed) {
const token = new AccessToken(this.opts.apiKey, this.opts.apiSecret);
token.addGrant({ agent: true });
Expand All @@ -181,27 +181,31 @@ export class Worker {
this.session = new WebSocket(url + 'agent', {
headers: { authorization: 'Bearer ' + jwt },
});
this.session.on('open', () => {
this.session!.removeAllListeners('close');

try {
await new Promise((resolve, reject) => {
this.session!.on('open', resolve);
this.session!.on('error', (error) => reject(error));
this.session!.on('close', (code) => reject(`WebSocket returned ${code}`));
});

this.runWS(this.session!);
});
return;
return;
} catch (e) {
if (this.closed) return;
if (retries >= this.opts.maxRetry) {
throw new Error(`failed to connect to LiveKit server after ${retries} attempts: ${e}`);
}

retries++;
const delay = Math.min(retries * 2, 10);

// TODO(nbsp): retries that actually work
// if (this.session.readyState !== WebSocket.OPEN) {
// if (this.closed) return;
// if (retries >= this.opts.maxRetry) {
// throw new Error(`failed to connect to LiveKit server after ${retries} attempts: ${e}`);
// }

// const delay = Math.min(retries * 2, 10);
// retries++;

// this.logger.warn(
// `failed to connect to LiveKit server, retrying in ${delay} seconds: ${e} (${retries}/${this.opts.maxRetry})`,
// );
// await new Promise((resolve) => setTimeout(resolve, delay));
// }
this.logger.warn(
`failed to connect to LiveKit server, retrying in ${delay} seconds: ${e} (${retries}/${this.opts.maxRetry})`,
);

await new Promise((resolve) => setTimeout(resolve, delay * 1000));
}
}
};

Expand Down Expand Up @@ -235,7 +239,8 @@ export class Worker {

ws.addEventListener('close', () => {
closingWS = true;
if (!this.closed) throw new Error('worker connection closed unexpectedly');
this.logger.error('worker connection closed unexpectedly');
this.close();
});

ws.addEventListener('message', (event) => {
Expand Down

0 comments on commit 1d2dd39

Please sign in to comment.