feat: add RateMeter for tracking memtable write throughput (#6744)

* feat: introduce `RateMeter`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-18 14:45:31 +08:00
committed by GitHub
parent 326198162e
commit f64fc3a57a
10 changed files with 313 additions and 14 deletions

View File

@@ -15,6 +15,7 @@
//! Basic tests for mito engine.
use std::collections::HashMap;
use std::time::Duration;
use api::v1::helper::row;
use api::v1::value::ValueData;
@@ -89,6 +90,12 @@ async fn test_write_to_region() {
rows: build_rows(0, 42),
};
put_rows(&engine, region_id, rows).await;
let region = engine.get_region(region_id).unwrap();
// Update the write bytes rate.
region
.write_bytes_per_sec
.update_rate(Duration::from_secs(1));
assert!(region.write_bytes_per_sec.get_rate() > 0);
}
#[apply(multiple_log_store_factories)]
@@ -159,6 +166,10 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
.unwrap();
assert_eq!(0, result.affected_rows);
// The replay won't update the write bytes rate meter.
let region = engine.get_region(region_id).unwrap();
assert_eq!(region.write_bytes_per_sec.get_total(), 0);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -1041,6 +1041,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to start repeated task: {}", name))]
StartRepeatedTask {
name: String,
#[snafu(implicit)]
location: Location,
source: common_runtime::error::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1203,6 +1211,8 @@ impl ErrorExt for Error {
InconsistentTimestampLength { .. } => StatusCode::InvalidArguments,
TooManyFilesToRead { .. } => StatusCode::RateLimited,
StartRepeatedTask { .. } => StatusCode::Internal,
}
}

View File

@@ -38,6 +38,7 @@ pub mod extension;
pub mod flush;
pub mod manifest;
pub mod memtable;
pub mod meter;
mod metrics;
pub mod read;
pub mod region;

15
src/mito2/src/meter.rs Normal file
View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod rate_meter;

View File

@@ -0,0 +1,163 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
/// `RateMeter` tracks a cumulative value and computes the rate per interval.
#[derive(Default, Debug, Clone)]
pub struct RateMeter {
inner: Arc<RateMeterInner>,
}
#[derive(Default, Debug)]
struct RateMeterInner {
/// Accumulated value since last rate calculation.
value: AtomicU64,
/// The last computed rate (per second).
last_rate: AtomicU64,
/// Optional: total accumulated value, never reset.
total: AtomicU64,
}
impl RateMeter {
/// Creates a new `RateMeter` with an initial value.
pub fn new() -> Self {
Self {
inner: Arc::new(RateMeterInner::default()),
}
}
/// Increments the accumulated value by `v`.
pub fn inc_by(&self, v: u64) {
self.inner.value.fetch_add(v, Ordering::Relaxed);
self.inner.total.fetch_add(v, Ordering::Relaxed);
}
/// Returns the current accumulated value since last rate calculation.
pub fn get_value(&self) -> u64 {
self.inner.value.load(Ordering::Relaxed)
}
/// Returns the total accumulated value since creation.
pub fn get_total(&self) -> u64 {
self.inner.total.load(Ordering::Relaxed)
}
/// Returns the last computed rate (per second).
pub fn get_rate(&self) -> u64 {
self.inner.last_rate.load(Ordering::Relaxed)
}
/// Updates the current rate based on the accumulated value over the given interval.
///
/// `interval` should represent the duration since the last `update_rate` call.
/// This method resets the internal accumulated counter (`value`) to 0.
pub fn update_rate(&self, interval: Duration) {
let current_value = self.inner.value.swap(0, Ordering::Relaxed);
let interval_secs = interval.as_secs();
if interval_secs > 0 {
let rate = current_value / interval_secs;
self.inner.last_rate.store(rate, Ordering::Relaxed);
}
}
/// Resets the meter: clears both current value and last rate.
/// Total accumulated value remains unchanged.
pub fn reset(&self) {
self.inner.value.store(0, Ordering::Relaxed);
self.inner.last_rate.store(0, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
fn test_inc_and_get_value_and_total() {
let meter = RateMeter::new();
assert_eq!(meter.get_value(), 0);
assert_eq!(meter.get_total(), 0);
meter.inc_by(10);
assert_eq!(meter.get_value(), 10);
assert_eq!(meter.get_total(), 10);
meter.inc_by(5);
assert_eq!(meter.get_value(), 15);
assert_eq!(meter.get_total(), 15);
}
#[test]
fn test_update_rate_and_get_rate() {
let meter = RateMeter::new();
meter.inc_by(100);
meter.update_rate(Duration::from_secs(2));
assert_eq!(meter.get_rate(), 50);
// After update, value should be reset
assert_eq!(meter.get_value(), 0);
// If interval is zero, rate should not be updated
meter.inc_by(30);
meter.update_rate(Duration::from_secs(0));
// Should still be 50
assert_eq!(meter.get_rate(), 50);
}
#[test]
fn test_reset() {
let meter = RateMeter::new();
meter.inc_by(100);
meter.update_rate(Duration::from_secs(1));
assert_eq!(meter.get_rate(), 100);
meter.reset();
assert_eq!(meter.get_value(), 0);
assert_eq!(meter.get_rate(), 0);
// Total should remain unchanged
assert_eq!(meter.get_total(), 100);
}
#[test]
fn test_total_accumulates() {
let meter = RateMeter::new();
meter.inc_by(10);
meter.update_rate(Duration::from_secs(1));
meter.inc_by(20);
meter.update_rate(Duration::from_secs(2));
assert_eq!(meter.get_total(), 30);
assert_eq!(meter.get_rate(), 10);
}
#[test]
fn test_clone_and_shared_state() {
let meter = RateMeter::new();
let meter2 = meter.clone();
meter.inc_by(10);
meter2.inc_by(5);
assert_eq!(meter.get_value(), 15);
assert_eq!(meter2.get_value(), 15);
assert_eq!(meter.get_total(), 15);
assert_eq!(meter2.get_total(), 15);
meter.update_rate(Duration::from_secs(1));
assert_eq!(meter2.get_rate(), 15);
}
}

View File

@@ -43,6 +43,7 @@ use crate::error::{
use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::meter::rate_meter::RateMeter;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file_purger::FilePurgerRef;
@@ -131,6 +132,9 @@ pub struct MitoRegion {
/// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
/// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
pub(crate) topic_latest_entry_id: AtomicU64,
/// `write_bytes_per_sec` tracks the memtable write throughput in bytes per second.
/// [RateUpdater](crate::worker::RateUpdater) will update the rate periodically.
pub(crate) write_bytes_per_sec: RateMeter,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
/// manifest stats
@@ -767,6 +771,14 @@ impl RegionMap {
regions.get(&region_id).cloned()
}
/// Iterates over all regions.
pub(crate) fn for_each_region(&self, f: impl Fn(&MitoRegionRef)) {
let regions = self.regions.read().unwrap();
for (_, region) in regions.iter() {
f(region);
}
}
/// Gets writable region by region id.
///
/// Returns error if the region does not exist or is readonly.
@@ -998,6 +1010,7 @@ mod tests {
use crate::access_layer::AccessLayer;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::meter::rate_meter::RateMeter;
use crate::region::{
ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
};
@@ -1167,6 +1180,7 @@ mod tests {
last_compaction_millis: Default::default(),
time_provider: Arc::new(StdTimeProvider),
topic_latest_entry_id: Default::default(),
write_bytes_per_sec: RateMeter::default(),
memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
stats: ManifestStats::default(),
};

View File

@@ -52,6 +52,7 @@ use crate::manifest::storage::manifest_compress_type;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::meter::rate_meter::RateMeter;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::{
@@ -287,6 +288,7 @@ impl RegionOpener {
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
memtable_builder,
write_bytes_per_sec: RateMeter::default(),
stats: self.stats,
})
}
@@ -470,6 +472,7 @@ impl RegionOpener {
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
write_bytes_per_sec: RateMeter::default(),
memtable_builder,
stats: self.stats.clone(),
};
@@ -649,8 +652,13 @@ where
}
last_entry_id = last_entry_id.max(entry_id);
let mut region_write_ctx =
RegionWriteCtx::new(region_id, version_control, provider.clone());
let mut region_write_ctx = RegionWriteCtx::new(
region_id,
version_control,
provider.clone(),
// For WAL replay, we don't need to track the write bytes rate.
None,
);
for mutation in entry.mutations {
rows_replayed += mutation
.rows

View File

@@ -25,6 +25,7 @@ use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{Error, Result, WriteGroupSnafu};
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::KeyValues;
use crate::meter::rate_meter::RateMeter;
use crate::metrics;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx;
@@ -106,6 +107,8 @@ pub(crate) struct RegionWriteCtx {
pub(crate) put_num: usize,
/// Rows to delete.
pub(crate) delete_num: usize,
/// Write bytes per second.
pub(crate) write_bytes_per_sec: Option<RateMeter>,
}
impl RegionWriteCtx {
@@ -114,6 +117,7 @@ impl RegionWriteCtx {
region_id: RegionId,
version_control: &VersionControlRef,
provider: Provider,
write_bytes_per_sec: Option<RateMeter>,
) -> RegionWriteCtx {
let VersionControlData {
version,
@@ -136,6 +140,7 @@ impl RegionWriteCtx {
put_num: 0,
delete_num: 0,
bulk_parts: vec![],
write_bytes_per_sec,
}
}
@@ -213,7 +218,13 @@ impl RegionWriteCtx {
return;
}
let mutable = self.version.memtables.mutable.clone();
let mutable_memtable = self.version.memtables.mutable.clone();
let prev_memory_usage = if self.write_bytes_per_sec.is_some() {
Some(mutable_memtable.memory_usage())
} else {
None
};
let mutations = mem::take(&mut self.wal_entry.mutations)
.into_iter()
.enumerate()
@@ -224,13 +235,13 @@ impl RegionWriteCtx {
.collect::<Vec<_>>();
if mutations.len() == 1 {
if let Err(err) = mutable.write(&mutations[0].1) {
if let Err(err) = mutable_memtable.write(&mutations[0].1) {
self.notifiers[mutations[0].0].err = Some(Arc::new(err));
}
} else {
let mut tasks = FuturesUnordered::new();
for (i, kvs) in mutations {
let mutable = mutable.clone();
let mutable = mutable_memtable.clone();
// use tokio runtime to schedule tasks.
tasks.push(common_runtime::spawn_blocking_global(move || {
(i, mutable.write(&kvs))
@@ -246,6 +257,12 @@ impl RegionWriteCtx {
}
}
if let Some(meter) = &self.write_bytes_per_sec {
let new_memory_usage = mutable_memtable.memory_usage();
let written_bytes =
new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
meter.inc_by(written_bytes as u64);
}
// Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
// to decrease `next_sequence` and `next_entry_id` by 1.
self.version_control
@@ -280,6 +297,13 @@ impl RegionWriteCtx {
.with_label_values(&["write_bulk"])
.start_timer();
let mutable_memtable = &self.version.memtables.mutable;
let prev_memory_usage = if self.write_bytes_per_sec.is_some() {
Some(mutable_memtable.memory_usage())
} else {
None
};
if self.bulk_parts.len() == 1 {
let part = self.bulk_parts.swap_remove(0);
let num_rows = part.num_rows();
@@ -293,7 +317,7 @@ impl RegionWriteCtx {
let mut tasks = FuturesUnordered::new();
for (i, part) in self.bulk_parts.drain(..).enumerate() {
let mutable = self.version.memtables.mutable.clone();
let mutable = mutable_memtable.clone();
tasks.push(common_runtime::spawn_blocking_global(move || {
let num_rows = part.num_rows();
(i, mutable.write_bulk(part), num_rows)
@@ -309,6 +333,12 @@ impl RegionWriteCtx {
}
}
if let Some(meter) = &self.write_bytes_per_sec {
let new_memory_usage = mutable_memtable.memory_usage();
let written_bytes =
new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
meter.inc_by(written_bytes as u64);
}
self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
}

View File

@@ -36,7 +36,7 @@ use std::time::Duration;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::JoinHandle;
use common_runtime::{JoinHandle, RepeatedTask, TaskFunction};
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
@@ -55,8 +55,9 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error;
use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::error::{
self, CreateDirSnafu, Error, JoinSnafu, Result, StartRepeatedTaskSnafu, WorkerStoppedSnafu,
};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
@@ -82,6 +83,8 @@ pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
/// Max delay to check region periodical tasks.
pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
/// Interval to update the rate meter for regions.
const RATE_UPDATE_INTERVAL: Duration = Duration::from_secs(30);
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A fixed size group of [RegionWorkers](RegionWorker).
@@ -208,7 +211,7 @@ impl WorkerGroup {
}
.start()
})
.collect();
.collect::<Result<Vec<_>>>()?;
Ok(WorkerGroup {
workers,
@@ -348,7 +351,7 @@ impl WorkerGroup {
}
.start()
})
.collect();
.collect::<Result<Vec<_>>>()?;
Ok(WorkerGroup {
workers,
@@ -427,7 +430,7 @@ struct WorkerStarter<S> {
impl<S: LogStore> WorkerStarter<S> {
/// Starts a region worker and its background thread.
fn start(self) -> RegionWorker {
fn start(self) -> Result<RegionWorker> {
let regions = Arc::new(RegionMap::default());
let opening_regions = Arc::new(OpeningRegions::default());
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
@@ -480,14 +483,28 @@ impl<S: LogStore> WorkerStarter<S> {
worker_thread.run().await;
});
RegionWorker {
let rate_update_task = RepeatedTask::new(
RATE_UPDATE_INTERVAL,
Box::new(RateUpdater {
regions: regions.clone(),
interval: RATE_UPDATE_INTERVAL,
}),
);
rate_update_task
.start(common_runtime::global_runtime())
.context(StartRepeatedTaskSnafu {
name: RateUpdater::NAME,
})?;
Ok(RegionWorker {
id: self.id,
regions,
opening_regions,
sender,
handle: Mutex::new(Some(handle)),
rate_update_task,
running,
}
})
}
}
@@ -503,6 +520,8 @@ pub(crate) struct RegionWorker {
sender: Sender<WorkerRequestWithTime>,
/// Handle to the worker thread.
handle: Mutex<Option<JoinHandle<()>>>,
/// rate update task.
rate_update_task: RepeatedTask<Error>,
/// Whether to run the worker thread.
running: Arc<AtomicBool>,
}
@@ -545,6 +564,9 @@ impl RegionWorker {
handle.await.context(JoinSnafu)?;
}
if let Err(err) = self.rate_update_task.stop().await {
error!(err; "Failed to stop rate update task");
}
Ok(())
}
@@ -664,6 +686,29 @@ impl StalledRequests {
}
}
struct RateUpdater {
regions: RegionMapRef,
interval: Duration,
}
impl RateUpdater {
const NAME: &str = "RateUpdater";
}
#[async_trait::async_trait]
impl TaskFunction<Error> for RateUpdater {
fn name(&self) -> &str {
Self::NAME
}
async fn call(&mut self) -> Result<()> {
self.regions.for_each_region(|region| {
region.write_bytes_per_sec.update_rate(self.interval);
});
Ok(())
}
}
/// Background worker loop to handle requests.
struct RegionWorkerLoop<S> {
/// Id of the worker.

View File

@@ -248,6 +248,7 @@ impl<S> RegionWorkerLoop<S> {
region.region_id,
&region.version_control,
region.provider.clone(),
Some(region.write_bytes_per_sec.clone()),
);
e.insert(region_ctx);
@@ -351,6 +352,7 @@ impl<S> RegionWorkerLoop<S> {
region.region_id,
&region.version_control,
region.provider.clone(),
Some(region.write_bytes_per_sec.clone()),
);
e.insert(region_ctx);