Skip to content

Commit

Permalink
Better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-sherman committed Nov 21, 2024
1 parent 668c090 commit 28face9
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 82 deletions.
101 changes: 51 additions & 50 deletions packages-rs/drainpipe/src/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use chrono::Utc;
use error::{ConfigValidationError, ConnectionError, JetstreamEventError};
use event::JetstreamEvent;
use futures_util::stream::StreamExt;
use tokio::{net::TcpStream, task::JoinHandle};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use url::Url;
use zstd::dict::DecoderDictionary;
Expand Down Expand Up @@ -198,15 +198,7 @@ impl JetstreamConnector {
///
/// A [JetstreamReceiver] is returned which can be used to respond to events. When all instances
/// of this receiver are dropped, the connection and task are automatically closed.
pub async fn connect(
&self,
) -> Result<
(
JetstreamReceiver,
JoinHandle<Result<(), JetstreamEventError>>,
),
ConnectionError,
> {
pub async fn connect(&self) -> Result<JetstreamReceiver, ConnectionError> {
// We validate the config again for good measure. Probably not necessary but it can't hurt.
self.config
.validate()
Expand All @@ -228,10 +220,9 @@ impl JetstreamConnector {

let dict = DecoderDictionary::copy(JETSTREAM_ZSTD_DICTIONARY);

// TODO: Internally creating and returning a tokio task might not be the best idea(?)
let handle = tokio::task::spawn(websocket_task(dict, ws_stream, send_channel));
tokio::task::spawn(websocket_task(dict, ws_stream, send_channel));

Ok((receive_channel, handle))
Ok(receive_channel)
}
}

Expand All @@ -245,45 +236,55 @@ async fn websocket_task(
// TODO: Use the write half to allow the user to change configuration settings on the fly.
let (_, mut read) = ws.split();
loop {
if let Some(Ok(message)) = read.next().await {
match message {
Message::Text(json) => {
let event = serde_json::from_str::<JetstreamEvent>(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;

if let Err(_) = send_channel.send(event) {
// We can assume that all receivers have been dropped, so we can close the
// connection and exit the task.
log::info!(
"All receivers for the Jetstream connection have been dropped, closing connection."
);
return Ok(());
}
match read.next().await {
None => {
log::error!("The WebSocket connection was closed unexpectedly.");
return Err(JetstreamEventError::WebSocketCloseFailure);
}

Some(Ok(Message::Text(json))) => {
let event = serde_json::from_str::<JetstreamEvent>(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;

if let Err(e) = send_channel.send(event) {
// We can assume that all receivers have been dropped, so we can close the
// connection and exit the task.
log::info!(
"All receivers for the Jetstream connection have been dropped, closing connection. {:?}", e
);
return Ok(());
}
Message::Binary(zstd_json) => {
let mut cursor = Cursor::new(zstd_json);
let mut decoder =
zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
.map_err(JetstreamEventError::CompressionDictionaryError)?;

let mut json = String::new();
decoder
.read_to_string(&mut json)
.map_err(JetstreamEventError::CompressionDecoderError)?;

let event = serde_json::from_str::<JetstreamEvent>(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;

if let Err(_) = send_channel.send(event) {
// We can assume that all receivers have been dropped, so we can close the
// connection and exit the task.
log::info!(
"All receivers for the Jetstream connection have been dropped, closing connection..."
);
return Ok(());
}
}

Some(Ok(Message::Binary(zstd_json))) => {
let mut cursor = Cursor::new(zstd_json);
let mut decoder =
zstd::stream::Decoder::with_prepared_dictionary(&mut cursor, &dictionary)
.map_err(JetstreamEventError::CompressionDictionaryError)?;

let mut json = String::new();
decoder
.read_to_string(&mut json)
.map_err(JetstreamEventError::CompressionDecoderError)?;

let event = serde_json::from_str::<JetstreamEvent>(&json)
.map_err(JetstreamEventError::ReceivedMalformedJSON)?;

if let Err(e) = send_channel.send(event) {
// We can assume that all receivers have been dropped, so we can close the
// connection and exit the task.
log::info!(
"All receivers for the Jetstream connection have been dropped, closing connection... {:?}", e
);
return Ok(());
}
_ => {}
}

unexpected => {
log::error!(
"Received an unexpected message type from Jetstream: {:?}",
unexpected
);
}
}
}
Expand Down
76 changes: 44 additions & 32 deletions packages-rs/drainpipe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ async fn main() -> anyhow::Result<()> {

let monitor = tokio_metrics::TaskMonitor::new();

{
let metrics_monitor = monitor.clone();
tokio::spawn(async move {
for interval in metrics_monitor.intervals() {
log::info!("{:?} per second", interval.instrumented_count as f64 / 5.0,);
tokio::time::sleep(Duration::from_millis(5000)).await;
}
});
}

let config = Config::from_env()?;
let store = drainpipe_store::Store::open(&config.store_location)?;
let endpoint = config
Expand All @@ -59,33 +49,55 @@ async fn main() -> anyhow::Result<()> {
wanted_collections: vec!["fyi.unravel.frontpage.*".to_string()],
wanted_dids: vec![],
compression: JetstreamCompression::Zstd,
// Connect 10 seconds before the most recently received cursor
cursor: existing_cursor.map(|c| c - Duration::from_secs(10)),
})
.await?;

while let Ok(event) = receiver.recv_async().await {
monitor
.instrument(async {
if let JetstreamEvent::Commit(ref commit) = event {
println!("Received commit: {:?}", commit);

send_frontpage_commit(&config, commit).await.or_else(|e| {
log::error!("Error processing commit: {:?}", e);
store.record_dead_letter(&drainpipe_store::DeadLetter::new(
commit.info().time_us.to_string(),
serde_json::to_string(commit)?,
e.to_string(),
))
})?
}

store.set_cursor(event.info().time_us)?;

Ok(()) as anyhow::Result<()>
})
.await?
let metric_logs_abort_handler = {
let metrics_monitor = monitor.clone();
tokio::spawn(async move {
for interval in metrics_monitor.intervals() {
log::info!("{:?} per second", interval.instrumented_count as f64 / 5.0,);
tokio::time::sleep(Duration::from_millis(5000)).await;
}
})
.abort_handle()
};

loop {
match receiver.recv_async().await {
Ok(event) => {
monitor
.instrument(async {
if let JetstreamEvent::Commit(ref commit) = event {
println!("Received commit: {:?}", commit);

send_frontpage_commit(&config, commit).await.or_else(|e| {
log::error!("Error processing commit: {:?}", e);
store.record_dead_letter(&drainpipe_store::DeadLetter::new(
commit.info().time_us.to_string(),
serde_json::to_string(commit)?,
e.to_string(),
))
})?
}

store.set_cursor(event.info().time_us)?;

Ok(()) as anyhow::Result<()>
})
.await?
}

Err(e) => {
log::error!("Error receiving event: {:?}", e);
break;
}
}
}

metric_logs_abort_handler.abort();
log::info!("WebSocket connection closed, attempting to reconnect...");
}
}
Expand All @@ -96,7 +108,7 @@ async fn connect(config: JetstreamConfig) -> anyhow::Result<flume::Receiver<Jets

loop {
match timeout(Duration::from_secs(10), jetstream.connect()).await {
Ok(Ok((receiver, _))) => return Ok(receiver),
Ok(Ok(receiver)) => return Ok(receiver),
Ok(Err(e)) => {
log::error!("WebSocket error. Retrying... {}", e);
}
Expand Down

0 comments on commit 28face9

Please sign in to comment.