diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index 787ea4353512..7ec5956adca7 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -20,14 +20,13 @@ use std::time::Duration; use async_trait::async_trait; use clap::Parser; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::etcd::EtcdStore; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::table_name::TableName; use common_telemetry::info; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; -use meta_srv::service::store::etcd::EtcdStore; -use meta_srv::service::store::kv::KvBackendAdapter; use rand::Rng; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; @@ -64,9 +63,7 @@ impl BenchTableMetadataCommand { pub async fn build(&self) -> Result { let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap(); - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - etcd_store, - ))); + let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store)); let tool = BenchTableMetadata { table_metadata_manager, diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 5cc4fb46ce07..6a996bca6b20 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -27,6 +27,8 @@ use common_meta::key::table_name::{TableNameKey, TableNameValue}; use common_meta::key::table_region::{TableRegionKey, TableRegionValue}; use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue}; use common_meta::key::{RegionDistribution, TableMetaKey}; +use common_meta::kv_backend::etcd::EtcdStore; +use common_meta::kv_backend::KvBackendRef; use common_meta::range_stream::PaginationStream; use common_meta::rpc::router::TableRoute; use common_meta::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest}; @@ -35,8 +37,6 @@ use common_meta::util::get_prefix_end_key; use common_telemetry::info; use etcd_client::Client; use futures::TryStreamExt; -use meta_srv::service::store::etcd::EtcdStore; -use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef}; use prost::Message; use snafu::ResultExt; use v1_helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue}; @@ -81,7 +81,7 @@ impl UpgradeCommand { } struct MigrateTableMetadata { - etcd_store: KvStoreRef, + etcd_store: KvBackendRef, dryrun: bool, skip_table_global_keys: bool, @@ -123,7 +123,7 @@ impl MigrateTableMetadata { info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); let mut stream = PaginationStream::new( - KvBackendAdapter::wrap(self.etcd_store.clone()), + self.etcd_store.clone(), RangeRequest::new().with_range(key, range_end), PAGE_SIZE, Arc::new(|kv: KeyValue| { @@ -182,7 +182,7 @@ impl MigrateTableMetadata { let mut keys = Vec::new(); info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); let mut stream = PaginationStream::new( - KvBackendAdapter::wrap(self.etcd_store.clone()), + self.etcd_store.clone(), RangeRequest::new().with_range(key, range_end), PAGE_SIZE, Arc::new(|kv: KeyValue| { @@ -234,7 +234,7 @@ impl MigrateTableMetadata { let mut keys = Vec::new(); info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); let mut stream = PaginationStream::new( - KvBackendAdapter::wrap(self.etcd_store.clone()), + self.etcd_store.clone(), RangeRequest::new().with_range(key, range_end), PAGE_SIZE, Arc::new(|kv: KeyValue| { @@ -284,7 +284,7 @@ impl MigrateTableMetadata { info!("Start scanning key from: {}", String::from_utf8_lossy(&key)); let mut stream = PaginationStream::new( - KvBackendAdapter::wrap(self.etcd_store.clone()), + self.etcd_store.clone(), RangeRequest::new().with_range(key, range_end.clone()), PAGE_SIZE, Arc::new(|kv: KeyValue| { diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 9bd6fc07e46d..b48b57c237f9 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_config::KvStoreConfig; +use common_config::KvBackendConfig; use common_telemetry::logging::LoggingOptions; use config::{Config, Environment, File, FileFormat}; use datanode::config::{DatanodeOptions, ProcedureConfig}; @@ -30,7 +30,7 @@ pub const ENV_LIST_SEP: &str = ","; pub struct MixOptions { pub data_home: String, pub procedure: ProcedureConfig, - pub metadata_store: KvStoreConfig, + pub metadata_store: KvBackendConfig, pub frontend: FrontendOptions, pub datanode: DatanodeOptions, pub logging: LoggingOptions, diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 35d091707529..7d23702ee328 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -19,7 +19,7 @@ use catalog::kvbackend::KvBackendCatalogManager; use catalog::CatalogManagerRef; use clap::Parser; use common_base::Plugins; -use common_config::{metadata_store_dir, KvStoreConfig, WalConfig}; +use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; use common_meta::cache_invalidator::DummyKvCacheInvalidator; use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureManagerRef; @@ -97,7 +97,7 @@ pub struct StandaloneOptions { pub prom_store: PromStoreOptions, pub wal: WalConfig, pub storage: StorageConfig, - pub metadata_store: KvStoreConfig, + pub metadata_store: KvBackendConfig, pub procedure: ProcedureConfig, pub logging: LoggingOptions, pub user_provider: Option, @@ -119,7 +119,7 @@ impl Default for StandaloneOptions { prom_store: PromStoreOptions::default(), wal: WalConfig::default(), storage: StorageConfig::default(), - metadata_store: KvStoreConfig::default(), + metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), logging: LoggingOptions::default(), user_provider: None, @@ -336,7 +336,7 @@ impl StartCommand { })?; let metadata_dir = metadata_store_dir(&opts.data_home); - let (kv_store, procedure_manager) = FeInstance::try_build_standalone_components( + let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components( metadata_dir, opts.metadata_store, opts.procedure, @@ -344,15 +344,18 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let datanode = - DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), Default::default()) - .build() - .await - .context(StartDatanodeSnafu)?; + let datanode = DatanodeBuilder::new( + dn_opts.clone(), + Some(kv_backend.clone()), + Default::default(), + ) + .build() + .await + .context(StartDatanodeSnafu)?; let region_server = datanode.region_server(); let catalog_manager = KvBackendCatalogManager::new( - kv_store.clone(), + kv_backend.clone(), Arc::new(DummyKvCacheInvalidator), Arc::new(StandaloneDatanodeManager(region_server.clone())), ); @@ -366,7 +369,7 @@ impl StartCommand { // TODO: build frontend instance like in distributed mode let mut frontend = build_frontend( fe_plugins, - kv_store, + kv_backend, procedure_manager.clone(), catalog_manager, region_server, @@ -389,13 +392,13 @@ impl StartCommand { /// Build frontend instance in standalone mode async fn build_frontend( plugins: Plugins, - kv_store: KvBackendRef, + kv_backend: KvBackendRef, procedure_manager: ProcedureManagerRef, catalog_manager: CatalogManagerRef, region_server: RegionServer, ) -> Result { let frontend_instance = FeInstance::try_new_standalone( - kv_store, + kv_backend, procedure_manager, catalog_manager, plugins, diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index 34418c147cf1..4cf4a0804eb8 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -54,14 +54,14 @@ pub fn metadata_store_dir(store_dir: &str) -> String { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] -pub struct KvStoreConfig { +pub struct KvBackendConfig { // Kv file size in bytes pub file_size: ReadableSize, // Kv purge threshold in bytes pub purge_threshold: ReadableSize, } -impl Default for KvStoreConfig { +impl Default for KvBackendConfig { fn default() -> Self { Self { // log file size 256MB diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4484a82a7cfb..7fdc256d8413 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -28,6 +28,26 @@ use crate::peer::Peer; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Empty key is not allowed"))] + EmptyKey { location: Location }, + + #[snafu(display("Invalid result with a txn response: {}", err_msg))] + InvalidTxnResult { err_msg: String, location: Location }, + + #[snafu(display("Failed to connect to Etcd"))] + ConnectEtcd { + #[snafu(source)] + error: etcd_client::Error, + location: Location, + }, + + #[snafu(display("Failed to execute via Etcd"))] + EtcdFailed { + #[snafu(source)] + error: etcd_client::Error, + location: Location, + }, + #[snafu(display("Failed to get sequence: {}", err_msg))] NextSequence { err_msg: String, location: Location }, @@ -254,7 +274,10 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - IllegalServerState { .. } | EtcdTxnOpResponse { .. } => StatusCode::Internal, + IllegalServerState { .. } + | EtcdTxnOpResponse { .. } + | EtcdFailed { .. } + | ConnectEtcd { .. } => StatusCode::Internal, SerdeJson { .. } | ParseOption { .. } @@ -267,7 +290,8 @@ impl ErrorExt for Error { | NextSequence { .. } | SequenceOutOfRange { .. } | UnexpectedSequenceValue { .. } - | InvalidHeartbeatResponse { .. } => StatusCode::Unexpected, + | InvalidHeartbeatResponse { .. } + | InvalidTxnResult { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } @@ -277,7 +301,7 @@ impl ErrorExt for Error { | RenameTable { .. } | Unsupported { .. } => StatusCode::Internal, - PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, + PrimaryKeyNotFound { .. } | &EmptyKey { .. } => StatusCode::InvalidArguments, TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index 8a67ebd888d5..689e1a3864a2 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod etcd; pub mod memory; pub mod test; pub mod txn; @@ -127,3 +128,12 @@ where } } } + +pub trait ResettableKvBackend: KvBackend +where + Self::Error: ErrorExt, +{ + fn reset(&self); +} + +pub type ResettableKvBackendRef = Arc + Send + Sync>; diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs similarity index 94% rename from src/meta-srv/src/service/store/etcd.rs rename to src/common/meta/src/kv_backend/etcd.rs index 9df574708a67..ee1ebb05aa25 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -15,25 +15,47 @@ use std::any::Any; use std::sync::Arc; -use common_meta::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse}; -use common_meta::kv_backend::{KvBackend, TxnService}; -use common_meta::metrics::METRIC_META_TXN_REQUEST; -use common_meta::rpc::store::{ - BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, -}; -use common_meta::rpc::KeyValue; use etcd_client::{ Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse, }; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error; -use crate::error::{ConvertEtcdTxnObjectSnafu, Error, Result}; -use crate::service::store::etcd_util::KvPair; -use crate::service::store::kv::KvStoreRef; +use super::KvBackendRef; +use crate::error::{self, Error, Result}; +use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse}; +use crate::kv_backend::{KvBackend, TxnService}; +use crate::metrics::METRIC_META_TXN_REQUEST; +use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, +}; +use crate::rpc::KeyValue; + +pub struct KvPair<'a>(&'a etcd_client::KeyValue); + +impl<'a> KvPair<'a> { + /// Creates a `KvPair` from etcd KeyValue + #[inline] + pub fn new(kv: &'a etcd_client::KeyValue) -> Self { + Self(kv) + } + + #[inline] + pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue { + KeyValue::from(KvPair::new(kv)) + } +} + +impl<'a> From> for KeyValue { + fn from(kv: KvPair<'a>) -> Self { + Self { + key: kv.0.key().to_vec(), + value: kv.0.value().to_vec(), + } + } +} // Maximum number of operations permitted in a transaction. // The etcd default configuration's `--max-txn-ops` is 128. @@ -46,7 +68,7 @@ pub struct EtcdStore { } impl EtcdStore { - pub async fn with_endpoints(endpoints: S) -> Result + pub async fn with_endpoints(endpoints: S) -> Result where E: AsRef, S: AsRef<[E]>, @@ -58,7 +80,7 @@ impl EtcdStore { Ok(Self::with_etcd_client(client)) } - pub fn with_etcd_client(client: Client) -> KvStoreRef { + pub fn with_etcd_client(client: Client) -> KvBackendRef { Arc::new(Self { client }) } @@ -305,7 +327,7 @@ impl TxnService for EtcdStore { .txn(etcd_txn) .await .context(error::EtcdFailedSnafu)?; - txn_res.try_into().context(ConvertEtcdTxnObjectSnafu) + txn_res.try_into() } } diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 702fcc9d05bb..bc0d4df25479 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -23,6 +23,7 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; use serde::Serializer; +use super::ResettableKvBackend; use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse}; use crate::kv_backend::{KvBackend, TxnService}; use crate::metrics::METRIC_META_TXN_REQUEST; @@ -324,6 +325,12 @@ impl TxnService for MemoryKvBackend { } } +impl ResettableKvBackend for MemoryKvBackend { + fn reset(&self) { + self.clear(); + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -336,24 +343,24 @@ mod tests { }; async fn mock_mem_store_with_data() -> MemoryKvBackend { - let kv_store = MemoryKvBackend::::new(); - prepare_kv(&kv_store).await; + let kv_backend = MemoryKvBackend::::new(); + prepare_kv(&kv_backend).await; - kv_store + kv_backend } #[tokio::test] async fn test_put() { - let kv_store = mock_mem_store_with_data().await; + let kv_backend = mock_mem_store_with_data().await; - test_kv_put(kv_store).await; + test_kv_put(kv_backend).await; } #[tokio::test] async fn test_range() { - let kv_store = mock_mem_store_with_data().await; + let kv_backend = mock_mem_store_with_data().await; - test_kv_range(kv_store).await; + test_kv_range(kv_backend).await; } #[tokio::test] @@ -365,29 +372,29 @@ mod tests { #[tokio::test] async fn test_batch_get() { - let kv_store = mock_mem_store_with_data().await; + let kv_backend = mock_mem_store_with_data().await; - test_kv_batch_get(kv_store).await; + test_kv_batch_get(kv_backend).await; } #[tokio::test(flavor = "multi_thread")] async fn test_compare_and_put() { - let kv_store = Arc::new(MemoryKvBackend::::new()); + let kv_backend = Arc::new(MemoryKvBackend::::new()); - test_kv_compare_and_put(kv_store).await; + test_kv_compare_and_put(kv_backend).await; } #[tokio::test] async fn test_delete_range() { - let kv_store = mock_mem_store_with_data().await; + let kv_backend = mock_mem_store_with_data().await; - test_kv_delete_range(kv_store).await; + test_kv_delete_range(kv_backend).await; } #[tokio::test] async fn test_batch_delete() { - let kv_store = mock_mem_store_with_data().await; + let kv_backend = mock_mem_store_with_data().await; - test_kv_batch_delete(kv_store).await; + test_kv_batch_delete(kv_backend).await; } } diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index 4f8911910072..b079a3dc2d52 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -38,9 +38,9 @@ pub fn mock_kvs() -> Vec { ] } -pub async fn prepare_kv(kv_store: &impl KvBackend) { +pub async fn prepare_kv(kv_backend: &impl KvBackend) { let kvs = mock_kvs(); - assert!(kv_store + assert!(kv_backend .batch_put(BatchPutRequest { kvs, ..Default::default() @@ -48,7 +48,7 @@ pub async fn prepare_kv(kv_store: &impl KvBackend) { .await .is_ok()); - assert!(kv_store + assert!(kv_backend .put(PutRequest { key: b"key11".to_vec(), value: b"val11".to_vec(), @@ -58,8 +58,8 @@ pub async fn prepare_kv(kv_store: &impl KvBackend) { .is_ok()); } -pub async fn test_kv_put(kv_store: impl KvBackend) { - let resp = kv_store +pub async fn test_kv_put(kv_backend: impl KvBackend) { + let resp = kv_backend .put(PutRequest { key: b"key11".to_vec(), value: b"val12".to_vec(), @@ -69,7 +69,7 @@ pub async fn test_kv_put(kv_store: impl KvBackend) { .unwrap(); assert!(resp.prev_kv.is_none()); - let resp = kv_store + let resp = kv_backend .put(PutRequest { key: b"key11".to_vec(), value: b"val13".to_vec(), @@ -82,11 +82,11 @@ pub async fn test_kv_put(kv_store: impl KvBackend) { assert_eq!(b"val12", prev_kv.value()); } -pub async fn test_kv_range(kv_store: impl KvBackend) { +pub async fn test_kv_range(kv_backend: impl KvBackend) { let key = b"key1".to_vec(); let range_end = util::get_prefix_end_key(b"key1"); - let resp = kv_store + let resp = kv_backend .range(RangeRequest { key: key.clone(), range_end: range_end.clone(), @@ -102,7 +102,7 @@ pub async fn test_kv_range(kv_store: impl KvBackend) { assert_eq!(b"key11", resp.kvs[1].key()); assert_eq!(b"val11", resp.kvs[1].value()); - let resp = kv_store + let resp = kv_backend .range(RangeRequest { key: key.clone(), range_end: range_end.clone(), @@ -118,7 +118,7 @@ pub async fn test_kv_range(kv_store: impl KvBackend) { assert_eq!(b"key11", resp.kvs[1].key()); assert_eq!(b"", resp.kvs[1].value()); - let resp = kv_store + let resp = kv_backend .range(RangeRequest { key: key.clone(), limit: 0, @@ -132,7 +132,7 @@ pub async fn test_kv_range(kv_store: impl KvBackend) { assert_eq!(b"key1", resp.kvs[0].key()); assert_eq!(b"val1", resp.kvs[0].value()); - let resp = kv_store + let resp = kv_backend .range(RangeRequest { key, range_end, @@ -147,19 +147,19 @@ pub async fn test_kv_range(kv_store: impl KvBackend) { assert_eq!(b"val1", resp.kvs[0].value()); } -pub async fn test_kv_range_2(kv_store: impl KvBackend) { - kv_store +pub async fn test_kv_range_2(kv_backend: impl KvBackend) { + kv_backend .put(PutRequest::new().with_key("atest").with_value("value")) .await .unwrap(); - kv_store + kv_backend .put(PutRequest::new().with_key("test").with_value("value")) .await .unwrap(); // If both key and range_end are ‘\0’, then range represents all keys. - let result = kv_store + let result = kv_backend .range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec())) .await .unwrap(); @@ -168,14 +168,14 @@ pub async fn test_kv_range_2(kv_store: impl KvBackend) { assert!(!result.more); // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument. - let result = kv_store + let result = kv_backend .range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec())) .await .unwrap(); assert_eq!(result.kvs.len(), 2); - let result = kv_store + let result = kv_backend .range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec())) .await .unwrap(); @@ -184,7 +184,7 @@ pub async fn test_kv_range_2(kv_store: impl KvBackend) { assert_eq!(result.kvs[0].key, b"test"); // Fetches the keys >= "a", set limit to 1, the `more` should be true. - let result = kv_store + let result = kv_backend .range( RangeRequest::new() .with_range(b"a".to_vec(), b"\0".to_vec()) @@ -196,7 +196,7 @@ pub async fn test_kv_range_2(kv_store: impl KvBackend) { assert!(result.more); // Fetches the keys >= "a", set limit to 2, the `more` should be false. - let result = kv_store + let result = kv_backend .range( RangeRequest::new() .with_range(b"a".to_vec(), b"\0".to_vec()) @@ -208,7 +208,7 @@ pub async fn test_kv_range_2(kv_store: impl KvBackend) { assert!(!result.more); // Fetches the keys >= "a", set limit to 3, the `more` should be false. - let result = kv_store + let result = kv_backend .range( RangeRequest::new() .with_range(b"a".to_vec(), b"\0".to_vec()) @@ -220,19 +220,28 @@ pub async fn test_kv_range_2(kv_store: impl KvBackend) { assert!(!result.more); } -pub async fn test_kv_batch_get(kv_store: impl KvBackend) { +pub async fn test_kv_batch_get(kv_backend: impl KvBackend) { let keys = vec![]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + let resp = kv_backend + .batch_get(BatchGetRequest { keys }) + .await + .unwrap(); assert!(resp.kvs.is_empty()); let keys = vec![b"key10".to_vec()]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + let resp = kv_backend + .batch_get(BatchGetRequest { keys }) + .await + .unwrap(); assert!(resp.kvs.is_empty()); let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; - let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap(); + let resp = kv_backend + .batch_get(BatchGetRequest { keys }) + .await + .unwrap(); assert_eq!(2, resp.kvs.len()); assert_eq!(b"key1", resp.kvs[0].key()); @@ -241,12 +250,12 @@ pub async fn test_kv_batch_get(kv_store: impl KvBackend) { assert_eq!(b"val3", resp.kvs[1].value()); } -pub async fn test_kv_compare_and_put(kv_store: Arc>) { +pub async fn test_kv_compare_and_put(kv_backend: Arc>) { let success = Arc::new(AtomicU8::new(0)); let mut joins = vec![]; for _ in 0..20 { - let kv_store_clone = kv_store.clone(); + let kv_backend_clone = kv_backend.clone(); let success_clone = success.clone(); let join = tokio::spawn(async move { let req = CompareAndPutRequest { @@ -254,7 +263,7 @@ pub async fn test_kv_compare_and_put(kv_store: Arc> expect: vec![], value: b"val_new".to_vec(), }; - let resp = kv_store_clone.compare_and_put(req).await.unwrap(); + let resp = kv_backend_clone.compare_and_put(req).await.unwrap(); if resp.success { success_clone.fetch_add(1, Ordering::SeqCst); } @@ -269,20 +278,20 @@ pub async fn test_kv_compare_and_put(kv_store: Arc> assert_eq!(1, success.load(Ordering::SeqCst)); } -pub async fn test_kv_delete_range(kv_store: impl KvBackend) { +pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { let req = DeleteRangeRequest { key: b"key3".to_vec(), range_end: vec![], prev_kv: true, }; - let resp = kv_store.delete_range(req).await.unwrap(); + let resp = kv_backend.delete_range(req).await.unwrap(); assert_eq!(1, resp.prev_kvs.len()); assert_eq!(1, resp.deleted); assert_eq!(b"key3", resp.prev_kvs[0].key()); assert_eq!(b"val3", resp.prev_kvs[0].value()); - let resp = kv_store.get(b"key3").await.unwrap(); + let resp = kv_backend.get(b"key3").await.unwrap(); assert!(resp.is_none()); let req = DeleteRangeRequest { @@ -291,11 +300,11 @@ pub async fn test_kv_delete_range(kv_store: impl KvBackend) { prev_kv: false, }; - let resp = kv_store.delete_range(req).await.unwrap(); + let resp = kv_backend.delete_range(req).await.unwrap(); assert_eq!(1, resp.deleted); assert!(resp.prev_kvs.is_empty()); - let resp = kv_store.get(b"key2").await.unwrap(); + let resp = kv_backend.get(b"key2").await.unwrap(); assert!(resp.is_none()); let key = b"key1".to_vec(); @@ -306,7 +315,7 @@ pub async fn test_kv_delete_range(kv_store: impl KvBackend) { range_end: range_end.clone(), prev_kv: true, }; - let resp = kv_store.delete_range(req).await.unwrap(); + let resp = kv_backend.delete_range(req).await.unwrap(); assert_eq!(2, resp.prev_kvs.len()); let req = RangeRequest { @@ -314,19 +323,19 @@ pub async fn test_kv_delete_range(kv_store: impl KvBackend) { range_end, ..Default::default() }; - let resp = kv_store.range(req).await.unwrap(); + let resp = kv_backend.range(req).await.unwrap(); assert!(resp.kvs.is_empty()); } -pub async fn test_kv_batch_delete(kv_store: impl KvBackend) { - assert!(kv_store.get(b"key1").await.unwrap().is_some()); - assert!(kv_store.get(b"key100").await.unwrap().is_none()); +pub async fn test_kv_batch_delete(kv_backend: impl KvBackend) { + assert!(kv_backend.get(b"key1").await.unwrap().is_some()); + assert!(kv_backend.get(b"key100").await.unwrap().is_none()); let req = BatchDeleteRequest { keys: vec![b"key1".to_vec(), b"key100".to_vec()], prev_kv: true, }; - let resp = kv_store.batch_delete(req).await.unwrap(); + let resp = kv_backend.batch_delete(req).await.unwrap(); assert_eq!(1, resp.prev_kvs.len()); assert_eq!( vec![KeyValue { @@ -335,18 +344,18 @@ pub async fn test_kv_batch_delete(kv_store: impl KvBackend) { }], resp.prev_kvs ); - assert!(kv_store.get(b"key1").await.unwrap().is_none()); + assert!(kv_backend.get(b"key1").await.unwrap().is_none()); - assert!(kv_store.get(b"key2").await.unwrap().is_some()); - assert!(kv_store.get(b"key3").await.unwrap().is_some()); + assert!(kv_backend.get(b"key2").await.unwrap().is_some()); + assert!(kv_backend.get(b"key3").await.unwrap().is_some()); let req = BatchDeleteRequest { keys: vec![b"key2".to_vec(), b"key3".to_vec()], prev_kv: false, }; - let resp = kv_store.batch_delete(req).await.unwrap(); + let resp = kv_backend.batch_delete(req).await.unwrap(); assert!(resp.prev_kvs.is_empty()); - assert!(kv_store.get(b"key2").await.unwrap().is_none()); - assert!(kv_store.get(b"key3").await.unwrap().is_none()); + assert!(kv_backend.get(b"key2").await.unwrap().is_none()); + assert!(kv_backend.get(b"key3").await.unwrap().is_none()); } diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index adf2b41a66a1..b07fe1ae33ec 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -268,9 +268,9 @@ mod tests { #[tokio::test] async fn test_txn_one_compare_op() { - let kv_store = create_kv_store().await; + let kv_backend = create_kv_backend().await; - let _ = kv_store + let _ = kv_backend .put(PutRequest { key: vec![11], value: vec![3], @@ -288,7 +288,7 @@ mod tests { .and_then(vec![TxnOp::Put(vec![11], vec![1])]) .or_else(vec![TxnOp::Put(vec![11], vec![2])]); - let txn_response = kv_store.txn(txn).await.unwrap(); + let txn_response = kv_backend.txn(txn).await.unwrap(); assert!(txn_response.succeeded); assert_eq!(txn_response.responses.len(), 1); @@ -296,10 +296,10 @@ mod tests { #[tokio::test] async fn test_txn_multi_compare_op() { - let kv_store = create_kv_store().await; + let kv_backend = create_kv_backend().await; for i in 1..3 { - let _ = kv_store + let _ = kv_backend .put(PutRequest { key: vec![i], value: vec![i], @@ -321,7 +321,7 @@ mod tests { ]) .or_else(vec![TxnOp::Put(vec![1], vec![11])]); - let txn_response = kv_store.txn(txn).await.unwrap(); + let txn_response = kv_backend.txn(txn).await.unwrap(); assert!(txn_response.succeeded); assert_eq!(txn_response.responses.len(), 2); @@ -329,9 +329,9 @@ mod tests { #[tokio::test] async fn test_txn_compare_equal() { - let kv_store = create_kv_store().await; + let kv_backend = create_kv_backend().await; let key = vec![101u8]; - kv_store.delete(&key, false).await.unwrap(); + kv_backend.delete(&key, false).await.unwrap(); let txn = Txn::new() .when(vec![Compare::with_not_exist_value( @@ -340,10 +340,10 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); assert!(txn_response.succeeded); - let txn_response = kv_store.txn(txn).await.unwrap(); + let txn_response = kv_backend.txn(txn).await.unwrap(); assert!(!txn_response.succeeded); let txn = Txn::new() @@ -354,15 +354,15 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) .or_else(vec![TxnOp::Put(key, vec![4])]); - let txn_response = kv_store.txn(txn).await.unwrap(); + let txn_response = kv_backend.txn(txn).await.unwrap(); assert!(txn_response.succeeded); } #[tokio::test] async fn test_txn_compare_greater() { - let kv_store = create_kv_store().await; + let kv_backend = create_kv_backend().await; let key = vec![102u8]; - kv_store.delete(&key, false).await.unwrap(); + kv_backend.delete(&key, false).await.unwrap(); let txn = Txn::new() .when(vec![Compare::with_not_exist_value( @@ -371,10 +371,10 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); assert!(!txn_response.succeeded); - let txn_response = kv_store.txn(txn).await.unwrap(); + let txn_response = kv_backend.txn(txn).await.unwrap(); assert!(txn_response.succeeded); let txn = Txn::new() @@ -385,7 +385,7 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) .or_else(vec![TxnOp::Get(key.clone())]); - let mut txn_response = kv_store.txn(txn).await.unwrap(); + let mut txn_response = kv_backend.txn(txn).await.unwrap(); assert!(!txn_response.succeeded); let res = txn_response.responses.pop().unwrap(); assert_eq!( @@ -402,9 +402,9 @@ mod tests { #[tokio::test] async fn test_txn_compare_less() { - let kv_store = create_kv_store().await; + let kv_backend = create_kv_backend().await; let key = vec![103u8]; - kv_store.delete(&[3], false).await.unwrap(); + kv_backend.delete(&[3], false).await.unwrap(); let txn = Txn::new() .when(vec![Compare::with_not_exist_value( @@ -413,10 +413,10 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); assert!(!txn_response.succeeded); - let txn_response = kv_store.txn(txn).await.unwrap(); + let txn_response = kv_backend.txn(txn).await.unwrap(); assert!(!txn_response.succeeded); let txn = Txn::new() @@ -427,7 +427,7 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) .or_else(vec![TxnOp::Get(key.clone())]); - let mut txn_response = kv_store.txn(txn).await.unwrap(); + let mut txn_response = kv_backend.txn(txn).await.unwrap(); assert!(!txn_response.succeeded); let res = txn_response.responses.pop().unwrap(); assert_eq!( @@ -444,9 +444,9 @@ mod tests { #[tokio::test] async fn test_txn_compare_not_equal() { - let kv_store = create_kv_store().await; + let kv_backend = create_kv_backend().await; let key = vec![104u8]; - kv_store.delete(&key, false).await.unwrap(); + kv_backend.delete(&key, false).await.unwrap(); let txn = Txn::new() .when(vec![Compare::with_not_exist_value( @@ -455,10 +455,10 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); - let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + let txn_response = kv_backend.txn(txn.clone()).await.unwrap(); assert!(!txn_response.succeeded); - let txn_response = kv_store.txn(txn).await.unwrap(); + let txn_response = kv_backend.txn(txn).await.unwrap(); assert!(txn_response.succeeded); let txn = Txn::new() @@ -469,7 +469,7 @@ mod tests { )]) .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) .or_else(vec![TxnOp::Get(key.clone())]); - let mut txn_response = kv_store.txn(txn).await.unwrap(); + let mut txn_response = kv_backend.txn(txn).await.unwrap(); assert!(!txn_response.succeeded); let res = txn_response.responses.pop().unwrap(); assert_eq!( @@ -484,7 +484,7 @@ mod tests { ); } - async fn create_kv_store() -> KvBackendRef { + async fn create_kv_backend() -> KvBackendRef { Arc::new(MemoryKvBackend::::new()) // TODO(jiachun): Add a feature to test against etcd in github CI // diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index 76b13928f1d1..5fb70d1b7010 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -230,10 +230,10 @@ mod tests { #[tokio::test] async fn test_range_empty() { - let kv_store = Arc::new(MemoryKvBackend::::new()); + let kv_backend = Arc::new(MemoryKvBackend::::new()); let stream = PaginationStream::new( - kv_store.clone(), + kv_backend.clone(), RangeRequest { key: b"a".to_vec(), ..Default::default() @@ -248,14 +248,14 @@ mod tests { #[tokio::test] async fn test_range() { - let kv_store = Arc::new(MemoryKvBackend::::new()); + let kv_backend = Arc::new(MemoryKvBackend::::new()); let total = 26; let mut expected = BTreeMap::, ()>::new(); for i in 0..total { let key = vec![97 + i]; - assert!(kv_store + assert!(kv_backend .put(PutRequest { key: key.clone(), value: key.clone(), @@ -271,7 +271,7 @@ mod tests { let range_end = b"f".to_vec(); let stream = PaginationStream::new( - kv_store.clone(), + kv_backend.clone(), RangeRequest { key, range_end, diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index f5e2b69c94bc..b87befcf5268 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -171,9 +171,9 @@ mod tests { #[tokio::test] async fn test_sequence() { - let kv_store = Arc::new(MemoryKvBackend::default()); + let kv_backend = Arc::new(MemoryKvBackend::default()); let initial = 1024; - let seq = Sequence::new("test_seq", initial, 10, kv_store); + let seq = Sequence::new("test_seq", initial, 10, kv_backend); for i in initial..initial + 100 { assert_eq!(i, seq.next().await.unwrap()); @@ -182,9 +182,9 @@ mod tests { #[tokio::test] async fn test_sequence_out_of_rage() { - let kv_store = Arc::new(MemoryKvBackend::default()); + let kv_backend = Arc::new(MemoryKvBackend::default()); let initial = u64::MAX - 10; - let seq = Sequence::new("test_seq", initial, 10, kv_store); + let seq = Sequence::new("test_seq", initial, 10, kv_backend); for _ in 0..10 { let _ = seq.next().await.unwrap(); @@ -248,8 +248,8 @@ mod tests { } } - let kv_store = Arc::new(Noop {}); - let seq = Sequence::new("test_seq", 0, 10, kv_store); + let kv_backend = Arc::new(Noop {}); + let seq = Sequence::new("test_seq", 0, 10, kv_backend); let next = seq.next().await; assert!(next.is_err()); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 6120990ebd0b..2e5ef085f806 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -30,7 +30,7 @@ use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; use common_base::Plugins; -use common_config::KvStoreConfig; +use common_config::KvBackendConfig; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::cache_invalidator::DummyCacheInvalidator; @@ -257,23 +257,23 @@ impl Instance { pub async fn try_build_standalone_components( dir: String, - kv_store_config: KvStoreConfig, + kv_backend_config: KvBackendConfig, procedure_config: ProcedureConfig, ) -> Result<(KvBackendRef, ProcedureManagerRef)> { - let kv_store = Arc::new( + let kv_backend = Arc::new( RaftEngineBackend::try_open_with_cfg(Config { dir, - purge_threshold: ReadableSize(kv_store_config.purge_threshold.0), + purge_threshold: ReadableSize(kv_backend_config.purge_threshold.0), recovery_mode: RecoveryMode::TolerateTailCorruption, batch_compression_threshold: ReadableSize::kb(8), - target_file_size: ReadableSize(kv_store_config.file_size.0), + target_file_size: ReadableSize(kv_backend_config.file_size.0), ..Default::default() }) .map_err(BoxedError::new) .context(error::OpenRaftEngineBackendSnafu)?, ); - let state_store = Arc::new(KvStateStore::new(kv_store.clone())); + let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); let manager_config = ManagerConfig { max_retry_times: procedure_config.max_retry_times, @@ -282,7 +282,7 @@ impl Instance { }; let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store)); - Ok((kv_store, procedure_manager)) + Ok((kv_backend, procedure_manager)) } pub async fn try_new_standalone( diff --git a/src/meta-srv/examples/kv_store.rs b/src/meta-srv/examples/kv_store.rs index aa5dbbc19a71..c11bc0508efb 100644 --- a/src/meta-srv/examples/kv_store.rs +++ b/src/meta-srv/examples/kv_store.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_meta::kv_backend::etcd::EtcdStore; use common_meta::rpc::store::{DeleteRangeRequest, PutRequest, RangeRequest}; -use meta_srv::service::store::etcd::EtcdStore; use tracing::{event, subscriber, Level}; use tracing_subscriber::FmtSubscriber; @@ -24,7 +24,7 @@ fn main() { #[tokio::main] async fn run() { - let kv_store = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap(); + let kv_backend = EtcdStore::with_endpoints(["127.0.0.1:2380"]).await.unwrap(); // put let put_req = PutRequest { @@ -32,7 +32,7 @@ async fn run() { value: b"value1".to_vec(), prev_kv: true, }; - let res = kv_store.put(put_req).await; + let res = kv_backend.put(put_req).await; event!(Level::INFO, "put result: {:#?}", res); // get @@ -40,7 +40,7 @@ async fn run() { key: b"key1".to_vec(), ..Default::default() }; - let res = kv_store.range(range_req.clone()).await; + let res = kv_backend.range(range_req.clone()).await; event!(Level::INFO, "get range result: {:#?}", res); // delete @@ -48,10 +48,10 @@ async fn run() { key: b"key1".to_vec(), ..Default::default() }; - let res = kv_store.delete_range(delete_range_req).await; + let res = kv_backend.delete_range(delete_range_req).await; event!(Level::INFO, "delete range result: {:#?}", res); // get none - let res = kv_store.range(range_req).await; + let res = kv_backend.range(range_req).await; event!(Level::INFO, "get range result: {:#?}", res); } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 5b3470d8b94c..975a3fb66f87 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -20,6 +20,9 @@ use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::lock_server::LockServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; +use common_meta::kv_backend::etcd::EtcdStore; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::ResettableKvBackendRef; use common_telemetry::info; use etcd_client::Client; use servers::configurator::ConfiguratorRef; @@ -41,9 +44,6 @@ use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::SelectorType; use crate::service::admin; -use crate::service::store::etcd::EtcdStore; -use crate::service::store::kv::ResettableKvStoreRef; -use crate::service::store::memory::MemStore; use crate::{error, Result}; #[derive(Clone)] @@ -162,9 +162,9 @@ pub fn router(meta_srv: MetaSrv) -> Router { } pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result { - let (kv_store, election, lock) = if opts.use_memory_store { + let (kv_backend, election, lock) = if opts.use_memory_store { ( - Arc::new(MemStore::new()) as _, + Arc::new(MemoryKvBackend::new()) as _, None, Some(Arc::new(MemLock::default()) as _), ) @@ -185,7 +185,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result Arc::new(LoadBasedSelector) as SelectorRef, @@ -194,7 +194,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result; #[derive(Builder)] pub struct MetaPeerClient { election: Option, - in_memory: ResettableKvStoreRef, + in_memory: ResettableKvBackendRef, #[builder(default = "ChannelManager::default()")] channel_manager: ChannelManager, #[builder(default = "3")] @@ -93,7 +93,12 @@ impl MetaPeerClient { ..Default::default() }; - return self.in_memory.range(request).await.map(|resp| resp.kvs); + return self + .in_memory + .range(request) + .await + .map(|resp| resp.kvs) + .context(error::KvBackendSnafu); } let max_retry_count = self.max_retry_count; @@ -162,7 +167,12 @@ impl MetaPeerClient { if self.is_leader() { let request = BatchGetRequest { keys }; - return self.in_memory.batch_get(request).await.map(|resp| resp.kvs); + return self + .in_memory + .batch_get(request) + .await + .map(|resp| resp.kvs) + .context(error::KvBackendSnafu); } let max_retry_count = self.max_retry_count; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 0a5334501a09..e05168d9d68c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -253,9 +253,6 @@ pub enum Error { #[snafu(display("Invalid arguments: {}", err_msg))] InvalidArguments { err_msg: String, location: Location }, - #[snafu(display("Invalid result with a txn response: {}", err_msg))] - InvalidTxnResult { err_msg: String, location: Location }, - #[snafu(display("Cannot parse catalog value"))] InvalidCatalogValue { location: Location, @@ -502,12 +499,6 @@ pub enum Error { source: common_meta::error::Error, }, - #[snafu(display("Failed to convert Etcd txn object: "))] - ConvertEtcdTxnObject { - source: common_meta::error::Error, - location: Location, - }, - // this error is used for custom error mapping // please do not delete it #[snafu(display("Other error"))] @@ -522,6 +513,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Keyvalue backend error"))] + KvBackend { + source: common_meta::error::Error, + location: Location, + }, + #[snafu(display("Failed to update table route"))] UpdateTableRoute { source: common_meta::error::Error, @@ -617,7 +614,6 @@ impl ErrorExt for Error { | Error::TableInfoNotFound { .. } | Error::CorruptedTableRoute { .. } | Error::MoveValue { .. } - | Error::InvalidTxnResult { .. } | Error::InvalidUtf8Value { .. } | Error::UnexpectedInstructionReply { .. } | Error::Unexpected { .. } @@ -651,8 +647,8 @@ impl ErrorExt for Error { Error::TableRouteConversion { source, .. } | Error::ConvertProtoData { source, .. } | Error::TableMetadataManager { source, .. } + | Error::KvBackend { source, .. } | Error::UpdateTableRoute { source, .. } - | Error::ConvertEtcdTxnObject { source, .. } | Error::GetFullTableInfo { source, .. } => source.status_code(), Error::InitMetadata { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 6fae2721b668..0b0fff204f60 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -409,6 +409,7 @@ mod tests { use std::time::Duration; use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION}; + use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::Sequence; use tokio::sync::mpsc; @@ -420,8 +421,6 @@ mod tests { use crate::handler::response_header_handler::ResponseHeaderHandler; use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher}; use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; - use crate::service::store::kv::KvBackendAdapter; - use crate::service::store::memory::MemStore; #[tokio::test] async fn test_mailbox() { @@ -463,8 +462,8 @@ mod tests { .register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher) .await; - let kv_store = Arc::new(MemStore::new()); - let seq = Sequence::new("test_seq", 0, 10, KvBackendAdapter::wrap(kv_store)); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let seq = Sequence::new("test_seq", 0, 10, kv_backend); let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq); let msg = MailboxMessage { diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index e2666ef05c82..325bcf42a46a 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -36,7 +36,7 @@ impl HeartbeatHandler for OnLeaderStartHandler { if election.in_infancy() { ctx.is_infancy = true; ctx.reset_in_memory(); - ctx.reset_leader_cached_kv_store(); + ctx.reset_leader_cached_kv_backend(); } } Ok(()) diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index e0835de2439d..9ce411c3e51f 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -18,8 +18,9 @@ use api::v1::meta::{HeartbeatRequest, Role}; use common_meta::rpc::store::PutRequest; use common_telemetry::warn; use dashmap::DashMap; +use snafu::ResultExt; -use crate::error::Result; +use crate::error::{self, Result}; use crate::handler::node_stat::Stat; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::keys::{StatKey, StatValue}; @@ -130,7 +131,11 @@ impl HeartbeatHandler for PersistStatsHandler { ..Default::default() }; - let _ = ctx.in_memory.put(put).await?; + let _ = ctx + .in_memory + .put(put) + .await + .context(error::KvBackendSnafu)?; Ok(()) } @@ -142,23 +147,23 @@ mod tests { use std::sync::Arc; use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::Sequence; use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::keys::StatKey; - use crate::service::store::cached_kv::LeaderCachedKvStore; - use crate::service::store::kv::KvBackendAdapter; - use crate::service::store::memory::MemStore; + use crate::service::store::cached_kv::LeaderCachedKvBackend; #[tokio::test] async fn test_handle_datanode_stats() { - let in_memory = Arc::new(MemStore::new()); - let kv_store = Arc::new(MemStore::new()); - let leader_cached_kv_store = - Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone())); - let seq = Sequence::new("test_seq", 0, 10, KvBackendAdapter::wrap(kv_store.clone())); + let in_memory = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( + kv_backend.clone(), + )); + let seq = Sequence::new("test_seq", 0, 10, kv_backend.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) @@ -170,16 +175,14 @@ mod tests { let ctx = Context { server_addr: "127.0.0.1:0000".to_string(), in_memory, - kv_store: kv_store.clone(), - leader_cached_kv_store, + kv_backend: kv_backend.clone(), + leader_cached_kv_backend, meta_peer_client, mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), is_infancy: false, - table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))), + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), }; let handler = PersistStatsHandler::default(); diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index fc156ff99149..1383ff828ad9 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -78,23 +78,20 @@ mod test { use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; - use crate::service::store::kv::KvBackendAdapter; use crate::test_util; #[tokio::test] async fn test_handle_region_lease() { let region_failover_manager = test_util::create_region_failover_manager(); - let kv_store = region_failover_manager + let kv_backend = region_failover_manager .create_context() .selector_ctx - .kv_store + .kv_backend .clone(); let table_id = 1; let table_name = "my_table"; - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); test_util::prepare_table_region_and_info_value(&table_metadata_manager, table_name).await; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 2926fcc25c94..a945c50ef8e9 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -51,22 +51,22 @@ mod tests { use api::v1::meta::{HeartbeatResponse, RequestHeader}; use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::Sequence; use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{Context, HeartbeatMailbox, Pushers}; - use crate::service::store::cached_kv::LeaderCachedKvStore; - use crate::service::store::kv::KvBackendAdapter; - use crate::service::store::memory::MemStore; + use crate::service::store::cached_kv::LeaderCachedKvBackend; #[tokio::test] async fn test_handle_heartbeat_resp_header() { - let in_memory = Arc::new(MemStore::new()); - let kv_store = Arc::new(MemStore::new()); - let leader_cached_kv_store = - Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone())); - let seq = Sequence::new("test_seq", 0, 10, KvBackendAdapter::wrap(kv_store.clone())); + let in_memory = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( + kv_backend.clone(), + )); + let seq = Sequence::new("test_seq", 0, 10, kv_backend.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) @@ -78,16 +78,14 @@ mod tests { let mut ctx = Context { server_addr: "127.0.0.1:0000".to_string(), in_memory, - kv_store: kv_store.clone(), - leader_cached_kv_store, + kv_backend: kv_backend.clone(), + leader_cached_kv_backend, meta_peer_client, mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), is_infancy: false, - table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))), + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/inactive_region_manager.rs b/src/meta-srv/src/inactive_region_manager.rs index e00c3b456f0f..273aad844b4d 100644 --- a/src/meta-srv/src/inactive_region_manager.rs +++ b/src/meta-srv/src/inactive_region_manager.rs @@ -14,20 +14,21 @@ use std::collections::HashSet; +use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::rpc::store::{BatchGetRequest, DeleteRangeRequest, PutRequest, RangeRequest}; use common_meta::RegionIdent; +use snafu::ResultExt; -use crate::error::Result; +use crate::error::{self, Result}; use crate::keys::InactiveRegionKey; use crate::metrics::METRIC_META_INACTIVE_REGIONS; -use crate::service::store::kv::ResettableKvStoreRef; pub struct InactiveRegionManager<'a> { - store: &'a ResettableKvStoreRef, + store: &'a ResettableKvBackendRef, } impl<'a> InactiveRegionManager<'a> { - pub fn new(store: &'a ResettableKvStoreRef) -> Self { + pub fn new(store: &'a ResettableKvBackendRef) -> Self { Self { store } } @@ -43,7 +44,7 @@ impl<'a> InactiveRegionManager<'a> { value: vec![], prev_kv: false, }; - self.store.put(req).await?; + self.store.put(req).await.context(error::KvBackendSnafu)?; METRIC_META_INACTIVE_REGIONS.inc(); @@ -58,7 +59,10 @@ impl<'a> InactiveRegionManager<'a> { region_id, } .into(); - self.store.delete(&key, false).await?; + self.store + .delete(&key, false) + .await + .context(error::KvBackendSnafu)?; METRIC_META_INACTIVE_REGIONS.dec(); @@ -89,7 +93,11 @@ impl<'a> InactiveRegionManager<'a> { }) .collect::, _)>>(); let keys = key_region_ids.iter().map(|(key, _)| key.clone()).collect(); - let resp = self.store.batch_get(BatchGetRequest { keys }).await?; + let resp = self + .store + .batch_get(BatchGetRequest { keys }) + .await + .context(error::KvBackendSnafu)?; let kvs = resp.kvs; if kvs.is_empty() { return Ok(HashSet::new()); @@ -124,7 +132,11 @@ impl<'a> InactiveRegionManager<'a> { ) -> Result> { let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id); let request = RangeRequest::new().with_prefix(prefix); - let resp = self.store.range(request).await?; + let resp = self + .store + .range(request) + .await + .context(error::KvBackendSnafu)?; let kvs = resp.kvs; kvs.into_iter() .map(|kv| InactiveRegionKey::try_from(kv.key)) @@ -134,7 +146,11 @@ impl<'a> InactiveRegionManager<'a> { pub async fn clear_all_inactive_regions(&self, cluster_id: u64) -> Result<()> { let prefix = InactiveRegionKey::get_prefix_by_cluster(cluster_id); let request = DeleteRangeRequest::new().with_prefix(prefix); - let _ = self.store.delete_range(request).await?; + let _ = self + .store + .delete_range(request) + .await + .context(error::KvBackendSnafu)?; Ok(()) } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index a4d354badea3..c72e2d59e779 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -24,6 +24,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; @@ -46,7 +47,6 @@ use crate::lock::DistLockRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; -use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; pub const TABLE_ID_SEQ: &str = "table_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; @@ -129,9 +129,9 @@ impl Default for DatanodeClientOptions { #[derive(Clone)] pub struct Context { pub server_addr: String, - pub in_memory: ResettableKvStoreRef, - pub kv_store: KvStoreRef, - pub leader_cached_kv_store: ResettableKvStoreRef, + pub in_memory: ResettableKvBackendRef, + pub kv_backend: KvBackendRef, + pub leader_cached_kv_backend: ResettableKvBackendRef, pub meta_peer_client: MetaPeerClientRef, pub mailbox: MailboxRef, pub election: Option, @@ -153,8 +153,8 @@ impl Context { self.in_memory.reset(); } - pub fn reset_leader_cached_kv_store(&self) { - self.leader_cached_kv_store.reset(); + pub fn reset_leader_cached_kv_backend(&self) { + self.leader_cached_kv_backend.reset(); } } @@ -164,7 +164,7 @@ pub struct LeaderValue(pub String); pub struct SelectorContext { pub server_addr: String, pub datanode_lease_secs: u64, - pub kv_store: KvStoreRef, + pub kv_backend: KvBackendRef, pub meta_peer_client: MetaPeerClientRef, pub table_id: Option, } @@ -209,9 +209,9 @@ pub struct MetaSrv { options: MetaSrvOptions, // It is only valid at the leader node and is used to temporarily // store some data that will not be persisted. - in_memory: ResettableKvStoreRef, - kv_store: KvStoreRef, - leader_cached_kv_store: ResettableKvStoreRef, + in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, + leader_cached_kv_backend: ResettableKvBackendRef, table_id_sequence: SequenceRef, meta_peer_client: MetaPeerClientRef, selector: SelectorRef, @@ -243,7 +243,7 @@ impl MetaSrv { if let Some(election) = self.election() { let procedure_manager = self.procedure_manager.clone(); let in_memory = self.in_memory.clone(); - let leader_cached_kv_store = self.leader_cached_kv_store.clone(); + let leader_cached_kv_backend = self.leader_cached_kv_backend.clone(); let subscribe_manager = self.subscribe_manager(); let mut rx = election.subscribe_leader_change(); let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone(); @@ -260,7 +260,7 @@ impl MetaSrv { match rx.recv().await { Ok(msg) => { in_memory.reset(); - leader_cached_kv_store.reset(); + leader_cached_kv_backend.reset(); info!("Leader's cache has bean cleared on leader change: {msg}"); match msg { LeaderChangeMessage::Elected(_) => { @@ -329,16 +329,16 @@ impl MetaSrv { &self.options } - pub fn in_memory(&self) -> &ResettableKvStoreRef { + pub fn in_memory(&self) -> &ResettableKvBackendRef { &self.in_memory } - pub fn kv_store(&self) -> &KvStoreRef { - &self.kv_store + pub fn kv_backend(&self) -> &KvBackendRef { + &self.kv_backend } - pub fn leader_cached_kv_store(&self) -> &ResettableKvStoreRef { - &self.leader_cached_kv_store + pub fn leader_cached_kv_backend(&self) -> &ResettableKvBackendRef { + &self.leader_cached_kv_backend } pub fn meta_peer_client(&self) -> &MetaPeerClientRef { @@ -397,8 +397,8 @@ impl MetaSrv { pub fn new_ctx(&self) -> Context { let server_addr = self.options().server_addr.clone(); let in_memory = self.in_memory.clone(); - let kv_store = self.kv_store.clone(); - let leader_cached_kv_store = self.leader_cached_kv_store.clone(); + let kv_backend = self.kv_backend.clone(); + let leader_cached_kv_backend = self.leader_cached_kv_backend.clone(); let meta_peer_client = self.meta_peer_client.clone(); let mailbox = self.mailbox.clone(); let election = self.election.clone(); @@ -408,8 +408,8 @@ impl MetaSrv { Context { server_addr, in_memory, - kv_store, - leader_cached_kv_store, + kv_backend, + leader_cached_kv_backend, meta_peer_client, mailbox, election, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 076484e7f999..0f2dc0f78da7 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -22,6 +22,8 @@ use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::sequence::{Sequence, SequenceRef}; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; @@ -52,16 +54,14 @@ use crate::procedure::region_failover::RegionFailoverManager; use crate::pubsub::PublishRef; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; -use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; -use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; -use crate::service::store::memory::MemStore; +use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend}; use crate::table_meta_alloc::MetaSrvTableMetadataAllocator; // TODO(fys): try use derive_builder macro pub struct MetaSrvBuilder { options: Option, - kv_store: Option, - in_memory: Option, + kv_backend: Option, + in_memory: Option, selector: Option, handler_group: Option, election: Option, @@ -74,7 +74,7 @@ pub struct MetaSrvBuilder { impl MetaSrvBuilder { pub fn new() -> Self { Self { - kv_store: None, + kv_backend: None, in_memory: None, selector: None, handler_group: None, @@ -92,12 +92,12 @@ impl MetaSrvBuilder { self } - pub fn kv_store(mut self, kv_store: KvStoreRef) -> Self { - self.kv_store = Some(kv_store); + pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self { + self.kv_backend = Some(kv_backend); self } - pub fn in_memory(mut self, in_memory: ResettableKvStoreRef) -> Self { + pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self { self.in_memory = Some(in_memory); self } @@ -144,7 +144,7 @@ impl MetaSrvBuilder { election, meta_peer_client, options, - kv_store, + kv_backend, in_memory, selector, handler_group, @@ -155,23 +155,22 @@ impl MetaSrvBuilder { let options = options.unwrap_or_default(); - let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default())); - let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default())); - let leader_cached_kv_store = build_leader_cached_kv_store(&election, &kv_store); + let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new())); + let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new())); + let leader_cached_kv_backend = build_leader_cached_kv_backend(&election, &kv_backend); let meta_peer_client = meta_peer_client .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector)); let pushers = Pushers::default(); - let mailbox = build_mailbox(&kv_store, &pushers); - let procedure_manager = build_procedure_manager(&options, &kv_store); - let kv_backend = KvBackendAdapter::wrap(kv_store.clone()); + let mailbox = build_mailbox(&kv_backend, &pushers); + let procedure_manager = build_procedure_manager(&options, &kv_backend); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone())); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, - kv_store: kv_store.clone(), + kv_backend: kv_backend.clone(), meta_peer_client: meta_peer_client.clone(), table_id: None, }; @@ -245,8 +244,8 @@ impl MetaSrvBuilder { started, options, in_memory, - kv_store, - leader_cached_kv_store, + kv_backend, + leader_cached_kv_backend, meta_peer_client: meta_peer_client.clone(), table_id_sequence, selector, @@ -268,19 +267,19 @@ impl MetaSrvBuilder { } } -fn build_leader_cached_kv_store( +fn build_leader_cached_kv_backend( election: &Option, - kv_store: &KvStoreRef, -) -> Arc { - Arc::new(LeaderCachedKvStore::new( + kv_backend: &KvBackendRef, +) -> Arc { + Arc::new(LeaderCachedKvBackend::new( Arc::new(CheckLeaderByElection(election.clone())), - kv_store.clone(), + kv_backend.clone(), )) } fn build_default_meta_peer_client( election: &Option, - in_memory: &ResettableKvStoreRef, + in_memory: &ResettableKvBackendRef, ) -> MetaPeerClientRef { MetaPeerClientBuilder::default() .election(election.clone()) @@ -291,23 +290,21 @@ fn build_default_meta_peer_client( .unwrap() } -fn build_mailbox(kv_store: &KvStoreRef, pushers: &Pushers) -> MailboxRef { - let mailbox_sequence = Sequence::new( - "heartbeat_mailbox", - 1, - 100, - KvBackendAdapter::wrap(kv_store.clone()), - ); +fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef { + let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_backend.clone()); HeartbeatMailbox::create(pushers.clone(), mailbox_sequence) } -fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> ProcedureManagerRef { +fn build_procedure_manager( + options: &MetaSrvOptions, + kv_backend: &KvBackendRef, +) -> ProcedureManagerRef { let manager_config = ManagerConfig { max_retry_times: options.procedure.max_retry_times, retry_delay: options.procedure.retry_delay, ..Default::default() }; - let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone()))); + let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); Arc::new(LocalManager::new(manager_config, state_store)) } diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index acc49ab61ddf..4e633e672ddd 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -21,13 +21,13 @@ use api::v1::meta::store_server::StoreServer; use client::client_manager::DatanodeClients; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::etcd::EtcdStore; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::KvBackendRef; use tower::service_fn; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; -use crate::service::store::etcd::EtcdStore; -use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; -use crate::service::store::memory::MemStore; #[derive(Clone)] pub struct MockInfo { @@ -37,34 +37,32 @@ pub struct MockInfo { } pub async fn mock_with_memstore() -> MockInfo { - let kv_store = Arc::new(MemStore::default()); - mock(Default::default(), kv_store, None, None).await + let kv_backend = Arc::new(MemoryKvBackend::new()); + mock(Default::default(), kv_backend, None, None).await } pub async fn mock_with_etcdstore(addr: &str) -> MockInfo { - let kv_store = EtcdStore::with_endpoints([addr]).await.unwrap(); - mock(Default::default(), kv_store, None, None).await + let kv_backend = EtcdStore::with_endpoints([addr]).await.unwrap(); + mock(Default::default(), kv_backend, None, None).await } pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo { - let kv_store = Arc::new(MemStore::default()); - mock(Default::default(), kv_store, Some(selector), None).await + let kv_backend = Arc::new(MemoryKvBackend::new()); + mock(Default::default(), kv_backend, Some(selector), None).await } pub async fn mock( opts: MetaSrvOptions, - kv_store: KvStoreRef, + kv_backend: KvBackendRef, selector: Option, datanode_clients: Option>, ) -> MockInfo { let server_addr = opts.server_addr.clone(); - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); - let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store); + let builder = MetaSrvBuilder::new().options(opts).kv_backend(kv_backend); let builder = match selector { Some(s) => builder.selector(s), diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 350c17d6fbe1..a09bb1c2c0c4 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -27,6 +27,7 @@ use std::time::Duration; use async_trait::async_trait; use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::key::TableMetadataManagerRef; +use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -46,7 +47,6 @@ use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataMan use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; -use crate::service::store::kv::ResettableKvStoreRef; const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); @@ -70,7 +70,7 @@ impl From for RegionFailoverKey { pub(crate) struct RegionFailoverManager { region_lease_secs: u64, - in_memory: ResettableKvStoreRef, + in_memory: ResettableKvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, selector: SelectorRef, @@ -94,7 +94,7 @@ impl Drop for FailoverProcedureGuard { impl RegionFailoverManager { pub(crate) fn new( region_lease_secs: u64, - in_memory: ResettableKvStoreRef, + in_memory: ResettableKvBackendRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, (selector, selector_ctx): (SelectorRef, SelectorContext), @@ -249,7 +249,7 @@ struct Node { #[derive(Clone)] pub struct RegionFailoverContext { pub region_lease_secs: u64, - pub in_memory: ResettableKvStoreRef, + pub in_memory: ResettableKvBackendRef, pub mailbox: MailboxRef, pub selector: SelectorRef, pub selector_ctx: SelectorContext, @@ -396,6 +396,7 @@ mod tests { use common_meta::ddl::utils::region_storage_path; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::Sequence; use common_meta::DatanodeId; use common_procedure::{BoxedProcedure, ProcedureId}; @@ -409,8 +410,6 @@ mod tests { use crate::lock::memory::MemLock; use crate::selector::{Namespace, Selector}; use crate::service::mailbox::Channel; - use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; - use crate::service::store::memory::MemStore; use crate::test_util; struct RandomNodeSelector { @@ -482,11 +481,11 @@ mod tests { } pub async fn build(self) -> TestingEnv { - let in_memory = Arc::new(MemStore::new()); - let kv_store: KvStoreRef = Arc::new(MemStore::new()); + let in_memory = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) - .in_memory(Arc::new(MemStore::new())) + .in_memory(Arc::new(MemoryKvBackend::new())) .build() .map(Arc::new) // Safety: all required fields set at initialization @@ -494,9 +493,7 @@ mod tests { let table_id = 1; let table = "my_table"; - let table_metadata_manager = Arc::new(TableMetadataManager::new( - KvBackendAdapter::wrap(kv_store.clone()), - )); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); test_util::prepare_table_region_and_info_value(&table_metadata_manager, table).await; let region_distribution = table_metadata_manager .table_route_manager() @@ -517,12 +514,8 @@ mod tests { let _ = heartbeat_receivers.insert(datanode_id, rx); } - let mailbox_sequence = Sequence::new( - "test_heartbeat_mailbox", - 0, - 100, - KvBackendAdapter::wrap(kv_store.clone()), - ); + let mailbox_sequence = + Sequence::new("test_heartbeat_mailbox", 0, 100, kv_backend.clone()); let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); let selector = self.selector.unwrap_or_else(|| { @@ -537,7 +530,7 @@ mod tests { let selector_ctx = SelectorContext { datanode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), - kv_store: kv_store.clone(), + kv_backend: kv_backend.clone(), meta_peer_client, table_id: Some(table_id), }; diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index b884e70a0a03..c57925697dd7 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -107,6 +107,7 @@ pub mod test_data { use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use common_meta::sequence::Sequence; @@ -118,8 +119,6 @@ pub mod test_data { use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::metasrv::MetasrvInfo; - use crate::service::store::kv::KvBackendAdapter; - use crate::service::store::memory::MemStore; use crate::test_util::new_region_route; pub fn new_region_routes() -> Vec { @@ -186,17 +185,11 @@ pub mod test_data { } pub(crate) fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { - let kv_store = Arc::new(MemStore::new()); - - let mailbox_sequence = Sequence::new( - "test_heartbeat_mailbox", - 0, - 100, - KvBackendAdapter::wrap(kv_store.clone()), - ); + let kv_backend = Arc::new(MemoryKvBackend::new()); + + let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_backend.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); - let kv_backend = KvBackendAdapter::wrap(kv_store); DdlContext { datanode_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index bad7719ac8e2..ae14e35652c9 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -100,16 +100,15 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; use super::RegionLeaseKeeper; - use crate::service::store::kv::KvBackendAdapter; - use crate::service::store::memory::MemStore; fn new_test_keeper() -> RegionLeaseKeeper { - let store = KvBackendAdapter::wrap(Arc::new(MemStore::new())); + let store = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(store)); diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index ecafa97d1a5e..e816cc5cdc4c 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -24,7 +24,6 @@ use crate::keys::{LeaseKey, LeaseValue, StatKey}; use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::{Namespace, Selector}; -use crate::service::store::kv::KvBackendAdapter; const MAX_REGION_NUMBER: u64 = u64::MAX; @@ -66,8 +65,7 @@ impl Selector for LoadBasedSelector { let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; let leader_peer_ids = if let Some(table_id) = ctx.table_id { - let table_metadata_manager = - TableMetadataManager::new(KvBackendAdapter::wrap(ctx.kv_store.clone())); + let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone()); get_leader_peer_ids(&table_metadata_manager, table_id).await? } else { diff --git a/src/meta-srv/src/service/admin/inactive_regions.rs b/src/meta-srv/src/service/admin/inactive_regions.rs index 18b1b5c48b56..6c3c4184903e 100644 --- a/src/meta-srv/src/service/admin/inactive_regions.rs +++ b/src/meta-srv/src/service/admin/inactive_regions.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use common_meta::kv_backend::ResettableKvBackendRef; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tonic::codegen::http; @@ -22,10 +23,9 @@ use crate::error::{self, Result}; use crate::inactive_region_manager::InactiveRegionManager; use crate::keys::InactiveRegionKey; use crate::service::admin::{util, HttpHandler}; -use crate::service::store::kv::ResettableKvStoreRef; pub struct ViewInactiveRegionsHandler { - pub store: ResettableKvStoreRef, + pub store: ResettableKvBackendRef, } #[async_trait::async_trait] @@ -51,7 +51,7 @@ impl HttpHandler for ViewInactiveRegionsHandler { } pub struct ClearInactiveRegionsHandler { - pub store: ResettableKvStoreRef, + pub store: ResettableKvBackendRef, } #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index ffe6d61fb08c..5b8c38aaca98 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -17,8 +17,10 @@ use api::v1::meta::{ Error, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader, }; use common_telemetry::warn; +use snafu::ResultExt; use tonic::{Request, Response}; +use crate::error; use crate::metasrv::MetaSrv; use crate::service::GrpcResult; @@ -37,7 +39,11 @@ impl cluster_server::Cluster for MetaSrv { } let req = req.into_inner().into(); - let resp = self.in_memory().batch_get(req).await?; + let resp = self + .in_memory() + .batch_get(req) + .await + .context(error::KvBackendSnafu)?; let resp = resp.to_proto_resp(ResponseHeader::success(0)); Ok(Response::new(resp)) @@ -56,7 +62,11 @@ impl cluster_server::Cluster for MetaSrv { } let req = req.into_inner().into(); - let res = self.in_memory().range(req).await?; + let res = self + .in_memory() + .range(req) + .await + .context(error::KvBackendSnafu)?; let resp = res.to_proto_resp(ResponseHeader::success(0)); Ok(Response::new(resp)) diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 628202d934a1..f250d1b7e9d2 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -167,18 +167,18 @@ mod tests { use api::v1::meta::heartbeat_server::Heartbeat; use api::v1::meta::*; + use common_meta::kv_backend::memory::MemoryKvBackend; use tonic::IntoRequest; use super::get_node_id; use crate::metasrv::builder::MetaSrvBuilder; - use crate::service::store::memory::MemStore; #[tokio::test] async fn test_ask_leader() { - let kv_store = Arc::new(MemStore::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); let meta_srv = MetaSrvBuilder::new() - .kv_store(kv_store) + .kv_backend(kv_backend) .build() .await .unwrap(); diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index aa58501653cc..55a0c555a0bb 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -13,10 +13,6 @@ // limitations under the License. pub mod cached_kv; -pub mod etcd; -pub(crate) mod etcd_util; -pub mod kv; -pub mod memory; use api::v1::meta::{ store_server, BatchDeleteRequest as PbBatchDeleteRequest, @@ -32,10 +28,10 @@ use common_meta::rpc::store::{ BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest, }; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; -use crate::error::MissingRequestHeaderSnafu; +use crate::error::{self, MissingRequestHeaderSnafu}; use crate::metasrv::MetaSrv; use crate::metrics::METRIC_META_KV_REQUEST; use crate::service::GrpcResult; @@ -53,12 +49,16 @@ impl store_server::Store for MetaSrv { let cluster_id_str = cluster_id.to_string(); let _timer = METRIC_META_KV_REQUEST - .with_label_values(&[self.kv_store().name(), "range", cluster_id_str.as_str()]) + .with_label_values(&[self.kv_backend().name(), "range", cluster_id_str.as_str()]) .start_timer(); let req: RangeRequest = req.into(); - let res = self.kv_store().range(req).await?; + let res = self + .kv_backend() + .range(req) + .await + .context(error::KvBackendSnafu)?; let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) @@ -75,12 +75,16 @@ impl store_server::Store for MetaSrv { let cluster_id_str = cluster_id.to_string(); let _timer = METRIC_META_KV_REQUEST - .with_label_values(&[self.kv_store().name(), "put", cluster_id_str.as_str()]) + .with_label_values(&[self.kv_backend().name(), "put", cluster_id_str.as_str()]) .start_timer(); let req: PutRequest = req.into(); - let res = self.kv_store().put(req).await?; + let res = self + .kv_backend() + .put(req) + .await + .context(error::KvBackendSnafu)?; let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) @@ -97,12 +101,20 @@ impl store_server::Store for MetaSrv { let cluster_id_str = cluster_id.to_string(); let _timer = METRIC_META_KV_REQUEST - .with_label_values(&[self.kv_store().name(), "batch_get", cluster_id_str.as_str()]) + .with_label_values(&[ + self.kv_backend().name(), + "batch_get", + cluster_id_str.as_str(), + ]) .start_timer(); let req: BatchGetRequest = req.into(); - let res = self.kv_store().batch_get(req).await?; + let res = self + .kv_backend() + .batch_get(req) + .await + .context(error::KvBackendSnafu)?; let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) @@ -119,12 +131,20 @@ impl store_server::Store for MetaSrv { let cluster_id_str = cluster_id.to_string(); let _timer = METRIC_META_KV_REQUEST - .with_label_values(&[self.kv_store().name(), "batch_pub", cluster_id_str.as_str()]) + .with_label_values(&[ + self.kv_backend().name(), + "batch_pub", + cluster_id_str.as_str(), + ]) .start_timer(); let req: BatchPutRequest = req.into(); - let res = self.kv_store().batch_put(req).await?; + let res = self + .kv_backend() + .batch_put(req) + .await + .context(error::KvBackendSnafu)?; let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) @@ -145,7 +165,7 @@ impl store_server::Store for MetaSrv { let _timer = METRIC_META_KV_REQUEST .with_label_values(&[ - self.kv_store().name(), + self.kv_backend().name(), "batch_delete", cluster_id_str.as_str(), ]) @@ -153,7 +173,11 @@ impl store_server::Store for MetaSrv { let req: BatchDeleteRequest = req.into(); - let res = self.kv_store().batch_delete(req).await?; + let res = self + .kv_backend() + .batch_delete(req) + .await + .context(error::KvBackendSnafu)?; let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) @@ -174,7 +198,7 @@ impl store_server::Store for MetaSrv { let _timer = METRIC_META_KV_REQUEST .with_label_values(&[ - self.kv_store().name(), + self.kv_backend().name(), "compare_and_put", cluster_id_str.as_str(), ]) @@ -182,7 +206,11 @@ impl store_server::Store for MetaSrv { let req: CompareAndPutRequest = req.into(); - let res = self.kv_store().compare_and_put(req).await?; + let res = self + .kv_backend() + .compare_and_put(req) + .await + .context(error::KvBackendSnafu)?; let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) @@ -203,7 +231,7 @@ impl store_server::Store for MetaSrv { let _timer = METRIC_META_KV_REQUEST .with_label_values(&[ - self.kv_store().name(), + self.kv_backend().name(), "delete_range", cluster_id_str.as_str(), ]) @@ -211,7 +239,11 @@ impl store_server::Store for MetaSrv { let req: DeleteRangeRequest = req.into(); - let res = self.kv_store().delete_range(req).await?; + let res = self + .kv_backend() + .delete_range(req) + .await + .context(error::KvBackendSnafu)?; let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) @@ -224,15 +256,15 @@ mod tests { use api::v1::meta::store_server::Store; use api::v1::meta::*; + use common_meta::kv_backend::memory::MemoryKvBackend; use tonic::IntoRequest; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::MetaSrv; - use crate::service::store::memory::MemStore; async fn new_meta_srv() -> MetaSrv { MetaSrvBuilder::new() - .kv_store(Arc::new(MemStore::new())) + .kv_backend(Arc::new(MemoryKvBackend::new())) .build() .await .unwrap() diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index 907ab6715a92..f92bd1fb6199 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -17,8 +17,12 @@ use std::collections::HashSet; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use common_meta::error::{Error, Result}; +use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::txn::{Txn, TxnOp, TxnRequest, TxnResponse}; -use common_meta::kv_backend::{KvBackend, TxnService}; +use common_meta::kv_backend::{ + KvBackend, KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, TxnService, +}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, @@ -26,10 +30,6 @@ use common_meta::rpc::store::{ }; use common_meta::rpc::KeyValue; -use crate::error::{Error, Result}; -use crate::service::store::kv::{KvStoreRef, ResettableKvStore, ResettableKvStoreRef}; -use crate::service::store::memory::MemStore; - pub type CheckLeaderRef = Arc; pub trait CheckLeader: Sync + Send { @@ -53,21 +53,21 @@ impl CheckLeader for AlwaysLeader { /// 3. Only the leader node can update this metadata, as the cache cannot detect /// modifications made to the data on the follower node. /// 4. Only the leader node can delete this metadata for the same reason mentioned above. -pub struct LeaderCachedKvStore { +pub struct LeaderCachedKvBackend { check_leader: CheckLeaderRef, - store: KvStoreRef, - cache: ResettableKvStoreRef, + store: KvBackendRef, + cache: ResettableKvBackendRef, version: AtomicUsize, name: String, } -impl LeaderCachedKvStore { - pub fn new(check_leader: CheckLeaderRef, store: KvStoreRef) -> Self { +impl LeaderCachedKvBackend { + pub fn new(check_leader: CheckLeaderRef, store: KvBackendRef) -> Self { let name = format!("LeaderCached({})", store.name()); Self { check_leader, store, - cache: Arc::new(MemStore::new()), + cache: Arc::new(MemoryKvBackend::new()), version: AtomicUsize::new(0), name, } @@ -75,7 +75,7 @@ impl LeaderCachedKvStore { /// With a leader checker which always returns true when checking, /// mainly used in test scenarios. - pub fn with_always_leader(store: KvStoreRef) -> Self { + pub fn with_always_leader(store: KvBackendRef) -> Self { Self::new(Arc::new(AlwaysLeader), store) } @@ -114,7 +114,7 @@ impl LeaderCachedKvStore { } #[async_trait::async_trait] -impl KvBackend for LeaderCachedKvStore { +impl KvBackend for LeaderCachedKvBackend { fn name(&self) -> &str { &self.name } @@ -284,7 +284,7 @@ impl KvBackend for LeaderCachedKvStore { } #[async_trait::async_trait] -impl TxnService for LeaderCachedKvStore { +impl TxnService for LeaderCachedKvBackend { type Error = Error; async fn txn(&self, txn: Txn) -> Result { @@ -322,7 +322,7 @@ impl TxnService for LeaderCachedKvStore { } } -impl ResettableKvStore for LeaderCachedKvStore { +impl ResettableKvBackend for LeaderCachedKvBackend { fn reset(&self) { self.cache.reset() } @@ -333,16 +333,15 @@ mod tests { use common_meta::rpc::KeyValue; use super::*; - use crate::service::store::memory::MemStore; - fn create_leader_cached_kv_store() -> LeaderCachedKvStore { - let store = Arc::new(MemStore::new()); - LeaderCachedKvStore::with_always_leader(store) + fn create_leader_cached_kv_backend() -> LeaderCachedKvBackend { + let store = Arc::new(MemoryKvBackend::new()); + LeaderCachedKvBackend::with_always_leader(store) } #[tokio::test] async fn test_get_put_delete() { - let cached_store = create_leader_cached_kv_store(); + let cached_store = create_leader_cached_kv_backend(); let inner_store = cached_store.store.clone(); let inner_cache = cached_store.cache.clone(); @@ -374,7 +373,7 @@ mod tests { #[tokio::test] async fn test_batch_get_put_delete() { - let cached_store = create_leader_cached_kv_store(); + let cached_store = create_leader_cached_kv_backend(); let inner_store = cached_store.store.clone(); let inner_cache = cached_store.cache.clone(); @@ -416,7 +415,7 @@ mod tests { #[tokio::test] async fn test_txn() { - let cached_store = create_leader_cached_kv_store(); + let cached_store = create_leader_cached_kv_backend(); let inner_cache = cached_store.cache.clone(); let kvs = (1..5) diff --git a/src/meta-srv/src/service/store/etcd_util.rs b/src/meta-srv/src/service/store/etcd_util.rs deleted file mode 100644 index 5baaa8ae67c1..000000000000 --- a/src/meta-srv/src/service/store/etcd_util.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_meta::rpc::KeyValue; - -pub struct KvPair<'a>(&'a etcd_client::KeyValue); - -impl<'a> KvPair<'a> { - /// Creates a `KvPair` from etcd KeyValue - #[inline] - pub fn new(kv: &'a etcd_client::KeyValue) -> Self { - Self(kv) - } - - #[inline] - pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue { - KeyValue::from(KvPair::new(kv)) - } -} - -impl<'a> From> for KeyValue { - fn from(kv: KvPair<'a>) -> Self { - Self { - key: kv.0.key().to_vec(), - value: kv.0.value().to_vec(), - } - } -} diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs deleted file mode 100644 index 3e60798605e9..000000000000 --- a/src/meta-srv/src/service/store/kv.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use async_trait::async_trait; -use common_error::ext::BoxedError; -use common_meta::error::ExternalSnafu; -use common_meta::kv_backend::txn::{Txn, TxnResponse}; -use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; -use common_meta::rpc::store::{ - BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, -}; -use snafu::ResultExt; - -use crate::error::Error; - -pub type KvStoreRef = Arc>; -pub type ResettableKvStoreRef = Arc; - -pub trait ResettableKvStore: KvBackend { - fn reset(&self); -} - -/// An adaptor to bridge [KvStoreRef] and [KvBackendRef]. -pub struct KvBackendAdapter(KvStoreRef); - -impl KvBackendAdapter { - pub fn wrap(kv_store: KvStoreRef) -> KvBackendRef { - Arc::new(Self(kv_store)) - } -} - -#[async_trait] -impl TxnService for KvBackendAdapter { - type Error = common_meta::error::Error; - - async fn txn(&self, txn: Txn) -> Result { - self.0 - .txn(txn) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } -} - -#[async_trait] -impl KvBackend for KvBackendAdapter { - fn name(&self) -> &str { - self.0.name() - } - - fn as_any(&self) -> &dyn Any { - self.0.as_any() - } - - async fn range(&self, req: RangeRequest) -> Result { - self.0 - .range(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - - async fn put(&self, req: PutRequest) -> Result { - self.0 - .put(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - - async fn batch_put(&self, req: BatchPutRequest) -> Result { - self.0 - .batch_put(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - - async fn batch_get(&self, req: BatchGetRequest) -> Result { - self.0 - .batch_get(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - - async fn compare_and_put( - &self, - req: CompareAndPutRequest, - ) -> Result { - self.0 - .compare_and_put(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - - async fn delete_range( - &self, - req: DeleteRangeRequest, - ) -> Result { - self.0 - .delete_range(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - - async fn batch_delete( - &self, - req: BatchDeleteRequest, - ) -> Result { - self.0 - .batch_delete(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } -} diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs deleted file mode 100644 index 51ded2ab3be0..000000000000 --- a/src/meta-srv/src/service/store/memory.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_meta::kv_backend::memory::MemoryKvBackend; - -use crate::error::Error; -use crate::service::store::kv::ResettableKvStore; - -pub type MemStore = MemoryKvBackend; - -impl ResettableKvStore for MemStore { - fn reset(&self) { - self.clear(); - } -} diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 63d68c9b7b9f..4428ec625822 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::Sequence; @@ -33,8 +34,6 @@ use crate::lock::memory::MemLock; use crate::metasrv::SelectorContext; use crate::procedure::region_failover::RegionFailoverManager; use crate::selector::lease_based::LeaseBasedSelector; -use crate::service::store::kv::KvBackendAdapter; -use crate::service::store::memory::MemStore; pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) -> RegionRoute { let region = Region { @@ -53,21 +52,16 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) } pub(crate) fn create_region_failover_manager() -> Arc { - let kv_store = Arc::new(MemStore::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); let pushers = Pushers::default(); - let mailbox_sequence = Sequence::new( - "test_heartbeat_mailbox", - 0, - 100, - KvBackendAdapter::wrap(kv_store.clone()), - ); + let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_backend.clone()); let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence); - let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone()))); + let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); - let in_memory = Arc::new(MemStore::new()); + let in_memory = Arc::new(MemoryKvBackend::new()); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) .in_memory(in_memory.clone()) @@ -80,7 +74,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { let selector_ctx = SelectorContext { datanode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), - kv_store: kv_store.clone(), + kv_backend: kv_backend.clone(), meta_peer_client, table_id: None, }; @@ -92,7 +86,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { procedure_manager, (selector, selector_ctx), Arc::new(MemLock::default()), - Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))), + Arc::new(TableMetadataManager::new(kv_backend)), )) } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index a2d7bcbce86f..3e2c1383a7fb 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -21,6 +21,8 @@ use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; @@ -33,8 +35,6 @@ use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; use meta_srv::mocks::MockInfo; -use meta_srv::service::store::kv::KvStoreRef; -use meta_srv::service::store::memory::MemStore; use servers::grpc::GrpcServer; use servers::Mode; use tonic::transport::Server; @@ -50,14 +50,14 @@ pub struct GreptimeDbCluster { pub _dir_guards: Vec, pub datanode_instances: HashMap, - pub kv_store: KvStoreRef, + pub kv_backend: KvBackendRef, pub meta_srv: MetaSrv, pub frontend: Arc, } pub struct GreptimeDbClusterBuilder { cluster_name: String, - kv_store: KvStoreRef, + kv_backend: KvBackendRef, store_config: Option, datanodes: Option, } @@ -66,7 +66,7 @@ impl GreptimeDbClusterBuilder { pub fn new(cluster_name: &str) -> Self { Self { cluster_name: cluster_name.to_string(), - kv_store: Arc::new(MemStore::default()), + kv_backend: Arc::new(MemoryKvBackend::new()), store_config: None, datanodes: None, } @@ -110,7 +110,7 @@ impl GreptimeDbClusterBuilder { storage_guards, _dir_guards: dir_guards, datanode_instances, - kv_store: self.kv_store.clone(), + kv_backend: self.kv_backend.clone(), meta_srv: meta_srv.meta_srv, frontend, } @@ -127,7 +127,7 @@ impl GreptimeDbClusterBuilder { ..Default::default() }; - meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await + meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await } async fn build_datanodes( diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 1cb34d5a12f0..c8c66cbd3cc3 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use catalog::kvbackend::KvBackendCatalogManager; use common_base::Plugins; -use common_config::KvStoreConfig; +use common_config::KvBackendConfig; use common_meta::cache_invalidator::DummyKvCacheInvalidator; use common_procedure::options::ProcedureConfig; use datanode::config::DatanodeOptions; @@ -66,9 +66,9 @@ impl GreptimeDbStandaloneBuilder { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, &self.instance_name); - let (kv_store, procedure_manager) = Instance::try_build_standalone_components( + let (kv_backend, procedure_manager) = Instance::try_build_standalone_components( format!("{}/kv", &opts.storage.data_home), - KvStoreConfig::default(), + KvBackendConfig::default(), ProcedureConfig::default(), ) .await @@ -76,13 +76,14 @@ impl GreptimeDbStandaloneBuilder { let plugins = self.plugin.unwrap_or_default(); - let datanode = DatanodeBuilder::new(opts.clone(), Some(kv_store.clone()), plugins.clone()) - .build() - .await - .unwrap(); + let datanode = + DatanodeBuilder::new(opts.clone(), Some(kv_backend.clone()), plugins.clone()) + .build() + .await + .unwrap(); let catalog_manager = KvBackendCatalogManager::new( - kv_store.clone(), + kv_backend.clone(), Arc::new(DummyKvCacheInvalidator), Arc::new(StandaloneDatanodeManager(datanode.region_server())), ); @@ -94,7 +95,7 @@ impl GreptimeDbStandaloneBuilder { .unwrap(); procedure_manager.start().await.unwrap(); let instance = Instance::try_new_standalone( - kv_store, + kv_backend, procedure_manager, catalog_manager, plugins, diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 2405b0a62943..de60fd8f84dd 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -347,7 +347,7 @@ async fn run_region_failover_procedure( selector_ctx: SelectorContext { datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS, server_addr: meta_srv.options().server_addr.clone(), - kv_store: meta_srv.kv_store().clone(), + kv_backend: meta_srv.kv_backend().clone(), meta_peer_client: meta_srv.meta_peer_client().clone(), table_id: None, },