From b94f6b50b8bdc38ef98aa04b291b3da7d9a38ef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Uzarski?= Date: Mon, 23 Dec 2024 08:57:41 +0100 Subject: [PATCH] retry_policy: decision based on RequestAttemptError Same as for LBP::on_query_failure, it narrows the type. --- scylla/src/client/pager.rs | 9 +- scylla/src/client/session.rs | 16 ++-- scylla/src/errors.rs | 2 +- scylla/src/policies/retry/default.rs | 79 +++++++++-------- .../policies/retry/downgrading_consistency.rs | 87 +++++++++++-------- scylla/src/policies/retry/retry_policy.rs | 4 +- 6 files changed, 114 insertions(+), 83 deletions(-) diff --git a/scylla/src/client/pager.rs b/scylla/src/client/pager.rs index 33f99ee79d..ee58ba0e8b 100644 --- a/scylla/src/client/pager.rs +++ b/scylla/src/client/pager.rs @@ -204,7 +204,7 @@ where .instrument(span.clone()) .await; - last_error = match queries_result { + let request_error: RequestAttemptError = match queries_result { Ok(proof) => { trace!(parent: &span, "Query succeeded"); // query_pages returned Ok, so we are guaranteed @@ -218,13 +218,13 @@ where error = %error, "Query failed" ); - error.into_query_error() + error } }; // Use retry policy to decide what to do next let query_info = QueryInfo { - error: &last_error, + error: &request_error, is_idempotent: self.query_is_idempotent, consistency: self.query_consistency, }; @@ -234,7 +234,10 @@ where parent: &span, retry_decision = format!("{:?}", retry_decision).as_str() ); + + last_error = request_error.into_query_error(); self.log_attempt_error(&last_error, &retry_decision); + match retry_decision { RetryDecision::RetrySameNode(cl) => { self.metrics.inc_retries_num(); diff --git a/scylla/src/client/session.rs b/scylla/src/client/session.rs index 3c0e7179ca..4f97a4e73a 100644 --- a/scylla/src/client/session.rs +++ b/scylla/src/client/session.rs @@ -2055,7 +2055,7 @@ where .await; let elapsed = request_start.elapsed(); - last_error = match request_result { + let request_error: RequestAttemptError = match request_result { Ok(response) => { trace!(parent: &span, "Request succeeded"); let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); @@ -2080,14 +2080,13 @@ where node, &e, ); - Some(e.into_query_error()) + e } }; - let the_error: &QueryError = last_error.as_ref().unwrap(); // Use retry policy to decide what to do next let query_info = QueryInfo { - error: the_error, + error: &request_error, is_idempotent: context.is_idempotent, consistency: context .consistency_set_on_statement @@ -2099,7 +2098,14 @@ where parent: &span, retry_decision = format!("{:?}", retry_decision).as_str() ); - context.log_attempt_error(&attempt_id, the_error, &retry_decision); + + last_error = Some(request_error.into_query_error()); + context.log_attempt_error( + &attempt_id, + last_error.as_ref().unwrap(), + &retry_decision, + ); + match retry_decision { RetryDecision::RetrySameNode(new_cl) => { self.metrics.inc_retries_num(); diff --git a/scylla/src/errors.rs b/scylla/src/errors.rs index 60de5661a1..92e162f73b 100644 --- a/scylla/src/errors.rs +++ b/scylla/src/errors.rs @@ -879,7 +879,7 @@ pub enum CqlEventHandlingError { /// /// requests. The retry decision is made based /// on this error. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum RequestAttemptError { /// Failed to serialize query parameters. This error occurs, when user executes diff --git a/scylla/src/policies/retry/default.rs b/scylla/src/policies/retry/default.rs index 0691445a96..66de11e6e2 100644 --- a/scylla/src/policies/retry/default.rs +++ b/scylla/src/policies/retry/default.rs @@ -1,6 +1,6 @@ use scylla_cql::frame::response::error::{DbError, WriteType}; -use crate::errors::QueryError; +use crate::errors::RequestAttemptError; use super::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; @@ -57,11 +57,10 @@ impl RetrySession for DefaultRetrySession { match query_info.error { // Basic errors - there are some problems on this node // Retry on a different one if possible - QueryError::BrokenConnection(_) - | QueryError::ConnectionPoolError(_) - | QueryError::DbError(DbError::Overloaded, _) - | QueryError::DbError(DbError::ServerError, _) - | QueryError::DbError(DbError::TruncateError, _) => { + RequestAttemptError::BrokenConnectionError(_) + | RequestAttemptError::DbError(DbError::Overloaded, _) + | RequestAttemptError::DbError(DbError::ServerError, _) + | RequestAttemptError::DbError(DbError::TruncateError, _) => { if query_info.is_idempotent { RetryDecision::RetryNextNode(None) } else { @@ -73,7 +72,7 @@ impl RetrySession for DefaultRetrySession { // Maybe this node has network problems - try a different one. // Perform at most one retry - it's unlikely that two nodes // have network problems at the same time - QueryError::DbError(DbError::Unavailable { .. }, _) => { + RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => { if !self.was_unavailable_retry { self.was_unavailable_retry = true; RetryDecision::RetryNextNode(None) @@ -87,7 +86,7 @@ impl RetrySession for DefaultRetrySession { // This happens when the coordinator picked replicas that were overloaded/dying. // Retried request should have some useful response because the node will detect // that these replicas are dead. - QueryError::DbError( + RequestAttemptError::DbError( DbError::ReadTimeout { received, required, @@ -107,7 +106,7 @@ impl RetrySession for DefaultRetrySession { // Retry at most once and only for BatchLog write. // Coordinator probably didn't detect the nodes as dead. // By the time we retry they should be detected as dead. - QueryError::DbError(DbError::WriteTimeout { write_type, .. }, _) => { + RequestAttemptError::DbError(DbError::WriteTimeout { write_type, .. }, _) => { if !self.was_write_timeout_retry && query_info.is_idempotent && *write_type == WriteType::BatchLog @@ -119,9 +118,11 @@ impl RetrySession for DefaultRetrySession { } } // The node is still bootstrapping it can't execute the query, we should try another one - QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(None), + RequestAttemptError::DbError(DbError::IsBootstrapping, _) => { + RetryDecision::RetryNextNode(None) + } // Connection to the contacted node is overloaded, try another one - QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), + RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), // In all other cases propagate the error to the user _ => RetryDecision::DontRetry, } @@ -135,15 +136,14 @@ impl RetrySession for DefaultRetrySession { #[cfg(test)] mod tests { use super::{DefaultRetryPolicy, QueryInfo, RetryDecision, RetryPolicy}; - use crate::errors::{ - BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError, QueryError, - }; + use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError}; use crate::errors::{DbError, WriteType}; use crate::statement::Consistency; use crate::test_utils::setup_tracing; use bytes::Bytes; + use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError}; - fn make_query_info(error: &QueryError, is_idempotent: bool) -> QueryInfo<'_> { + fn make_query_info(error: &RequestAttemptError, is_idempotent: bool) -> QueryInfo<'_> { QueryInfo { error, is_idempotent, @@ -152,7 +152,7 @@ mod tests { } // Asserts that default policy never retries for this Error - fn default_policy_assert_never_retries(error: QueryError) { + fn default_policy_assert_never_retries(error: RequestAttemptError) { let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( policy.decide_should_retry(make_query_info(&error, false)), @@ -206,19 +206,27 @@ mod tests { ]; for dberror in never_retried_dberrors { - default_policy_assert_never_retries(QueryError::DbError(dberror, String::new())); + default_policy_assert_never_retries(RequestAttemptError::DbError( + dberror, + String::new(), + )); } - default_policy_assert_never_retries(QueryError::BadQuery(BadQuery::Other( - "Length of provided values must be equal to number of batch statements \ - (got 1 values, 2 statements)" - .to_owned(), - ))); - default_policy_assert_never_retries(ProtocolError::NonfinishedPagingState.into()); + default_policy_assert_never_retries(RequestAttemptError::RepreparedIdMissingInBatch); + default_policy_assert_never_retries(RequestAttemptError::RepreparedIdChanged { + statement: String::new(), + expected_id: vec![], + reprepared_id: vec![], + }); + default_policy_assert_never_retries(RequestAttemptError::CqlRequestSerialization( + CqlRequestSerializationError::BatchSerialization( + BatchSerializationError::TooManyStatements(u16::MAX as usize + 1), + ), + )); } // Asserts that for this error policy retries on next on idempotent queries only - fn default_policy_assert_idempotent_next(error: QueryError) { + fn default_policy_assert_idempotent_next(error: RequestAttemptError) { let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( policy.decide_should_retry(make_query_info(&error, false)), @@ -236,13 +244,12 @@ mod tests { fn default_idempotent_next_retries() { setup_tracing(); let idempotent_next_errors = vec![ - QueryError::DbError(DbError::Overloaded, String::new()), - QueryError::DbError(DbError::TruncateError, String::new()), - QueryError::DbError(DbError::ServerError, String::new()), - QueryError::BrokenConnection( + RequestAttemptError::DbError(DbError::Overloaded, String::new()), + RequestAttemptError::DbError(DbError::TruncateError, String::new()), + RequestAttemptError::DbError(DbError::ServerError, String::new()), + RequestAttemptError::BrokenConnectionError( BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(), ), - QueryError::ConnectionPoolError(ConnectionPoolError::Initializing), ]; for error in idempotent_next_errors { @@ -254,7 +261,7 @@ mod tests { #[test] fn default_bootstrapping() { setup_tracing(); - let error = QueryError::DbError(DbError::IsBootstrapping, String::new()); + let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new()); let mut policy = DefaultRetryPolicy::new().new_session(); assert_eq!( @@ -273,7 +280,7 @@ mod tests { #[test] fn default_unavailable() { setup_tracing(); - let error = QueryError::DbError( + let error = RequestAttemptError::DbError( DbError::Unavailable { consistency: Consistency::Two, required: 2, @@ -308,7 +315,7 @@ mod tests { fn default_read_timeout() { setup_tracing(); // Enough responses and data_present == false - coordinator received only checksums - let enough_responses_no_data = QueryError::DbError( + let enough_responses_no_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -342,7 +349,7 @@ mod tests { // Enough responses but data_present == true - coordinator probably timed out // waiting for read-repair acknowledgement. - let enough_responses_with_data = QueryError::DbError( + let enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -367,7 +374,7 @@ mod tests { ); // Not enough responses, data_present == true - let not_enough_responses_with_data = QueryError::DbError( + let not_enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 1, @@ -397,7 +404,7 @@ mod tests { fn default_write_timeout() { setup_tracing(); // WriteType == BatchLog - let good_write_type = QueryError::DbError( + let good_write_type = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received: 1, @@ -426,7 +433,7 @@ mod tests { ); // WriteType != BatchLog - let bad_write_type = QueryError::DbError( + let bad_write_type = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received: 4, diff --git a/scylla/src/policies/retry/downgrading_consistency.rs b/scylla/src/policies/retry/downgrading_consistency.rs index 27e5eb132d..291e73fc24 100644 --- a/scylla/src/policies/retry/downgrading_consistency.rs +++ b/scylla/src/policies/retry/downgrading_consistency.rs @@ -2,7 +2,7 @@ use scylla_cql::Consistency; use tracing::debug; use super::{QueryInfo, RetryDecision, RetryPolicy, RetrySession}; -use crate::errors::{DbError, QueryError, WriteType}; +use crate::errors::{DbError, RequestAttemptError, WriteType}; /// Downgrading consistency retry policy - retries with lower consistency level if it knows\ /// that the initial CL is unreachable. Also, it behaves as [DefaultRetryPolicy](crate::policies::retry::DefaultRetryPolicy) @@ -51,7 +51,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { let cl = match query_info.consistency { Consistency::Serial | Consistency::LocalSerial => { return match query_info.error { - QueryError::DbError(DbError::Unavailable { .. }, _) => { + RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => { // JAVA-764: if the requested consistency level is serial, it means that the operation failed at // the paxos phase of a LWT. // Retry on the next host, on the assumption that the initial coordinator could be network-isolated. @@ -88,11 +88,10 @@ impl RetrySession for DowngradingConsistencyRetrySession { match query_info.error { // Basic errors - there are some problems on this node // Retry on a different one if possible - QueryError::BrokenConnection(_) - | QueryError::ConnectionPoolError(_) - | QueryError::DbError(DbError::Overloaded, _) - | QueryError::DbError(DbError::ServerError, _) - | QueryError::DbError(DbError::TruncateError, _) => { + RequestAttemptError::BrokenConnectionError(_) + | RequestAttemptError::DbError(DbError::Overloaded, _) + | RequestAttemptError::DbError(DbError::ServerError, _) + | RequestAttemptError::DbError(DbError::TruncateError, _) => { if query_info.is_idempotent { RetryDecision::RetryNextNode(None) } else { @@ -101,7 +100,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { } // Unavailable - the current node believes that not enough nodes // are alive to satisfy specified consistency requirements. - QueryError::DbError(DbError::Unavailable { alive, .. }, _) => { + RequestAttemptError::DbError(DbError::Unavailable { alive, .. }, _) => { if !self.was_retry { self.was_retry = true; max_likely_to_work_cl(*alive, cl) @@ -110,7 +109,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { } } // ReadTimeout - coordinator didn't receive enough replies in time. - QueryError::DbError( + RequestAttemptError::DbError( DbError::ReadTimeout { received, required, @@ -132,7 +131,7 @@ impl RetrySession for DowngradingConsistencyRetrySession { } } // Write timeout - coordinator didn't receive enough replies in time. - QueryError::DbError( + RequestAttemptError::DbError( DbError::WriteTimeout { write_type, received, @@ -161,9 +160,11 @@ impl RetrySession for DowngradingConsistencyRetrySession { } } // The node is still bootstrapping it can't execute the query, we should try another one - QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(None), + RequestAttemptError::DbError(DbError::IsBootstrapping, _) => { + RetryDecision::RetryNextNode(None) + } // Connection to the contacted node is overloaded, try another one - QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), + RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None), // In all other cases propagate the error to the user _ => RetryDecision::DontRetry, } @@ -177,8 +178,9 @@ impl RetrySession for DowngradingConsistencyRetrySession { #[cfg(test)] mod tests { use bytes::Bytes; + use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError}; - use crate::errors::{BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError}; + use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError}; use crate::test_utils::setup_tracing; use super::*; @@ -196,7 +198,7 @@ mod tests { ]; fn make_query_info_with_cl( - error: &QueryError, + error: &RequestAttemptError, is_idempotent: bool, cl: Consistency, ) -> QueryInfo<'_> { @@ -208,7 +210,10 @@ mod tests { } // Asserts that downgrading consistency policy never retries for this Error - fn downgrading_consistency_policy_assert_never_retries(error: QueryError, cl: Consistency) { + fn downgrading_consistency_policy_assert_never_retries( + error: RequestAttemptError, + cl: Consistency, + ) { let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)), @@ -264,28 +269,39 @@ mod tests { for &cl in CONSISTENCY_LEVELS { for dberror in never_retried_dberrors.clone() { downgrading_consistency_policy_assert_never_retries( - QueryError::DbError(dberror, String::new()), + RequestAttemptError::DbError(dberror, String::new()), cl, ); } downgrading_consistency_policy_assert_never_retries( - QueryError::BadQuery(BadQuery::Other( - "Length of provided values must be equal to number of batch statements \ - (got 1 values, 2 statements)" - .to_owned(), - )), + RequestAttemptError::RepreparedIdMissingInBatch, cl, ); downgrading_consistency_policy_assert_never_retries( - ProtocolError::NonfinishedPagingState.into(), + RequestAttemptError::RepreparedIdChanged { + statement: String::new(), + expected_id: vec![], + reprepared_id: vec![], + }, + cl, + ); + downgrading_consistency_policy_assert_never_retries( + RequestAttemptError::CqlRequestSerialization( + CqlRequestSerializationError::BatchSerialization( + BatchSerializationError::TooManyStatements(u16::MAX as usize + 1), + ), + ), cl, ); } } // Asserts that for this error policy retries on next on idempotent queries only - fn downgrading_consistency_policy_assert_idempotent_next(error: QueryError, cl: Consistency) { + fn downgrading_consistency_policy_assert_idempotent_next( + error: RequestAttemptError, + cl: Consistency, + ) { let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); assert_eq!( policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)), @@ -318,13 +334,12 @@ mod tests { fn downgrading_consistency_idempotent_next_retries() { setup_tracing(); let idempotent_next_errors = vec![ - QueryError::DbError(DbError::Overloaded, String::new()), - QueryError::DbError(DbError::TruncateError, String::new()), - QueryError::DbError(DbError::ServerError, String::new()), - QueryError::BrokenConnection( + RequestAttemptError::DbError(DbError::Overloaded, String::new()), + RequestAttemptError::DbError(DbError::TruncateError, String::new()), + RequestAttemptError::DbError(DbError::ServerError, String::new()), + RequestAttemptError::BrokenConnectionError( BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(), ), - QueryError::ConnectionPoolError(ConnectionPoolError::Initializing), ]; for &cl in CONSISTENCY_LEVELS { @@ -338,7 +353,7 @@ mod tests { #[test] fn downgrading_consistency_bootstrapping() { setup_tracing(); - let error = QueryError::DbError(DbError::IsBootstrapping, String::new()); + let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new()); for &cl in CONSISTENCY_LEVELS { let mut policy = DowngradingConsistencyRetryPolicy::new().new_session(); @@ -360,7 +375,7 @@ mod tests { fn downgrading_consistency_unavailable() { setup_tracing(); let alive = 1; - let error = QueryError::DbError( + let error = RequestAttemptError::DbError( DbError::Unavailable { consistency: Consistency::Two, required: 2, @@ -399,7 +414,7 @@ mod tests { fn downgrading_consistency_read_timeout() { setup_tracing(); // Enough responses and data_present == false - coordinator received only checksums - let enough_responses_no_data = QueryError::DbError( + let enough_responses_no_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -450,7 +465,7 @@ mod tests { } // Enough responses but data_present == true - coordinator probably timed out // waiting for read-repair acknowledgement. - let enough_responses_with_data = QueryError::DbError( + let enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received: 2, @@ -486,7 +501,7 @@ mod tests { // Not enough responses, data_present == true let received = 1; - let not_enough_responses_with_data = QueryError::DbError( + let not_enough_responses_with_data = RequestAttemptError::DbError( DbError::ReadTimeout { consistency: Consistency::Two, received, @@ -548,7 +563,7 @@ mod tests { setup_tracing(); for (received, required) in (1..=5).zip(2..=6) { // WriteType == BatchLog - let write_type_batchlog = QueryError::DbError( + let write_type_batchlog = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received, @@ -591,7 +606,7 @@ mod tests { } // WriteType == UnloggedBatch - let write_type_unlogged_batch = QueryError::DbError( + let write_type_unlogged_batch = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received, @@ -634,7 +649,7 @@ mod tests { } // WriteType == other - let write_type_other = QueryError::DbError( + let write_type_other = RequestAttemptError::DbError( DbError::WriteTimeout { consistency: Consistency::Two, received, diff --git a/scylla/src/policies/retry/retry_policy.rs b/scylla/src/policies/retry/retry_policy.rs index 92949f9a02..2b48a608d5 100644 --- a/scylla/src/policies/retry/retry_policy.rs +++ b/scylla/src/policies/retry/retry_policy.rs @@ -2,13 +2,13 @@ //! To decide when to retry a query the `Session` can use any object which implements //! the `RetryPolicy` trait -use crate::errors::QueryError; +use crate::errors::RequestAttemptError; use crate::frame::types::Consistency; /// Information about a failed query pub struct QueryInfo<'a> { /// The error with which the query failed - pub error: &'a QueryError, + pub error: &'a RequestAttemptError, /// A query is idempotent if it can be applied multiple times without changing the result of the initial application\ /// If set to `true` we can be sure that it is idempotent\ /// If set to `false` it is unknown whether it is idempotent