Skip to content

Commit

Permalink
feat: add fallback logic for vmagent sending wrong content type (Grep…
Browse files Browse the repository at this point in the history
…timeTeam#4009)

* feat: add fallback logic for vmagent sending wrong content type

* fix: resolve lint issues

* Update src/servers/src/http/prom_store.rs

Co-authored-by: Yingwen <[email protected]>

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
sunng87 and evenyag authored May 23, 2024
1 parent 418090b commit b90b7ad
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
23 changes: 19 additions & 4 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}

fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
Ok(Bytes::from(if is_zstd {
zstd_decompress(body)?
} else {
snappy_decompress(body)?
}))
}

async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
Expand All @@ -247,11 +255,18 @@ async fn decode_remote_write_request(
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(if is_zstd {
zstd_decompress(&body[..])?
// due to vmagent's limitation, there is a chance that vmagent is
// sending content type wrong so we have to apply a fallback with decoding
// the content in another method.
//
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
// see https://github.com/GreptimeTeam/greptimedb/issues/3929
let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
buf
} else {
snappy_decompress(&body[..])?
});
// fallback to the other compression method
try_decompress(!is_zstd, &body[..])?
};

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
Expand Down
25 changes: 25 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,5 +963,30 @@ pub async fn test_vm_proto_remote_write(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);

// also test fallback logic, vmagent could sent data in wrong content-type
// we encode it as zstd but send it as snappy
let compressed_request =
zstd::stream::encode_all(&serialized_request[..], 1).expect("Failed to encode zstd");

let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);

// reversed
let compressed_request =
prom_store::snappy_compress(&serialized_request[..]).expect("Failed to encode snappy");

let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "zstd")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);

guard.remove_all().await;
}

0 comments on commit b90b7ad

Please sign in to comment.