diff --git a/graph-gateway/src/block_constraints.rs b/graph-gateway/src/block_constraints.rs index b580ad34..17655d6c 100644 --- a/graph-gateway/src/block_constraints.rs +++ b/graph-gateway/src/block_constraints.rs @@ -1,23 +1,19 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::Write as _, -}; +use std::collections::{BTreeMap, BTreeSet}; use alloy_primitives::{BlockHash, BlockNumber}; use anyhow::{anyhow, bail}; use cost_model::Context; -use gateway_common::time::unix_timestamp; use gateway_framework::{ - blocks::{Block, BlockConstraint, UnresolvedBlock}, + blocks::{BlockConstraint, UnresolvedBlock}, chain::Chain, errors::Error, }; use graphql::{ - graphql_parser::query::{Field, OperationDefinition, Selection, SelectionSet, Text, Value}, + graphql_parser::query::{OperationDefinition, Selection, Text, Value}, IntoStaticValue as _, StaticValue, }; use itertools::Itertools as _; -use serde_json::{self, json}; +use serde_json::{self, json, value::RawValue}; #[derive(Debug)] pub struct BlockRequirements { @@ -132,181 +128,23 @@ fn block_constraints(context: &Context) -> Result, Err Ok(constraints) } -pub fn rewrite_query<'q>( - chain: &Chain, - ctx: &Context<'q>, - requirements: &BlockRequirements, - blocks_behind: u64, -) -> String { - let mut buf: String = Default::default(); - for fragment in &ctx.fragments { - write!(&mut buf, "{}", fragment).unwrap(); - } - if contains_introspection(ctx) { - for operation in &ctx.operations { - write!(&mut buf, "{}", operation).unwrap(); - } - } else { - let latest_block = requirements.latest.then_some(()).and_then(|_| { - let mut block = chain.latest()?; - if (blocks_behind > 0) || requirements.number_gte.is_some() { - let number = block - .number - .saturating_sub(blocks_behind) - .max(requirements.number_gte.unwrap_or(0)); - block = chain.find(&UnresolvedBlock::WithNumber(number))?; - } - let now = unix_timestamp() / 1_000; - if now.saturating_sub(block.timestamp) > 30 { - return None; - } - Some(block.clone()) - }); +pub fn rewrite_query(document: &str, variables: &Option>) -> String { + // Don't insert block data probe if we suspect the query contains introspection. `_meta` queries + // and introspection do not mix. + // Examples of introspection queries: + // - `{ __schema { queryType { name } } }` + // - `{ __type(name: "Droid") { name description } }` + let contains_introspection = document.contains("__"); - let serialize_field = - |buf: &mut String, - field: &Field<'q, &'q str>, - defaults: &BTreeMap| { - buf.push_str(" "); - if let Some(alias) = field.alias { - write!(buf, "{alias}: ").unwrap(); - } - write!(buf, "{}", field.name).unwrap(); - for directive in &field.directives { - write!(buf, " {}", directive).unwrap(); - } - buf.push_str("(block: "); - if let Some(constraint) = field - .arguments - .iter() - .find(|(n, _)| *n == "block") - .and_then(|(_, field)| field_constraint(&ctx.variables, defaults, field).ok()) - { - match (constraint, &latest_block) { - (BlockConstraint::Hash(hash), _) => { - write!(buf, "{{ hash: \"{hash}\" }}").unwrap(); - } - (BlockConstraint::Number(number), _) => { - match chain.find(&UnresolvedBlock::WithNumber(number)) { - Some(Block { hash, .. }) => { - write!(buf, "{{ hash: \"{hash}\" }}").unwrap(); - } - None => { - write!(buf, "{{ number: {number} }}").unwrap(); - } - } - } - (_, Some(Block { hash, .. })) => { - write!(buf, "{{ hash: \"{hash}\" }}",).unwrap(); - } - (BlockConstraint::NumberGTE(number), None) => { - write!(buf, "{{ number_gte: {number} }}").unwrap(); - } - (BlockConstraint::Unconstrained, None) => { - write!(buf, "null").unwrap(); - } - }; - } else if let Some(block) = &latest_block { - write!(buf, "{{ hash: \"{}\" }}", block.hash).unwrap(); - } else { - buf.push_str("null"); - } - for (name, value) in &field.arguments { - if *name != "block" { - write!(buf, ", {name}: {value}").unwrap(); - } - } - buf.push(')'); - if !field.selection_set.items.is_empty() { - buf.push_str(" {\n"); - for selection in &field.selection_set.items { - match selection { - Selection::Field(field) => { - write!(buf, " {}", field).unwrap(); - } - Selection::FragmentSpread(spread) => { - write!(buf, " {}", spread).unwrap(); - } - Selection::InlineFragment(fragment) => { - write!(buf, " {}", fragment).unwrap(); - } - }; - } - buf.push_str(" }"); - } - buf.push('\n'); - }; - let serialize_selection_set = - |buf: &mut String, - selection_set: &SelectionSet<'q, &'q str>, - defaults: &BTreeMap| { - buf.push_str("{\n"); - for selection in &selection_set.items { - match selection { - Selection::Field(field) => serialize_field(buf, field, defaults), - Selection::FragmentSpread(spread) => { - write!(buf, " {}", spread).unwrap(); - } - Selection::InlineFragment(fragment) => { - write!(buf, " {}", fragment).unwrap() - } - }; - } - buf.push_str(" _gateway_probe_: _meta { block { hash number timestamp } }\n}\n"); - }; - let serialize_operation = - |buf: &mut String, operation: &OperationDefinition<'q, &'q str>| { - match operation { - OperationDefinition::SelectionSet(selection_set) => { - serialize_selection_set(buf, selection_set, &BTreeMap::default()); - } - OperationDefinition::Query(query) => { - buf.push_str("query"); - if let Some(name) = query.name { - write!(buf, " {name}").unwrap(); - } - if !query.variable_definitions.is_empty() { - write!(buf, "({}", query.variable_definitions[0]).unwrap(); - for var in &query.variable_definitions[1..] { - write!(buf, ", {var}").unwrap(); - } - buf.push(')'); - } - debug_assert!(query.directives.is_empty()); - buf.push(' '); - let defaults = query - .variable_definitions - .iter() - .filter(|d| !ctx.variables.0.contains_key(d.name)) - .filter_map(|d| { - Some((d.name.to_string(), d.default_value.as_ref()?.to_graphql())) - }) - .collect::>(); - serialize_selection_set(buf, &query.selection_set, &defaults); - } - OperationDefinition::Mutation(_) | OperationDefinition::Subscription(_) => (), - }; - }; - for operation in &ctx.operations { - serialize_operation(&mut buf, operation); - } - } - - serde_json::to_string(&json!({ "query": buf, "variables": ctx.variables })).unwrap() -} + let query = match document.rfind('}') { + Some(end) if !contains_introspection => format!( + "{} _gateway_probe_: _meta {{ block {{ hash number timestamp }} }}\n}}\n", + &document[..end], + ), + _ => document.to_string(), + }; -fn contains_introspection(ctx: &Context<'_>) -> bool { - fn selection_set_has_introspection<'q>(s: &SelectionSet<'q, &'q str>) -> bool { - s.items.iter().any(|selection| match selection { - Selection::Field(f) => f.name.starts_with("__"), // only check top level - Selection::InlineFragment(_) | Selection::FragmentSpread(_) => false, - }) - } - ctx.operations.iter().any(|op| match op { - OperationDefinition::Query(q) => selection_set_has_introspection(&q.selection_set), - OperationDefinition::SelectionSet(s) => selection_set_has_introspection(s), - OperationDefinition::Mutation(_) | OperationDefinition::Subscription(_) => false, - }) + serde_json::to_string(&json!({ "query": query, "variables": variables })).unwrap() } fn field_constraint<'c, T: Text<'c>>( @@ -406,8 +244,7 @@ fn parse_number<'c, T: Text<'c>>( mod tests { use std::iter::FromIterator as _; - use alloy_primitives::{hex, Address}; - use gateway_framework::blocks::Block; + use alloy_primitives::hex; use super::*; @@ -466,120 +303,4 @@ mod tests { assert_eq!(constraints, expected); } } - - #[test] - fn query_contains_introspection() { - let examples = [ - "{ __schema { queryType { name } } }", - "{ __type(name:\"Droid\") { name description } }", - ]; - for example in examples { - let context = Context::new(example, "").unwrap(); - assert!(super::contains_introspection(&context)); - } - } - - #[test] - fn query_rewrite() { - let mut chain = Chain::default(); - let now = unix_timestamp() / 1_000; - chain.insert( - Block { - hash: hex!("0000000000000000000000000000000000000000000000000000000000000000") - .into(), - number: 123, - timestamp: now - 1, - }, - Address::default().into(), - ); - chain.insert( - Block { - hash: hex!("0000000000000000000000000000000000000000000000000000000000000001") - .into(), - number: 124, - timestamp: now, - }, - Address::default().into(), - ); - - let tests = [ - ( - r#"{ - bundle0: bundle(id:"1" block:{number:123}) { ethPriceUSD } - bundle1: bundle(id:"1") { ethPriceUSD } - }"#, - BlockRequirements { - latest: true, - number_gte: None, - range: Some((123, 123)), - }, - "{\n bundle0: bundle(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000000\" }, id: \"1\") {\n ethPriceUSD\n }\n bundle1: bundle(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000001\" }, id: \"1\") {\n ethPriceUSD\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n", - ), - ( - r#"{ - bundle0: bundle(id:"1" block:{number:125}) { ethPriceUSD } - }"#, - BlockRequirements { - latest: true, - number_gte: None, - range: Some((125, 125)), - }, - "{\n bundle0: bundle(block: { number: 125 }, id: \"1\") {\n ethPriceUSD\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n", - ), - ( - r#"{ bundle(block:{number_gte:125}) { ethPriceUSD } }"#, - BlockRequirements { - latest: true, - number_gte: Some(125), - range: None, - }, - "{\n bundle(block: { number_gte: 125 }) {\n ethPriceUSD\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n", - ), - ( - r#"query GetTopSales { - events(where: { type: "Sale" }, first: 1, orderBy: value, orderDirection: desc) { - type - } - }"#, - BlockRequirements { - latest: true, - number_gte: None, - range: None, - }, - "query GetTopSales {\n events(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000001\" }, where: {type: \"Sale\"}, first: 1, orderBy: value, orderDirection: desc) {\n type\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n", - ), - ( - r#" - fragment Foo on Delegation { - id - } - { - delegations(first: 1) { - delegator - ...Foo - } - } - "#, - BlockRequirements { - latest: true, - number_gte: None, - range: None, - }, - "fragment Foo on Delegation {\n id\n}\n{\n delegations(block: { hash: \"0x0000000000000000000000000000000000000000000000000000000000000001\" }, first: 1) {\n delegator\n ...Foo\n }\n _gateway_probe_: _meta { block { hash number timestamp } }\n}\n", - ), - ]; - - for (client_query, requirements, expected_indexer_query) in tests { - let context = Context::new(client_query, "").unwrap(); - let indexer_request = rewrite_query(&chain, &context, &requirements, 0); - let doc = serde_json::from_str::(&indexer_request).unwrap(); - let doc = doc - .as_object() - .and_then(|o| o.get("query")?.as_str()) - .unwrap(); - println!("{}", doc); - assert!(Context::new(doc, "").is_ok()); - assert_eq!(doc, expected_indexer_query); - } - } } diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index f2a41daa..d6dfd31a 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -312,8 +312,9 @@ async fn run_indexer_queries( tracing::debug!(?candidates); } + let indexer_query = rewrite_query(&client_request.query, &client_request.variables); + let mut indexer_requests: Vec = Default::default(); - let mut indexer_request_rewrites: BTreeMap = Default::default(); let mut client_response_time: Option = None; // If a client query cannot be handled by the available indexers, we should give a reason for @@ -356,24 +357,8 @@ async fn run_indexer_queries( debug_assert!(fee == receipt.grt_value()); let blocks_behind = blocks_behind(seconds_behind, blocks_per_minute); - let indexer_query = match indexer_request_rewrites.get(&seconds_behind) { - Some(indexer_query) => indexer_query.clone(), - None => { - let chain = chain.read(); - let indexer_query = - rewrite_query(&chain, &agora_context, &block_requirements, blocks_behind); - if selections - .iter() - .filter(|s| s.seconds_behind == seconds_behind) - .count() - > 1 - { - indexer_request_rewrites.insert(seconds_behind, indexer_query.clone()); - } - indexer_query - } - }; let indexer_client = ctx.indexer_client.clone(); + let indexer_query = indexer_query.clone(); let tx = tx.clone(); tokio::spawn( async move {