Skip to content

Commit

Permalink
fix: avoid acquiring lock during reading stats (GreptimeTeam#4070)
Browse files Browse the repository at this point in the history
* fix: avoid acquiring lock during reading stats

* chore: apply suggestions from CR

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored May 30, 2024
1 parent 7de336f commit eab309f
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl RegionServer {

pub async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
match self.inner.region_map.get(&region_id) {
Some(e) => e.region_disk_usage(region_id).await,
Some(e) => e.region_disk_usage(region_id),
None => None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}

async fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
fn region_disk_usage(&self, _region_id: RegionId) -> Option<i64> {
unimplemented!()
}

Expand Down
2 changes: 1 addition & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl RegionEngine for FileRegionEngine {
self.inner.stop().await.map_err(BoxedError::new)
}

async fn region_disk_usage(&self, _: RegionId) -> Option<i64> {
fn region_disk_usage(&self, _: RegionId) -> Option<i64> {
None
}

Expand Down
16 changes: 4 additions & 12 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ impl RegionEngine for MetricEngine {
/// Retrieves region's disk usage.
///
/// Note: Returns `None` if it's a logical region.
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
if self.inner.is_physical_region(region_id) {
self.inner.mito.region_disk_usage(region_id).await
self.inner.mito.region_disk_usage(region_id)
} else {
None
}
Expand Down Expand Up @@ -383,15 +383,7 @@ mod test {
let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();

assert!(env
.metric()
.region_disk_usage(logical_region_id)
.await
.is_none());
assert!(env
.metric()
.region_disk_usage(physical_region_id)
.await
.is_some());
assert!(env.metric().region_disk_usage(logical_region_id).is_none());
assert!(env.metric().region_disk_usage(physical_region_id).is_some());
}
}
7 changes: 3 additions & 4 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ impl MitoEngine {
}

/// Returns the region disk/memory usage information.
pub async fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
pub fn get_region_usage(&self, region_id: RegionId) -> Result<RegionUsage> {
let region = self
.inner
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;

Ok(region.region_usage().await)
Ok(region.region_usage())
}

/// Handle substrait query and return a stream of record batches
Expand Down Expand Up @@ -368,10 +368,9 @@ impl RegionEngine for MitoEngine {
self.inner.stop().await.map_err(BoxedError::new)
}

async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
let size = self
.get_region_usage(region_id)
.await
.map(|usage| usage.disk_usage())
.ok()?;
size.try_into().ok()
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ async fn test_region_usage() {
.unwrap();
// region is empty now, check manifest size
let region = engine.get_region(region_id).unwrap();
let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert_eq!(region_stat.manifest_usage, 686);

// put some rows
Expand All @@ -535,7 +535,7 @@ async fn test_region_usage() {

put_rows(&engine, region_id, rows).await;

let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert!(region_stat.wal_usage > 0);

// delete some rows
Expand All @@ -545,13 +545,13 @@ async fn test_region_usage() {
};
delete_rows(&engine, region_id, rows).await;

let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert!(region_stat.wal_usage > 0);

// flush region
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage().await;
let region_stat = region.region_usage();
assert_eq!(region_stat.sst_usage, 3010);

// region total usage
Expand Down
14 changes: 12 additions & 2 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use common_datasource::compression::CompressionType;
Expand Down Expand Up @@ -121,12 +122,17 @@ pub struct RegionManifestManager {

impl RegionManifestManager {
/// Constructs a region's manifest and persist it.
pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
pub async fn new(
metadata: RegionMetadataRef,
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
) -> Result<Self> {
// construct storage
let mut store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
total_manifest_size,
);

info!(
Expand Down Expand Up @@ -168,7 +174,10 @@ impl RegionManifestManager {
/// Opens an existing manifest.
///
/// Returns `Ok(None)` if no such manifest.
pub async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
pub async fn open(
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
) -> Result<Option<Self>> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["open"])
.start_timer();
Expand All @@ -178,6 +187,7 @@ impl RegionManifestManager {
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
total_manifest_size,
);

// recover from storage
Expand Down
39 changes: 34 additions & 5 deletions src/mito2/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::collections::HashMap;
use std::iter::Iterator;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use common_datasource::compression::CompressionType;
use common_telemetry::debug;
Expand Down Expand Up @@ -133,15 +135,22 @@ pub struct ManifestObjectStore {
path: String,
/// Stores the size of each manifest file.
manifest_size_map: HashMap<FileKey, u64>,
total_manifest_size: Arc<AtomicU64>,
}

impl ManifestObjectStore {
pub fn new(path: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self {
pub fn new(
path: &str,
object_store: ObjectStore,
compress_type: CompressionType,
total_manifest_size: Arc<AtomicU64>,
) -> Self {
Self {
object_store,
compress_type,
path: util::normalize_dir(path),
manifest_size_map: HashMap::new(),
total_manifest_size,
}
}

Expand Down Expand Up @@ -338,10 +347,9 @@ impl ManifestObjectStore {
// delete manifest sizes
for (_, is_checkpoint, version) in &del_entries {
if *is_checkpoint {
self.manifest_size_map
.remove(&FileKey::Checkpoint(*version));
self.unset_file_size(&FileKey::Checkpoint(*version));
} else {
self.manifest_size_map.remove(&FileKey::Delta(*version));
self.unset_file_size(&FileKey::Delta(*version));
}
}

Expand Down Expand Up @@ -564,12 +572,28 @@ impl ManifestObjectStore {
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map.insert(FileKey::Delta(version), size);
self.inc_total_manifest_size(size);
}

/// Set the size of the checkpoint file by checkpoint version.
pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map
.insert(FileKey::Checkpoint(version), size);
self.inc_total_manifest_size(size);
}

fn unset_file_size(&mut self, key: &FileKey) {
if let Some(val) = self.manifest_size_map.remove(key) {
self.dec_total_manifest_size(val);
}
}

fn inc_total_manifest_size(&self, val: u64) {
self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
}

fn dec_total_manifest_size(&self, val: u64) {
self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -610,7 +634,12 @@ mod tests {
let mut builder = Fs::default();
let _ = builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed)
ManifestObjectStore::new(
"/",
object_store,
CompressionType::Uncompressed,
Default::default(),
)
}

fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
Expand Down
26 changes: 17 additions & 9 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(crate) mod version;

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};

use common_telemetry::{error, info, warn};
Expand Down Expand Up @@ -106,6 +106,8 @@ pub(crate) struct MitoRegion {
time_provider: TimeProviderRef,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
/// manifest stats
stats: ManifestStats,
}

pub(crate) type MitoRegionRef = Arc<MitoRegion>;
Expand Down Expand Up @@ -233,7 +235,7 @@ impl MitoRegion {
}

/// Returns the region usage in bytes.
pub(crate) async fn region_usage(&self) -> RegionUsage {
pub(crate) fn region_usage(&self) -> RegionUsage {
let region_id = self.region_id;

let version = self.version();
Expand All @@ -243,13 +245,7 @@ impl MitoRegion {
let sst_usage = version.ssts.sst_usage();

let wal_usage = self.estimated_wal_usage(memtable_usage);

let manifest_usage = self
.manifest_ctx
.manifest_manager
.read()
.await
.manifest_usage();
let manifest_usage = self.stats.total_manifest_size();

RegionUsage {
region_id,
Expand Down Expand Up @@ -526,6 +522,18 @@ impl OpeningRegions {

pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;

/// Manifest stats.
#[derive(Default, Debug, Clone)]
pub(crate) struct ManifestStats {
total_manifest_size: Arc<AtomicU64>,
}

impl ManifestStats {
fn total_manifest_size(&self) -> u64 {
self.total_manifest_size.load(Ordering::Relaxed)
}
}

#[cfg(test)]
mod tests {
use crossbeam_utils::atomic::AtomicCell;
Expand Down
20 changes: 16 additions & 4 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::{ManifestContext, MitoRegion, RegionState};
use crate::region::{ManifestContext, ManifestStats, MitoRegion, RegionState};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
Expand All @@ -63,6 +63,7 @@ pub(crate) struct RegionOpener {
skip_wal_replay: bool,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
stats: ManifestStats,
}

impl RegionOpener {
Expand All @@ -87,6 +88,7 @@ impl RegionOpener {
skip_wal_replay: false,
intermediate_manager,
time_provider: None,
stats: Default::default(),
}
}

Expand Down Expand Up @@ -169,8 +171,12 @@ impl RegionOpener {
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
let metadata = Arc::new(self.metadata.unwrap());
let manifest_manager =
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
let manifest_manager = RegionManifestManager::new(
metadata.clone(),
region_manifest_options,
self.stats.total_manifest_size.clone(),
)
.await?;

let memtable_builder = self
.memtable_builder_provider
Expand Down Expand Up @@ -217,6 +223,7 @@ impl RegionOpener {
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
memtable_builder,
stats: self.stats,
})
}

Expand Down Expand Up @@ -267,7 +274,11 @@ impl RegionOpener {
let region_options = self.options.as_ref().unwrap().clone();

let region_manifest_options = self.manifest_options(config, &region_options)?;
let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await?
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
self.stats.total_manifest_size.clone(),
)
.await?
else {
return Ok(None);
};
Expand Down Expand Up @@ -350,6 +361,7 @@ impl RegionOpener {
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
memtable_builder,
stats: self.stats.clone(),
};
Ok(Some(region))
}
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,11 @@ impl TestEnv {
};

if let Some(metadata) = initial_metadata {
RegionManifestManager::new(metadata, manifest_opts)
RegionManifestManager::new(metadata, manifest_opts, Default::default())
.await
.map(Some)
} else {
RegionManifestManager::open(manifest_opts).await
RegionManifestManager::open(manifest_opts, Default::default()).await
}
}

Expand Down
Loading

0 comments on commit eab309f

Please sign in to comment.