diff --git a/src/catalog/src/system_schema/information_schema/region_statistics.rs b/src/catalog/src/system_schema/information_schema/region_statistics.rs index 8db6b860bf..ddd613d3b3 100644 --- a/src/catalog/src/system_schema/information_schema/region_statistics.rs +++ b/src/catalog/src/system_schema/information_schema/region_statistics.rs @@ -40,6 +40,7 @@ const REGION_ID: &str = "region_id"; const TABLE_ID: &str = "table_id"; const REGION_NUMBER: &str = "region_number"; const REGION_ROWS: &str = "region_rows"; +const WRITTEN_BYTES: &str = "written_bytes_since_open"; const DISK_SIZE: &str = "disk_size"; const MEMTABLE_SIZE: &str = "memtable_size"; const MANIFEST_SIZE: &str = "manifest_size"; @@ -56,6 +57,7 @@ const INIT_CAPACITY: usize = 42; /// - `table_id`: The table id. /// - `region_number`: The region number. /// - `region_rows`: The number of rows in region. +/// - `written_bytes_since_open`: The total bytes written of the region since region opened. /// - `memtable_size`: The memtable size in bytes. /// - `disk_size`: The approximate disk size in bytes. /// - `manifest_size`: The manifest size in bytes. @@ -83,6 +85,7 @@ impl InformationSchemaRegionStatistics { ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false), ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false), ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true), + ColumnSchema::new(WRITTEN_BYTES, ConcreteDataType::uint64_datatype(), true), ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true), ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true), ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true), @@ -145,6 +148,7 @@ struct InformationSchemaRegionStatisticsBuilder { table_ids: UInt32VectorBuilder, region_numbers: UInt32VectorBuilder, region_rows: UInt64VectorBuilder, + written_bytes: UInt64VectorBuilder, disk_sizes: UInt64VectorBuilder, memtable_sizes: UInt64VectorBuilder, manifest_sizes: UInt64VectorBuilder, @@ -163,6 +167,7 @@ impl InformationSchemaRegionStatisticsBuilder { table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + written_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), @@ -193,6 +198,7 @@ impl InformationSchemaRegionStatisticsBuilder { (TABLE_ID, &Value::from(region_stat.id.table_id())), (REGION_NUMBER, &Value::from(region_stat.id.region_number())), (REGION_ROWS, &Value::from(region_stat.num_rows)), + (WRITTEN_BYTES, &Value::from(region_stat.written_bytes)), (DISK_SIZE, &Value::from(region_stat.approximate_bytes)), (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)), (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)), @@ -211,6 +217,7 @@ impl InformationSchemaRegionStatisticsBuilder { self.region_numbers .push(Some(region_stat.id.region_number())); self.region_rows.push(Some(region_stat.num_rows)); + self.written_bytes.push(Some(region_stat.written_bytes)); self.disk_sizes.push(Some(region_stat.approximate_bytes)); self.memtable_sizes.push(Some(region_stat.memtable_size)); self.manifest_sizes.push(Some(region_stat.manifest_size)); @@ -226,6 +233,7 @@ impl InformationSchemaRegionStatisticsBuilder { Arc::new(self.table_ids.finish()), Arc::new(self.region_numbers.finish()), Arc::new(self.region_rows.finish()), + Arc::new(self.written_bytes.finish()), Arc::new(self.disk_sizes.finish()), Arc::new(self.memtable_sizes.finish()), Arc::new(self.manifest_sizes.finish()), diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 830fce9f94..d9767f524a 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -792,6 +792,7 @@ impl InformationExtension for StandaloneInformationExtension { region_manifest: region_stat.manifest.into(), data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, + written_bytes: region_stat.written_bytes, } }) .collect::>(); diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index bee7f989c6..a835b8b575 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -97,6 +97,8 @@ pub struct RegionStat { pub index_size: u64, /// The manifest infoof the region. pub region_manifest: RegionManifestInfo, + /// The total bytes written of the region since region opened. + pub written_bytes: u64, /// The latest entry id of topic used by data. /// **Only used by remote WAL prune.** pub data_topic_latest_entry_id: u64, @@ -277,6 +279,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat { sst_size: region_stat.sst_size, index_size: region_stat.index_size, region_manifest: region_stat.manifest.into(), + written_bytes: region_stat.written_bytes, data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id, metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id, } diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs index 13aee5d234..25940ceeac 100644 --- a/src/meta-srv/src/handler/collect_leader_region_handler.rs +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -124,6 +124,7 @@ mod tests { index_size: 0, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + written_bytes: 0, } } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index cdcd9d3228..e0026c22cc 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -104,6 +104,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + written_bytes: 0, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index c133a54798..2d88a79519 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -166,6 +166,7 @@ mod test { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + written_bytes: 0, } } diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index d69e2f56d3..c43b06c7a0 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -197,6 +197,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + written_bytes: 0, }], ..Default::default() } @@ -224,6 +225,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + written_bytes: 0, }], ..Default::default() } @@ -251,6 +253,7 @@ mod tests { }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, + written_bytes: 0, }], ..Default::default() } diff --git a/src/metric-engine/src/utils.rs b/src/metric-engine/src/utils.rs index 0f28a6365e..15072c59f4 100644 --- a/src/metric-engine/src/utils.rs +++ b/src/metric-engine/src/utils.rs @@ -61,6 +61,7 @@ pub fn get_region_statistic(mito: &MitoEngine, region_id: RegionId) -> Option 0); } #[apply(multiple_log_store_factories)] async fn test_region_replay(factory: Option) { + use std::sync::atomic::Ordering; + use common_wal::options::{KafkaWalOptions, WalOptions}; common_telemetry::init_default_ut_logging(); @@ -155,6 +160,10 @@ async fn test_region_replay(factory: Option) { .unwrap(); assert_eq!(0, result.affected_rows); + // The replay won't update the write bytes rate meter. + let region = engine.get_region(region_id).unwrap(); + assert_eq!(region.written_bytes.load(Ordering::Relaxed), 0); + let request = ScanRequest::default(); let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 08863b3b2f..3c5697b649 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -129,6 +129,8 @@ pub struct MitoRegion { /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region, /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`. pub(crate) topic_latest_entry_id: AtomicU64, + /// The total bytes written to the region. + pub(crate) written_bytes: Arc, /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, /// manifest stats @@ -313,6 +315,7 @@ impl MitoRegion { let manifest_version = self.stats.manifest_version(); let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed); + let written_bytes = self.written_bytes.load(Ordering::Relaxed); RegionStatistic { num_rows, @@ -327,6 +330,7 @@ impl MitoRegion { }, data_topic_latest_entry_id: topic_latest_entry_id, metadata_topic_latest_entry_id: topic_latest_entry_id, + written_bytes, } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 885963f21e..d793643e52 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -277,6 +277,7 @@ impl RegionOpener { time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), memtable_builder, + written_bytes: Arc::new(AtomicU64::new(0)), stats: self.stats, }) } @@ -455,6 +456,7 @@ impl RegionOpener { last_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), + written_bytes: Arc::new(AtomicU64::new(0)), memtable_builder, stats: self.stats.clone(), }; @@ -634,7 +636,7 @@ where last_entry_id = last_entry_id.max(entry_id); let mut region_write_ctx = - RegionWriteCtx::new(region_id, version_control, provider.clone()); + RegionWriteCtx::new(region_id, version_control, provider.clone(), None); for mutation in entry.mutations { rows_replayed += mutation .rows diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index c05c66b9ef..12aed6329f 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::mem; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint}; @@ -106,6 +107,8 @@ pub(crate) struct RegionWriteCtx { pub(crate) put_num: usize, /// Rows to delete. pub(crate) delete_num: usize, + /// The total bytes written to the region. + pub(crate) written_bytes: Option>, } impl RegionWriteCtx { @@ -114,6 +117,7 @@ impl RegionWriteCtx { region_id: RegionId, version_control: &VersionControlRef, provider: Provider, + written_bytes: Option>, ) -> RegionWriteCtx { let VersionControlData { version, @@ -136,6 +140,7 @@ impl RegionWriteCtx { put_num: 0, delete_num: 0, bulk_parts: vec![], + written_bytes, } } @@ -214,6 +219,12 @@ impl RegionWriteCtx { } let mutable = self.version.memtables.mutable.clone(); + let prev_memory_usage = if self.written_bytes.is_some() { + Some(mutable.memory_usage()) + } else { + None + }; + let mutations = mem::take(&mut self.wal_entry.mutations) .into_iter() .enumerate() @@ -246,6 +257,11 @@ impl RegionWriteCtx { } } + if let Some(written_bytes) = &self.written_bytes { + let new_memory_usage = mutable.memory_usage(); + let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default()); + written_bytes.fetch_add(bytes as u64, Ordering::Relaxed); + } // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need // to decrease `next_sequence` and `next_entry_id` by 1. self.version_control @@ -271,6 +287,13 @@ impl RegionWriteCtx { .with_label_values(&["write_bulk"]) .start_timer(); + let mutable_memtable = &self.version.memtables.mutable; + let prev_memory_usage = if self.written_bytes.is_some() { + Some(mutable_memtable.memory_usage()) + } else { + None + }; + if self.bulk_parts.len() == 1 { let part = self.bulk_parts.swap_remove(0); let num_rows = part.num_rows(); @@ -300,6 +323,11 @@ impl RegionWriteCtx { } } + if let Some(written_bytes) = &self.written_bytes { + let new_memory_usage = mutable_memtable.memory_usage(); + let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default()); + written_bytes.fetch_add(bytes as u64, Ordering::Relaxed); + } self.version_control .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 6ed9477687..fd9f768a57 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -247,6 +247,7 @@ impl RegionWorkerLoop { region.region_id, ®ion.version_control, region.provider.clone(), + Some(region.written_bytes.clone()), ); e.insert(region_ctx); @@ -350,6 +351,7 @@ impl RegionWorkerLoop { region.region_id, ®ion.version_control, region.provider.clone(), + Some(region.written_bytes.clone()), ); e.insert(region_ctx); diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 3cf9bb0fc1..90e057a9c6 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -450,6 +450,9 @@ pub struct RegionStatistic { /// The details of the region. #[serde(default)] pub manifest: RegionManifestInfo, + #[serde(default)] + /// The total bytes written of the region since region opened. + pub written_bytes: u64, /// The latest entry id of the region's remote WAL since last flush. /// For metric engine, there're two latest entry ids, one for data and one for metadata. /// TODO(weny): remove this two fields and use single instead. diff --git a/tests/cases/standalone/common/information_schema/region_statistics.result b/tests/cases/standalone/common/information_schema/region_statistics.result index aa8e649ec2..14b8f890b0 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.result +++ b/tests/cases/standalone/common/information_schema/region_statistics.result @@ -22,15 +22,17 @@ INSERT INTO test VALUES Affected Rows: 3 -- SQLNESS SLEEP 3s -SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) +-- For regions using different WAL implementations, the manifest size may vary. +-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest. +SELECT SUM(region_rows),SUM(written_bytes_since_open), SUM(memtable_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); -+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ -| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.disk_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) | -+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ -| 3 | 2238 | 0 | 0 | -+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ ++-------------------------------------------------------+--------------------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.written_bytes_since_open) | sum(information_schema.region_statistics.memtable_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) | ++-------------------------------------------------------+--------------------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +| 3 | 78 | 78 | 0 | 0 | ++-------------------------------------------------------+--------------------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; diff --git a/tests/cases/standalone/common/information_schema/region_statistics.sql b/tests/cases/standalone/common/information_schema/region_statistics.sql index ed7a7b0cfc..70947adab2 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.sql +++ b/tests/cases/standalone/common/information_schema/region_statistics.sql @@ -17,7 +17,9 @@ INSERT INTO test VALUES (21, 'c', 21); -- SQLNESS SLEEP 3s -SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) +-- For regions using different WAL implementations, the manifest size may vary. +-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest. +SELECT SUM(region_rows),SUM(written_bytes_since_open), SUM(memtable_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 85eb99f2a3..fe26f30906 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -317,17 +317,18 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | region_peers | table_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | region_peers | table_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | region_peers | table_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | region_statistics | disk_size | 5 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | region_statistics | engine | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | region_statistics | index_size | 9 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | region_statistics | manifest_size | 7 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | region_statistics | memtable_size | 6 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | disk_size | 6 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | engine | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_statistics | index_size | 10 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | manifest_size | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | memtable_size | 7 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | region_statistics | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | region_statistics | region_number | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | information_schema | region_statistics | region_role | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_statistics | region_role | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | region_statistics | region_rows | 4 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | region_statistics | sst_size | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | sst_size | 9 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | region_statistics | table_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | information_schema | region_statistics | written_bytes_since_open | 5 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | routines | character_maximum_length | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | routines | character_octet_length | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | routines | character_set_client | 29 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |