diff --git a/packages-rs/drainpipe/src/main.rs b/packages-rs/drainpipe/src/main.rs index dc0b433..6022d2b 100644 --- a/packages-rs/drainpipe/src/main.rs +++ b/packages-rs/drainpipe/src/main.rs @@ -24,8 +24,6 @@ async fn main() -> anyhow::Result<()> { } env_logger::init(); - let config = Config::from_env()?; - let store = drainpipe_store::Store::open(&config.store_location)?; let monitor = tokio_metrics::TaskMonitor::new(); @@ -39,75 +37,46 @@ async fn main() -> anyhow::Result<()> { }); } - let existing_cursor = store - .get_cursor()? - .map(|ts| { - Utc.timestamp_micros(ts as i64) - .earliest() - .ok_or(anyhow::anyhow!("Could not convert timestamp to Utc")) - }) - .transpose()?; - + let config = Config::from_env()?; + let store = drainpipe_store::Store::open(&config.store_location)?; let endpoint = config .jetstream_url .clone() .unwrap_or(DefaultJetstreamEndpoints::USEastOne.into()); - let jetstream = JetstreamConnector::new(JetstreamConfig { - endpoint, - wanted_collections: vec!["fyi.unravel.frontpage.*".to_string()], - wanted_dids: vec![], - compression: JetstreamCompression::Zstd, - cursor: existing_cursor.map(|c| c - Duration::from_secs(10)), - })?; - - let mut reconnect_attempts = 0; - let max_reconnect_attempts = 5; - loop { - let (receiver, _) = match timeout(Duration::from_secs(10), jetstream.connect()).await { - Ok(Ok(result)) => result, - Ok(Err(e)) => { - log::error!("WebSocket error: {}", e); - reconnect_attempts += 1; - if reconnect_attempts > max_reconnect_attempts { - log::error!("Maximum reconnect attempts reached, exiting."); - return Ok(()); - } - continue; - } - Err(_) => { - log::error!("Failed to connect to WebSocket, retrying..."); - reconnect_attempts += 1; - if reconnect_attempts > max_reconnect_attempts { - log::error!("Maximum reconnect attempts reached, exiting."); - return Ok(()); - } - continue; - } - }; - - reconnect_attempts = 0; + let existing_cursor = store + .get_cursor()? + .map(|ts| { + Utc.timestamp_micros(ts as i64) + .earliest() + .ok_or(anyhow::anyhow!("Could not convert timestamp to Utc")) + }) + .transpose()?; + + let receiver = connect(JetstreamConfig { + endpoint: endpoint.clone(), + wanted_collections: vec!["fyi.unravel.frontpage.*".to_string()], + wanted_dids: vec![], + compression: JetstreamCompression::Zstd, + cursor: existing_cursor.map(|c| c - Duration::from_secs(10)), + }) + .await?; while let Ok(event) = receiver.recv_async().await { monitor .instrument(async { if let JetstreamEvent::Commit(ref commit) = event { println!("Received commit: {:?}", commit); - send_frontpage_commit(&config, commit) - .await - .and_then(|_| { - log::info!("Successfully sent frontpage commit"); - Ok(()) - }) - .or_else(|e| { - log::error!("Error processing commit: {:?}", e); - store.record_dead_letter(&drainpipe_store::DeadLetter::new( - commit.info().time_us.to_string(), - serde_json::to_string(commit)?, - e.to_string(), - )) - })? + + send_frontpage_commit(&config, commit).await.or_else(|e| { + log::error!("Error processing commit: {:?}", e); + store.record_dead_letter(&drainpipe_store::DeadLetter::new( + commit.info().time_us.to_string(), + serde_json::to_string(commit)?, + e.to_string(), + )) + })? } store.set_cursor(event.info().time_us)?; @@ -121,6 +90,29 @@ async fn main() -> anyhow::Result<()> { } } +async fn connect(config: JetstreamConfig) -> anyhow::Result> { + let jetstream = JetstreamConnector::new(config)?; + let mut retry_delay_seconds = 1; + + loop { + match timeout(Duration::from_secs(10), jetstream.connect()).await { + Ok(Ok((receiver, _))) => return Ok(receiver), + Ok(Err(e)) => { + log::error!("WebSocket error. Retrying... {}", e); + } + Err(e) => { + log::error!("Timed out after {e} connecting to WebSocket, retrying..."); + } + } + + // Exponential backoff + tokio::time::sleep(Duration::from_secs(retry_delay_seconds)).await; + + // Cap the delay at 16s + retry_delay_seconds = std::cmp::min(retry_delay_seconds * 2, 16); + } +} + async fn send_frontpage_commit( cfg: &Config, commit: &jetstream::event::CommitEvent,