diff --git a/src/datanode/src/region_server/catalog.rs b/src/datanode/src/region_server/catalog.rs index 0d750ae881..0aaec7498c 100644 --- a/src/datanode/src/region_server/catalog.rs +++ b/src/datanode/src/region_server/catalog.rs @@ -72,7 +72,7 @@ impl RegionServer { })? }; - let entries = mito.all_ssts_from_manifest().collect::>(); + let entries = mito.all_ssts_from_manifest().await; let schema = ManifestSstEntry::schema().arrow_schema().clone(); let batch = ManifestSstEntry::to_record_batch(&entries) .map_err(DataFusionError::from) diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index e750f8920e..7a4871254e 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -111,6 +111,8 @@ mod tests { let mito = env.mito(); let debug_format = mito .all_ssts_from_manifest() + .await + .into_iter() .map(|mut e| { e.file_path = e.file_path.replace(&e.file_id, ""); e.index_file_path = e @@ -125,12 +127,12 @@ mod tests { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None }"# +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# ); // list from storage diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index b2142f1a5e..7b325ab309 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -414,16 +414,20 @@ impl MitoEngine { } /// Lists all SSTs from the manifest of all regions in the engine. - pub fn all_ssts_from_manifest(&self) -> impl Iterator + use<'_> { + pub async fn all_ssts_from_manifest(&self) -> Vec { let node_id = self.inner.workers.file_ref_manager().node_id(); - self.inner - .workers - .all_regions() - .flat_map(|region| region.manifest_sst_entries()) - .map(move |mut entry| { - entry.node_id = node_id; - entry - }) + let regions = self.inner.workers.all_regions(); + + let mut results = Vec::new(); + for region in regions { + let mut entries = region.manifest_sst_entries().await; + for e in &mut entries { + e.node_id = node_id; + } + results.extend(entries); + } + + results } /// Lists all SSTs from the storage layer of all regions in the engine. diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index c9704e9304..5b82f20807 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -771,6 +771,8 @@ async fn test_list_ssts() { // list from manifest let debug_format = engine .all_ssts_from_manifest() + .await + .into_iter() .map(|mut e| { e.file_path = e.file_path.replace(&e.file_id, ""); e.index_file_path = e @@ -785,9 +787,9 @@ async fn test_list_ssts() { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None }"# +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ); // list from storage diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 137a9e9b5f..b801617826 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -369,6 +369,11 @@ async fn test_staging_exit_success_with_manifests() { "No data should be readable before exit staging mode" ); + // Inspect SSTs from manifest + let sst_entries = engine.all_ssts_from_manifest().await; + assert_eq!(sst_entries.len(), 2); + assert!(sst_entries.iter().all(|e| !e.visible)); + // Exit staging mode successfully engine .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader) @@ -424,4 +429,9 @@ async fn test_staging_exit_success_with_manifests() { let batches = RecordBatches::try_collect(stream).await.unwrap(); let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum(); assert_eq!(total_rows, 10, "Expected to read all staged rows"); + + // Inspect SSTs from manifest + let sst_entries = engine.all_ssts_from_manifest().await; + assert_eq!(sst_entries.len(), 2); + assert!(sst_entries.iter().all(|e| e.visible)); } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index e280ebe493..b611387fd2 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -18,8 +18,8 @@ pub mod opener; pub mod options; pub(crate) mod version; -use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; @@ -545,46 +545,55 @@ impl MitoRegion { } /// Returns the SST entries of the region. - pub fn manifest_sst_entries(&self) -> Vec { + pub async fn manifest_sst_entries(&self) -> Vec { let table_dir = self.table_dir(); let path_type = self.access_layer.path_type(); - self.version() + + let visible_ssts = self + .version() .ssts .levels() .iter() - .flat_map(|level| { - level.files().map(|file| { - let meta = file.meta_ref(); - let region_id = self.region_id; - let origin_region_id = meta.region_id; - let (index_file_path, index_file_size) = if meta.index_file_size > 0 { - let index_file_path = index_file_path(table_dir, meta.file_id(), path_type); - (Some(index_file_path), Some(meta.index_file_size)) - } else { - (None, None) - }; - ManifestSstEntry { - table_dir: table_dir.to_string(), - region_id, - table_id: region_id.table_id(), - region_number: region_id.region_number(), - region_group: region_id.region_group(), - region_sequence: region_id.region_sequence(), - file_id: meta.file_id.to_string(), - level: meta.level, - file_path: sst_file_path(table_dir, meta.file_id(), path_type), - file_size: meta.file_size, - index_file_path, - index_file_size, - num_rows: meta.num_rows, - num_row_groups: meta.num_row_groups, - min_ts: meta.time_range.0, - max_ts: meta.time_range.1, - sequence: meta.sequence.map(|s| s.get()), - origin_region_id, - node_id: None, - } - }) + .flat_map(|level| level.files().map(|file| file.file_id().file_id())) + .collect::>(); + + self.manifest_ctx + .manifest() + .await + .files + .values() + .map(|meta| { + let region_id = self.region_id; + let origin_region_id = meta.region_id; + let (index_file_path, index_file_size) = if meta.index_file_size > 0 { + let index_file_path = index_file_path(table_dir, meta.file_id(), path_type); + (Some(index_file_path), Some(meta.index_file_size)) + } else { + (None, None) + }; + let visible = visible_ssts.contains(&meta.file_id); + ManifestSstEntry { + table_dir: table_dir.to_string(), + region_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + region_group: region_id.region_group(), + region_sequence: region_id.region_sequence(), + file_id: meta.file_id.to_string(), + level: meta.level, + file_path: sst_file_path(table_dir, meta.file_id(), path_type), + file_size: meta.file_size, + index_file_path, + index_file_size, + num_rows: meta.num_rows, + num_row_groups: meta.num_row_groups, + min_ts: meta.time_range.0, + max_ts: meta.time_range.1, + sequence: meta.sequence.map(|s| s.get()), + origin_region_id, + node_id: None, + visible, + } }) .collect() } diff --git a/src/store-api/src/sst_entry.rs b/src/store-api/src/sst_entry.rs index eabfc46e06..a680e438bc 100644 --- a/src/store-api/src/sst_entry.rs +++ b/src/store-api/src/sst_entry.rs @@ -20,8 +20,8 @@ use common_time::timestamp::TimeUnit; use datafusion_common::DataFusionError; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, LogicalTableSource}; use datatypes::arrow::array::{ - ArrayRef, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, UInt32Array, - UInt64Array, + ArrayRef, BooleanArray, TimestampMillisecondArray, TimestampNanosecondArray, UInt8Array, + UInt32Array, UInt64Array, }; use datatypes::arrow::error::ArrowError; use datatypes::arrow_array::StringArray; @@ -71,6 +71,8 @@ pub struct ManifestSstEntry { pub origin_region_id: RegionId, /// The node id fetched from the manifest. pub node_id: Option, + /// Whether this file is visible in current version. + pub visible: bool, } impl ManifestSstEntry { @@ -97,6 +99,7 @@ impl ManifestSstEntry { ColumnSchema::new("sequence", Ty::uint64_datatype(), true), ColumnSchema::new("origin_region_id", Ty::uint64_datatype(), false), ColumnSchema::new("node_id", Ty::uint64_datatype(), true), + ColumnSchema::new("visible", Ty::boolean_datatype(), false), ])) } @@ -130,6 +133,7 @@ impl ManifestSstEntry { let sequences = entries.iter().map(|e| e.sequence); let origin_region_ids = entries.iter().map(|e| e.origin_region_id.as_u64()); let node_ids = entries.iter().map(|e| e.node_id); + let visible_flags = entries.iter().map(|e| Some(e.visible)); let columns: Vec = vec![ Arc::new(StringArray::from_iter_values(table_dirs)), @@ -151,6 +155,7 @@ impl ManifestSstEntry { Arc::new(UInt64Array::from_iter(sequences)), Arc::new(UInt64Array::from_iter_values(origin_region_ids)), Arc::new(UInt64Array::from_iter(node_ids)), + Arc::new(BooleanArray::from_iter(visible_flags)), ]; DfRecordBatch::try_new(schema.arrow_schema().clone(), columns) @@ -316,6 +321,7 @@ mod tests { sequence: None, origin_region_id: region_id1, node_id: Some(1), + visible: false, }, ManifestSstEntry { table_dir: "tdir2".to_string(), @@ -337,6 +343,7 @@ mod tests { sequence: Some(9), origin_region_id: region_id2, node_id: None, + visible: true, }, ]; @@ -504,6 +511,14 @@ mod tests { .unwrap(); assert_eq!(1, node_ids.value(0)); assert!(node_ids.is_null(1)); + + let visible = batch + .column(19) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!visible.value(0)); + assert!(visible.value(1)); } #[test]