diff --git a/Cargo.lock b/Cargo.lock index 37dbccd..969dc6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,9 +113,9 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "atrium-api" -version = "0.24.7" +version = "0.24.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee68ddf7cde9eb121eed3a28b138f6a9b4c4a90ab0c5c2e38bc2817af0b06da3" +checksum = "1c4e5d077f7941ec5484964fba8697b7571a964b0a714e02ae7bc7332833c36b" dependencies = [ "atrium-xrpc", "chrono", @@ -133,9 +133,9 @@ dependencies = [ [[package]] name = "atrium-xrpc" -version = "0.11.6" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "737eea1de2eb174bbfe720619cb25a22c30b9640ae0d3b78386cedf007712963" +checksum = "f223b98be2acdd7afe5b867744aee8258413ed09993099de0a036b247db0ec4c" dependencies = [ "http", "serde", @@ -431,24 +431,19 @@ version = "0.2.0" dependencies = [ "anyhow", "atrium-api", - "bincode", "chrono", "dotenv-flow", "drainpipe-store", "env_logger", - "flume", "futures-util", + "jetstream", "log", "reqwest", "serde", "serde_json", "sled", - "thiserror 2.0.3", "tokio", "tokio-metrics", - "tokio-tungstenite", - "url", - "zstd", ] [[package]] @@ -466,8 +461,10 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "jetstream", "log", "serde", + "serde_json", "sled", ] @@ -1045,6 +1042,24 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jetstream" +version = "0.1.0" +dependencies = [ + "atrium-api", + "chrono", + "flume", + "futures-util", + "log", + "serde", + "serde_json", + "thiserror 2.0.3", + "tokio", + "tokio-tungstenite", + "url", + "zstd", +] + [[package]] name = "jobserver" version = "0.1.32" @@ -1665,9 +1680,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -2139,9 +2154,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", diff --git a/Cargo.toml b/Cargo.toml index 83b2ed7..70b21b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "packages-rs/drainpipe", "packages-rs/drainpipe-cli", "packages-rs/drainpipe-store", + "packages-rs/jetstream", ] resolver = "2" diff --git a/packages-rs/drainpipe-store/Cargo.toml b/packages-rs/drainpipe-store/Cargo.toml index f204e14..fdb5a07 100644 --- a/packages-rs/drainpipe-store/Cargo.toml +++ b/packages-rs/drainpipe-store/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] anyhow = "1.0.93" bincode = "1.3.3" +jetstream = { path = "../jetstream" } log = "0.4.22" serde = { version = "1.0.215", features = ["derive"] } +serde_json = "1.0.133" sled = "0.34.7" diff --git a/packages-rs/drainpipe-store/src/lib.rs b/packages-rs/drainpipe-store/src/lib.rs index 3c6cb72..e557f60 100644 --- a/packages-rs/drainpipe-store/src/lib.rs +++ b/packages-rs/drainpipe-store/src/lib.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use sled::Tree; pub struct Store { + db: sled::Db, cursor_tree: Tree, dead_letter_tree: Tree, } @@ -50,18 +51,13 @@ impl DeadLetter { error_message, }) } - - pub fn key(&self) -> &String { - match &self.0 { - DeadLetterInner::V1 { key, .. } => key, - } - } } impl Store { pub fn open(path: &PathBuf) -> anyhow::Result { let db = sled::open(path)?; Ok(Self { + db: db.clone(), cursor_tree: db.open_tree("cursor")?, dead_letter_tree: db.open_tree("dead_letter")?, }) @@ -92,9 +88,27 @@ impl Store { .transpose() } - pub fn record_dead_letter(&self, dead_letter: &DeadLetter) -> anyhow::Result<()> { - self.dead_letter_tree - .insert(dead_letter.key(), bincode::serialize(&dead_letter)?)?; + fn record_dead_letter(&self, commit_json: String, error_message: String) -> anyhow::Result<()> { + let key = self.db.generate_id()?.to_string(); + self.dead_letter_tree.insert( + key.clone(), + bincode::serialize(&DeadLetter::new(key, commit_json, error_message))?, + )?; Ok(()) } + + pub fn record_dead_letter_commit( + &self, + commit: &jetstream::event::CommitEvent, + error_message: String, + ) -> anyhow::Result<()> { + self.record_dead_letter(serde_json::to_string(commit)?, error_message) + } + + pub fn record_dead_letter_jetstream_error( + &self, + error: &jetstream::error::JetstreamEventError, + ) -> anyhow::Result<()> { + self.record_dead_letter("null".into(), error.to_string()) + } } diff --git a/packages-rs/drainpipe/Cargo.toml b/packages-rs/drainpipe/Cargo.toml index a6ec715..9bc2db6 100644 --- a/packages-rs/drainpipe/Cargo.toml +++ b/packages-rs/drainpipe/Cargo.toml @@ -6,21 +6,16 @@ edition = "2021" [dependencies] anyhow = "1.0.93" atrium-api = "0.24.7" -bincode = "1.3.3" -chrono = { version = "0.4.38", features = ["serde"] } +chrono = "0.4.38" dotenv-flow = "0.16.2" drainpipe-store = { path = "../drainpipe-store" } env_logger = "0.11.5" -flume = "0.11.1" futures-util = "0.3" +jetstream = { path = "../jetstream" } log = "0.4.22" reqwest = { version = "0.12.4", features = ["json"] } serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.132" sled = "0.34.7" -thiserror = "2.0.3" tokio = { version = "1.38.0", features = ["full"] } tokio-metrics = "0.3.1" -tokio-tungstenite = { version = "0.24.0", features = ["native-tls"] } -url = "2.5.3" -zstd = "0.13.2" diff --git a/packages-rs/drainpipe/src/main.rs b/packages-rs/drainpipe/src/main.rs index 875f2bb..bd55a87 100644 --- a/packages-rs/drainpipe/src/main.rs +++ b/packages-rs/drainpipe/src/main.rs @@ -1,5 +1,4 @@ mod config; -mod jetstream; use chrono::{TimeZone, Utc}; use config::Config; @@ -76,11 +75,7 @@ async fn main() -> anyhow::Result<()> { send_frontpage_commit(&config, commit).await.or_else(|e| { log::error!("Error processing commit: {:?}", e); - store.record_dead_letter(&drainpipe_store::DeadLetter::new( - commit.info().time_us.to_string(), - serde_json::to_string(commit)?, - e.to_string(), - )) + store.record_dead_letter_commit(&commit, e.to_string()) })? } @@ -92,7 +87,7 @@ async fn main() -> anyhow::Result<()> { } Ok(Err(e)) => { - // TODO: This should add a dead letter + store.record_dead_letter_jetstream_error(&e)?; log::error!( "Error receiving event (possible junk event structure?): {:?}", e diff --git a/packages-rs/jetstream/Cargo.toml b/packages-rs/jetstream/Cargo.toml new file mode 100644 index 0000000..c1c3007 --- /dev/null +++ b/packages-rs/jetstream/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "jetstream" +version = "0.1.0" +edition = "2021" + +[dependencies] +atrium-api = "0.24.8" +chrono = { version = "0.4.38", features = ["serde"] } +flume = "0.11.1" +futures-util = "0.3.31" +log = "0.4.22" +serde = { version = "1.0.215", features = ["derive"] } +serde_json = "1.0.132" +thiserror = "2.0.3" +tokio = { version = "1.38.0", features = ["full"] } +tokio-tungstenite = { version = "0.24.0", features = ["native-tls"] } +url = "2.5.4" +zstd = "0.13.2" diff --git a/packages-rs/jetstream/README.md b/packages-rs/jetstream/README.md new file mode 100644 index 0000000..8640f3d --- /dev/null +++ b/packages-rs/jetstream/README.md @@ -0,0 +1,5 @@ +# Jetstream Client + +A client for [jetstream](https://github.com/bluesky-social/jetstream). + +This started as a fork of [jetstream-oxide](https://github.com/videah/jetstream-oxide) but has since diverged with a few breaking changes and critical fixes. diff --git a/packages-rs/drainpipe/src/jetstream/error.rs b/packages-rs/jetstream/src/error.rs similarity index 100% rename from packages-rs/drainpipe/src/jetstream/error.rs rename to packages-rs/jetstream/src/error.rs diff --git a/packages-rs/drainpipe/src/jetstream/event.rs b/packages-rs/jetstream/src/event.rs similarity index 100% rename from packages-rs/drainpipe/src/jetstream/event.rs rename to packages-rs/jetstream/src/event.rs diff --git a/packages-rs/drainpipe/src/jetstream.rs b/packages-rs/jetstream/src/lib.rs similarity index 100% rename from packages-rs/drainpipe/src/jetstream.rs rename to packages-rs/jetstream/src/lib.rs diff --git a/packages-rs/drainpipe/zstd_dictionary b/packages-rs/jetstream/zstd_dictionary similarity index 100% rename from packages-rs/drainpipe/zstd_dictionary rename to packages-rs/jetstream/zstd_dictionary