Skip to content

Commit

Permalink
feat: enable metasrv leader cached kv (GreptimeTeam#2629)
Browse files Browse the repository at this point in the history
* feat: enable metasrv leader cached kv

* fix: fix cached kv caching the empty value bug

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: change DEFAULT_PAGE_SIZE to 1536
  • Loading branch information
WenyXu authored Nov 3, 2023
1 parent fb8d0c6 commit 39d52f2
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 28 deletions.
18 changes: 13 additions & 5 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,20 @@ pub const REMOVED_PREFIX: &str = "__removed";
const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";

const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
const TABLE_ROUTE_PREFIX: &str = "__table_route";

pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
pub const TABLE_ROUTE_PREFIX: &str = "__table_route";

pub const CACHE_KEY_PREFIXES: [&str; 4] = [
TABLE_NAME_KEY_PREFIX,
CATALOG_NAME_KEY_PREFIX,
SCHEMA_NAME_KEY_PREFIX,
TABLE_ROUTE_PREFIX,
];

pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;

Expand Down
11 changes: 10 additions & 1 deletion src/common/meta/src/range_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,16 @@ enum PaginationStreamState<K, V> {
Error,
}

pub const DEFAULT_PAGE_SIZE: usize = 512;
/// The Range Request's default page size.
///
/// It dependents on upstream KvStore server side grpc message size limitation.
/// (e.g., etcd has default grpc message size limitation is 4MiB)
///
/// Generally, almost all metadata is smaller than is 2700 Byte.
/// Therefore, We can set the [DEFAULT_PAGE_SIZE] to 1536 statically.
///
/// TODO(weny): Considers updating the default page size dynamically.
pub const DEFAULT_PAGE_SIZE: usize = 1536;

struct PaginationStreamFactory {
kv: KvBackendRef,
Expand Down
4 changes: 3 additions & 1 deletion src/meta-srv/src/handler/on_leader_start_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ impl HeartbeatHandler for OnLeaderStartHandler {
if let Some(election) = &ctx.election {
if election.in_infancy() {
ctx.is_infancy = true;
// TODO(weny): Unifies the multiple leader state between Context and MetaSrv.
// we can't ensure the in-memory kv has already been reset in the outside loop.
// We still use heartbeat requests to trigger resetting in-memory kv.
ctx.reset_in_memory();
ctx.reset_leader_cached_kv_backend();
}
}
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(async_closure)]
#![feature(result_flattening)]
#![feature(assert_matches)]

pub mod bootstrap;
mod cache_invalidator;
Expand All @@ -34,6 +35,7 @@ pub mod pubsub;
pub mod region;
pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;

pub use crate::error::Result;
Expand Down
33 changes: 26 additions & 7 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::sequence::SequenceRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
Expand All @@ -39,14 +39,17 @@ use tokio::sync::broadcast::error::RecvError;
use crate::cluster::MetaPeerClientRef;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{
InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
StopProcedureManagerSnafu,
};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::{become_follower, become_leader, StateRef};

pub const TABLE_ID_SEQ: &str = "table_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";

Expand Down Expand Up @@ -176,17 +179,29 @@ pub struct MetaStateHandler {
procedure_manager: ProcedureManagerRef,
subscribe_manager: Option<SubscribeManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
state: StateRef,
}

impl MetaStateHandler {
pub async fn on_become_leader(&self) {
self.state.write().unwrap().next_state(become_leader(false));

if let Err(e) = self.leader_cached_kv_backend.load().await {
error!(e; "Failed to load kv into leader cache kv store");
} else {
self.state.write().unwrap().next_state(become_leader(true));
}

if let Err(e) = self.procedure_manager.start().await {
error!(e; "Failed to start procedure manager");
}
self.greptimedb_telemetry_task.should_report(true);
}

pub async fn on_become_follower(&self) {
self.state.write().unwrap().next_state(become_follower());

// Stops the procedures.
if let Err(e) = self.procedure_manager.stop().await {
error!(e; "Failed to stop procedure manager");
Expand All @@ -205,13 +220,14 @@ impl MetaStateHandler {

#[derive(Clone)]
pub struct MetaSrv {
state: StateRef,
started: Arc<AtomicBool>,
options: MetaSrvOptions,
// It is only valid at the leader node and is used to temporarily
// store some data that will not be persisted.
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
leader_cached_kv_backend: ResettableKvBackendRef,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
table_id_sequence: SequenceRef,
meta_peer_client: MetaPeerClientRef,
selector: SelectorRef,
Expand Down Expand Up @@ -254,6 +270,8 @@ impl MetaSrv {
greptimedb_telemetry_task,
subscribe_manager,
procedure_manager,
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
};
let _handle = common_runtime::spawn_bg(async move {
loop {
Expand Down Expand Up @@ -299,6 +317,11 @@ impl MetaSrv {
info!("MetaSrv stopped");
});
} else {
// Always load kv into cached kv store.
self.leader_cached_kv_backend
.load()
.await
.context(error::KvBackendSnafu)?;
self.procedure_manager
.start()
.await
Expand Down Expand Up @@ -337,10 +360,6 @@ impl MetaSrv {
&self.kv_backend
}

pub fn leader_cached_kv_backend(&self) -> &ResettableKvBackendRef {
&self.leader_cached_kv_backend
}

pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
&self.meta_peer_client
}
Expand Down
27 changes: 15 additions & 12 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use client::client_manager::DatanodeClients;
Expand Down Expand Up @@ -55,6 +55,7 @@ use crate::pubsub::PublishRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend};
use crate::state::State;
use crate::table_meta_alloc::MetaSrvTableMetadataAllocator;

// TODO(fys): try use derive_builder macro
Expand Down Expand Up @@ -157,7 +158,18 @@ impl MetaSrvBuilder {

let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
let leader_cached_kv_backend = build_leader_cached_kv_backend(&election, &kv_backend);

let state = Arc::new(RwLock::new(match election {
None => State::leader(options.server_addr.to_string(), true),
Some(_) => State::follower(options.server_addr.to_string()),
}));

let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
state.clone(),
kv_backend.clone(),
));
let kv_backend = leader_cached_kv_backend.clone() as _;

let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
Expand Down Expand Up @@ -241,6 +253,7 @@ impl MetaSrvBuilder {
let metasrv_home = options.data_home.to_string();

Ok(MetaSrv {
state,
started,
options,
in_memory,
Expand All @@ -267,16 +280,6 @@ impl MetaSrvBuilder {
}
}

fn build_leader_cached_kv_backend(
election: &Option<ElectionRef>,
kv_backend: &KvBackendRef,
) -> Arc<LeaderCachedKvBackend> {
Arc::new(LeaderCachedKvBackend::new(
Arc::new(CheckLeaderByElection(election.clone())),
kv_backend.clone(),
))
}

fn build_default_meta_peer_client(
election: &Option<ElectionRef>,
in_memory: &ResettableKvBackendRef,
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ lazy_static! {
register_histogram_vec!("meta_handler_execute", "meta handler execute", &["name"]).unwrap();
pub static ref METRIC_META_INACTIVE_REGIONS: IntGauge =
register_int_gauge!("meta_inactive_regions", "meta inactive regions").unwrap();
pub static ref METRIC_META_LEADER_CACHED_KV_LOAD: HistogramVec =
register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"])
.unwrap();
}
54 changes: 52 additions & 2 deletions src/meta-srv/src/service/store/cached_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@
use std::any::Any;
use std::collections::HashSet;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use common_meta::error::{Error, Result};
use common_meta::key::CACHE_KEY_PREFIXES;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnOp, TxnRequest, TxnResponse};
use common_meta::kv_backend::{
KvBackend, KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, TxnService,
};
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use futures::TryStreamExt;

use crate::metrics;
use crate::state::State;

pub type CheckLeaderRef = Arc<dyn CheckLeader>;

Expand All @@ -44,6 +50,12 @@ impl CheckLeader for AlwaysLeader {
}
}

impl CheckLeader for RwLock<State> {
fn check(&self) -> bool {
self.read().unwrap().enable_leader_cache()
}
}

/// A cache dedicated to a Leader node, in order to cache some metadata.
///
/// To use this cache, the following constraints must be followed:
Expand Down Expand Up @@ -79,6 +91,37 @@ impl LeaderCachedKvBackend {
Self::new(Arc::new(AlwaysLeader), store)
}

/// The caller MUST ensure during the loading, there are no mutation requests reaching the `LeaderCachedKvStore`.
pub async fn load(&self) -> Result<()> {
for prefix in &CACHE_KEY_PREFIXES[..] {
let _timer = metrics::METRIC_META_LEADER_CACHED_KV_LOAD.with_label_values(&[prefix]);

// TODO(weny): Refactors PaginationStream's output to unary output.
let stream = PaginationStream::new(
self.store.clone(),
RangeRequest::new().with_prefix(prefix.as_bytes()),
DEFAULT_PAGE_SIZE,
Arc::new(|kv| Ok((kv, ()))),
);

let kvs = stream
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|(kv, _)| kv)
.collect();

self.cache
.batch_put(BatchPutRequest {
kvs,
prev_kv: false,
})
.await?;
}

Ok(())
}

#[inline]
fn is_leader(&self) -> bool {
self.check_leader.check()
Expand Down Expand Up @@ -141,7 +184,14 @@ impl KvBackend for LeaderCachedKvBackend {

let ver = self.get_version();

let res = self.store.range(req.clone()).await?;
let res = self
.store
.range(RangeRequest {
// ignores `keys_only`
keys_only: false,
..req.clone()
})
.await?;
if !res.kvs.is_empty() {
let KeyValue { key, value } = res.kvs[0].clone();
let put_req = PutRequest {
Expand Down
Loading

0 comments on commit 39d52f2

Please sign in to comment.