Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: internal request execution API error types refactor #1157

Merged
merged 17 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ use crate::cluster::{ClusterState, NodeRef};
use crate::cql_to_rust::{FromRow, FromRowError};
use crate::deserialize::DeserializeOwnedRow;
use crate::errors::ProtocolError;
use crate::errors::{QueryError, UserRequestError};
use crate::errors::{QueryError, RequestAttemptError};
use crate::frame::response::result;
use crate::network::Connection;
use crate::observability::driver_tracing::RequestSpan;
use crate::observability::history::{self, HistoryListener};
use crate::observability::metrics::Metrics;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{QueryInfo, RetryDecision, RetrySession};
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
use crate::response::query_result::ColumnSpecs;
use crate::response::{NonErrorQueryResponse, QueryResponse};
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
Expand Down Expand Up @@ -135,7 +135,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
sender: ProvingSender<Result<ReceivedPage, QueryError>>,

// Closure used to perform a single page query
// AsyncFn(Arc<Connection>, Option<Arc<[u8]>>) -> Result<QueryResponse, UserRequestError>
// AsyncFn(Arc<Connection>, Option<Arc<[u8]>>) -> Result<QueryResponse, RequestAttemptError>
page_query: QueryFunc,

statement_info: RoutingInfo<'a>,
Expand All @@ -158,7 +158,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
impl<QueryFunc, QueryFut, SpanCreator> PagerWorker<'_, QueryFunc, SpanCreator>
where
QueryFunc: Fn(Arc<Connection>, Consistency, PagingState) -> QueryFut,
QueryFut: Future<Output = Result<QueryResponse, UserRequestError>>,
QueryFut: Future<Output = Result<QueryResponse, RequestAttemptError>>,
SpanCreator: Fn() -> RequestSpan,
{
// Contract: this function MUST send at least one item through self.sender
Expand Down Expand Up @@ -199,14 +199,14 @@ where
'same_node_retries: loop {
trace!(parent: &span, "Execution started");
// Query pages until an error occurs
let queries_result: Result<PageSendAttemptedProof, QueryError> = self
let queries_result: Result<PageSendAttemptedProof, RequestAttemptError> = self
.query_pages(&connection, current_consistency, node)
.instrument(span.clone())
.await;

last_error = match queries_result {
let request_error: RequestAttemptError = match queries_result {
Ok(proof) => {
trace!(parent: &span, "Query succeeded");
trace!(parent: &span, "Request succeeded");
// query_pages returned Ok, so we are guaranteed
// that it attempted to send at least one page
// through self.sender and we can safely return now.
Expand All @@ -216,15 +216,15 @@ where
trace!(
parent: &span,
error = %error,
"Query failed"
"Request failed"
);
error
}
};

// Use retry policy to decide what to do next
let query_info = QueryInfo {
error: &last_error,
let query_info = RequestInfo {
error: &request_error,
is_idempotent: self.query_is_idempotent,
consistency: self.query_consistency,
};
Expand All @@ -234,7 +234,10 @@ where
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
);

last_error = request_error.into_query_error();
self.log_attempt_error(&last_error, &retry_decision);

match retry_decision {
RetryDecision::RetrySameNode(cl) => {
self.metrics.inc_retries_num();
Expand Down Expand Up @@ -278,7 +281,7 @@ where
connection: &Arc<Connection>,
consistency: Consistency,
node: NodeRef<'_>,
) -> Result<PageSendAttemptedProof, QueryError> {
) -> Result<PageSendAttemptedProof, RequestAttemptError> {
loop {
let request_span = (self.span_creator)();
match self
Expand All @@ -298,7 +301,7 @@ where
consistency: Consistency,
node: NodeRef<'_>,
request_span: &RequestSpan,
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, QueryError> {
) -> Result<ControlFlow<PageSendAttemptedProof, ()>, RequestAttemptError> {
self.metrics.inc_total_paged_queries();
let query_start = std::time::Instant::now();

Expand Down Expand Up @@ -329,7 +332,7 @@ where
self.log_query_success();
self.execution_profile
.load_balancing_policy
.on_query_success(&self.statement_info, elapsed, node);
.on_request_success(&self.statement_info, elapsed, node);

request_span.record_raw_rows_fields(&rows);

Expand Down Expand Up @@ -359,11 +362,10 @@ where
Ok(ControlFlow::Continue(()))
}
Err(err) => {
let err = err.into();
self.metrics.inc_failed_paged_queries();
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
.on_request_failure(&self.statement_info, elapsed, node, &err);
Err(err)
}
Ok(NonErrorQueryResponse {
Expand All @@ -381,10 +383,10 @@ where
Ok(response) => {
self.metrics.inc_failed_paged_queries();
let err =
ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into();
RequestAttemptError::UnexpectedResponse(response.response.to_response_kind());
self.execution_profile
.load_balancing_policy
.on_query_failure(&self.statement_info, elapsed, node, &err);
.on_request_failure(&self.statement_info, elapsed, node, &err);
Err(err)
}
}
Expand Down Expand Up @@ -482,7 +484,7 @@ struct SingleConnectionPagerWorker<Fetcher> {
impl<Fetcher, FetchFut> SingleConnectionPagerWorker<Fetcher>
where
Fetcher: Fn(PagingState) -> FetchFut + Send + Sync,
FetchFut: Future<Output = Result<QueryResponse, UserRequestError>> + Send,
FetchFut: Future<Output = Result<QueryResponse, RequestAttemptError>> + Send,
{
async fn work(mut self) -> PageSendAttemptedProof {
match self.do_work().await {
Expand All @@ -497,8 +499,12 @@ where
async fn do_work(&mut self) -> Result<PageSendAttemptedProof, QueryError> {
let mut paging_state = PagingState::start();
loop {
let result = (self.fetcher)(paging_state).await?;
let response = result.into_non_error_query_response()?;
let result = (self.fetcher)(paging_state)
.await
.map_err(RequestAttemptError::into_query_error)?;
let response = result
.into_non_error_query_response()
.map_err(RequestAttemptError::into_query_error)?;
match response.response {
NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => {
let (proof, send_result) = self
Expand Down
54 changes: 32 additions & 22 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::cluster::node::CloudEndpoint;
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
use crate::errors::{
BadQuery, NewSessionError, ProtocolError, QueryError, TracingProtocolError, UserRequestError,
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, TracingProtocolError,
};
use crate::frame::response::result;
#[cfg(feature = "ssl")]
Expand All @@ -28,7 +28,7 @@ use crate::observability::tracing::TracingInfo;
use crate::policies::address_translator::AddressTranslator;
use crate::policies::host_filter::HostFilter;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{QueryInfo, RetryDecision, RetrySession};
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
use crate::policies::speculative_execution;
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
Expand Down Expand Up @@ -1138,7 +1138,6 @@ where
)
.await
.and_then(QueryResponse::into_non_error_query_response)
.map_err(Into::into)
} else {
let prepared = connection.prepare(query_ref).await?;
let serialized = prepared.serialize_values(values_ref)?;
Expand All @@ -1154,7 +1153,6 @@ where
)
.await
.and_then(QueryResponse::into_non_error_query_response)
.map_err(Into::into)
}
}
},
Expand All @@ -1175,7 +1173,9 @@ where
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_query_error)?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1303,9 +1303,10 @@ where

// Safety: there is at least one node in the cluster, and `Cluster::iter_working_connections()`
// returns either an error or an iterator with at least one connection, so there will be at least one result.
let first_ok: Result<PreparedStatement, UserRequestError> =
let first_ok: Result<PreparedStatement, RequestAttemptError> =
results.by_ref().find_or_first(Result::is_ok).unwrap();
let mut prepared: PreparedStatement = first_ok?;
let mut prepared: PreparedStatement =
first_ok.map_err(RequestAttemptError::into_query_error)?;

// Validate prepared ids equality
for statement in results.flatten() {
Expand Down Expand Up @@ -1456,7 +1457,6 @@ where
)
.await
.and_then(QueryResponse::into_non_error_query_response)
.map_err(Into::into)
}
},
&span,
Expand All @@ -1476,7 +1476,9 @@ where
self.handle_set_keyspace_response(&response).await?;
self.handle_auto_await_schema_agreement(&response).await?;

let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
let (result, paging_state_response) = response
.into_query_result_and_paging_state()
.map_err(RequestAttemptError::into_query_error)?;
span.record_result_fields(&result);

Ok((result, paging_state_response))
Expand Down Expand Up @@ -1556,7 +1558,7 @@ where

let span = RequestSpan::new_batch();

let run_request_result = self
let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
.run_request(
statement_info,
&batch.config,
Expand All @@ -1577,6 +1579,7 @@ where
serial_consistency,
)
.await
.and_then(QueryResponse::into_non_error_query_response)
}
},
&span,
Expand All @@ -1586,7 +1589,8 @@ where

let result = match run_request_result {
RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
RunRequestResult::Completed(result) => {
RunRequestResult::Completed(non_error_query_response) => {
let result = non_error_query_response.into_query_result()?;
span.record_result_fields(&result);
result
}
Expand Down Expand Up @@ -1844,7 +1848,7 @@ where
request_span: &'a RequestSpan,
) -> Result<RunRequestResult<ResT>, QueryError>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
Expand Down Expand Up @@ -2008,7 +2012,7 @@ where
mut context: ExecuteRequestContext<'a>,
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
where
QueryFut: Future<Output = Result<ResT, QueryError>>,
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let mut last_error: Option<QueryError> = None;
Expand Down Expand Up @@ -2045,18 +2049,18 @@ where
);
let attempt_id: Option<history::AttemptId> =
context.log_attempt_start(connection.get_connect_address());
let request_result: Result<ResT, QueryError> =
let request_result: Result<ResT, RequestAttemptError> =
run_request_once(connection, current_consistency, execution_profile)
.instrument(span.clone())
.await;

let elapsed = request_start.elapsed();
last_error = match request_result {
let request_error: RequestAttemptError = match request_result {
Ok(response) => {
trace!(parent: &span, "Request succeeded");
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
context.log_attempt_success(&attempt_id);
execution_profile.load_balancing_policy.on_query_success(
execution_profile.load_balancing_policy.on_request_success(
context.query_info,
elapsed,
node,
Expand All @@ -2070,20 +2074,19 @@ where
"Request failed"
);
self.metrics.inc_failed_nonpaged_queries();
execution_profile.load_balancing_policy.on_query_failure(
execution_profile.load_balancing_policy.on_request_failure(
context.query_info,
elapsed,
node,
&e,
);
Some(e)
e
}
};

let the_error: &QueryError = last_error.as_ref().unwrap();
// Use retry policy to decide what to do next
let query_info = QueryInfo {
error: the_error,
let query_info = RequestInfo {
error: &request_error,
is_idempotent: context.is_idempotent,
consistency: context
.consistency_set_on_statement
Expand All @@ -2095,7 +2098,14 @@ where
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
);
context.log_attempt_error(&attempt_id, the_error, &retry_decision);

last_error = Some(request_error.into_query_error());
context.log_attempt_error(
&attempt_id,
last_error.as_ref().unwrap(),
&retry_decision,
);

match retry_decision {
RetryDecision::RetrySameNode(new_cl) => {
self.metrics.inc_retries_num();
Expand Down
4 changes: 2 additions & 2 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType,
use crate::deserialize::DeserializeOwnedValue;
use crate::errors::{BadKeyspaceName, BadQuery, DbError, QueryError};
use crate::observability::tracing::TracingInfo;
use crate::policies::retry::{QueryInfo, RetryDecision, RetryPolicy, RetrySession};
use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
use crate::prepared_statement::PreparedStatement;
use crate::query::Query;
use crate::routing::partitioner::{
Expand Down Expand Up @@ -2725,7 +2725,7 @@ async fn test_iter_works_when_retry_policy_returns_ignore_write_error() {

struct MyRetrySession(Arc<AtomicBool>);
impl RetrySession for MyRetrySession {
fn decide_should_retry(&mut self, _: QueryInfo) -> RetryDecision {
fn decide_should_retry(&mut self, _: RequestInfo) -> RetryDecision {
self.0.store(true, Ordering::Relaxed);
RetryDecision::IgnoreWriteError
}
Expand Down
7 changes: 5 additions & 2 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::client::pager::QueryPager;
use crate::cluster::node::resolve_contact_points;
use crate::deserialize::DeserializeOwnedRow;
use crate::errors::{DbError, NewSessionError, QueryError};
use crate::errors::{DbError, NewSessionError, QueryError, RequestAttemptError};
use crate::frame::response::event::Event;
use crate::network::{Connection, ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize};
use crate::policies::host_filter::HostFilter;
Expand Down Expand Up @@ -986,7 +986,10 @@ where
let mut query = Query::new(query_str);
query.set_page_size(METADATA_QUERY_PAGE_SIZE);

let prepared = conn.prepare(&query).await?;
let prepared = conn
.prepare(&query)
.await
.map_err(RequestAttemptError::into_query_error)?;
let serialized_values = prepared.serialize_values(&keyspaces)?;
conn.execute_iter(prepared, serialized_values).await
}
Expand Down
Loading
Loading