Skip to content

Commit

Permalink
Massage receive hook payload to match current old shape
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-sherman committed Nov 21, 2024
1 parent e344fb2 commit 43e00d8
Showing 1 changed file with 23 additions and 22 deletions.
45 changes: 23 additions & 22 deletions packages-rs/drainpipe/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ mod jetstream;

use chrono::{TimeZone, Utc};
use config::Config;
use jetstream::event::JetstreamEvent;
use jetstream::event::{CommitEvent, JetstreamEvent};
use jetstream::{
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
};
use serde::Serialize;
use serde_json::json;
use std::path::PathBuf;
use std::time::Duration;
use std::vec;
Expand Down Expand Up @@ -121,27 +121,28 @@ async fn main() -> anyhow::Result<()> {
}
}

fn u64_serialize<S>(x: &u64, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_str(&x.to_string())
}

#[derive(Serialize, Debug)]
struct ConsumerBody<'a> {
ops: &'a Vec<&'a jetstream::event::CommitEvent>,
repo: &'a str,
#[serde(serialize_with = "u64_serialize")]
seq: u64,
}

async fn send_frontpage_commit(
cfg: &Config,
commit: &jetstream::event::CommitEvent,
) -> anyhow::Result<()> {
let client = reqwest::Client::new();

// Structure of the "ops" json array and the body of the request in general is a little whacky because it's
// matching the old drainpipe code where we would send the relay event to the consumer verbatim.
// There is potential for improvement here.
let ops = match commit {
CommitEvent::Update { .. } => anyhow::bail!("Update commits are not supported"),
CommitEvent::Create { commit, .. } => json!([{
"action": "create",
"path": format!("{}/{}", commit.info.collection.to_string(), commit.info.rkey),
"cid": commit.cid,
}]),
CommitEvent::Delete { commit, .. } => json!([{
"action": "create",
"path": format!("{}/{}", commit.collection.to_string(), commit.rkey)
}]),
};

let commit_info = commit.info();

let response = client
Expand All @@ -150,11 +151,11 @@ async fn send_frontpage_commit(
"Authorization",
format!("Bearer {}", cfg.frontpage_consumer_secret),
)
.json(&ConsumerBody {
ops: &vec![commit],
repo: &commit_info.did,
seq: commit_info.time_us,
})
.json(&json!({
"repo": commit_info.did,
"seq": commit_info.time_us.to_string(),
"ops": ops
}))
.send()
.await?;

Expand Down

0 comments on commit 43e00d8

Please sign in to comment.