Skip to content

Commit

Permalink
Reconnect logic
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-sherman committed Nov 21, 2024
1 parent ecf324d commit 7952a0f
Showing 1 changed file with 51 additions and 59 deletions.
110 changes: 51 additions & 59 deletions packages-rs/drainpipe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ async fn main() -> anyhow::Result<()> {
}

env_logger::init();
let config = Config::from_env()?;
let store = drainpipe_store::Store::open(&config.store_location)?;

let monitor = tokio_metrics::TaskMonitor::new();

Expand All @@ -39,75 +37,46 @@ async fn main() -> anyhow::Result<()> {
});
}

let existing_cursor = store
.get_cursor()?
.map(|ts| {
Utc.timestamp_micros(ts as i64)
.earliest()
.ok_or(anyhow::anyhow!("Could not convert timestamp to Utc"))
})
.transpose()?;

let config = Config::from_env()?;
let store = drainpipe_store::Store::open(&config.store_location)?;
let endpoint = config
.jetstream_url
.clone()
.unwrap_or(DefaultJetstreamEndpoints::USEastOne.into());

let jetstream = JetstreamConnector::new(JetstreamConfig {
endpoint,
wanted_collections: vec!["fyi.unravel.frontpage.*".to_string()],
wanted_dids: vec![],
compression: JetstreamCompression::Zstd,
cursor: existing_cursor.map(|c| c - Duration::from_secs(10)),
})?;

let mut reconnect_attempts = 0;
let max_reconnect_attempts = 5;

loop {
let (receiver, _) = match timeout(Duration::from_secs(10), jetstream.connect()).await {
Ok(Ok(result)) => result,
Ok(Err(e)) => {
log::error!("WebSocket error: {}", e);
reconnect_attempts += 1;
if reconnect_attempts > max_reconnect_attempts {
log::error!("Maximum reconnect attempts reached, exiting.");
return Ok(());
}
continue;
}
Err(_) => {
log::error!("Failed to connect to WebSocket, retrying...");
reconnect_attempts += 1;
if reconnect_attempts > max_reconnect_attempts {
log::error!("Maximum reconnect attempts reached, exiting.");
return Ok(());
}
continue;
}
};

reconnect_attempts = 0;
let existing_cursor = store
.get_cursor()?
.map(|ts| {
Utc.timestamp_micros(ts as i64)
.earliest()
.ok_or(anyhow::anyhow!("Could not convert timestamp to Utc"))
})
.transpose()?;

let receiver = connect(JetstreamConfig {
endpoint: endpoint.clone(),
wanted_collections: vec!["fyi.unravel.frontpage.*".to_string()],
wanted_dids: vec![],
compression: JetstreamCompression::Zstd,
cursor: existing_cursor.map(|c| c - Duration::from_secs(10)),
})
.await?;

while let Ok(event) = receiver.recv_async().await {
monitor
.instrument(async {
if let JetstreamEvent::Commit(ref commit) = event {
println!("Received commit: {:?}", commit);
send_frontpage_commit(&config, commit)
.await
.and_then(|_| {
log::info!("Successfully sent frontpage commit");
Ok(())
})
.or_else(|e| {
log::error!("Error processing commit: {:?}", e);
store.record_dead_letter(&drainpipe_store::DeadLetter::new(
commit.info().time_us.to_string(),
serde_json::to_string(commit)?,
e.to_string(),
))
})?

send_frontpage_commit(&config, commit).await.or_else(|e| {
log::error!("Error processing commit: {:?}", e);
store.record_dead_letter(&drainpipe_store::DeadLetter::new(
commit.info().time_us.to_string(),
serde_json::to_string(commit)?,
e.to_string(),
))
})?
}

store.set_cursor(event.info().time_us)?;
Expand All @@ -121,6 +90,29 @@ async fn main() -> anyhow::Result<()> {
}
}

async fn connect(config: JetstreamConfig) -> anyhow::Result<flume::Receiver<JetstreamEvent>> {
let jetstream = JetstreamConnector::new(config)?;
let mut retry_delay_seconds = 1;

loop {
match timeout(Duration::from_secs(10), jetstream.connect()).await {
Ok(Ok((receiver, _))) => return Ok(receiver),
Ok(Err(e)) => {
log::error!("WebSocket error. Retrying... {}", e);
}
Err(e) => {
log::error!("Timed out after {e} connecting to WebSocket, retrying...");
}
}

// Exponential backoff
tokio::time::sleep(Duration::from_secs(retry_delay_seconds)).await;

// Cap the delay at 16s
retry_delay_seconds = std::cmp::min(retry_delay_seconds * 2, 16);
}
}

async fn send_frontpage_commit(
cfg: &Config,
commit: &jetstream::event::CommitEvent,
Expand Down

0 comments on commit 7952a0f

Please sign in to comment.