diff --git a/scylla/src/client/pager.rs b/scylla/src/client/pager.rs index e229090c90..039d747590 100644 --- a/scylla/src/client/pager.rs +++ b/scylla/src/client/pager.rs @@ -30,14 +30,14 @@ use crate::cluster::{ClusterState, NodeRef}; use crate::cql_to_rust::{FromRow, FromRowError}; use crate::deserialize::DeserializeOwnedRow; use crate::errors::ProtocolError; -use crate::errors::{QueryError, UserRequestError}; +use crate::errors::{QueryError, RequestAttemptError}; use crate::frame::response::result; use crate::network::Connection; use crate::observability::driver_tracing::RequestSpan; use crate::observability::history::{self, HistoryListener}; use crate::observability::metrics::Metrics; use crate::policies::load_balancing::{self, RoutingInfo}; -use crate::policies::retry::{QueryInfo, RetryDecision, RetrySession}; +use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession}; use crate::response::query_result::ColumnSpecs; use crate::response::{NonErrorQueryResponse, QueryResponse}; use crate::statement::{prepared_statement::PreparedStatement, query::Query}; @@ -135,7 +135,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> { sender: ProvingSender>, // Closure used to perform a single page query - // AsyncFn(Arc, Option>) -> Result + // AsyncFn(Arc, Option>) -> Result page_query: QueryFunc, statement_info: RoutingInfo<'a>, @@ -158,7 +158,7 @@ struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> { impl PagerWorker<'_, QueryFunc, SpanCreator> where QueryFunc: Fn(Arc, Consistency, PagingState) -> QueryFut, - QueryFut: Future>, + QueryFut: Future>, SpanCreator: Fn() -> RequestSpan, { // Contract: this function MUST send at least one item through self.sender @@ -199,14 +199,14 @@ where 'same_node_retries: loop { trace!(parent: &span, "Execution started"); // Query pages until an error occurs - let queries_result: Result = self + let queries_result: Result = self .query_pages(&connection, current_consistency, node) .instrument(span.clone()) .await; - last_error = match queries_result { + let request_error: RequestAttemptError = match queries_result { Ok(proof) => { - trace!(parent: &span, "Query succeeded"); + trace!(parent: &span, "Request succeeded"); // query_pages returned Ok, so we are guaranteed // that it attempted to send at least one page // through self.sender and we can safely return now. @@ -216,15 +216,15 @@ where trace!( parent: &span, error = %error, - "Query failed" + "Request failed" ); error } }; // Use retry policy to decide what to do next - let query_info = QueryInfo { - error: &last_error, + let query_info = RequestInfo { + error: &request_error, is_idempotent: self.query_is_idempotent, consistency: self.query_consistency, }; @@ -234,7 +234,10 @@ where parent: &span, retry_decision = format!("{:?}", retry_decision).as_str() ); + + last_error = request_error.into_query_error(); self.log_attempt_error(&last_error, &retry_decision); + match retry_decision { RetryDecision::RetrySameNode(cl) => { self.metrics.inc_retries_num(); @@ -278,7 +281,7 @@ where connection: &Arc, consistency: Consistency, node: NodeRef<'_>, - ) -> Result { + ) -> Result { loop { let request_span = (self.span_creator)(); match self @@ -298,7 +301,7 @@ where consistency: Consistency, node: NodeRef<'_>, request_span: &RequestSpan, - ) -> Result, QueryError> { + ) -> Result, RequestAttemptError> { self.metrics.inc_total_paged_queries(); let query_start = std::time::Instant::now(); @@ -329,7 +332,7 @@ where self.log_query_success(); self.execution_profile .load_balancing_policy - .on_query_success(&self.statement_info, elapsed, node); + .on_request_success(&self.statement_info, elapsed, node); request_span.record_raw_rows_fields(&rows); @@ -359,11 +362,10 @@ where Ok(ControlFlow::Continue(())) } Err(err) => { - let err = err.into(); self.metrics.inc_failed_paged_queries(); self.execution_profile .load_balancing_policy - .on_query_failure(&self.statement_info, elapsed, node, &err); + .on_request_failure(&self.statement_info, elapsed, node, &err); Err(err) } Ok(NonErrorQueryResponse { @@ -381,10 +383,10 @@ where Ok(response) => { self.metrics.inc_failed_paged_queries(); let err = - ProtocolError::UnexpectedResponse(response.response.to_response_kind()).into(); + RequestAttemptError::UnexpectedResponse(response.response.to_response_kind()); self.execution_profile .load_balancing_policy - .on_query_failure(&self.statement_info, elapsed, node, &err); + .on_request_failure(&self.statement_info, elapsed, node, &err); Err(err) } } @@ -482,7 +484,7 @@ struct SingleConnectionPagerWorker { impl SingleConnectionPagerWorker where Fetcher: Fn(PagingState) -> FetchFut + Send + Sync, - FetchFut: Future> + Send, + FetchFut: Future> + Send, { async fn work(mut self) -> PageSendAttemptedProof { match self.do_work().await { @@ -497,8 +499,12 @@ where async fn do_work(&mut self) -> Result { let mut paging_state = PagingState::start(); loop { - let result = (self.fetcher)(paging_state).await?; - let response = result.into_non_error_query_response()?; + let result = (self.fetcher)(paging_state) + .await + .map_err(RequestAttemptError::into_query_error)?; + let response = result + .into_non_error_query_response() + .map_err(RequestAttemptError::into_query_error)?; match response.response { NonErrorResponse::Result(result::Result::Rows((rows, paging_state_response))) => { let (proof, send_result) = self diff --git a/scylla/src/client/session.rs b/scylla/src/client/session.rs index a0a29db4d1..2ba11889b7 100644 --- a/scylla/src/client/session.rs +++ b/scylla/src/client/session.rs @@ -15,7 +15,7 @@ use crate::cluster::node::CloudEndpoint; use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef}; use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState}; use crate::errors::{ - BadQuery, NewSessionError, ProtocolError, QueryError, TracingProtocolError, UserRequestError, + BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, TracingProtocolError, }; use crate::frame::response::result; #[cfg(feature = "ssl")] @@ -28,7 +28,7 @@ use crate::observability::tracing::TracingInfo; use crate::policies::address_translator::AddressTranslator; use crate::policies::host_filter::HostFilter; use crate::policies::load_balancing::{self, RoutingInfo}; -use crate::policies::retry::{QueryInfo, RetryDecision, RetrySession}; +use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession}; use crate::policies::speculative_execution; use crate::prepared_statement::PreparedStatement; use crate::query::Query; @@ -1138,7 +1138,6 @@ where ) .await .and_then(QueryResponse::into_non_error_query_response) - .map_err(Into::into) } else { let prepared = connection.prepare(query_ref).await?; let serialized = prepared.serialize_values(values_ref)?; @@ -1154,7 +1153,6 @@ where ) .await .and_then(QueryResponse::into_non_error_query_response) - .map_err(Into::into) } } }, @@ -1175,7 +1173,9 @@ where self.handle_set_keyspace_response(&response).await?; self.handle_auto_await_schema_agreement(&response).await?; - let (result, paging_state_response) = response.into_query_result_and_paging_state()?; + let (result, paging_state_response) = response + .into_query_result_and_paging_state() + .map_err(RequestAttemptError::into_query_error)?; span.record_result_fields(&result); Ok((result, paging_state_response)) @@ -1303,9 +1303,10 @@ where // Safety: there is at least one node in the cluster, and `Cluster::iter_working_connections()` // returns either an error or an iterator with at least one connection, so there will be at least one result. - let first_ok: Result = + let first_ok: Result = results.by_ref().find_or_first(Result::is_ok).unwrap(); - let mut prepared: PreparedStatement = first_ok?; + let mut prepared: PreparedStatement = + first_ok.map_err(RequestAttemptError::into_query_error)?; // Validate prepared ids equality for statement in results.flatten() { @@ -1456,7 +1457,6 @@ where ) .await .and_then(QueryResponse::into_non_error_query_response) - .map_err(Into::into) } }, &span, @@ -1476,7 +1476,9 @@ where self.handle_set_keyspace_response(&response).await?; self.handle_auto_await_schema_agreement(&response).await?; - let (result, paging_state_response) = response.into_query_result_and_paging_state()?; + let (result, paging_state_response) = response + .into_query_result_and_paging_state() + .map_err(RequestAttemptError::into_query_error)?; span.record_result_fields(&result); Ok((result, paging_state_response)) @@ -1556,7 +1558,7 @@ where let span = RequestSpan::new_batch(); - let run_request_result = self + let run_request_result: RunRequestResult = self .run_request( statement_info, &batch.config, @@ -1577,6 +1579,7 @@ where serial_consistency, ) .await + .and_then(QueryResponse::into_non_error_query_response) } }, &span, @@ -1586,7 +1589,8 @@ where let result = match run_request_result { RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(), - RunRequestResult::Completed(result) => { + RunRequestResult::Completed(non_error_query_response) => { + let result = non_error_query_response.into_query_result()?; span.record_result_fields(&result); result } @@ -1844,7 +1848,7 @@ where request_span: &'a RequestSpan, ) -> Result, QueryError> where - QueryFut: Future>, + QueryFut: Future>, ResT: AllowedRunRequestResTType, { let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> = @@ -2008,7 +2012,7 @@ where mut context: ExecuteRequestContext<'a>, ) -> Option, QueryError>> where - QueryFut: Future>, + QueryFut: Future>, ResT: AllowedRunRequestResTType, { let mut last_error: Option = None; @@ -2045,18 +2049,18 @@ where ); let attempt_id: Option = context.log_attempt_start(connection.get_connect_address()); - let request_result: Result = + let request_result: Result = run_request_once(connection, current_consistency, execution_profile) .instrument(span.clone()) .await; let elapsed = request_start.elapsed(); - last_error = match request_result { + let request_error: RequestAttemptError = match request_result { Ok(response) => { trace!(parent: &span, "Request succeeded"); let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); context.log_attempt_success(&attempt_id); - execution_profile.load_balancing_policy.on_query_success( + execution_profile.load_balancing_policy.on_request_success( context.query_info, elapsed, node, @@ -2070,20 +2074,19 @@ where "Request failed" ); self.metrics.inc_failed_nonpaged_queries(); - execution_profile.load_balancing_policy.on_query_failure( + execution_profile.load_balancing_policy.on_request_failure( context.query_info, elapsed, node, &e, ); - Some(e) + e } }; - let the_error: &QueryError = last_error.as_ref().unwrap(); // Use retry policy to decide what to do next - let query_info = QueryInfo { - error: the_error, + let query_info = RequestInfo { + error: &request_error, is_idempotent: context.is_idempotent, consistency: context .consistency_set_on_statement @@ -2095,7 +2098,14 @@ where parent: &span, retry_decision = format!("{:?}", retry_decision).as_str() ); - context.log_attempt_error(&attempt_id, the_error, &retry_decision); + + last_error = Some(request_error.into_query_error()); + context.log_attempt_error( + &attempt_id, + last_error.as_ref().unwrap(), + &retry_decision, + ); + match retry_decision { RetryDecision::RetrySameNode(new_cl) => { self.metrics.inc_retries_num(); diff --git a/scylla/src/client/session_test.rs b/scylla/src/client/session_test.rs index 677acef49b..5f7e6f90cf 100644 --- a/scylla/src/client/session_test.rs +++ b/scylla/src/client/session_test.rs @@ -9,7 +9,7 @@ use crate::cluster::metadata::{CollectionType, ColumnKind, CqlType, NativeType, use crate::deserialize::DeserializeOwnedValue; use crate::errors::{BadKeyspaceName, BadQuery, DbError, QueryError}; use crate::observability::tracing::TracingInfo; -use crate::policies::retry::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; +use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}; use crate::prepared_statement::PreparedStatement; use crate::query::Query; use crate::routing::partitioner::{ @@ -2725,7 +2725,7 @@ async fn test_iter_works_when_retry_policy_returns_ignore_write_error() { struct MyRetrySession(Arc); impl RetrySession for MyRetrySession { - fn decide_should_retry(&mut self, _: QueryInfo) -> RetryDecision { + fn decide_should_retry(&mut self, _: RequestInfo) -> RetryDecision { self.0.store(true, Ordering::Relaxed); RetryDecision::IgnoreWriteError } diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index d5d4c0e091..31657b33be 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -19,7 +19,7 @@ use crate::client::pager::QueryPager; use crate::cluster::node::resolve_contact_points; use crate::deserialize::DeserializeOwnedRow; -use crate::errors::{DbError, NewSessionError, QueryError}; +use crate::errors::{DbError, NewSessionError, QueryError, RequestAttemptError}; use crate::frame::response::event::Event; use crate::network::{Connection, ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize}; use crate::policies::host_filter::HostFilter; @@ -986,7 +986,10 @@ where let mut query = Query::new(query_str); query.set_page_size(METADATA_QUERY_PAGE_SIZE); - let prepared = conn.prepare(&query).await?; + let prepared = conn + .prepare(&query) + .await + .map_err(RequestAttemptError::into_query_error)?; let serialized_values = prepared.serialize_values(&keyspaces)?; conn.execute_iter(prepared, serialized_values).await } diff --git a/scylla/src/errors.rs b/scylla/src/errors.rs index 032b16bede..92e162f73b 100644 --- a/scylla/src/errors.rs +++ b/scylla/src/errors.rs @@ -145,33 +145,6 @@ impl From for QueryError { } } -impl From for QueryError { - fn from(value: UserRequestError) -> Self { - match value { - UserRequestError::CqlRequestSerialization(e) => e.into(), - UserRequestError::DbError(err, msg) => QueryError::DbError(err, msg), - UserRequestError::CqlResultParseError(e) => e.into(), - UserRequestError::CqlErrorParseError(e) => e.into(), - UserRequestError::BrokenConnectionError(e) => e.into(), - UserRequestError::UnexpectedResponse(response) => { - ProtocolError::UnexpectedResponse(response).into() - } - UserRequestError::BodyExtensionsParseError(e) => e.into(), - UserRequestError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId, - UserRequestError::RepreparedIdChanged { - statement, - expected_id, - reprepared_id, - } => ProtocolError::RepreparedIdChanged { - statement, - expected_id, - reprepared_id, - } - .into(), - } - } -} - impl From for NewSessionError { fn from(query_error: QueryError) -> NewSessionError { match query_error { @@ -898,33 +871,58 @@ pub enum CqlEventHandlingError { SendError, } -/// An error type that occurred when executing one of: -/// - QUERY -/// - PREPARE -/// - EXECUTE -/// - BATCH +/// An error type that occurred during single attempt of: +/// - `QUERY` +/// - `PREPARE` +/// - `EXECUTE` +/// - `BATCH` /// -/// requests. -#[derive(Error, Debug)] -pub(crate) enum UserRequestError { +/// requests. The retry decision is made based +/// on this error. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum RequestAttemptError { + /// Failed to serialize query parameters. This error occurs, when user executes + /// a CQL `QUERY` request with non-empty parameter's value list and the serialization + /// of provided values fails during statement preparation. + #[error("Failed to serialize query parameters: {0}")] + SerializationError(#[from] SerializationError), + + /// Failed to serialize CQL request. #[error("Failed to serialize CQL request: {0}")] CqlRequestSerialization(#[from] CqlRequestSerializationError), - #[error("Database returned an error: {0}, Error message: {1}")] - DbError(DbError, String), + + /// Driver was unable to allocate a stream id to execute a query on. + #[error("Unable to allocate stream id")] + UnableToAllocStreamId, + + /// A connection has been broken during query execution. + #[error(transparent)] + BrokenConnectionError(#[from] BrokenConnectionError), + + /// Failed to deserialize frame body extensions. + #[error(transparent)] + BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError), + + /// Received a RESULT server response, but failed to deserialize it. #[error(transparent)] CqlResultParseError(#[from] CqlResultParseError), + + /// Received an ERROR server response, but failed to deserialize it. #[error("Failed to deserialize ERROR response: {0}")] CqlErrorParseError(#[from] CqlErrorParseError), + + /// Database sent a response containing some error with a message + #[error("Database returned an error: {0}, Error message: {1}")] + DbError(DbError, String), + + /// Received an unexpected response from the server. #[error( "Received unexpected response from the server: {0}. Expected RESULT or ERROR response." )] UnexpectedResponse(CqlResponseKind), - #[error(transparent)] - BrokenConnectionError(#[from] BrokenConnectionError), - #[error(transparent)] - BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError), - #[error("Unable to allocate stream id")] - UnableToAllocStreamId, + + /// Prepared statement id changed after repreparation. #[error( "Prepared statement id changed after repreparation; md5 sum (computed from the query string) should stay the same;\ Statement: \"{statement}\"; expected id: {expected_id:?}; reprepared id: {reprepared_id:?}" @@ -934,15 +932,52 @@ pub(crate) enum UserRequestError { expected_id: Vec, reprepared_id: Vec, }, + + /// Driver tried to reprepare a statement in the batch, but the reprepared + /// statement's id is not included in the batch. + #[error("Reprepared statement's id does not exist in the batch.")] + RepreparedIdMissingInBatch, } -impl From for UserRequestError { +impl RequestAttemptError { + /// Converts the error to [`QueryError`]. + pub fn into_query_error(self) -> QueryError { + match self { + RequestAttemptError::CqlRequestSerialization(e) => e.into(), + RequestAttemptError::DbError(err, msg) => QueryError::DbError(err, msg), + RequestAttemptError::CqlResultParseError(e) => e.into(), + RequestAttemptError::CqlErrorParseError(e) => e.into(), + RequestAttemptError::BrokenConnectionError(e) => e.into(), + RequestAttemptError::UnexpectedResponse(response) => { + ProtocolError::UnexpectedResponse(response).into() + } + RequestAttemptError::BodyExtensionsParseError(e) => e.into(), + RequestAttemptError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId, + RequestAttemptError::RepreparedIdChanged { + statement, + expected_id, + reprepared_id, + } => ProtocolError::RepreparedIdChanged { + statement, + expected_id, + reprepared_id, + } + .into(), + RequestAttemptError::RepreparedIdMissingInBatch => { + ProtocolError::RepreparedIdMissingInBatch.into() + } + RequestAttemptError::SerializationError(e) => e.into(), + } + } +} + +impl From for RequestAttemptError { fn from(value: response::error::Error) -> Self { - UserRequestError::DbError(value.error, value.reason) + RequestAttemptError::DbError(value.error, value.reason) } } -impl From for UserRequestError { +impl From for RequestAttemptError { fn from(value: InternalRequestError) -> Self { match value { InternalRequestError::CqlRequestSerialization(e) => e.into(), @@ -952,10 +987,12 @@ impl From for UserRequestError { // other response, treat it as unexpected response. CqlResponseParseError::CqlErrorParseError(e) => e.into(), CqlResponseParseError::CqlResultParseError(e) => e.into(), - _ => UserRequestError::UnexpectedResponse(e.to_response_kind()), + _ => RequestAttemptError::UnexpectedResponse(e.to_response_kind()), }, InternalRequestError::BrokenConnection(e) => e.into(), - InternalRequestError::UnableToAllocStreamId => UserRequestError::UnableToAllocStreamId, + InternalRequestError::UnableToAllocStreamId => { + RequestAttemptError::UnableToAllocStreamId + } } } } diff --git a/scylla/src/network/connection.rs b/scylla/src/network/connection.rs index 5f0b3f760c..bb9dc20b34 100644 --- a/scylla/src/network/connection.rs +++ b/scylla/src/network/connection.rs @@ -10,8 +10,8 @@ use crate::cluster::NodeAddr; use crate::errors::{ BadKeyspaceName, BrokenConnectionError, BrokenConnectionErrorKind, ConnectionError, ConnectionSetupRequestError, ConnectionSetupRequestErrorKind, CqlEventHandlingError, DbError, - InternalRequestError, ProtocolError, QueryError, ResponseParseError, SchemaVersionFetchError, - TranslationError, UseKeyspaceProtocolError, UserRequestError, + InternalRequestError, ProtocolError, QueryError, RequestAttemptError, ResponseParseError, + SchemaVersionFetchError, TranslationError, UseKeyspaceProtocolError, }; use crate::frame::protocol_features::ProtocolFeatures; use crate::frame::{ @@ -603,7 +603,7 @@ impl Connection { pub(crate) async fn prepare( &self, query: &Query, - ) -> Result { + ) -> Result { let query_response = self .send_request( &request::Prepare { @@ -617,7 +617,7 @@ impl Connection { let mut prepared_statement = match query_response.response { Response::Error(error::Error { error, reason }) => { - return Err(UserRequestError::DbError(error, reason)) + return Err(RequestAttemptError::DbError(error, reason)) } Response::Result(result::Result::Prepared(p)) => PreparedStatement::new( p.id, @@ -631,7 +631,7 @@ impl Connection { query.config.clone(), ), _ => { - return Err(UserRequestError::UnexpectedResponse( + return Err(RequestAttemptError::UnexpectedResponse( query_response.response.to_response_kind(), )) } @@ -647,13 +647,13 @@ impl Connection { &self, query: impl Into, previous_prepared: &PreparedStatement, - ) -> Result<(), UserRequestError> { + ) -> Result<(), RequestAttemptError> { let reprepare_query: Query = query.into(); let reprepared = self.prepare(&reprepare_query).await?; // Reprepared statement should keep its id - it's the md5 sum // of statement contents if reprepared.get_id() != previous_prepared.get_id() { - Err(UserRequestError::RepreparedIdChanged { + Err(RequestAttemptError::RepreparedIdChanged { statement: reprepare_query.contents, expected_id: previous_prepared.get_id().clone().into(), reprepared_id: reprepared.get_id().clone().into(), @@ -774,7 +774,7 @@ impl Connection { &self, query: impl Into, paging_state: PagingState, - ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { + ) -> Result<(QueryResult, PagingStateResponse), RequestAttemptError> { let query: Query = query.into(); // This method is used only for driver internal queries, so no need to consult execution profile here. @@ -799,7 +799,7 @@ impl Connection { paging_state: PagingState, consistency: Consistency, serial_consistency: Option, - ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { + ) -> Result<(QueryResult, PagingStateResponse), RequestAttemptError> { let query: Query = query.into(); let page_size = query.get_validated_page_size(); @@ -824,14 +824,14 @@ impl Connection { self.query_raw_unpaged(&query) .await - .map_err(Into::into) + .map_err(RequestAttemptError::into_query_error) .and_then(QueryResponse::into_query_result) } pub(crate) async fn query_raw_unpaged( &self, query: &Query, - ) -> Result { + ) -> Result { // This method is used only for driver internal queries, so no need to consult execution profile here. self.query_raw_with_consistency( query, @@ -852,7 +852,7 @@ impl Connection { serial_consistency: Option, page_size: Option, paging_state: PagingState, - ) -> Result { + ) -> Result { let query_frame = query::Query { contents: Cow::Borrowed(&query.contents), parameters: query::QueryParameters { @@ -882,7 +882,7 @@ impl Connection { // This method is used only for driver internal queries, so no need to consult execution profile here. self.execute_raw_unpaged(prepared, values) .await - .map_err(Into::into) + .map_err(RequestAttemptError::into_query_error) .and_then(QueryResponse::into_query_result) } @@ -891,7 +891,7 @@ impl Connection { &self, prepared: &PreparedStatement, values: SerializedValues, - ) -> Result { + ) -> Result { // This method is used only for driver internal queries, so no need to consult execution profile here. self.execute_raw_with_consistency( prepared, @@ -914,7 +914,7 @@ impl Connection { serial_consistency: Option, page_size: Option, paging_state: PagingState, - ) -> Result { + ) -> Result { let execute_frame = execute::Execute { id: prepared_statement.get_id().to_owned(), parameters: query::QueryParameters { @@ -1035,6 +1035,8 @@ impl Connection { batch.config.serial_consistency.flatten(), ) .await + .map_err(RequestAttemptError::into_query_error) + .and_then(QueryResponse::into_query_result) } pub(crate) async fn batch_with_consistency( @@ -1043,7 +1045,7 @@ impl Connection { values: impl BatchValues, consistency: Consistency, serial_consistency: Option, - ) -> Result { + ) -> Result { let batch = self.prepare_batch(init_batch, &values).await?; let contexts = batch.statements.iter().map(|bs| match bs { @@ -1068,7 +1070,7 @@ impl Connection { let query_response = self .send_request(&batch_frame, true, batch.config.tracing, None) .await - .map_err(UserRequestError::from)?; + .map_err(RequestAttemptError::from)?; return match query_response.response { Response::Error(err) => match err.error { @@ -1084,16 +1086,15 @@ impl Connection { self.reprepare(p.get_statement(), p).await?; continue; } else { - return Err(ProtocolError::RepreparedIdMissingInBatch.into()); + return Err(RequestAttemptError::RepreparedIdMissingInBatch); } } _ => Err(err.into()), }, - Response::Result(_) => Ok(query_response.into_query_result()?), - _ => Err(ProtocolError::UnexpectedResponse( + Response::Result(_) => Ok(query_response), + _ => Err(RequestAttemptError::UnexpectedResponse( query_response.response.to_response_kind(), - ) - .into()), + )), }; } } @@ -1102,7 +1103,7 @@ impl Connection { &self, init_batch: &'b Batch, values: impl BatchValues, - ) -> Result, QueryError> { + ) -> Result, RequestAttemptError> { let mut to_prepare = HashSet::<&str>::new(); { @@ -1157,7 +1158,10 @@ impl Connection { false => format!("USE {}", keyspace_name.as_str()).into(), }; - let query_response = self.query_raw_unpaged(&query).await?; + let query_response = self + .query_raw_unpaged(&query) + .await + .map_err(RequestAttemptError::into_query_error)?; Self::verify_use_keyspace_result(keyspace_name, query_response) } diff --git a/scylla/src/policies/load_balancing/default.rs b/scylla/src/policies/load_balancing/default.rs index 600c5698f3..e15fa943e4 100644 --- a/scylla/src/policies/load_balancing/default.rs +++ b/scylla/src/policies/load_balancing/default.rs @@ -6,7 +6,7 @@ use crate::cluster::ClusterState; use crate::{ cluster::metadata::Strategy, cluster::node::Node, - errors::QueryError, + errors::RequestAttemptError, routing::locator::ReplicaSet, routing::{Shard, Token}, }; @@ -559,22 +559,27 @@ or refrain from preferring datacenters (which may ban all other datacenters, if "DefaultPolicy".to_string() } - fn on_query_success(&self, _routing_info: &RoutingInfo, latency: Duration, node: NodeRef<'_>) { + fn on_request_success( + &self, + _routing_info: &RoutingInfo, + latency: Duration, + node: NodeRef<'_>, + ) { if let Some(latency_awareness) = self.latency_awareness.as_ref() { - latency_awareness.report_query(node, latency); + latency_awareness.report_request(node, latency); } } - fn on_query_failure( + fn on_request_failure( &self, _routing_info: &RoutingInfo, latency: Duration, node: NodeRef<'_>, - error: &QueryError, + error: &RequestAttemptError, ) { if let Some(latency_awareness) = self.latency_awareness.as_ref() { if LatencyAwareness::reliable_latency_measure(error) { - latency_awareness.report_query(node, latency); + latency_awareness.report_request(node, latency); } } } @@ -2554,7 +2559,7 @@ mod latency_awareness { use uuid::Uuid; use crate::cluster::node::Node; - use crate::errors::{DbError, QueryError}; + use crate::errors::{DbError, RequestAttemptError}; use crate::policies::load_balancing::NodeRef; use crate::routing::Shard; use std::{ @@ -2808,7 +2813,7 @@ mod latency_awareness { Either::Right(skipping_penalised_targets_iterator) } - pub(super) fn report_query(&self, node: &Node, latency: Duration) { + pub(super) fn report_request(&self, node: &Node, latency: Duration) { let node_avgs_guard = self.node_avgs.read().unwrap(); if let Some(previous_node_avg) = node_avgs_guard.get(&node.host_id) { // The usual path, the node has been already noticed. @@ -2839,33 +2844,27 @@ mod latency_awareness { } } - pub(crate) fn reliable_latency_measure(error: &QueryError) -> bool { + pub(crate) fn reliable_latency_measure(error: &RequestAttemptError) -> bool { match error { // "fast" errors, i.e. ones that are returned quickly after the query begins - QueryError::BadQuery(_) - | QueryError::CqlRequestSerialization(_) - | QueryError::BrokenConnection(_) - | QueryError::ConnectionPoolError(_) - | QueryError::EmptyPlan - | QueryError::UnableToAllocStreamId - | QueryError::DbError(DbError::IsBootstrapping, _) - | QueryError::DbError(DbError::Unavailable { .. }, _) - | QueryError::DbError(DbError::Unprepared { .. }, _) - | QueryError::DbError(DbError::Overloaded { .. }, _) - | QueryError::DbError(DbError::RateLimitReached { .. }, _) => false, + RequestAttemptError::CqlRequestSerialization(_) + | RequestAttemptError::BrokenConnectionError(_) + | RequestAttemptError::UnableToAllocStreamId + | RequestAttemptError::DbError(DbError::IsBootstrapping, _) + | RequestAttemptError::DbError(DbError::Unavailable { .. }, _) + | RequestAttemptError::DbError(DbError::Unprepared { .. }, _) + | RequestAttemptError::DbError(DbError::Overloaded { .. }, _) + | RequestAttemptError::DbError(DbError::RateLimitReached { .. }, _) + | RequestAttemptError::SerializationError(_) => false, // "slow" errors, i.e. ones that are returned after considerable time of query being run - #[allow(deprecated)] - QueryError::DbError(_, _) - | QueryError::CqlResultParseError(_) - | QueryError::CqlErrorParseError(_) - | QueryError::BodyExtensionsParseError(_) - | QueryError::MetadataError(_) - | QueryError::ProtocolError(_) - | QueryError::TimeoutError - | QueryError::RequestTimeout(_) - | QueryError::NextRowError(_) - | QueryError::IntoLegacyQueryResultError(_) => true, + RequestAttemptError::DbError(_, _) + | RequestAttemptError::CqlResultParseError(_) + | RequestAttemptError::CqlErrorParseError(_) + | RequestAttemptError::BodyExtensionsParseError(_) + | RequestAttemptError::RepreparedIdChanged { .. } + | RequestAttemptError::RepreparedIdMissingInBatch + | RequestAttemptError::UnexpectedResponse(_) => true, } } } diff --git a/scylla/src/policies/load_balancing/mod.rs b/scylla/src/policies/load_balancing/mod.rs index 1307c156c5..6eddb74bcc 100644 --- a/scylla/src/policies/load_balancing/mod.rs +++ b/scylla/src/policies/load_balancing/mod.rs @@ -4,7 +4,7 @@ use crate::cluster::{ClusterState, NodeRef}; use crate::{ - errors::QueryError, + errors::RequestAttemptError, routing::{Shard, Token}, }; use scylla_cql::frame::{response::result::TableSpec, types}; @@ -19,8 +19,8 @@ pub use plan::Plan; /// Represents info about statement that can be used by load balancing policies. #[derive(Default, Clone, Debug)] pub struct RoutingInfo<'a> { - /// Requested consistency information allows to route queries to the appropriate - /// datacenters. E.g. queries with a LOCAL_ONE consistency should be routed to the same + /// Requested consistency information allows to route requests to the appropriate + /// datacenters. E.g. requests with a LOCAL_ONE consistency should be routed to the same /// datacenter. pub consistency: types::Consistency, pub serial_consistency: Option, @@ -33,65 +33,65 @@ pub struct RoutingInfo<'a> { /// If, while preparing, we received from the cluster information that the statement is an LWT, /// then we can use this information for routing optimisation. Namely, an optimisation - /// can be performed: the query should be routed to the replicas in a predefined order + /// can be performed: the request should be routed to the replicas in a predefined order /// (i. e. always try first to contact replica A, then B if it fails, then C, etc.). - /// If false, the query should be routed normally. + /// If false, the request should be routed normally. /// Note: this a Scylla-specific optimisation. Therefore, the flag will be always false for Cassandra. pub is_confirmed_lwt: bool, } -/// The fallback list of nodes in the query plan. +/// The fallback list of nodes in the request plan. /// /// It is computed on-demand, only if querying the most preferred node fails /// (or when speculative execution is triggered). pub type FallbackPlan<'a> = Box, Option)> + Send + Sync + 'a>; -/// Policy that decides which nodes and shards to contact for each query. +/// Policy that decides which nodes and shards to contact for each request. /// -/// When a query is prepared to be sent to ScyllaDB/Cassandra, a `LoadBalancingPolicy` +/// When a request is prepared to be sent to ScyllaDB/Cassandra, a `LoadBalancingPolicy` /// implementation constructs a load balancing plan. That plan is a list of /// targets (target is a node + an optional shard) to which -/// the driver will try to send the query. The first elements of the plan are the targets which are +/// the driver will try to send the request. The first elements of the plan are the targets which are /// the best to contact (e.g. they might have the lowest latency). /// -/// Most queries are sent on the first try, so the query execution layer rarely needs to know more +/// Most requests are sent on the first try, so the request execution layer rarely needs to know more /// than one target from plan. To better optimize that case, `LoadBalancingPolicy` has two methods: -/// `pick` and `fallback`. `pick` returns the first target to contact for a given query, `fallback` +/// `pick` and `fallback`. `pick` returns the first target to contact for a given request, `fallback` /// returns the rest of the load balancing plan. /// /// `fallback` is called not only if a send to `pick`ed node failed (or when executing /// speculatively), but also if `pick` returns `None`. /// -/// Usually the driver needs only the first node from load balancing plan (most queries are send +/// Usually the driver needs only the first node from load balancing plan (most requests are send /// successfully, and there is no need to retry). /// -/// This trait is used to produce an iterator of nodes to contact for a given query. +/// This trait is used to produce an iterator of nodes to contact for a given request. pub trait LoadBalancingPolicy: Send + Sync + std::fmt::Debug { - /// Returns the first node to contact for a given query. + /// Returns the first node to contact for a given request. fn pick<'a>( &'a self, - query: &'a RoutingInfo, + request: &'a RoutingInfo, cluster: &'a ClusterState, ) -> Option<(NodeRef<'a>, Option)>; - /// Returns all contact-appropriate nodes for a given query. + /// Returns all contact-appropriate nodes for a given request. fn fallback<'a>( &'a self, - query: &'a RoutingInfo, + request: &'a RoutingInfo, cluster: &'a ClusterState, ) -> FallbackPlan<'a>; - /// Invoked each time a query succeeds. - fn on_query_success(&self, _query: &RoutingInfo, _latency: Duration, _node: NodeRef<'_>) {} + /// Invoked each time a request succeeds. + fn on_request_success(&self, _request: &RoutingInfo, _latency: Duration, _node: NodeRef<'_>) {} - /// Invoked each time a query fails. - fn on_query_failure( + /// Invoked each time a request fails. + fn on_request_failure( &self, - _query: &RoutingInfo, + _request: &RoutingInfo, _latency: Duration, _node: NodeRef<'_>, - _error: &QueryError, + _error: &RequestAttemptError, ) { } diff --git a/scylla/src/policies/retry/default.rs b/scylla/src/policies/retry/default.rs index 0691445a96..dacf13b864 100644 --- a/scylla/src/policies/retry/default.rs +++ b/scylla/src/policies/retry/default.rs @@ -1,8 +1,8 @@ use scylla_cql::frame::response::error::{DbError, WriteType}; -use crate::errors::QueryError; +use crate::errors::RequestAttemptError; -use super::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; +use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}; /// Default retry policy - retries when there is a high chance that a retry might help.\ /// Behaviour based on [DataStax Java Driver](https://docs.datastax.com/en/developer/java-driver/4.10/manual/core/retries/) @@ -50,19 +50,18 @@ impl Default for DefaultRetrySession { } impl RetrySession for DefaultRetrySession { - fn decide_should_retry(&mut self, query_info: QueryInfo) -> RetryDecision { - if query_info.consistency.is_serial() { + fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision { + if request_info.consistency.is_serial() { return RetryDecision::DontRetry; }; - match query_info.error { + match request_info.error { // Basic errors - there are some problems on this node // Retry on a different one if possible - QueryError::BrokenConnection(_) - | QueryError::ConnectionPoolError(_) - | QueryError::DbError(DbError::Overloaded, _) - | QueryError::DbError(DbError::ServerError, _) - | QueryError::DbError(DbError::TruncateError, _) => { - if query_info.is_idempotent { + RequestAttemptError::BrokenConnectionError(_) + | RequestAttemptError::DbError(DbError::Overloaded, _) + | RequestAttemptError::DbError(DbError::ServerError, _) + | RequestAttemptError::DbError(DbError::TruncateError, _) => { + if request_info.is_idempotent { RetryDecision::RetryNextNode(None) } else { RetryDecision::DontRetry @@ -73,7 +72,7 @@ impl RetrySession for DefaultRetrySession { // Maybe this node has network problems - try a different one. // Perform at most one retry - it's unlikely that two nodes // have network problems at the same time - QueryError::DbError(DbError::Unavailable { .. }, _) => { + RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => { if !self.was_unavailable_retry { self.was_unavailable_retry = true; RetryDecision::RetryNextNode(None) @@ -87,7 +86,7 @@ impl RetrySession for DefaultRetrySession { // This happens when the coordinator picked replicas that were overloaded/dying. // Retried request should have some useful response because the node will detect // that these replicas are dead. - QueryError::DbError( + RequestAttemptError::DbError( DbError::ReadTimeout { received, required, @@ -107,9 +106,9 @@ impl RetrySession for DefaultRetrySession { // Retry at most once and only for BatchLog write. // Coordinator probably didn't detect the nodes as dead. // By the time we retry they should be detected as dead. - QueryError::DbError(DbError::WriteTimeout { write_type, .. }, _) => { + RequestAttemptError::DbError(DbError::WriteTimeout { write_type, .. }, _) => { if !self.was_write_timeout_retry - && query_info.is_idempotent + && request_info.is_idempotent && *write_type == WriteType::BatchLog { self.was_write_timeout_retry = true; @@ -118,10 +117,12 @@ impl RetrySession for DefaultRetrySession { RetryDecision::DontRetry } } - // The node is still bootstrapping it can't execute the query, we should try another one - QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(None), + // The node is still bootstrapping it can't execute the request, we should try another one + RequestAttemptError::DbError(DbError::IsBootstrapping, _) => { + RetryDecision::RetryNextNode(None) + } // Connection to the contacted node is overloaded, try another one - QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), + RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), // In all other cases propagate the error to the user _ => RetryDecision::DontRetry, } @@ -134,17 +135,16 @@ impl RetrySession for DefaultRetrySession { #[cfg(test)] mod tests { - use super::{DefaultRetryPolicy, QueryInfo, RetryDecision, RetryPolicy}; - use crate::errors::{ - BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError, QueryError, - }; + use super::{DefaultRetryPolicy, RequestInfo, RetryDecision, RetryPolicy}; + use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError}; use crate::errors::{DbError, WriteType}; use crate::statement::Consistency; use crate::test_utils::setup_tracing; use bytes::Bytes; + use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError}; - fn make_query_info(error: &QueryError, is_idempotent: bool) -> QueryInfo<'_> { - QueryInfo { + fn make_request_info(error: &RequestAttemptError, is_idempotent: bool) -> RequestInfo<'_> { + RequestInfo { error, is_idempotent, consistency: Consistency::One, @@ -152,16 +152,16 @@ mod tests { } // Asserts that default policy never retries for this Error - fn default_policy_assert_never_retries(error: QueryError) { + fn default_policy_assert_never_retries(error: RequestAttemptError) { let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&error, false)), + policy.decide_should_retry(make_request_info(&error, false)), RetryDecision::DontRetry ); let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&error, true)), + policy.decide_should_retry(make_request_info(&error, true)), RetryDecision::DontRetry ); } @@ -206,28 +206,36 @@ mod tests { ]; for dberror in never_retried_dberrors { - default_policy_assert_never_retries(QueryError::DbError(dberror, String::new())); + default_policy_assert_never_retries(RequestAttemptError::DbError( + dberror, + String::new(), + )); } - default_policy_assert_never_retries(QueryError::BadQuery(BadQuery::Other( - "Length of provided values must be equal to number of batch statements \ - (got 1 values, 2 statements)" - .to_owned(), - ))); - default_policy_assert_never_retries(ProtocolError::NonfinishedPagingState.into()); + default_policy_assert_never_retries(RequestAttemptError::RepreparedIdMissingInBatch); + default_policy_assert_never_retries(RequestAttemptError::RepreparedIdChanged { + statement: String::new(), + expected_id: vec![], + reprepared_id: vec![], + }); + default_policy_assert_never_retries(RequestAttemptError::CqlRequestSerialization( + CqlRequestSerializationError::BatchSerialization( + BatchSerializationError::TooManyStatements(u16::MAX as usize + 1), + ), + )); } // Asserts that for this error policy retries on next on idempotent queries only - fn default_policy_assert_idempotent_next(error: QueryError) { + fn default_policy_assert_idempotent_next(error: RequestAttemptError) { let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&error, false)), + policy.decide_should_retry(make_request_info(&error, false)), RetryDecision::DontRetry ); let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&error, true)), + policy.decide_should_retry(make_request_info(&error, true)), RetryDecision::RetryNextNode(None) ); } @@ -236,13 +244,12 @@ mod tests { fn default_idempotent_next_retries() { setup_tracing(); let idempotent_next_errors = vec![ - QueryError::DbError(DbError::Overloaded, String::new()), - QueryError::DbError(DbError::TruncateError, String::new()), - QueryError::DbError(DbError::ServerError, String::new()), - QueryError::BrokenConnection( + RequestAttemptError::DbError(DbError::Overloaded, String::new()), + RequestAttemptError::DbError(DbError::TruncateError, String::new()), + RequestAttemptError::DbError(DbError::ServerError, String::new()), + RequestAttemptError::BrokenConnectionError( BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(), ), - QueryError::ConnectionPoolError(ConnectionPoolError::Initializing), ]; for error in idempotent_next_errors { @@ -254,17 +261,17 @@ mod tests { #[test] fn default_bootstrapping() { setup_tracing(); - let error = QueryError::DbError(DbError::IsBootstrapping, String::new()); + let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new()); let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&error, false)), + policy.decide_should_retry(make_request_info(&error, false)), RetryDecision::RetryNextNode(None) ); let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&error, true)), + policy.decide_should_retry(make_request_info(&error, true)), RetryDecision::RetryNextNode(None) ); } @@ -273,7 +280,7 @@ mod tests { #[test] fn default_unavailable() { setup_tracing(); - let error = QueryError::DbError( + let error = RequestAttemptError::DbError( DbError::Unavailable { consistency: Consistency::Two, required: 2, @@ -284,21 +291,21 @@ mod tests { let mut policy_not_idempotent = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy_not_idempotent.decide_should_retry(make_query_info(&error, false)), + policy_not_idempotent.decide_should_retry(make_request_info(&error, false)), RetryDecision::RetryNextNode(None) ); assert_eq!( - policy_not_idempotent.decide_should_retry(make_query_info(&error, false)), + policy_not_idempotent.decide_should_retry(make_request_info(&error, false)), RetryDecision::DontRetry ); let mut policy_idempotent = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy_idempotent.decide_should_retry(make_query_info(&error, true)), + policy_idempotent.decide_should_retry(make_request_info(&error, true)), RetryDecision::RetryNextNode(None) ); assert_eq!( - policy_idempotent.decide_should_retry(make_query_info(&error, true)), + policy_idempotent.decide_should_retry(make_request_info(&error, true)), RetryDecision::DontRetry ); } @@ -308,7 +315,7 @@ mod tests { fn default_read_timeout() { setup_tracing(); // Enough responses and data_present == false - coordinator received only checksums - let enough_responses_no_data = QueryError::DbError( + let enough_responses_no_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -321,28 +328,28 @@ mod tests { // Not idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&enough_responses_no_data, false)), + policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)), RetryDecision::RetrySameNode(None) ); assert_eq!( - policy.decide_should_retry(make_query_info(&enough_responses_no_data, false)), + policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)), RetryDecision::DontRetry ); // Idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&enough_responses_no_data, true)), + policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)), RetryDecision::RetrySameNode(None) ); assert_eq!( - policy.decide_should_retry(make_query_info(&enough_responses_no_data, true)), + policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)), RetryDecision::DontRetry ); // Enough responses but data_present == true - coordinator probably timed out // waiting for read-repair acknowledgement. - let enough_responses_with_data = QueryError::DbError( + let enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -355,19 +362,19 @@ mod tests { // Not idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&enough_responses_with_data, false)), + policy.decide_should_retry(make_request_info(&enough_responses_with_data, false)), RetryDecision::DontRetry ); // Idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&enough_responses_with_data, true)), + policy.decide_should_retry(make_request_info(&enough_responses_with_data, true)), RetryDecision::DontRetry ); // Not enough responses, data_present == true - let not_enough_responses_with_data = QueryError::DbError( + let not_enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 1, @@ -380,24 +387,24 @@ mod tests { // Not idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(¬_enough_responses_with_data, false)), + policy.decide_should_retry(make_request_info(¬_enough_responses_with_data, false)), RetryDecision::DontRetry ); // Idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(¬_enough_responses_with_data, true)), + policy.decide_should_retry(make_request_info(¬_enough_responses_with_data, true)), RetryDecision::DontRetry ); } - // WriteTimeout will retry once when the query is idempotent and write_type == BatchLog + // WriteTimeout will retry once when the request is idempotent and write_type == BatchLog #[test] fn default_write_timeout() { setup_tracing(); // WriteType == BatchLog - let good_write_type = QueryError::DbError( + let good_write_type = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received: 1, @@ -410,23 +417,23 @@ mod tests { // Not idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&good_write_type, false)), + policy.decide_should_retry(make_request_info(&good_write_type, false)), RetryDecision::DontRetry ); // Idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&good_write_type, true)), + policy.decide_should_retry(make_request_info(&good_write_type, true)), RetryDecision::RetrySameNode(None) ); assert_eq!( - policy.decide_should_retry(make_query_info(&good_write_type, true)), + policy.decide_should_retry(make_request_info(&good_write_type, true)), RetryDecision::DontRetry ); // WriteType != BatchLog - let bad_write_type = QueryError::DbError( + let bad_write_type = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received: 4, @@ -439,14 +446,14 @@ mod tests { // Not idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&bad_write_type, false)), + policy.decide_should_retry(make_request_info(&bad_write_type, false)), RetryDecision::DontRetry ); // Idempotent let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info(&bad_write_type, true)), + policy.decide_should_retry(make_request_info(&bad_write_type, true)), RetryDecision::DontRetry ); } diff --git a/scylla/src/policies/retry/downgrading_consistency.rs b/scylla/src/policies/retry/downgrading_consistency.rs index 27e5eb132d..79d0be396c 100644 --- a/scylla/src/policies/retry/downgrading_consistency.rs +++ b/scylla/src/policies/retry/downgrading_consistency.rs @@ -1,8 +1,8 @@ use scylla_cql::Consistency; use tracing::debug; -use super::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; -use crate::errors::{DbError, QueryError, WriteType}; +use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}; +use crate::errors::{DbError, RequestAttemptError, WriteType}; /// Downgrading consistency retry policy - retries with lower consistency level if it knows\ /// that the initial CL is unreachable. Also, it behaves as [DefaultRetryPolicy](crate::policies::retry::DefaultRetryPolicy) @@ -47,11 +47,11 @@ impl Default for DowngradingConsistencyRetrySession { } impl RetrySession for DowngradingConsistencyRetrySession { - fn decide_should_retry(&mut self, query_info: QueryInfo) -> RetryDecision { - let cl = match query_info.consistency { + fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision { + let cl = match request_info.consistency { Consistency::Serial | Consistency::LocalSerial => { - return match query_info.error { - QueryError::DbError(DbError::Unavailable { .. }, _) => { + return match request_info.error { + RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => { // JAVA-764: if the requested consistency level is serial, it means that the operation failed at // the paxos phase of a LWT. // Retry on the next host, on the assumption that the initial coordinator could be network-isolated. @@ -85,15 +85,14 @@ impl RetrySession for DowngradingConsistencyRetrySession { decision } - match query_info.error { + match request_info.error { // Basic errors - there are some problems on this node // Retry on a different one if possible - QueryError::BrokenConnection(_) - | QueryError::ConnectionPoolError(_) - | QueryError::DbError(DbError::Overloaded, _) - | QueryError::DbError(DbError::ServerError, _) - | QueryError::DbError(DbError::TruncateError, _) => { - if query_info.is_idempotent { + RequestAttemptError::BrokenConnectionError(_) + | RequestAttemptError::DbError(DbError::Overloaded, _) + | RequestAttemptError::DbError(DbError::ServerError, _) + | RequestAttemptError::DbError(DbError::TruncateError, _) => { + if request_info.is_idempotent { RetryDecision::RetryNextNode(None) } else { RetryDecision::DontRetry @@ -101,7 +100,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { } // Unavailable - the current node believes that not enough nodes // are alive to satisfy specified consistency requirements. - QueryError::DbError(DbError::Unavailable { alive, .. }, _) => { + RequestAttemptError::DbError(DbError::Unavailable { alive, .. }, _) => { if !self.was_retry { self.was_retry = true; max_likely_to_work_cl(*alive, cl) @@ -110,7 +109,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { } } // ReadTimeout - coordinator didn't receive enough replies in time. - QueryError::DbError( + RequestAttemptError::DbError( DbError::ReadTimeout { received, required, @@ -132,7 +131,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { } } // Write timeout - coordinator didn't receive enough replies in time. - QueryError::DbError( + RequestAttemptError::DbError( DbError::WriteTimeout { write_type, received, @@ -140,7 +139,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { }, _, ) => { - if self.was_retry || !query_info.is_idempotent { + if self.was_retry || !request_info.is_idempotent { RetryDecision::DontRetry } else { self.was_retry = true; @@ -160,10 +159,12 @@ impl RetrySession for DowngradingConsistencyRetrySession { } } } - // The node is still bootstrapping it can't execute the query, we should try another one - QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(None), + // The node is still bootstrapping it can't execute the request, we should try another one + RequestAttemptError::DbError(DbError::IsBootstrapping, _) => { + RetryDecision::RetryNextNode(None) + } // Connection to the contacted node is overloaded, try another one - QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), + RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), // In all other cases propagate the error to the user _ => RetryDecision::DontRetry, } @@ -177,8 +178,9 @@ impl RetrySession for DowngradingConsistencyRetrySession { #[cfg(test)] mod tests { use bytes::Bytes; + use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError}; - use crate::errors::{BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError}; + use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError}; use crate::test_utils::setup_tracing; use super::*; @@ -195,12 +197,12 @@ mod tests { Consistency::Two, ]; - fn make_query_info_with_cl( - error: &QueryError, + fn make_request_info_with_cl( + error: &RequestAttemptError, is_idempotent: bool, cl: Consistency, - ) -> QueryInfo<'_> { - QueryInfo { + ) -> RequestInfo<'_> { + RequestInfo { error, is_idempotent, consistency: cl, @@ -208,16 +210,19 @@ mod tests { } // Asserts that downgrading consistency policy never retries for this Error - fn downgrading_consistency_policy_assert_never_retries(error: QueryError, cl: Consistency) { + fn downgrading_consistency_policy_assert_never_retries( + error: RequestAttemptError, + cl: Consistency, + ) { let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)), + policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)), RetryDecision::DontRetry ); let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl(&error, true, cl)), + policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)), RetryDecision::DontRetry ); } @@ -264,37 +269,48 @@ mod tests { for &cl in CONSISTENCY_LEVELS { for dberror in never_retried_dberrors.clone() { downgrading_consistency_policy_assert_never_retries( - QueryError::DbError(dberror, String::new()), + RequestAttemptError::DbError(dberror, String::new()), cl, ); } downgrading_consistency_policy_assert_never_retries( - QueryError::BadQuery(BadQuery::Other( - "Length of provided values must be equal to number of batch statements \ - (got 1 values, 2 statements)" - .to_owned(), - )), + RequestAttemptError::RepreparedIdMissingInBatch, cl, ); downgrading_consistency_policy_assert_never_retries( - ProtocolError::NonfinishedPagingState.into(), + RequestAttemptError::RepreparedIdChanged { + statement: String::new(), + expected_id: vec![], + reprepared_id: vec![], + }, + cl, + ); + downgrading_consistency_policy_assert_never_retries( + RequestAttemptError::CqlRequestSerialization( + CqlRequestSerializationError::BatchSerialization( + BatchSerializationError::TooManyStatements(u16::MAX as usize + 1), + ), + ), cl, ); } } // Asserts that for this error policy retries on next on idempotent queries only - fn downgrading_consistency_policy_assert_idempotent_next(error: QueryError, cl: Consistency) { + fn downgrading_consistency_policy_assert_idempotent_next( + error: RequestAttemptError, + cl: Consistency, + ) { let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)), + policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)), RetryDecision::DontRetry ); let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl(&error, true, cl)), + policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)), RetryDecision::RetryNextNode(None) ); } @@ -318,13 +334,12 @@ mod tests { fn downgrading_consistency_idempotent_next_retries() { setup_tracing(); let idempotent_next_errors = vec![ - QueryError::DbError(DbError::Overloaded, String::new()), - QueryError::DbError(DbError::TruncateError, String::new()), - QueryError::DbError(DbError::ServerError, String::new()), - QueryError::BrokenConnection( + RequestAttemptError::DbError(DbError::Overloaded, String::new()), + RequestAttemptError::DbError(DbError::TruncateError, String::new()), + RequestAttemptError::DbError(DbError::ServerError, String::new()), + RequestAttemptError::BrokenConnectionError( BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(), ), - QueryError::ConnectionPoolError(ConnectionPoolError::Initializing), ]; for &cl in CONSISTENCY_LEVELS { @@ -338,18 +353,18 @@ mod tests { #[test] fn downgrading_consistency_bootstrapping() { setup_tracing(); - let error = QueryError::DbError(DbError::IsBootstrapping, String::new()); + let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new()); for &cl in CONSISTENCY_LEVELS { let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)), + policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)), RetryDecision::RetryNextNode(None) ); let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl(&error, true, cl)), + policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)), RetryDecision::RetryNextNode(None) ); } @@ -360,7 +375,7 @@ mod tests { fn downgrading_consistency_unavailable() { setup_tracing(); let alive = 1; - let error = QueryError::DbError( + let error = RequestAttemptError::DbError( DbError::Unavailable { consistency: Consistency::Two, required: 2, @@ -373,22 +388,22 @@ mod tests { let mut policy_not_idempotent = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( policy_not_idempotent - .decide_should_retry(make_query_info_with_cl(&error, false, cl)), + .decide_should_retry(make_request_info_with_cl(&error, false, cl)), max_likely_to_work_cl(alive, cl) ); assert_eq!( policy_not_idempotent - .decide_should_retry(make_query_info_with_cl(&error, false, cl)), + .decide_should_retry(make_request_info_with_cl(&error, false, cl)), RetryDecision::DontRetry ); let mut policy_idempotent = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy_idempotent.decide_should_retry(make_query_info_with_cl(&error, true, cl)), + policy_idempotent.decide_should_retry(make_request_info_with_cl(&error, true, cl)), max_likely_to_work_cl(alive, cl) ); assert_eq!( - policy_idempotent.decide_should_retry(make_query_info_with_cl(&error, true, cl)), + policy_idempotent.decide_should_retry(make_request_info_with_cl(&error, true, cl)), RetryDecision::DontRetry ); } @@ -399,7 +414,7 @@ mod tests { fn downgrading_consistency_read_timeout() { setup_tracing(); // Enough responses and data_present == false - coordinator received only checksums - let enough_responses_no_data = QueryError::DbError( + let enough_responses_no_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -413,7 +428,7 @@ mod tests { // Not idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &enough_responses_no_data, false, cl @@ -421,7 +436,7 @@ mod tests { RetryDecision::RetrySameNode(None) ); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &enough_responses_no_data, false, cl @@ -432,7 +447,7 @@ mod tests { // Idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &enough_responses_no_data, true, cl @@ -440,7 +455,7 @@ mod tests { RetryDecision::RetrySameNode(None) ); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &enough_responses_no_data, true, cl @@ -450,7 +465,7 @@ mod tests { } // Enough responses but data_present == true - coordinator probably timed out // waiting for read-repair acknowledgement. - let enough_responses_with_data = QueryError::DbError( + let enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -464,7 +479,7 @@ mod tests { // Not idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &enough_responses_with_data, false, cl @@ -475,7 +490,7 @@ mod tests { // Idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &enough_responses_with_data, true, cl @@ -486,7 +501,7 @@ mod tests { // Not enough responses, data_present == true let received = 1; - let not_enough_responses_with_data = QueryError::DbError( + let not_enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received, @@ -501,7 +516,7 @@ mod tests { // Not idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( ¬_enough_responses_with_data, false, cl @@ -510,7 +525,7 @@ mod tests { ); if let RetryDecision::RetrySameNode(new_cl) = expected_decision { assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( ¬_enough_responses_with_data, false, new_cl.unwrap_or(cl) @@ -522,7 +537,7 @@ mod tests { // Idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( ¬_enough_responses_with_data, true, cl @@ -531,7 +546,7 @@ mod tests { ); if let RetryDecision::RetrySameNode(new_cl) = expected_decision { assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( ¬_enough_responses_with_data, true, new_cl.unwrap_or(cl) @@ -542,13 +557,13 @@ mod tests { } } - // WriteTimeout will retry once when the query is idempotent and write_type == BatchLog + // WriteTimeout will retry once when the request is idempotent and write_type == BatchLog #[test] fn downgrading_consistency_write_timeout() { setup_tracing(); for (received, required) in (1..=5).zip(2..=6) { // WriteType == BatchLog - let write_type_batchlog = QueryError::DbError( + let write_type_batchlog = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received, @@ -562,7 +577,7 @@ mod tests { // Not idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_batchlog, false, cl @@ -573,7 +588,7 @@ mod tests { // Idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_batchlog, true, cl @@ -581,7 +596,7 @@ mod tests { RetryDecision::RetrySameNode(None) ); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_batchlog, true, cl @@ -591,7 +606,7 @@ mod tests { } // WriteType == UnloggedBatch - let write_type_unlogged_batch = QueryError::DbError( + let write_type_unlogged_batch = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received, @@ -605,7 +620,7 @@ mod tests { // Not idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_unlogged_batch, false, cl @@ -616,7 +631,7 @@ mod tests { // Idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_unlogged_batch, true, cl @@ -624,7 +639,7 @@ mod tests { max_likely_to_work_cl(received, cl) ); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_unlogged_batch, true, cl @@ -634,7 +649,7 @@ mod tests { } // WriteType == other - let write_type_other = QueryError::DbError( + let write_type_other = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received, @@ -648,7 +663,7 @@ mod tests { // Not idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_other, false, cl @@ -659,7 +674,7 @@ mod tests { // Idempotent let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_other, true, cl @@ -667,7 +682,7 @@ mod tests { RetryDecision::IgnoreWriteError ); assert_eq!( - policy.decide_should_retry(make_query_info_with_cl( + policy.decide_should_retry(make_request_info_with_cl( &write_type_other, true, cl diff --git a/scylla/src/policies/retry/fallthrough.rs b/scylla/src/policies/retry/fallthrough.rs index 48a866e5fd..6a9600026e 100644 --- a/scylla/src/policies/retry/fallthrough.rs +++ b/scylla/src/policies/retry/fallthrough.rs @@ -1,4 +1,4 @@ -use super::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; +use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}; /// Forwards all errors directly to the user, never retries #[derive(Debug)] @@ -24,7 +24,7 @@ impl RetryPolicy for FallthroughRetryPolicy { } impl RetrySession for FallthroughRetrySession { - fn decide_should_retry(&mut self, _query_info: QueryInfo) -> RetryDecision { + fn decide_should_retry(&mut self, _query_info: RequestInfo) -> RetryDecision { RetryDecision::DontRetry } diff --git a/scylla/src/policies/retry/mod.rs b/scylla/src/policies/retry/mod.rs index f065e6a881..9076673e36 100644 --- a/scylla/src/policies/retry/mod.rs +++ b/scylla/src/policies/retry/mod.rs @@ -8,4 +8,4 @@ pub use downgrading_consistency::{ DowngradingConsistencyRetryPolicy, DowngradingConsistencyRetrySession, }; pub use fallthrough::{FallthroughRetryPolicy, FallthroughRetrySession}; -pub use retry_policy::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; +pub use retry_policy::{RequestInfo, RetryDecision, RetryPolicy, RetrySession}; diff --git a/scylla/src/policies/retry/retry_policy.rs b/scylla/src/policies/retry/retry_policy.rs index 92949f9a02..7507db3788 100644 --- a/scylla/src/policies/retry/retry_policy.rs +++ b/scylla/src/policies/retry/retry_policy.rs @@ -1,19 +1,19 @@ -//! Query retries configurations\ -//! To decide when to retry a query the `Session` can use any object which implements +//! Request retries configurations\ +//! To decide when to retry a request the `Session` can use any object which implements //! the `RetryPolicy` trait -use crate::errors::QueryError; +use crate::errors::RequestAttemptError; use crate::frame::types::Consistency; -/// Information about a failed query -pub struct QueryInfo<'a> { - /// The error with which the query failed - pub error: &'a QueryError, - /// A query is idempotent if it can be applied multiple times without changing the result of the initial application\ +/// Information about a failed request +pub struct RequestInfo<'a> { + /// The error with which the request failed + pub error: &'a RequestAttemptError, + /// A request is idempotent if it can be applied multiple times without changing the result of the initial application\ /// If set to `true` we can be sure that it is idempotent\ /// If set to `false` it is unknown whether it is idempotent pub is_idempotent: bool, - /// Consistency with which the query failed + /// Consistency with which the request failed pub consistency: Consistency, } @@ -25,18 +25,18 @@ pub enum RetryDecision { IgnoreWriteError, } -/// Specifies a policy used to decide when to retry a query +/// Specifies a policy used to decide when to retry a request pub trait RetryPolicy: std::fmt::Debug + Send + Sync { - /// Called for each new query, starts a session of deciding about retries + /// Called for each new request, starts a session of deciding about retries fn new_session(&self) -> Box; } -/// Used throughout a single query to decide when to retry it -/// After this query is finished it is destroyed or reset +/// Used throughout a single request to decide when to retry it +/// After this request is finished it is destroyed or reset pub trait RetrySession: Send + Sync { - /// Called after the query failed - decide what to do next - fn decide_should_retry(&mut self, query_info: QueryInfo) -> RetryDecision; + /// Called after the request failed - decide what to do next + fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision; - /// Reset before using for a new query + /// Reset before using for a new request fn reset(&mut self); } diff --git a/scylla/src/response/request_response.rs b/scylla/src/response/request_response.rs index 950611cc36..91b0dc64e6 100644 --- a/scylla/src/response/request_response.rs +++ b/scylla/src/response/request_response.rs @@ -6,7 +6,7 @@ use scylla_cql::frame::response::{NonErrorResponse, Response}; use tracing::error; use uuid::Uuid; -use crate::errors::{ProtocolError, QueryError, UserRequestError}; +use crate::errors::{ProtocolError, QueryError, RequestAttemptError}; use crate::frame::response::{self, result}; use crate::response::query_result::QueryResult; @@ -28,7 +28,7 @@ pub(crate) struct NonErrorQueryResponse { impl QueryResponse { pub(crate) fn into_non_error_query_response( self, - ) -> Result { + ) -> Result { Ok(NonErrorQueryResponse { response: self.response.into_non_error_response()?, tracing_id: self.tracing_id, @@ -38,13 +38,15 @@ impl QueryResponse { pub(crate) fn into_query_result_and_paging_state( self, - ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { + ) -> Result<(QueryResult, PagingStateResponse), RequestAttemptError> { self.into_non_error_query_response()? .into_query_result_and_paging_state() } pub(crate) fn into_query_result(self) -> Result { - self.into_non_error_query_response()?.into_query_result() + self.into_non_error_query_response() + .map_err(RequestAttemptError::into_query_error)? + .into_query_result() } } @@ -65,14 +67,14 @@ impl NonErrorQueryResponse { pub(crate) fn into_query_result_and_paging_state( self, - ) -> Result<(QueryResult, PagingStateResponse), UserRequestError> { + ) -> Result<(QueryResult, PagingStateResponse), RequestAttemptError> { let (raw_rows, paging_state_response) = match self.response { NonErrorResponse::Result(result::Result::Rows((rs, paging_state_response))) => { (Some(rs), paging_state_response) } NonErrorResponse::Result(_) => (None, PagingStateResponse::NoMorePages), _ => { - return Err(UserRequestError::UnexpectedResponse( + return Err(RequestAttemptError::UnexpectedResponse( self.response.to_response_kind(), )) } @@ -85,7 +87,9 @@ impl NonErrorQueryResponse { } pub(crate) fn into_query_result(self) -> Result { - let (result, paging_state) = self.into_query_result_and_paging_state()?; + let (result, paging_state) = self + .into_query_result_and_paging_state() + .map_err(RequestAttemptError::into_query_error)?; if !paging_state.finished() { error!( diff --git a/scylla/tests/integration/execution_profiles.rs b/scylla/tests/integration/execution_profiles.rs index 587e70dfeb..8c732472bb 100644 --- a/scylla/tests/integration/execution_profiles.rs +++ b/scylla/tests/integration/execution_profiles.rs @@ -65,7 +65,7 @@ impl LoadBalancingPolicy for BoundToPredefinedNodePolicy { Box::new(std::iter::empty()) } - fn on_query_success( + fn on_request_success( &self, _query: &RoutingInfo, _latency: std::time::Duration, @@ -73,12 +73,12 @@ impl LoadBalancingPolicy for BoundToPredefinedNodePolicy { ) { } - fn on_query_failure( + fn on_request_failure( &self, _query: &RoutingInfo, _latency: std::time::Duration, _node: NodeRef<'_>, - _error: &scylla::errors::QueryError, + _error: &scylla::errors::RequestAttemptError, ) { } @@ -97,9 +97,9 @@ impl RetryPolicy for BoundToPredefinedNodePolicy { impl RetrySession for BoundToPredefinedNodePolicy { fn decide_should_retry( &mut self, - query_info: scylla::policies::retry::QueryInfo, + request_info: scylla::policies::retry::RequestInfo, ) -> scylla::policies::retry::RetryDecision { - self.report_consistency(query_info.consistency); + self.report_consistency(request_info.consistency); scylla::policies::retry::RetryDecision::DontRetry }