Skip to content

Commit

Permalink
fix: remove block constraint rewrites (#895)
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus authored Jul 22, 2024
1 parent 9f07198 commit 06941f8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 317 deletions.
319 changes: 20 additions & 299 deletions graph-gateway/src/block_constraints.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Write as _,
};
use std::collections::{BTreeMap, BTreeSet};

use alloy_primitives::{BlockHash, BlockNumber};
use anyhow::{anyhow, bail};
use cost_model::Context;
use gateway_common::time::unix_timestamp;
use gateway_framework::{
blocks::{Block, BlockConstraint, UnresolvedBlock},
blocks::{BlockConstraint, UnresolvedBlock},
chain::Chain,
errors::Error,
};
use graphql::{
graphql_parser::query::{Field, OperationDefinition, Selection, SelectionSet, Text, Value},
graphql_parser::query::{OperationDefinition, Selection, Text, Value},
IntoStaticValue as _, StaticValue,
};
use itertools::Itertools as _;
use serde_json::{self, json};
use serde_json::{self, json, value::RawValue};

#[derive(Debug)]
pub struct BlockRequirements {
Expand Down Expand Up @@ -132,181 +128,23 @@ fn block_constraints(context: &Context) -> Result<BTreeSet<BlockConstraint>, Err
Ok(constraints)
}

pub fn rewrite_query<'q>(
chain: &Chain,
ctx: &Context<'q>,
requirements: &BlockRequirements,
blocks_behind: u64,
) -> String {
let mut buf: String = Default::default();
for fragment in &ctx.fragments {
write!(&mut buf, "{}", fragment).unwrap();
}
if contains_introspection(ctx) {
for operation in &ctx.operations {
write!(&mut buf, "{}", operation).unwrap();
}
} else {
let latest_block = requirements.latest.then_some(()).and_then(|_| {
let mut block = chain.latest()?;
if (blocks_behind > 0) || requirements.number_gte.is_some() {
let number = block
.number
.saturating_sub(blocks_behind)
.max(requirements.number_gte.unwrap_or(0));
block = chain.find(&UnresolvedBlock::WithNumber(number))?;
}
let now = unix_timestamp() / 1_000;
if now.saturating_sub(block.timestamp) > 30 {
return None;
}
Some(block.clone())
});
pub fn rewrite_query(document: &str, variables: &Option<Box<RawValue>>) -> String {
// Don't insert block data probe if we suspect the query contains introspection. `_meta` queries
// and introspection do not mix.
// Examples of introspection queries:
// - `{ __schema { queryType { name } } }`
// - `{ __type(name: "Droid") { name description } }`
let contains_introspection = document.contains("__");

let serialize_field =
|buf: &mut String,
field: &Field<'q, &'q str>,
defaults: &BTreeMap<String, StaticValue>| {
buf.push_str(" ");
if let Some(alias) = field.alias {
write!(buf, "{alias}: ").unwrap();
}
write!(buf, "{}", field.name).unwrap();
for directive in &field.directives {
write!(buf, " {}", directive).unwrap();
}
buf.push_str("(block: ");
if let Some(constraint) = field
.arguments
.iter()
.find(|(n, _)| *n == "block")
.and_then(|(_, field)| field_constraint(&ctx.variables, defaults, field).ok())
{
match (constraint, &latest_block) {
(BlockConstraint::Hash(hash), _) => {
write!(buf, "{{ hash: \"{hash}\" }}").unwrap();
}
(BlockConstraint::Number(number), _) => {
match chain.find(&UnresolvedBlock::WithNumber(number)) {
Some(Block { hash, .. }) => {
write!(buf, "{{ hash: \"{hash}\" }}").unwrap();
}
None => {
write!(buf, "{{ number: {number} }}").unwrap();
}
}
}
(_, Some(Block { hash, .. })) => {
write!(buf, "{{ hash: \"{hash}\" }}",).unwrap();
}
(BlockConstraint::NumberGTE(number), None) => {
write!(buf, "{{ number_gte: {number} }}").unwrap();
}
(BlockConstraint::Unconstrained, None) => {
write!(buf, "null").unwrap();
}
};
} else if let Some(block) = &latest_block {
write!(buf, "{{ hash: \"{}\" }}", block.hash).unwrap();
} else {
buf.push_str("null");
}
for (name, value) in &field.arguments {
if *name != "block" {
write!(buf, ", {name}: {value}").unwrap();
}
}
buf.push(')');
if !field.selection_set.items.is_empty() {
buf.push_str(" {\n");
for selection in &field.selection_set.items {
match selection {
Selection::Field(field) => {
write!(buf, " {}", field).unwrap();
}
Selection::FragmentSpread(spread) => {
write!(buf, " {}", spread).unwrap();
}
Selection::InlineFragment(fragment) => {
write!(buf, " {}", fragment).unwrap();
}
};
}
buf.push_str(" }");
}
buf.push('\n');
};
let serialize_selection_set =
|buf: &mut String,
selection_set: &SelectionSet<'q, &'q str>,
defaults: &BTreeMap<String, StaticValue>| {
buf.push_str("{\n");
for selection in &selection_set.items {
match selection {
Selection::Field(field) => serialize_field(buf, field, defaults),
Selection::FragmentSpread(spread) => {
write!(buf, " {}", spread).unwrap();
}
Selection::InlineFragment(fragment) => {
write!(buf, " {}", fragment).unwrap()
}
};
}
buf.push_str(" _gateway_probe_: _meta { block { hash number timestamp } }\n}\n");
};
let serialize_operation =
|buf: &mut String, operation: &OperationDefinition<'q, &'q str>| {
match operation {
OperationDefinition::SelectionSet(selection_set) => {
serialize_selection_set(buf, selection_set, &BTreeMap::default());
}
OperationDefinition::Query(query) => {
buf.push_str("query");
if let Some(name) = query.name {
write!(buf, " {name}").unwrap();
}
if !query.variable_definitions.is_empty() {
write!(buf, "({}", query.variable_definitions[0]).unwrap();
for var in &query.variable_definitions[1..] {
write!(buf, ", {var}").unwrap();
}
buf.push(')');
}
debug_assert!(query.directives.is_empty());
buf.push(' ');
let defaults = query
.variable_definitions
.iter()
.filter(|d| !ctx.variables.0.contains_key(d.name))
.filter_map(|d| {
Some((d.name.to_string(), d.default_value.as_ref()?.to_graphql()))
})
.collect::<BTreeMap<String, StaticValue>>();
serialize_selection_set(buf, &query.selection_set, &defaults);
}
OperationDefinition::Mutation(_) | OperationDefinition::Subscription(_) => (),
};
};
for operation in &ctx.operations {
serialize_operation(&mut buf, operation);
}
}

serde_json::to_string(&json!({ "query": buf, "variables": ctx.variables })).unwrap()
}
let query = match document.rfind('}') {
Some(end) if !contains_introspection => format!(
"{} _gateway_probe_: _meta {{ block {{ hash number timestamp }} }}\n}}\n",
&document[..end],
),
_ => document.to_string(),
};

fn contains_introspection(ctx: &Context<'_>) -> bool {
fn selection_set_has_introspection<'q>(s: &SelectionSet<'q, &'q str>) -> bool {
s.items.iter().any(|selection| match selection {
Selection::Field(f) => f.name.starts_with("__"), // only check top level
Selection::InlineFragment(_) | Selection::FragmentSpread(_) => false,
})
}
ctx.operations.iter().any(|op| match op {
OperationDefinition::Query(q) => selection_set_has_introspection(&q.selection_set),
OperationDefinition::SelectionSet(s) => selection_set_has_introspection(s),
OperationDefinition::Mutation(_) | OperationDefinition::Subscription(_) => false,
})
serde_json::to_string(&json!({ "query": query, "variables": variables })).unwrap()
}

fn field_constraint<'c, T: Text<'c>>(
Expand Down Expand Up @@ -406,8 +244,7 @@ fn parse_number<'c, T: Text<'c>>(
mod tests {
use std::iter::FromIterator as _;

use alloy_primitives::{hex, Address};
use gateway_framework::blocks::Block;
use alloy_primitives::hex;

use super::*;

Expand Down Expand Up @@ -466,120 +303,4 @@ mod tests {
assert_eq!(constraints, expected);
}
}

#[test]
fn query_contains_introspection() {
let examples = [
"{ __schema { queryType { name } } }",
"{ __type(name:\"Droid\") { name description } }",
];
for example in examples {
let context = Context::new(example, "").unwrap();
assert!(super::contains_introspection(&context));
}
}

#[test]
fn query_rewrite() {
let mut chain = Chain::default();
let now = unix_timestamp() / 1_000;
chain.insert(
Block {
hash: hex!("0000000000000000000000000000000000000000000000000000000000000000")
.into(),
number: 123,
timestamp: now - 1,
},
Address::default().into(),
);
chain.insert(
Block {
hash: hex!("0000000000000000000000000000000000000000000000000000000000000001")
.into(),
number: 124,
timestamp: now,
},
Address::default().into(),
);

let tests = [
(
r#"{
bundle0: bundle(id:"1" block:{number:123}) { ethPriceUSD }
bundle1: bundle(id:"1") { ethPriceUSD }
}"#,
BlockRequirements {
latest: true,
number_gte: None,
range: Some((123, 123)),
},
"{\n bundle0: bundle(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000000\" }, id: \"1\") {\n ethPriceUSD\n }\n bundle1: bundle(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000001\" }, id: \"1\") {\n ethPriceUSD\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n",
),
(
r#"{
bundle0: bundle(id:"1" block:{number:125}) { ethPriceUSD }
}"#,
BlockRequirements {
latest: true,
number_gte: None,
range: Some((125, 125)),
},
"{\n bundle0: bundle(block: { number: 125 }, id: \"1\") {\n ethPriceUSD\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n",
),
(
r#"{ bundle(block:{number_gte:125}) { ethPriceUSD } }"#,
BlockRequirements {
latest: true,
number_gte: Some(125),
range: None,
},
"{\n bundle(block: { number_gte: 125 }) {\n ethPriceUSD\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n",
),
(
r#"query GetTopSales {
events(where: { type: "Sale" }, first: 1, orderBy: value, orderDirection: desc) {
type
}
}"#,
BlockRequirements {
latest: true,
number_gte: None,
range: None,
},
"query GetTopSales {\n events(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000001\" }, where: {type: \"Sale\"}, first: 1, orderBy: value, orderDirection: desc) {\n type\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n",
),
(
r#"
fragment Foo on Delegation {
id
}
{
delegations(first: 1) {
delegator
...Foo
}
}
"#,
BlockRequirements {
latest: true,
number_gte: None,
range: None,
},
"fragment Foo on Delegation {\n id\n}\n{\n delegations(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000001\" }, first: 1) {\n delegator\n ...Foo\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n",
),
];

for (client_query, requirements, expected_indexer_query) in tests {
let context = Context::new(client_query, "").unwrap();
let indexer_request = rewrite_query(&chain, &context, &requirements, 0);
let doc = serde_json::from_str::<serde_json::Value>(&indexer_request).unwrap();
let doc = doc
.as_object()
.and_then(|o| o.get("query")?.as_str())
.unwrap();
println!("{}", doc);
assert!(Context::new(doc, "").is_ok());
assert_eq!(doc, expected_indexer_query);
}
}
}
21 changes: 3 additions & 18 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,9 @@ async fn run_indexer_queries(
tracing::debug!(?candidates);
}

let indexer_query = rewrite_query(&client_request.query, &client_request.variables);

let mut indexer_requests: Vec<reports::IndexerRequest> = Default::default();
let mut indexer_request_rewrites: BTreeMap<u32, String> = Default::default();
let mut client_response_time: Option<Duration> = None;

// If a client query cannot be handled by the available indexers, we should give a reason for
Expand Down Expand Up @@ -356,24 +357,8 @@ async fn run_indexer_queries(
debug_assert!(fee == receipt.grt_value());

let blocks_behind = blocks_behind(seconds_behind, blocks_per_minute);
let indexer_query = match indexer_request_rewrites.get(&seconds_behind) {
Some(indexer_query) => indexer_query.clone(),
None => {
let chain = chain.read();
let indexer_query =
rewrite_query(&chain, &agora_context, &block_requirements, blocks_behind);
if selections
.iter()
.filter(|s| s.seconds_behind == seconds_behind)
.count()
> 1
{
indexer_request_rewrites.insert(seconds_behind, indexer_query.clone());
}
indexer_query
}
};
let indexer_client = ctx.indexer_client.clone();
let indexer_query = indexer_query.clone();
let tx = tx.clone();
tokio::spawn(
async move {
Expand Down

0 comments on commit 06941f8

Please sign in to comment.