Skip to content

Commit

Permalink
history: rename QueryHistory[Result] to Request*
Browse files Browse the repository at this point in the history
  • Loading branch information
muzarski committed Jan 23, 2025
1 parent 701927b commit ff85ad0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 54 deletions.
101 changes: 57 additions & 44 deletions scylla/src/observability/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ pub struct AttemptId(pub usize);
pub struct SpeculativeId(pub usize);

/// Any type implementing this trait can be passed to Session
/// to collect execution history of specific queries.\
/// to collect execution history of specific requests.\
/// In order to use it call `set_history_listener` on
/// `Query`, `PreparedStatement`, etc...\
/// The listener has to generate unique IDs for new queries, attempts and speculative fibers.
/// The listener has to generate unique IDs for new requests, attempts and speculative fibers.
/// These ids are then used by the caller to identify them.\
/// It's important to note that even after a query is finished there still might come events related to it.
/// These events come from speculative futures that didn't notice the query is done already.
Expand Down Expand Up @@ -131,7 +131,7 @@ impl HistoryCollector {

/// Takes the data out of the collector. The collected events are cleared.\
/// It's possible that after finishing a query and taking out the events
/// new ones will still come - from queries that haven't been cancelled yet.
/// new ones will still come - from requests that haven't been cancelled yet.
pub fn take_collected(&self) -> HistoryCollectorData {
self.do_with_data(|data| {
let mut data_to_swap = HistoryCollectorData {
Expand Down Expand Up @@ -246,25 +246,25 @@ impl HistoryListener for HistoryCollector {
}
}

/// Structured representation of queries history.\
/// Structured representation of requests history.\
/// HistoryCollector collects raw events which later can be converted
/// to this pretty representation.\
/// It has a `Display` impl which can be used for printing pretty query history.
/// It has a `Display` impl which can be used for printing pretty request history.
#[derive(Debug, Clone)]
pub struct StructuredHistory {
pub queries: Vec<QueryHistory>,
pub requests: Vec<RequestHistory>,
}

#[derive(Debug, Clone)]
pub struct QueryHistory {
pub struct RequestHistory {
pub start_time: TimePoint,
pub non_speculative_fiber: FiberHistory,
pub speculative_fibers: Vec<FiberHistory>,
pub result: Option<QueryHistoryResult>,
pub result: Option<RequestHistoryResult>,
}

#[derive(Debug, Clone)]
pub enum QueryHistoryResult {
pub enum RequestHistoryResult {
Success(TimePoint),
Error(TimePoint, RequestError),
}
Expand All @@ -291,10 +291,10 @@ pub enum AttemptResult {
impl From<&HistoryCollectorData> for StructuredHistory {
fn from(data: &HistoryCollectorData) -> StructuredHistory {
let mut attempts: BTreeMap<AttemptId, AttemptHistory> = BTreeMap::new();
let mut queries: BTreeMap<RequestId, QueryHistory> = BTreeMap::new();
let mut requests: BTreeMap<RequestId, RequestHistory> = BTreeMap::new();
let mut fibers: BTreeMap<SpeculativeId, FiberHistory> = BTreeMap::new();

// Collect basic data about queries, attempts and speculative fibers
// Collect basic data about requests, attempts and speculative fibers
for (event, event_time) in &data.events {
match event {
HistoryEvent::NewAttempt(attempt_id, _, _, node_addr) => {
Expand Down Expand Up @@ -324,9 +324,9 @@ impl From<&HistoryCollectorData> for StructuredHistory {
}
}
HistoryEvent::NewQuery(request_id) => {
queries.insert(
requests.insert(
*request_id,
QueryHistory {
RequestHistory {
start_time: *event_time,
non_speculative_fiber: FiberHistory {
start_time: *event_time,
Expand All @@ -338,13 +338,14 @@ impl From<&HistoryCollectorData> for StructuredHistory {
);
}
HistoryEvent::QuerySuccess(request_id) => {
if let Some(query) = queries.get_mut(request_id) {
query.result = Some(QueryHistoryResult::Success(*event_time));
if let Some(query) = requests.get_mut(request_id) {
query.result = Some(RequestHistoryResult::Success(*event_time));
}
}
HistoryEvent::QueryError(request_id, error) => {
if let Some(query) = queries.get_mut(request_id) {
query.result = Some(QueryHistoryResult::Error(*event_time, error.clone()));
if let Some(query) = requests.get_mut(request_id) {
query.result =
Some(RequestHistoryResult::Error(*event_time, error.clone()));
}
}
HistoryEvent::NewSpeculativeFiber(speculative_id, _) => {
Expand All @@ -370,7 +371,7 @@ impl From<&HistoryCollectorData> for StructuredHistory {
}
}
None => {
if let Some(query) = queries.get_mut(request_id) {
if let Some(query) = requests.get_mut(request_id) {
query.non_speculative_fiber.attempts.push(attempt);
}
}
Expand All @@ -379,19 +380,19 @@ impl From<&HistoryCollectorData> for StructuredHistory {
}
}

// Move speculative fibers to their queries
// Move speculative fibers to their requests
for (event, _) in &data.events {
if let HistoryEvent::NewSpeculativeFiber(speculative_id, request_id) = event {
if let Some(fiber) = fibers.remove(speculative_id) {
if let Some(query) = queries.get_mut(request_id) {
if let Some(query) = requests.get_mut(request_id) {
query.speculative_fibers.push(fiber);
}
}
}
}

StructuredHistory {
queries: queries.into_values().collect(),
requests: requests.into_values().collect(),
}
}
}
Expand All @@ -400,7 +401,7 @@ impl From<&HistoryCollectorData> for StructuredHistory {
impl Display for StructuredHistory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Queries History:")?;
for (i, query) in self.queries.iter().enumerate() {
for (i, query) in self.requests.iter().enumerate() {
writeln!(f, "=== Query #{} ===", i)?;
writeln!(f, "| start_time: {}", query.start_time)?;
writeln!(f, "| Non-speculative attempts:")?;
Expand All @@ -414,10 +415,10 @@ impl Display for StructuredHistory {
}
writeln!(f, "|")?;
match &query.result {
Some(QueryHistoryResult::Success(succ_time)) => {
Some(RequestHistoryResult::Success(succ_time)) => {
writeln!(f, "| Query successful at {}", succ_time)?;
}
Some(QueryHistoryResult::Error(err_time, error)) => {
Some(RequestHistoryResult::Error(err_time, error)) => {
writeln!(f, "| Query failed at {}", err_time)?;
writeln!(f, "| Error: {}", error)?;
}
Expand Down Expand Up @@ -461,8 +462,8 @@ mod tests {
};

use super::{
AttemptId, AttemptResult, HistoryCollector, HistoryListener, QueryHistoryResult, RequestId,
SpeculativeId, StructuredHistory, TimePoint,
AttemptId, AttemptResult, HistoryCollector, HistoryListener, RequestHistoryResult,
RequestId, SpeculativeId, StructuredHistory, TimePoint,
};
use assert_matches::assert_matches;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
Expand All @@ -480,11 +481,11 @@ mod tests {
Utc,
);

for query in &mut history.queries {
for query in &mut history.requests {
query.start_time = the_time;
match &mut query.result {
Some(QueryHistoryResult::Success(succ_time)) => *succ_time = the_time,
Some(QueryHistoryResult::Error(err_time, _)) => *err_time = the_time,
Some(RequestHistoryResult::Success(succ_time)) => *succ_time = the_time,
Some(RequestHistoryResult::Error(err_time, _)) => *err_time = the_time,
None => {}
};

Expand Down Expand Up @@ -543,7 +544,7 @@ mod tests {
let history_collector = HistoryCollector::new();
let history: StructuredHistory = history_collector.clone_structured_history();

assert!(history.queries.is_empty());
assert!(history.requests.is_empty());

let displayed = "Queries History:
";
Expand All @@ -559,9 +560,12 @@ mod tests {

let history: StructuredHistory = history_collector.clone_structured_history();

assert_eq!(history.queries.len(), 1);
assert!(history.queries[0].non_speculative_fiber.attempts.is_empty());
assert!(history.queries[0].speculative_fibers.is_empty());
assert_eq!(history.requests.len(), 1);
assert!(history.requests[0]
.non_speculative_fiber
.attempts
.is_empty());
assert!(history.requests[0].speculative_fibers.is_empty());

let displayed = "Queries History:
=== Query #0 ===
Expand All @@ -588,11 +592,11 @@ mod tests {

let history: StructuredHistory = history_collector.clone_structured_history();

assert_eq!(history.queries.len(), 1);
assert_eq!(history.queries[0].non_speculative_fiber.attempts.len(), 1);
assert!(history.queries[0].speculative_fibers.is_empty());
assert_eq!(history.requests.len(), 1);
assert_eq!(history.requests[0].non_speculative_fiber.attempts.len(), 1);
assert!(history.requests[0].speculative_fibers.is_empty());
assert_matches!(
history.queries[0].non_speculative_fiber.attempts[0].result,
history.requests[0].non_speculative_fiber.attempts[0].result,
Some(AttemptResult::Success(_))
);

Expand Down Expand Up @@ -676,12 +680,21 @@ mod tests {

let history: StructuredHistory = history_collector.clone_structured_history();

assert_eq!(history.queries.len(), 1);
assert!(history.queries[0].non_speculative_fiber.attempts.is_empty());
assert_eq!(history.queries[0].speculative_fibers.len(), 3);
assert!(history.queries[0].speculative_fibers[0].attempts.is_empty());
assert!(history.queries[0].speculative_fibers[1].attempts.is_empty());
assert!(history.queries[0].speculative_fibers[2].attempts.is_empty());
assert_eq!(history.requests.len(), 1);
assert!(history.requests[0]
.non_speculative_fiber
.attempts
.is_empty());
assert_eq!(history.requests[0].speculative_fibers.len(), 3);
assert!(history.requests[0].speculative_fibers[0]
.attempts
.is_empty());
assert!(history.requests[0].speculative_fibers[1]
.attempts
.is_empty());
assert!(history.requests[0].speculative_fibers[2]
.attempts
.is_empty());

let displayed = "Queries History:
=== Query #0 ===
Expand Down Expand Up @@ -813,7 +826,7 @@ mod tests {
}

#[test]
fn multiple_queries() {
fn multiple_requests() {
setup_tracing();
let history_collector = HistoryCollector::new();

Expand Down
20 changes: 10 additions & 10 deletions scylla/tests/integration/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::StreamExt;
use scylla::errors::{RequestAttemptError, RequestError};
use scylla::frame::response::result::Row;
use scylla::observability::history::{
AttemptResult, HistoryCollector, QueryHistoryResult, StructuredHistory, TimePoint,
AttemptResult, HistoryCollector, RequestHistoryResult, StructuredHistory, TimePoint,
};
use scylla::query::Query;

Expand All @@ -24,11 +24,11 @@ fn set_one_time(mut history: StructuredHistory) -> StructuredHistory {
Utc,
);

for query in &mut history.queries {
for query in &mut history.requests {
query.start_time = the_time;
match &mut query.result {
Some(QueryHistoryResult::Success(succ_time)) => *succ_time = the_time,
Some(QueryHistoryResult::Error(err_time, _)) => *err_time = the_time,
Some(RequestHistoryResult::Success(succ_time)) => *succ_time = the_time,
Some(RequestHistoryResult::Error(err_time, _)) => *err_time = the_time,
None => {}
};

Expand Down Expand Up @@ -56,7 +56,7 @@ fn set_one_time(mut history: StructuredHistory) -> StructuredHistory {
fn set_one_node(mut history: StructuredHistory) -> StructuredHistory {
let the_node: SocketAddr = node1_addr();

for query in &mut history.queries {
for query in &mut history.requests {
for fiber in std::iter::once(&mut query.non_speculative_fiber)
.chain(query.speculative_fibers.iter_mut())
{
Expand Down Expand Up @@ -84,12 +84,12 @@ fn set_one_db_error_message(mut history: StructuredHistory) -> StructuredHistory
}
};

for query in &mut history.queries {
if let Some(QueryHistoryResult::Error(_, err)) = &mut query.result {
for request in &mut history.requests {
if let Some(RequestHistoryResult::Error(_, err)) = &mut request.result {
set_msg_request_error(err);
}
for fiber in std::iter::once(&mut query.non_speculative_fiber)
.chain(query.speculative_fibers.iter_mut())
for fiber in std::iter::once(&mut request.non_speculative_fiber)
.chain(request.speculative_fibers.iter_mut())
{
for attempt in &mut fiber.attempts {
if let Some(AttemptResult::Error(_, err, _)) = &mut attempt.result {
Expand Down Expand Up @@ -249,7 +249,7 @@ async fn iterator_query_history() {

let history = history_collector.clone_structured_history();

assert!(history.queries.len() >= 4);
assert!(history.requests.len() >= 4);

let displayed_prefix = "Queries History:
=== Query #0 ===
Expand Down

0 comments on commit ff85ad0

Please sign in to comment.