From 21ee981b497da24bed05ea59b0536966f3ae5cb6 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 9 Sep 2025 17:45:15 +0800 Subject: [PATCH] feat: add origin_region_id and node_id to sst entry (#6937) Signed-off-by: Zhenchi --- src/metric-engine/src/engine/flush.rs | 30 ++++++++--------- src/mito2/src/access_layer.rs | 1 + src/mito2/src/engine.rs | 12 +++++++ src/mito2/src/engine/basic_test.rs | 18 +++++----- src/mito2/src/region.rs | 5 ++- src/mito2/src/sst/file_ref.rs | 4 +++ src/store-api/src/sst_entry.rs | 47 ++++++++++++++++++++++++++- 7 files changed, 91 insertions(+), 26 deletions(-) diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index 87b054dfde..e750f8920e 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -125,12 +125,12 @@ mod tests { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }"# +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None }"# ); // list from storage @@ -152,15 +152,15 @@ ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, assert_eq!( debug_format, r#" -StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/index/.puffin", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/index/.puffin", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/index/.puffin", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: None, last_modified_ms: None }"# +StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: None, last_modified_ms: None, node_id: None }"# ); } } diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index f6c2953588..1ab47c6386 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -343,6 +343,7 @@ impl AccessLayer { file_path: path.to_string(), file_size, last_modified_ms, + node_id: None, }; yield entry; diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d50c1a28bd..b2142f1a5e 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -415,14 +415,20 @@ impl MitoEngine { /// Lists all SSTs from the manifest of all regions in the engine. pub fn all_ssts_from_manifest(&self) -> impl Iterator + use<'_> { + let node_id = self.inner.workers.file_ref_manager().node_id(); self.inner .workers .all_regions() .flat_map(|region| region.manifest_sst_entries()) + .map(move |mut entry| { + entry.node_id = node_id; + entry + }) } /// Lists all SSTs from the storage layer of all regions in the engine. pub fn all_ssts_from_storage(&self) -> impl Stream> { + let node_id = self.inner.workers.file_ref_manager().node_id(); let regions = self.inner.workers.all_regions(); let mut layers_distinct_table_dirs = HashMap::new(); @@ -437,6 +443,12 @@ impl MitoEngine { stream::iter(layers_distinct_table_dirs) .map(|(_, access_layer)| access_layer.storage_sst_entries()) .flatten() + .map(move |entry| { + entry.map(move |mut entry| { + entry.node_id = node_id; + entry + }) + }) } } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index bf19befa06..c9704e9304 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -785,9 +785,9 @@ async fn test_list_ssts() { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10) } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10) } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10) }"# +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None }"# ); // list from storage @@ -809,11 +809,11 @@ ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: assert_eq!( debug_format, r#" -StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None } -StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None }"# +StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"# ); } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index d3fa407279..69ef6f4b59 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -543,7 +543,8 @@ impl MitoRegion { .flat_map(|level| { level.files().map(|file| { let meta = file.meta_ref(); - let region_id = meta.region_id; + let region_id = self.region_id; + let origin_region_id = meta.region_id; let (index_file_path, index_file_size) = if meta.index_file_size > 0 { let index_file_path = index_file_path(table_dir, meta.file_id(), path_type); (Some(index_file_path), Some(meta.index_file_size)) @@ -568,6 +569,8 @@ impl MitoRegion { min_ts: meta.time_range.0, max_ts: meta.time_range.1, sequence: meta.sequence.map(|s| s.get()), + origin_region_id, + node_id: None, } }) }) diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index 0a78a37cc3..dbb54cdf61 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -223,6 +223,10 @@ impl FileReferenceManager { GC_REF_FILE_CNT.dec(); } } + + pub fn node_id(&self) -> Option { + self.node_id + } } #[cfg(test)] diff --git a/src/store-api/src/sst_entry.rs b/src/store-api/src/sst_entry.rs index c50d7d57f7..eabfc46e06 100644 --- a/src/store-api/src/sst_entry.rs +++ b/src/store-api/src/sst_entry.rs @@ -35,7 +35,7 @@ use crate::storage::{RegionGroup, RegionId, RegionNumber, RegionSeq, ScanRequest pub struct ManifestSstEntry { /// The table directory this file belongs to. pub table_dir: String, - /// The region id this file belongs to. + /// The region id of region that refers to the file. pub region_id: RegionId, /// The table id this file belongs to. pub table_id: TableId, @@ -67,6 +67,10 @@ pub struct ManifestSstEntry { pub max_ts: Timestamp, /// The sequence number associated with this file. pub sequence: Option, + /// The region id of region that creates the file. + pub origin_region_id: RegionId, + /// The node id fetched from the manifest. + pub node_id: Option, } impl ManifestSstEntry { @@ -91,6 +95,8 @@ impl ManifestSstEntry { ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true), ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true), ColumnSchema::new("sequence", Ty::uint64_datatype(), true), + ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false), + ColumnSchema::new("node_id", Ty::uint64_datatype(), true), ])) } @@ -122,6 +128,8 @@ impl ManifestSstEntry { .map(|ts| ts.value()) }); let sequences = entries.iter().map(|e| e.sequence); + let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64()); + let node_ids = entries.iter().map(|e| e.node_id); let columns: Vec = vec![ Arc::new(StringArray::from_iter_values(table_dirs)), @@ -141,6 +149,8 @@ impl ManifestSstEntry { Arc::new(TimestampNanosecondArray::from_iter(min_ts)), Arc::new(TimestampNanosecondArray::from_iter(max_ts)), Arc::new(UInt64Array::from_iter(sequences)), + Arc::new(UInt64Array::from_iter_values(origin_region_ids)), + Arc::new(UInt64Array::from_iter(node_ids)), ]; DfRecordBatch::try_new(schema.arrow_schema().clone(), columns) @@ -174,6 +184,8 @@ pub struct StorageSstEntry { pub file_size: Option, /// Last modified time in milliseconds since epoch, if available from storage. pub last_modified_ms: Option, + /// The node id fetched from the manifest. + pub node_id: Option, } impl StorageSstEntry { @@ -188,6 +200,7 @@ impl StorageSstEntry { Ty::timestamp_millisecond_datatype(), true, ), + ColumnSchema::new("node_id", Ty::uint64_datatype(), true), ])) } @@ -200,11 +213,13 @@ impl StorageSstEntry { e.last_modified_ms .and_then(|ts| ts.convert_to(TimeUnit::Millisecond).map(|ts| ts.value())) }); + let node_ids = entries.iter().map(|e| e.node_id); let columns: Vec = vec![ Arc::new(StringArray::from_iter_values(file_paths)), Arc::new(UInt64Array::from_iter(file_sizes)), Arc::new(TimestampMillisecondArray::from_iter(last_modified_ms)), + Arc::new(UInt64Array::from_iter(node_ids)), ]; DfRecordBatch::try_new(schema.arrow_schema().clone(), columns) @@ -299,6 +314,8 @@ mod tests { min_ts: Timestamp::new_millisecond(1000), // 1s -> 1_000_000_000ns max_ts: Timestamp::new_second(2), // 2s -> 2_000_000_000ns sequence: None, + origin_region_id: region_id1, + node_id: Some(1), }, ManifestSstEntry { table_dir: "tdir2".to_string(), @@ -318,6 +335,8 @@ mod tests { min_ts: Timestamp::new_nanosecond(5), // 5ns max_ts: Timestamp::new_microsecond(2000), // 2ms -> 2_000_000ns sequence: Some(9), + origin_region_id: region_id2, + node_id: None, }, ]; @@ -469,6 +488,22 @@ mod tests { .unwrap(); assert!(sequences.is_null(0)); assert_eq!(9, sequences.value(1)); + + let origin_region_ids = batch + .column(17) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(region_id1.as_u64(), origin_region_ids.value(0)); + assert_eq!(region_id2.as_u64(), origin_region_ids.value(1)); + + let node_ids = batch + .column(18) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(1, node_ids.value(0)); + assert!(node_ids.is_null(1)); } #[test] @@ -478,11 +513,13 @@ mod tests { file_path: "/s1".to_string(), file_size: None, last_modified_ms: None, + node_id: Some(1), }, StorageSstEntry { file_path: "/s2".to_string(), file_size: Some(123), last_modified_ms: Some(Timestamp::new_millisecond(456)), + node_id: None, }, ]; @@ -515,6 +552,14 @@ mod tests { .unwrap(); assert!(last_modified.is_null(0)); assert_eq!(456, last_modified.value(1)); + + let node_ids = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(1, node_ids.value(0)); + assert!(node_ids.is_null(1)); } #[test]