Skip to content

Commit

Permalink
retry_policy: decision based on RequestAttemptError
Browse files Browse the repository at this point in the history
Same as for LBP::on_query_failure, it narrows the type.
  • Loading branch information
muzarski committed Jan 15, 2025
1 parent 8b73ff2 commit b94f6b5
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 83 deletions.
9 changes: 6 additions & 3 deletions scylla/src/client/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ where
.instrument(span.clone())
.await;

last_error = match queries_result {
let request_error: RequestAttemptError = match queries_result {
Ok(proof) => {
trace!(parent: &span, "Query succeeded");
// query_pages returned Ok, so we are guaranteed
Expand All @@ -218,13 +218,13 @@ where
error = %error,
"Query failed"
);
error.into_query_error()
error
}
};

// Use retry policy to decide what to do next
let query_info = QueryInfo {
error: &last_error,
error: &request_error,
is_idempotent: self.query_is_idempotent,
consistency: self.query_consistency,
};
Expand All @@ -234,7 +234,10 @@ where
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
);

last_error = request_error.into_query_error();
self.log_attempt_error(&last_error, &retry_decision);

match retry_decision {
RetryDecision::RetrySameNode(cl) => {
self.metrics.inc_retries_num();
Expand Down
16 changes: 11 additions & 5 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2055,7 +2055,7 @@ where
.await;

let elapsed = request_start.elapsed();
last_error = match request_result {
let request_error: RequestAttemptError = match request_result {
Ok(response) => {
trace!(parent: &span, "Request succeeded");
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
Expand All @@ -2080,14 +2080,13 @@ where
node,
&e,
);
Some(e.into_query_error())
e
}
};

let the_error: &QueryError = last_error.as_ref().unwrap();
// Use retry policy to decide what to do next
let query_info = QueryInfo {
error: the_error,
error: &request_error,
is_idempotent: context.is_idempotent,
consistency: context
.consistency_set_on_statement
Expand All @@ -2099,7 +2098,14 @@ where
parent: &span,
retry_decision = format!("{:?}", retry_decision).as_str()
);
context.log_attempt_error(&attempt_id, the_error, &retry_decision);

last_error = Some(request_error.into_query_error());
context.log_attempt_error(
&attempt_id,
last_error.as_ref().unwrap(),
&retry_decision,
);

match retry_decision {
RetryDecision::RetrySameNode(new_cl) => {
self.metrics.inc_retries_num();
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ pub enum CqlEventHandlingError {
///
/// requests. The retry decision is made based
/// on this error.
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum RequestAttemptError {
/// Failed to serialize query parameters. This error occurs, when user executes
Expand Down
79 changes: 43 additions & 36 deletions scylla/src/policies/retry/default.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use scylla_cql::frame::response::error::{DbError, WriteType};

use crate::errors::QueryError;
use crate::errors::RequestAttemptError;

use super::{QueryInfo, RetryDecision, RetryPolicy, RetrySession};

Expand Down Expand Up @@ -57,11 +57,10 @@ impl RetrySession for DefaultRetrySession {
match query_info.error {
// Basic errors - there are some problems on this node
// Retry on a different one if possible
QueryError::BrokenConnection(_)
| QueryError::ConnectionPoolError(_)
| QueryError::DbError(DbError::Overloaded, _)
| QueryError::DbError(DbError::ServerError, _)
| QueryError::DbError(DbError::TruncateError, _) => {
RequestAttemptError::BrokenConnectionError(_)
| RequestAttemptError::DbError(DbError::Overloaded, _)
| RequestAttemptError::DbError(DbError::ServerError, _)
| RequestAttemptError::DbError(DbError::TruncateError, _) => {
if query_info.is_idempotent {
RetryDecision::RetryNextNode(None)
} else {
Expand All @@ -73,7 +72,7 @@ impl RetrySession for DefaultRetrySession {
// Maybe this node has network problems - try a different one.
// Perform at most one retry - it's unlikely that two nodes
// have network problems at the same time
QueryError::DbError(DbError::Unavailable { .. }, _) => {
RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => {
if !self.was_unavailable_retry {
self.was_unavailable_retry = true;
RetryDecision::RetryNextNode(None)
Expand All @@ -87,7 +86,7 @@ impl RetrySession for DefaultRetrySession {
// This happens when the coordinator picked replicas that were overloaded/dying.
// Retried request should have some useful response because the node will detect
// that these replicas are dead.
QueryError::DbError(
RequestAttemptError::DbError(
DbError::ReadTimeout {
received,
required,
Expand All @@ -107,7 +106,7 @@ impl RetrySession for DefaultRetrySession {
// Retry at most once and only for BatchLog write.
// Coordinator probably didn't detect the nodes as dead.
// By the time we retry they should be detected as dead.
QueryError::DbError(DbError::WriteTimeout { write_type, .. }, _) => {
RequestAttemptError::DbError(DbError::WriteTimeout { write_type, .. }, _) => {
if !self.was_write_timeout_retry
&& query_info.is_idempotent
&& *write_type == WriteType::BatchLog
Expand All @@ -119,9 +118,11 @@ impl RetrySession for DefaultRetrySession {
}
}
// The node is still bootstrapping it can't execute the query, we should try another one
QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(None),
RequestAttemptError::DbError(DbError::IsBootstrapping, _) => {
RetryDecision::RetryNextNode(None)
}
// Connection to the contacted node is overloaded, try another one
QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None),
RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None),
// In all other cases propagate the error to the user
_ => RetryDecision::DontRetry,
}
Expand All @@ -135,15 +136,14 @@ impl RetrySession for DefaultRetrySession {
#[cfg(test)]
mod tests {
use super::{DefaultRetryPolicy, QueryInfo, RetryDecision, RetryPolicy};
use crate::errors::{
BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError, QueryError,
};
use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError};
use crate::errors::{DbError, WriteType};
use crate::statement::Consistency;
use crate::test_utils::setup_tracing;
use bytes::Bytes;
use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError};

fn make_query_info(error: &QueryError, is_idempotent: bool) -> QueryInfo<'_> {
fn make_query_info(error: &RequestAttemptError, is_idempotent: bool) -> QueryInfo<'_> {
QueryInfo {
error,
is_idempotent,
Expand All @@ -152,7 +152,7 @@ mod tests {
}

// Asserts that default policy never retries for this Error
fn default_policy_assert_never_retries(error: QueryError) {
fn default_policy_assert_never_retries(error: RequestAttemptError) {
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, false)),
Expand Down Expand Up @@ -206,19 +206,27 @@ mod tests {
];

for dberror in never_retried_dberrors {
default_policy_assert_never_retries(QueryError::DbError(dberror, String::new()));
default_policy_assert_never_retries(RequestAttemptError::DbError(
dberror,
String::new(),
));
}

default_policy_assert_never_retries(QueryError::BadQuery(BadQuery::Other(
"Length of provided values must be equal to number of batch statements \
(got 1 values, 2 statements)"
.to_owned(),
)));
default_policy_assert_never_retries(ProtocolError::NonfinishedPagingState.into());
default_policy_assert_never_retries(RequestAttemptError::RepreparedIdMissingInBatch);
default_policy_assert_never_retries(RequestAttemptError::RepreparedIdChanged {
statement: String::new(),
expected_id: vec![],
reprepared_id: vec![],
});
default_policy_assert_never_retries(RequestAttemptError::CqlRequestSerialization(
CqlRequestSerializationError::BatchSerialization(
BatchSerializationError::TooManyStatements(u16::MAX as usize + 1),
),
));
}

// Asserts that for this error policy retries on next on idempotent queries only
fn default_policy_assert_idempotent_next(error: QueryError) {
fn default_policy_assert_idempotent_next(error: RequestAttemptError) {
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, false)),
Expand All @@ -236,13 +244,12 @@ mod tests {
fn default_idempotent_next_retries() {
setup_tracing();
let idempotent_next_errors = vec![
QueryError::DbError(DbError::Overloaded, String::new()),
QueryError::DbError(DbError::TruncateError, String::new()),
QueryError::DbError(DbError::ServerError, String::new()),
QueryError::BrokenConnection(
RequestAttemptError::DbError(DbError::Overloaded, String::new()),
RequestAttemptError::DbError(DbError::TruncateError, String::new()),
RequestAttemptError::DbError(DbError::ServerError, String::new()),
RequestAttemptError::BrokenConnectionError(
BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
),
QueryError::ConnectionPoolError(ConnectionPoolError::Initializing),
];

for error in idempotent_next_errors {
Expand All @@ -254,7 +261,7 @@ mod tests {
#[test]
fn default_bootstrapping() {
setup_tracing();
let error = QueryError::DbError(DbError::IsBootstrapping, String::new());
let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new());

let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
Expand All @@ -273,7 +280,7 @@ mod tests {
#[test]
fn default_unavailable() {
setup_tracing();
let error = QueryError::DbError(
let error = RequestAttemptError::DbError(
DbError::Unavailable {
consistency: Consistency::Two,
required: 2,
Expand Down Expand Up @@ -308,7 +315,7 @@ mod tests {
fn default_read_timeout() {
setup_tracing();
// Enough responses and data_present == false - coordinator received only checksums
let enough_responses_no_data = QueryError::DbError(
let enough_responses_no_data = RequestAttemptError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
Expand Down Expand Up @@ -342,7 +349,7 @@ mod tests {

// Enough responses but data_present == true - coordinator probably timed out
// waiting for read-repair acknowledgement.
let enough_responses_with_data = QueryError::DbError(
let enough_responses_with_data = RequestAttemptError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
Expand All @@ -367,7 +374,7 @@ mod tests {
);

// Not enough responses, data_present == true
let not_enough_responses_with_data = QueryError::DbError(
let not_enough_responses_with_data = RequestAttemptError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 1,
Expand Down Expand Up @@ -397,7 +404,7 @@ mod tests {
fn default_write_timeout() {
setup_tracing();
// WriteType == BatchLog
let good_write_type = QueryError::DbError(
let good_write_type = RequestAttemptError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received: 1,
Expand Down Expand Up @@ -426,7 +433,7 @@ mod tests {
);

// WriteType != BatchLog
let bad_write_type = QueryError::DbError(
let bad_write_type = RequestAttemptError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received: 4,
Expand Down
Loading

0 comments on commit b94f6b5

Please sign in to comment.