Skip to content

Commit

Permalink
refactor: unify the KvBackend (GreptimeTeam#2684)
Browse files Browse the repository at this point in the history
* refactor: unify the KvBackend

* refactor: rename kv_store to kv_backend

* chore: apply suggestions from CR

* refactor: rename kv_store to kv_backend

* refactor: rename KvStoreConfig to KvBackendConfig
  • Loading branch information
WenyXu authored Nov 3, 2023
1 parent ce867fb commit fb8d0c6
Show file tree
Hide file tree
Showing 43 changed files with 520 additions and 615 deletions.
7 changes: 2 additions & 5 deletions src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::table_name::TableName;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use meta_srv::service::store::etcd::EtcdStore;
use meta_srv::service::store::kv::KvBackendAdapter;
use rand::Rng;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};

Expand Down Expand Up @@ -64,9 +63,7 @@ impl BenchTableMetadataCommand {
pub async fn build(&self) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap();

let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
etcd_store,
)));
let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));

let tool = BenchTableMetadata {
table_metadata_manager,
Expand Down
14 changes: 7 additions & 7 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::{RegionDistribution, TableMetaKey};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::router::TableRoute;
use common_meta::rpc::store::{BatchDeleteRequest, BatchPutRequest, PutRequest, RangeRequest};
Expand All @@ -35,8 +37,6 @@ use common_meta::util::get_prefix_end_key;
use common_telemetry::info;
use etcd_client::Client;
use futures::TryStreamExt;
use meta_srv::service::store::etcd::EtcdStore;
use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef};
use prost::Message;
use snafu::ResultExt;
use v1_helper::{CatalogKey as v1CatalogKey, SchemaKey as v1SchemaKey, TableGlobalValue};
Expand Down Expand Up @@ -81,7 +81,7 @@ impl UpgradeCommand {
}

struct MigrateTableMetadata {
etcd_store: KvStoreRef,
etcd_store: KvBackendRef,
dryrun: bool,

skip_table_global_keys: bool,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl MigrateTableMetadata {
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));

let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down Expand Up @@ -182,7 +182,7 @@ impl MigrateTableMetadata {
let mut keys = Vec::new();
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down Expand Up @@ -234,7 +234,7 @@ impl MigrateTableMetadata {
let mut keys = Vec::new();
info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down Expand Up @@ -284,7 +284,7 @@ impl MigrateTableMetadata {

info!("Start scanning key from: {}", String::from_utf8_lossy(&key));
let mut stream = PaginationStream::new(
KvBackendAdapter::wrap(self.etcd_store.clone()),
self.etcd_store.clone(),
RangeRequest::new().with_range(key, range_end.clone()),
PAGE_SIZE,
Arc::new(|kv: KeyValue| {
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_config::KvStoreConfig;
use common_config::KvBackendConfig;
use common_telemetry::logging::LoggingOptions;
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
Expand All @@ -30,7 +30,7 @@ pub const ENV_LIST_SEP: &str = ",";
pub struct MixOptions {
pub data_home: String,
pub procedure: ProcedureConfig,
pub metadata_store: KvStoreConfig,
pub metadata_store: KvBackendConfig,
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
Expand Down
29 changes: 16 additions & 13 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::{metadata_store_dir, KvStoreConfig, WalConfig};
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub struct StandaloneOptions {
pub prom_store: PromStoreOptions,
pub wal: WalConfig,
pub storage: StorageConfig,
pub metadata_store: KvStoreConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
pub logging: LoggingOptions,
pub user_provider: Option<String>,
Expand All @@ -119,7 +119,7 @@ impl Default for StandaloneOptions {
prom_store: PromStoreOptions::default(),
wal: WalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvStoreConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(),
user_provider: None,
Expand Down Expand Up @@ -336,23 +336,26 @@ impl StartCommand {
})?;

let metadata_dir = metadata_store_dir(&opts.data_home);
let (kv_store, procedure_manager) = FeInstance::try_build_standalone_components(
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
metadata_dir,
opts.metadata_store,
opts.procedure,
)
.await
.context(StartFrontendSnafu)?;

let datanode =
DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), Default::default())
.build()
.await
.context(StartDatanodeSnafu)?;
let datanode = DatanodeBuilder::new(
dn_opts.clone(),
Some(kv_backend.clone()),
Default::default(),
)
.build()
.await
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();

let catalog_manager = KvBackendCatalogManager::new(
kv_store.clone(),
kv_backend.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(region_server.clone())),
);
Expand All @@ -366,7 +369,7 @@ impl StartCommand {
// TODO: build frontend instance like in distributed mode
let mut frontend = build_frontend(
fe_plugins,
kv_store,
kv_backend,
procedure_manager.clone(),
catalog_manager,
region_server,
Expand All @@ -389,13 +392,13 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
plugins: Plugins,
kv_store: KvBackendRef,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let frontend_instance = FeInstance::try_new_standalone(
kv_store,
kv_backend,
procedure_manager,
catalog_manager,
plugins,
Expand Down
4 changes: 2 additions & 2 deletions src/common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ pub fn metadata_store_dir(store_dir: &str) -> String {

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvStoreConfig {
pub struct KvBackendConfig {
// Kv file size in bytes
pub file_size: ReadableSize,
// Kv purge threshold in bytes
pub purge_threshold: ReadableSize,
}

impl Default for KvStoreConfig {
impl Default for KvBackendConfig {
fn default() -> Self {
Self {
// log file size 256MB
Expand Down
30 changes: 27 additions & 3 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@ use crate::peer::Peer;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Empty key is not allowed"))]
EmptyKey { location: Location },

#[snafu(display("Invalid result with a txn response: {}", err_msg))]
InvalidTxnResult { err_msg: String, location: Location },

#[snafu(display("Failed to connect to Etcd"))]
ConnectEtcd {
#[snafu(source)]
error: etcd_client::Error,
location: Location,
},

#[snafu(display("Failed to execute via Etcd"))]
EtcdFailed {
#[snafu(source)]
error: etcd_client::Error,
location: Location,
},

#[snafu(display("Failed to get sequence: {}", err_msg))]
NextSequence { err_msg: String, location: Location },

Expand Down Expand Up @@ -254,7 +274,10 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
IllegalServerState { .. } | EtcdTxnOpResponse { .. } => StatusCode::Internal,
IllegalServerState { .. }
| EtcdTxnOpResponse { .. }
| EtcdFailed { .. }
| ConnectEtcd { .. } => StatusCode::Internal,

SerdeJson { .. }
| ParseOption { .. }
Expand All @@ -267,7 +290,8 @@ impl ErrorExt for Error {
| NextSequence { .. }
| SequenceOutOfRange { .. }
| UnexpectedSequenceValue { .. }
| InvalidHeartbeatResponse { .. } => StatusCode::Unexpected,
| InvalidHeartbeatResponse { .. }
| InvalidTxnResult { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand All @@ -277,7 +301,7 @@ impl ErrorExt for Error {
| RenameTable { .. }
| Unsupported { .. } => StatusCode::Internal,

PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
PrimaryKeyNotFound { .. } | &EmptyKey { .. } => StatusCode::InvalidArguments,

TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Expand Down
10 changes: 10 additions & 0 deletions src/common/meta/src/kv_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod etcd;
pub mod memory;
pub mod test;
pub mod txn;
Expand Down Expand Up @@ -127,3 +128,12 @@ where
}
}
}

pub trait ResettableKvBackend: KvBackend
where
Self::Error: ErrorExt,
{
fn reset(&self);
}

pub type ResettableKvBackendRef = Arc<dyn ResettableKvBackend<Error = Error> + Send + Sync>;
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,47 @@
use std::any::Any;
use std::sync::Arc;

use common_meta::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::metrics::METRIC_META_TXN_REQUEST;
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use etcd_client::{
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
TxnResponse,
};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error;
use crate::error::{ConvertEtcdTxnObjectSnafu, Error, Result};
use crate::service::store::etcd_util::KvPair;
use crate::service::store::kv::KvStoreRef;
use super::KvBackendRef;
use crate::error::{self, Error, Result};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;

pub struct KvPair<'a>(&'a etcd_client::KeyValue);

impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
pub fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}

#[inline]
pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}

impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}

// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
Expand All @@ -46,7 +68,7 @@ pub struct EtcdStore {
}

impl EtcdStore {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvStoreRef>
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<KvBackendRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -58,7 +80,7 @@ impl EtcdStore {
Ok(Self::with_etcd_client(client))
}

pub fn with_etcd_client(client: Client) -> KvStoreRef {
pub fn with_etcd_client(client: Client) -> KvBackendRef {
Arc::new(Self { client })
}

Expand Down Expand Up @@ -305,7 +327,7 @@ impl TxnService for EtcdStore {
.txn(etcd_txn)
.await
.context(error::EtcdFailedSnafu)?;
txn_res.try_into().context(ConvertEtcdTxnObjectSnafu)
txn_res.try_into()
}
}

Expand Down
Loading

0 comments on commit fb8d0c6

Please sign in to comment.