From f64fc3a57a13931566662ca3db2a6dea66b5b68a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 18 Aug 2025 14:45:31 +0800 Subject: [PATCH] feat: add `RateMeter` for tracking memtable write throughput (#6744) * feat: introduce `RateMeter` Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/mito2/src/engine/basic_test.rs | 11 ++ src/mito2/src/error.rs | 10 ++ src/mito2/src/lib.rs | 1 + src/mito2/src/meter.rs | 15 +++ src/mito2/src/meter/rate_meter.rs | 163 +++++++++++++++++++++++++++ src/mito2/src/region.rs | 14 +++ src/mito2/src/region/opener.rs | 12 +- src/mito2/src/region_write_ctx.rs | 38 ++++++- src/mito2/src/worker.rs | 61 ++++++++-- src/mito2/src/worker/handle_write.rs | 2 + 10 files changed, 313 insertions(+), 14 deletions(-) create mode 100644 src/mito2/src/meter.rs create mode 100644 src/mito2/src/meter/rate_meter.rs diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 20f2e1eb3c..54d932155f 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -15,6 +15,7 @@ //! Basic tests for mito engine. use std::collections::HashMap; +use std::time::Duration; use api::v1::helper::row; use api::v1::value::ValueData; @@ -89,6 +90,12 @@ async fn test_write_to_region() { rows: build_rows(0, 42), }; put_rows(&engine, region_id, rows).await; + let region = engine.get_region(region_id).unwrap(); + // Update the write bytes rate. + region + .write_bytes_per_sec + .update_rate(Duration::from_secs(1)); + assert!(region.write_bytes_per_sec.get_rate() > 0); } #[apply(multiple_log_store_factories)] @@ -159,6 +166,10 @@ async fn test_region_replay(factory: Option) { .unwrap(); assert_eq!(0, result.affected_rows); + // The replay won't update the write bytes rate meter. + let region = engine.get_region(region_id).unwrap(); + assert_eq!(region.write_bytes_per_sec.get_total(), 0); + let request = ScanRequest::default(); let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index aa3d91ff4b..cc7dc514c2 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1041,6 +1041,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to start repeated task: {}", name))] + StartRepeatedTask { + name: String, + #[snafu(implicit)] + location: Location, + source: common_runtime::error::Error, + }, } pub type Result = std::result::Result; @@ -1203,6 +1211,8 @@ impl ErrorExt for Error { InconsistentTimestampLength { .. } => StatusCode::InvalidArguments, TooManyFilesToRead { .. } => StatusCode::RateLimited, + + StartRepeatedTask { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index ad4045c86e..eba689845f 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -38,6 +38,7 @@ pub mod extension; pub mod flush; pub mod manifest; pub mod memtable; +pub mod meter; mod metrics; pub mod read; pub mod region; diff --git a/src/mito2/src/meter.rs b/src/mito2/src/meter.rs new file mode 100644 index 0000000000..7bad3a2962 --- /dev/null +++ b/src/mito2/src/meter.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod rate_meter; diff --git a/src/mito2/src/meter/rate_meter.rs b/src/mito2/src/meter/rate_meter.rs new file mode 100644 index 0000000000..87526627bd --- /dev/null +++ b/src/mito2/src/meter/rate_meter.rs @@ -0,0 +1,163 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +/// `RateMeter` tracks a cumulative value and computes the rate per interval. +#[derive(Default, Debug, Clone)] +pub struct RateMeter { + inner: Arc, +} + +#[derive(Default, Debug)] +struct RateMeterInner { + /// Accumulated value since last rate calculation. + value: AtomicU64, + + /// The last computed rate (per second). + last_rate: AtomicU64, + + /// Optional: total accumulated value, never reset. + total: AtomicU64, +} + +impl RateMeter { + /// Creates a new `RateMeter` with an initial value. + pub fn new() -> Self { + Self { + inner: Arc::new(RateMeterInner::default()), + } + } + + /// Increments the accumulated value by `v`. + pub fn inc_by(&self, v: u64) { + self.inner.value.fetch_add(v, Ordering::Relaxed); + self.inner.total.fetch_add(v, Ordering::Relaxed); + } + + /// Returns the current accumulated value since last rate calculation. + pub fn get_value(&self) -> u64 { + self.inner.value.load(Ordering::Relaxed) + } + + /// Returns the total accumulated value since creation. + pub fn get_total(&self) -> u64 { + self.inner.total.load(Ordering::Relaxed) + } + + /// Returns the last computed rate (per second). + pub fn get_rate(&self) -> u64 { + self.inner.last_rate.load(Ordering::Relaxed) + } + + /// Updates the current rate based on the accumulated value over the given interval. + /// + /// `interval` should represent the duration since the last `update_rate` call. + /// This method resets the internal accumulated counter (`value`) to 0. + pub fn update_rate(&self, interval: Duration) { + let current_value = self.inner.value.swap(0, Ordering::Relaxed); + + let interval_secs = interval.as_secs(); + if interval_secs > 0 { + let rate = current_value / interval_secs; + self.inner.last_rate.store(rate, Ordering::Relaxed); + } + } + + /// Resets the meter: clears both current value and last rate. + /// Total accumulated value remains unchanged. + pub fn reset(&self) { + self.inner.value.store(0, Ordering::Relaxed); + self.inner.last_rate.store(0, Ordering::Relaxed); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn test_inc_and_get_value_and_total() { + let meter = RateMeter::new(); + assert_eq!(meter.get_value(), 0); + assert_eq!(meter.get_total(), 0); + + meter.inc_by(10); + assert_eq!(meter.get_value(), 10); + assert_eq!(meter.get_total(), 10); + + meter.inc_by(5); + assert_eq!(meter.get_value(), 15); + assert_eq!(meter.get_total(), 15); + } + + #[test] + fn test_update_rate_and_get_rate() { + let meter = RateMeter::new(); + meter.inc_by(100); + meter.update_rate(Duration::from_secs(2)); + assert_eq!(meter.get_rate(), 50); + // After update, value should be reset + assert_eq!(meter.get_value(), 0); + + // If interval is zero, rate should not be updated + meter.inc_by(30); + meter.update_rate(Duration::from_secs(0)); + // Should still be 50 + assert_eq!(meter.get_rate(), 50); + } + + #[test] + fn test_reset() { + let meter = RateMeter::new(); + meter.inc_by(100); + meter.update_rate(Duration::from_secs(1)); + assert_eq!(meter.get_rate(), 100); + meter.reset(); + assert_eq!(meter.get_value(), 0); + assert_eq!(meter.get_rate(), 0); + // Total should remain unchanged + assert_eq!(meter.get_total(), 100); + } + + #[test] + fn test_total_accumulates() { + let meter = RateMeter::new(); + meter.inc_by(10); + meter.update_rate(Duration::from_secs(1)); + meter.inc_by(20); + meter.update_rate(Duration::from_secs(2)); + assert_eq!(meter.get_total(), 30); + assert_eq!(meter.get_rate(), 10); + } + + #[test] + fn test_clone_and_shared_state() { + let meter = RateMeter::new(); + let meter2 = meter.clone(); + meter.inc_by(10); + meter2.inc_by(5); + assert_eq!(meter.get_value(), 15); + assert_eq!(meter2.get_value(), 15); + assert_eq!(meter.get_total(), 15); + assert_eq!(meter2.get_total(), 15); + + meter.update_rate(Duration::from_secs(1)); + assert_eq!(meter2.get_rate(), 15); + } +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 631f44caf1..385253a922 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -43,6 +43,7 @@ use crate::error::{ use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; use crate::memtable::MemtableBuilderRef; +use crate::meter::rate_meter::RateMeter; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OnFailure, OptionOutputTx}; use crate::sst::file_purger::FilePurgerRef; @@ -131,6 +132,9 @@ pub struct MitoRegion { /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region, /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`. pub(crate) topic_latest_entry_id: AtomicU64, + /// `write_bytes_per_sec` tracks the memtable write throughput in bytes per second. + /// [RateUpdater](crate::worker::RateUpdater) will update the rate periodically. + pub(crate) write_bytes_per_sec: RateMeter, /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, /// manifest stats @@ -767,6 +771,14 @@ impl RegionMap { regions.get(®ion_id).cloned() } + /// Iterates over all regions. + pub(crate) fn for_each_region(&self, f: impl Fn(&MitoRegionRef)) { + let regions = self.regions.read().unwrap(); + for (_, region) in regions.iter() { + f(region); + } + } + /// Gets writable region by region id. /// /// Returns error if the region does not exist or is readonly. @@ -998,6 +1010,7 @@ mod tests { use crate::access_layer::AccessLayer; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; + use crate::meter::rate_meter::RateMeter; use crate::region::{ ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState, }; @@ -1167,6 +1180,7 @@ mod tests { last_compaction_millis: Default::default(), time_provider: Arc::new(StdTimeProvider), topic_latest_entry_id: Default::default(), + write_bytes_per_sec: RateMeter::default(), memtable_builder: Arc::new(EmptyMemtableBuilder::default()), stats: ManifestStats::default(), }; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 6666970ebd..54534f3e41 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -52,6 +52,7 @@ use crate::manifest::storage::manifest_compress_type; use crate::memtable::bulk::part::BulkPart; use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; +use crate::meter::rate_meter::RateMeter; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::{ @@ -287,6 +288,7 @@ impl RegionOpener { time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), memtable_builder, + write_bytes_per_sec: RateMeter::default(), stats: self.stats, }) } @@ -470,6 +472,7 @@ impl RegionOpener { last_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), + write_bytes_per_sec: RateMeter::default(), memtable_builder, stats: self.stats.clone(), }; @@ -649,8 +652,13 @@ where } last_entry_id = last_entry_id.max(entry_id); - let mut region_write_ctx = - RegionWriteCtx::new(region_id, version_control, provider.clone()); + let mut region_write_ctx = RegionWriteCtx::new( + region_id, + version_control, + provider.clone(), + // For WAL replay, we don't need to track the write bytes rate. + None, + ); for mutation in entry.mutations { rows_replayed += mutation .rows diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 395a33ec20..06a9cccdc1 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -25,6 +25,7 @@ use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{Error, Result, WriteGroupSnafu}; use crate::memtable::bulk::part::BulkPart; use crate::memtable::KeyValues; +use crate::meter::rate_meter::RateMeter; use crate::metrics; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::OptionOutputTx; @@ -106,6 +107,8 @@ pub(crate) struct RegionWriteCtx { pub(crate) put_num: usize, /// Rows to delete. pub(crate) delete_num: usize, + /// Write bytes per second. + pub(crate) write_bytes_per_sec: Option, } impl RegionWriteCtx { @@ -114,6 +117,7 @@ impl RegionWriteCtx { region_id: RegionId, version_control: &VersionControlRef, provider: Provider, + write_bytes_per_sec: Option, ) -> RegionWriteCtx { let VersionControlData { version, @@ -136,6 +140,7 @@ impl RegionWriteCtx { put_num: 0, delete_num: 0, bulk_parts: vec![], + write_bytes_per_sec, } } @@ -213,7 +218,13 @@ impl RegionWriteCtx { return; } - let mutable = self.version.memtables.mutable.clone(); + let mutable_memtable = self.version.memtables.mutable.clone(); + let prev_memory_usage = if self.write_bytes_per_sec.is_some() { + Some(mutable_memtable.memory_usage()) + } else { + None + }; + let mutations = mem::take(&mut self.wal_entry.mutations) .into_iter() .enumerate() @@ -224,13 +235,13 @@ impl RegionWriteCtx { .collect::>(); if mutations.len() == 1 { - if let Err(err) = mutable.write(&mutations[0].1) { + if let Err(err) = mutable_memtable.write(&mutations[0].1) { self.notifiers[mutations[0].0].err = Some(Arc::new(err)); } } else { let mut tasks = FuturesUnordered::new(); for (i, kvs) in mutations { - let mutable = mutable.clone(); + let mutable = mutable_memtable.clone(); // use tokio runtime to schedule tasks. tasks.push(common_runtime::spawn_blocking_global(move || { (i, mutable.write(&kvs)) @@ -246,6 +257,12 @@ impl RegionWriteCtx { } } + if let Some(meter) = &self.write_bytes_per_sec { + let new_memory_usage = mutable_memtable.memory_usage(); + let written_bytes = + new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default()); + meter.inc_by(written_bytes as u64); + } // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need // to decrease `next_sequence` and `next_entry_id` by 1. self.version_control @@ -280,6 +297,13 @@ impl RegionWriteCtx { .with_label_values(&["write_bulk"]) .start_timer(); + let mutable_memtable = &self.version.memtables.mutable; + let prev_memory_usage = if self.write_bytes_per_sec.is_some() { + Some(mutable_memtable.memory_usage()) + } else { + None + }; + if self.bulk_parts.len() == 1 { let part = self.bulk_parts.swap_remove(0); let num_rows = part.num_rows(); @@ -293,7 +317,7 @@ impl RegionWriteCtx { let mut tasks = FuturesUnordered::new(); for (i, part) in self.bulk_parts.drain(..).enumerate() { - let mutable = self.version.memtables.mutable.clone(); + let mutable = mutable_memtable.clone(); tasks.push(common_runtime::spawn_blocking_global(move || { let num_rows = part.num_rows(); (i, mutable.write_bulk(part), num_rows) @@ -309,6 +333,12 @@ impl RegionWriteCtx { } } + if let Some(meter) = &self.write_bytes_per_sec { + let new_memory_usage = mutable_memtable.memory_usage(); + let written_bytes = + new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default()); + meter.inc_by(written_bytes as u64); + } self.version_control .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 02e5cdd391..69d1caaffa 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -36,7 +36,7 @@ use std::time::Duration; use common_base::Plugins; use common_error::ext::BoxedError; use common_meta::key::SchemaMetadataManagerRef; -use common_runtime::JoinHandle; +use common_runtime::{JoinHandle, RepeatedTask, TaskFunction}; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; @@ -55,8 +55,9 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; -use crate::error; -use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; +use crate::error::{ + self, CreateDirSnafu, Error, JoinSnafu, Result, StartRepeatedTaskSnafu, WorkerStoppedSnafu, +}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING}; @@ -82,6 +83,8 @@ pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping"; pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60); /// Max delay to check region periodical tasks. pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3; +/// Interval to update the rate meter for regions. +const RATE_UPDATE_INTERVAL: Duration = Duration::from_secs(30); #[cfg_attr(doc, aquamarine::aquamarine)] /// A fixed size group of [RegionWorkers](RegionWorker). @@ -208,7 +211,7 @@ impl WorkerGroup { } .start() }) - .collect(); + .collect::>>()?; Ok(WorkerGroup { workers, @@ -348,7 +351,7 @@ impl WorkerGroup { } .start() }) - .collect(); + .collect::>>()?; Ok(WorkerGroup { workers, @@ -427,7 +430,7 @@ struct WorkerStarter { impl WorkerStarter { /// Starts a region worker and its background thread. - fn start(self) -> RegionWorker { + fn start(self) -> Result { let regions = Arc::new(RegionMap::default()); let opening_regions = Arc::new(OpeningRegions::default()); let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); @@ -480,14 +483,28 @@ impl WorkerStarter { worker_thread.run().await; }); - RegionWorker { + let rate_update_task = RepeatedTask::new( + RATE_UPDATE_INTERVAL, + Box::new(RateUpdater { + regions: regions.clone(), + interval: RATE_UPDATE_INTERVAL, + }), + ); + rate_update_task + .start(common_runtime::global_runtime()) + .context(StartRepeatedTaskSnafu { + name: RateUpdater::NAME, + })?; + + Ok(RegionWorker { id: self.id, regions, opening_regions, sender, handle: Mutex::new(Some(handle)), + rate_update_task, running, - } + }) } } @@ -503,6 +520,8 @@ pub(crate) struct RegionWorker { sender: Sender, /// Handle to the worker thread. handle: Mutex>>, + /// rate update task. + rate_update_task: RepeatedTask, /// Whether to run the worker thread. running: Arc, } @@ -545,6 +564,9 @@ impl RegionWorker { handle.await.context(JoinSnafu)?; } + if let Err(err) = self.rate_update_task.stop().await { + error!(err; "Failed to stop rate update task"); + } Ok(()) } @@ -664,6 +686,29 @@ impl StalledRequests { } } +struct RateUpdater { + regions: RegionMapRef, + interval: Duration, +} + +impl RateUpdater { + const NAME: &str = "RateUpdater"; +} + +#[async_trait::async_trait] +impl TaskFunction for RateUpdater { + fn name(&self) -> &str { + Self::NAME + } + + async fn call(&mut self) -> Result<()> { + self.regions.for_each_region(|region| { + region.write_bytes_per_sec.update_rate(self.interval); + }); + Ok(()) + } +} + /// Background worker loop to handle requests. struct RegionWorkerLoop { /// Id of the worker. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 68d8152d5a..32182243b3 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -248,6 +248,7 @@ impl RegionWorkerLoop { region.region_id, ®ion.version_control, region.provider.clone(), + Some(region.write_bytes_per_sec.clone()), ); e.insert(region_ctx); @@ -351,6 +352,7 @@ impl RegionWorkerLoop { region.region_id, ®ion.version_control, region.provider.clone(), + Some(region.write_bytes_per_sec.clone()), ); e.insert(region_ctx);