Skip to content

Commit

Permalink
Move handling of MissingUserDefinedType to ClusterData::new
Browse files Browse the repository at this point in the history
This commit changes type of `keyspaces` field in `Metadata` from
`HashMap<String, Keyspace>` to `HashMap<String, Result<Keyspace, MissingUserDefinedType>>`.
Because of that, it also removed `MissingUserDefinedType` handling from
`query_metadata`. Now handling this error is done in `ClusterData::new`.
This has an advantage: we can use older version of the keyspace metadata
if the new version has this error.
  • Loading branch information
Lorak-mmk committed Jan 20, 2025
1 parent 82561f1 commit 800aabc
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 33 deletions.
15 changes: 2 additions & 13 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(crate) struct MetadataReader {
/// Describes all metadata retrieved from the cluster
pub(crate) struct Metadata {
pub(crate) peers: Vec<Peer>,
pub(crate) keyspaces: HashMap<String, Keyspace>,
pub(crate) keyspaces: HashMap<String, Result<Keyspace, MissingUserDefinedType>>,
}

#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
Expand Down Expand Up @@ -297,7 +297,7 @@ pub struct UserDefinedType {
/// Represents a user defined type whose definition is missing from the metadata.
#[derive(Clone, Debug, Error)]
#[error("Missing UDT: {keyspace}, {name}")]
struct MissingUserDefinedType {
pub(crate) struct MissingUserDefinedType {
name: String,
keyspace: String,
}
Expand Down Expand Up @@ -800,17 +800,6 @@ async fn query_metadata(
return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists).into());
}

let keyspaces = keyspaces
.into_iter()
.filter_map(|(ks_name, ks)| match ks {
Ok(ks) => Some((ks_name, ks)),
Err(e) => {
warn!("Error while processing keyspace \"{ks_name}\": {e}");
None
}
})
.collect();

Ok(Metadata { peers, keyspaces })
}

Expand Down
26 changes: 23 additions & 3 deletions scylla/src/cluster/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use scylla_cql::frame::response::result::TableSpec;
use scylla_cql::types::serialize::row::SerializedValues;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tracing::debug;
use tracing::{debug, warn};
use uuid::Uuid;

use super::metadata::{Keyspace, Metadata, Strategy};
Expand Down Expand Up @@ -64,6 +64,7 @@ impl ClusterState {
used_keyspace: &Option<VerifiedKeyspaceName>,
host_filter: Option<&dyn HostFilter>,
mut tablets: TabletsInfo,
old_keyspaces: &HashMap<String, Keyspace>,
) -> Self {
// Create new updated known_peers and ring
let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
Expand Down Expand Up @@ -109,6 +110,26 @@ impl ClusterState {
}
}

let keyspaces: HashMap<String, Keyspace> = metadata
.keyspaces
.into_iter()
.filter_map(|(ks_name, ks)| match ks {
Ok(ks) => Some((ks_name, ks)),
Err(e) => {
if let Some(old_ks) = old_keyspaces.get(&ks_name) {
warn!("Encountered an error while processing metadata of keyspace \"{ks_name}\": {e}.\
Re-using older version of this keyspace metadata");
Some((ks_name, old_ks.clone()))
} else {
warn!("Encountered an error while processing metadata of keyspace \"{ks_name}\": {e}.\
No previous version of this keyspace metadata found, so it will not be\
present in ClusterData until next refresh.");
None
}
}
})
.collect();

{
let removed_nodes = {
let mut removed_nodes = HashSet::new();
Expand All @@ -122,7 +143,7 @@ impl ClusterState {
};

let table_predicate = |spec: &TableSpec| {
if let Some(ks) = metadata.keyspaces.get(spec.ks_name()) {
if let Some(ks) = keyspaces.get(spec.ks_name()) {
ks.tables.contains_key(spec.table_name())
} else {
false
Expand Down Expand Up @@ -150,7 +171,6 @@ impl ClusterState {
)
}

let keyspaces = metadata.keyspaces;
let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies, tablets);
Expand Down
2 changes: 2 additions & 0 deletions scylla/src/cluster/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl Cluster {
&None,
host_filter.as_deref(),
TabletsInfo::new(),
&HashMap::new(),
)
.await;
cluster_data.wait_until_all_pools_are_initialized().await;
Expand Down Expand Up @@ -413,6 +414,7 @@ impl ClusterWorker {
&self.used_keyspace,
self.host_filter.as_deref(),
cluster_data.locator.tablets.clone(),
&cluster_data.keyspaces,
)
.await,
);
Expand Down
3 changes: 3 additions & 0 deletions scylla/src/policies/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,7 @@ mod tests {
&None,
None,
TabletsInfo::new(),
&HashMap::new(),
)
.await
}
Expand Down Expand Up @@ -1449,6 +1450,7 @@ mod tests {
&None,
None,
TabletsInfo::new(),
&HashMap::new(),
)
.await
}
Expand Down Expand Up @@ -2498,6 +2500,7 @@ mod tests {
Some(&FHostFilter)
},
TabletsInfo::new(),
&HashMap::new(),
)
.await;

Expand Down
48 changes: 42 additions & 6 deletions scylla/src/routing/locator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,43 +860,79 @@ mod tests {
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![F, A, C, D, G, E],
);
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_NTS_RF_2).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_2,
vec![F, A, D, G],
);
check(
160,
None,
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_SS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_SS_RF_2,
vec![F, A],
);

check(
160,
Some("eu"),
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![A, C, G],
);
check(
160,
Some("us"),
&metadata.keyspaces.get(KEYSPACE_NTS_RF_3).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_NTS_RF_3)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_NTS_RF_3,
vec![F, D, E],
);
check(
160,
Some("eu"),
&metadata.keyspaces.get(KEYSPACE_SS_RF_2).unwrap().strategy,
&metadata
.keyspaces
.get(KEYSPACE_SS_RF_2)
.unwrap()
.as_ref()
.unwrap()
.strategy,
TABLE_SS_RF_2,
vec![A],
);
Expand Down
8 changes: 4 additions & 4 deletions scylla/src/routing/locator/precomputed_replicas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ mod tests {
let mut metadata = mock_metadata_for_token_aware_tests();
metadata.keyspaces = [(
"SimpleStrategy{rf=2}".into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::SimpleStrategy {
replication_factor: 2,
},
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
)]
.iter()
.cloned()
Expand All @@ -251,7 +251,7 @@ mod tests {
metadata
.keyspaces
.values()
.map(|keyspace| &keyspace.strategy),
.map(|keyspace| &keyspace.as_ref().unwrap().strategy),
);

let check = |token, replication_factor, expected_node_ids| {
Expand Down Expand Up @@ -293,7 +293,7 @@ mod tests {
metadata
.keyspaces
.values()
.map(|keyspace| &keyspace.strategy),
.map(|keyspace| &keyspace.as_ref().unwrap().strategy),
);

let check = |token, dc, replication_factor, expected_node_ids| {
Expand Down
17 changes: 10 additions & 7 deletions scylla/src/routing/locator/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,18 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
let keyspaces = [
(
KEYSPACE_SS_RF_2.into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::SimpleStrategy {
replication_factor: 2,
},
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
),
(
KEYSPACE_NTS_RF_2.into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 2), ("us".to_owned(), 2)]
.into_iter()
Expand All @@ -138,11 +138,11 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
),
(
KEYSPACE_NTS_RF_3.into(),
Keyspace {
Ok(Keyspace {
strategy: Strategy::NetworkTopologyStrategy {
datacenter_repfactors: [("eu".to_owned(), 3), ("us".to_owned(), 3)]
.into_iter()
Expand All @@ -151,7 +151,7 @@ pub(crate) fn mock_metadata_for_token_aware_tests() -> Metadata {
tables: HashMap::new(),
views: HashMap::new(),
user_defined_types: HashMap::new(),
},
}),
),
]
.iter()
Expand Down Expand Up @@ -199,7 +199,10 @@ pub(crate) fn create_ring(metadata: &Metadata) -> impl Iterator<Item = (Token, A

pub(crate) fn create_locator(metadata: &Metadata) -> ReplicaLocator {
let ring = create_ring(metadata);
let strategies = metadata.keyspaces.values().map(|ks| &ks.strategy);
let strategies = metadata
.keyspaces
.values()
.map(|ks| &ks.as_ref().unwrap().strategy);

ReplicaLocator::new(ring, strategies, TabletsInfo::new())
}
Expand Down

0 comments on commit 800aabc

Please sign in to comment.