feat: add written_bytes_since_open column to region_statistics table (#6904)

* feat: add `write_bytes` column to `region_statistics` table

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: rename `write_bytes` to `written_bytes`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: rename `written_bytes` to `written_bytes_since_open`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-09-05 15:27:30 +08:00
committed by Yingwen
parent e6831704d8
commit e2393d27b2
17 changed files with 87 additions and 15 deletions

View File

@@ -40,6 +40,7 @@ const REGION_ID: &str = "region_id";
const TABLE_ID: &str = "table_id";
const REGION_NUMBER: &str = "region_number";
const REGION_ROWS: &str = "region_rows";
const WRITTEN_BYTES: &str = "written_bytes_since_open";
const DISK_SIZE: &str = "disk_size";
const MEMTABLE_SIZE: &str = "memtable_size";
const MANIFEST_SIZE: &str = "manifest_size";
@@ -56,6 +57,7 @@ const INIT_CAPACITY: usize = 42;
/// - `table_id`: The table id.
/// - `region_number`: The region number.
/// - `region_rows`: The number of rows in region.
/// - `written_bytes_since_open`: The total bytes written of the region since region opened.
/// - `memtable_size`: The memtable size in bytes.
/// - `disk_size`: The approximate disk size in bytes.
/// - `manifest_size`: The manifest size in bytes.
@@ -83,6 +85,7 @@ impl InformationSchemaRegionStatistics {
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(WRITTEN_BYTES, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
@@ -145,6 +148,7 @@ struct InformationSchemaRegionStatisticsBuilder {
table_ids: UInt32VectorBuilder,
region_numbers: UInt32VectorBuilder,
region_rows: UInt64VectorBuilder,
written_bytes: UInt64VectorBuilder,
disk_sizes: UInt64VectorBuilder,
memtable_sizes: UInt64VectorBuilder,
manifest_sizes: UInt64VectorBuilder,
@@ -163,6 +167,7 @@ impl InformationSchemaRegionStatisticsBuilder {
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
written_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
@@ -193,6 +198,7 @@ impl InformationSchemaRegionStatisticsBuilder {
(TABLE_ID, &Value::from(region_stat.id.table_id())),
(REGION_NUMBER, &Value::from(region_stat.id.region_number())),
(REGION_ROWS, &Value::from(region_stat.num_rows)),
(WRITTEN_BYTES, &Value::from(region_stat.written_bytes)),
(DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
(MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
(MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
@@ -211,6 +217,7 @@ impl InformationSchemaRegionStatisticsBuilder {
self.region_numbers
.push(Some(region_stat.id.region_number()));
self.region_rows.push(Some(region_stat.num_rows));
self.written_bytes.push(Some(region_stat.written_bytes));
self.disk_sizes.push(Some(region_stat.approximate_bytes));
self.memtable_sizes.push(Some(region_stat.memtable_size));
self.manifest_sizes.push(Some(region_stat.manifest_size));
@@ -226,6 +233,7 @@ impl InformationSchemaRegionStatisticsBuilder {
Arc::new(self.table_ids.finish()),
Arc::new(self.region_numbers.finish()),
Arc::new(self.region_rows.finish()),
Arc::new(self.written_bytes.finish()),
Arc::new(self.disk_sizes.finish()),
Arc::new(self.memtable_sizes.finish()),
Arc::new(self.manifest_sizes.finish()),

View File

@@ -792,6 +792,7 @@ impl InformationExtension for StandaloneInformationExtension {
region_manifest: region_stat.manifest.into(),
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
written_bytes: region_stat.written_bytes,
}
})
.collect::<Vec<_>>();

View File

@@ -97,6 +97,8 @@ pub struct RegionStat {
pub index_size: u64,
/// The manifest infoof the region.
pub region_manifest: RegionManifestInfo,
/// The total bytes written of the region since region opened.
pub written_bytes: u64,
/// The latest entry id of topic used by data.
/// **Only used by remote WAL prune.**
pub data_topic_latest_entry_id: u64,
@@ -277,6 +279,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
written_bytes: region_stat.written_bytes,
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
}

View File

@@ -124,6 +124,7 @@ mod tests {
index_size: 0,
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
written_bytes: 0,
}
}

View File

@@ -104,6 +104,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
written_bytes: 0,
}
}
acc.stat = Some(Stat {

View File

@@ -166,6 +166,7 @@ mod test {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
written_bytes: 0,
}
}

View File

@@ -197,6 +197,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
written_bytes: 0,
}],
..Default::default()
}
@@ -224,6 +225,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
written_bytes: 0,
}],
..Default::default()
}
@@ -251,6 +253,7 @@ mod tests {
},
data_topic_latest_entry_id: 0,
metadata_topic_latest_entry_id: 0,
written_bytes: 0,
}],
..Default::default()
}

View File

@@ -61,6 +61,7 @@ pub fn get_region_statistic(mito: &MitoEngine, region_id: RegionId) -> Option<Re
metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(),
metadata_manifest_version: metadata_stat.manifest.data_manifest_version(),
},
written_bytes: metadata_stat.written_bytes + data_stat.written_bytes,
data_topic_latest_entry_id: data_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: metadata_stat.metadata_topic_latest_entry_id,
}),

View File

@@ -15,6 +15,7 @@
//! Basic tests for mito engine.
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use api::v1::value::ValueData;
use api::v1::{Rows, SemanticType};
@@ -86,11 +87,15 @@ async fn test_write_to_region() {
rows: build_rows(0, 42),
};
put_rows(&engine, region_id, rows).await;
let region = engine.get_region(region_id).unwrap();
assert!(region.written_bytes.load(Ordering::Relaxed) > 0);
}
#[apply(multiple_log_store_factories)]
async fn test_region_replay(factory: Option<LogStoreFactory>) {
use std::sync::atomic::Ordering;
use common_wal::options::{KafkaWalOptions, WalOptions};
common_telemetry::init_default_ut_logging();
@@ -155,6 +160,10 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
.unwrap();
assert_eq!(0, result.affected_rows);
// The replay won't update the write bytes rate meter.
let region = engine.get_region(region_id).unwrap();
assert_eq!(region.written_bytes.load(Ordering::Relaxed), 0);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -129,6 +129,8 @@ pub struct MitoRegion {
/// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
/// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
pub(crate) topic_latest_entry_id: AtomicU64,
/// The total bytes written to the region.
pub(crate) written_bytes: Arc<AtomicU64>,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
/// manifest stats
@@ -313,6 +315,7 @@ impl MitoRegion {
let manifest_version = self.stats.manifest_version();
let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
let written_bytes = self.written_bytes.load(Ordering::Relaxed);
RegionStatistic {
num_rows,
@@ -327,6 +330,7 @@ impl MitoRegion {
},
data_topic_latest_entry_id: topic_latest_entry_id,
metadata_topic_latest_entry_id: topic_latest_entry_id,
written_bytes,
}
}

View File

@@ -277,6 +277,7 @@ impl RegionOpener {
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
memtable_builder,
written_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats,
})
}
@@ -455,6 +456,7 @@ impl RegionOpener {
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
written_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder,
stats: self.stats.clone(),
};
@@ -634,7 +636,7 @@ where
last_entry_id = last_entry_id.max(entry_id);
let mut region_write_ctx =
RegionWriteCtx::new(region_id, version_control, provider.clone());
RegionWriteCtx::new(region_id, version_control, provider.clone(), None);
for mutation in entry.mutations {
rows_replayed += mutation
.rows

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::mem;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint};
@@ -106,6 +107,8 @@ pub(crate) struct RegionWriteCtx {
pub(crate) put_num: usize,
/// Rows to delete.
pub(crate) delete_num: usize,
/// The total bytes written to the region.
pub(crate) written_bytes: Option<Arc<AtomicU64>>,
}
impl RegionWriteCtx {
@@ -114,6 +117,7 @@ impl RegionWriteCtx {
region_id: RegionId,
version_control: &VersionControlRef,
provider: Provider,
written_bytes: Option<Arc<AtomicU64>>,
) -> RegionWriteCtx {
let VersionControlData {
version,
@@ -136,6 +140,7 @@ impl RegionWriteCtx {
put_num: 0,
delete_num: 0,
bulk_parts: vec![],
written_bytes,
}
}
@@ -214,6 +219,12 @@ impl RegionWriteCtx {
}
let mutable = self.version.memtables.mutable.clone();
let prev_memory_usage = if self.written_bytes.is_some() {
Some(mutable.memory_usage())
} else {
None
};
let mutations = mem::take(&mut self.wal_entry.mutations)
.into_iter()
.enumerate()
@@ -246,6 +257,11 @@ impl RegionWriteCtx {
}
}
if let Some(written_bytes) = &self.written_bytes {
let new_memory_usage = mutable.memory_usage();
let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
}
// Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
// to decrease `next_sequence` and `next_entry_id` by 1.
self.version_control
@@ -271,6 +287,13 @@ impl RegionWriteCtx {
.with_label_values(&["write_bulk"])
.start_timer();
let mutable_memtable = &self.version.memtables.mutable;
let prev_memory_usage = if self.written_bytes.is_some() {
Some(mutable_memtable.memory_usage())
} else {
None
};
if self.bulk_parts.len() == 1 {
let part = self.bulk_parts.swap_remove(0);
let num_rows = part.num_rows();
@@ -300,6 +323,11 @@ impl RegionWriteCtx {
}
}
if let Some(written_bytes) = &self.written_bytes {
let new_memory_usage = mutable_memtable.memory_usage();
let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
}
self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
}

View File

@@ -247,6 +247,7 @@ impl<S> RegionWorkerLoop<S> {
region.region_id,
&region.version_control,
region.provider.clone(),
Some(region.written_bytes.clone()),
);
e.insert(region_ctx);
@@ -350,6 +351,7 @@ impl<S> RegionWorkerLoop<S> {
region.region_id,
&region.version_control,
region.provider.clone(),
Some(region.written_bytes.clone()),
);
e.insert(region_ctx);

View File

@@ -450,6 +450,9 @@ pub struct RegionStatistic {
/// The details of the region.
#[serde(default)]
pub manifest: RegionManifestInfo,
#[serde(default)]
/// The total bytes written of the region since region opened.
pub written_bytes: u64,
/// The latest entry id of the region's remote WAL since last flush.
/// For metric engine, there're two latest entry ids, one for data and one for metadata.
/// TODO(weny): remove this two fields and use single instead.

View File

@@ -22,15 +22,17 @@ INSERT INTO test VALUES
Affected Rows: 3
-- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
-- For regions using different WAL implementations, the manifest size may vary.
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
SELECT SUM(region_rows),SUM(written_bytes_since_open), SUM(memtable_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.disk_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| 3 | 2238 | 0 | 0 |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
+-------------------------------------------------------+--------------------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| sum(information_schema.region_statistics.region_rows) | sum(information_schema.region_statistics.written_bytes_since_open) | sum(information_schema.region_statistics.memtable_size) | sum(information_schema.region_statistics.sst_size) | sum(information_schema.region_statistics.index_size) |
+-------------------------------------------------------+--------------------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| 3 | 78 | 78 | 0 | 0 |
+-------------------------------------------------------+--------------------------------------------------------------------+---------------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';

View File

@@ -17,7 +17,9 @@ INSERT INTO test VALUES
(21, 'c', 21);
-- SQLNESS SLEEP 3s
SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
-- For regions using different WAL implementations, the manifest size may vary.
-- The remote WAL implementation additionally stores a flushed entry ID when creating the manifest.
SELECT SUM(region_rows),SUM(written_bytes_since_open), SUM(memtable_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');

View File

@@ -317,17 +317,18 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | region_peers | table_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_peers | table_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_peers | table_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | region_statistics | disk_size | 5 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | engine | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | region_statistics | index_size | 9 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | manifest_size | 7 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | memtable_size | 6 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | disk_size | 6 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | engine | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | region_statistics | index_size | 10 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | manifest_size | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | memtable_size | 7 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | |
| greptime | information_schema | region_statistics | region_number | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | region_statistics | region_role | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | region_statistics | region_role | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | region_statistics | region_rows | 4 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | sst_size | 8 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | sst_size | 9 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | region_statistics | table_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | region_statistics | written_bytes_since_open | 5 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | |
| greptime | information_schema | routines | character_maximum_length | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
| greptime | information_schema | routines | character_octet_length | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
| greptime | information_schema | routines | character_set_client | 29 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |