From 30894d7599e1ad40a83fb4ef3708d86a13ac01d1 Mon Sep 17 00:00:00 2001 From: Sicong Hu Date: Fri, 31 Oct 2025 10:13:17 +0800 Subject: [PATCH] feat(mito): Optimize async index building with priority-based batching (#7034) * feat: add priority-based batching to IndexBuildScheduler Signed-off-by: SNC123 * fix: clean old puffin-related cache Signed-off-by: SNC123 * test: add test for IndexBuildScheduler Signed-off-by: SNC123 * feat: different index file id for read and async write Signed-off-by: SNC123 * feat: different index file id for delete Signed-off-by: SNC123 * chore: clippy Signed-off-by: SNC123 * fix: apply suggestions Signed-off-by: SNC123 * fix: apply comments Signed-off-by: SNC123 * combine files and index files Signed-off-by: SNC123 * feat: add index_file_id into ManifestSstEntry Signed-off-by: SNC123 * Update src/mito2/src/gc.rs Signed-off-by: SNC123 * resolve conflicts Signed-off-by: SNC123 * fix: sqlness Signed-off-by: SNC123 * chore: fmt Signed-off-by: SNC123 --------- Signed-off-by: SNC123 --- src/cmd/src/datanode/objbench.rs | 1 + src/metric-engine/src/engine/flush.rs | 13 +- src/mito2/src/access_layer.rs | 8 +- src/mito2/src/compaction/compactor.rs | 1 + src/mito2/src/compaction/test_util.rs | 1 + src/mito2/src/engine.rs | 10 +- src/mito2/src/engine/basic_test.rs | 13 +- src/mito2/src/flush.rs | 1 + src/mito2/src/gc.rs | 19 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 + src/mito2/src/region.rs | 15 +- src/mito2/src/remap_manifest.rs | 1 + src/mito2/src/request.rs | 14 +- src/mito2/src/sst/file.rs | 52 +- src/mito2/src/sst/file_purger.rs | 4 +- src/mito2/src/sst/file_ref.rs | 1 + src/mito2/src/sst/index.rs | 496 ++++++++++++++++-- src/mito2/src/sst/parquet.rs | 1 + src/mito2/src/test_util/scheduler_util.rs | 4 +- src/mito2/src/test_util/sst_util.rs | 1 + src/mito2/src/test_util/version_util.rs | 2 + src/mito2/src/worker.rs | 8 +- src/mito2/src/worker/handle_close.rs | 2 + src/mito2/src/worker/handle_drop.rs | 4 + src/mito2/src/worker/handle_rebuild_index.rs | 39 +- src/mito2/src/worker/handle_truncate.rs | 4 + src/store-api/src/sst_entry.rs | 43 +- .../common/information_schema/ssts.result | 33 +- .../common/system/information_schema.result | 29 +- 29 files changed, 702 insertions(+), 120 deletions(-) diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index 564e8c744b..dffb971072 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -162,6 +162,7 @@ impl ObjbenchCommand { file_size, available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows, num_row_groups, sequence: None, diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index c82862583d..9c06f94043 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -119,6 +119,7 @@ mod tests { .index_file_path .map(|path| path.replace(&e.file_id, "")); e.file_id = "".to_string(); + e.index_file_id = e.index_file_id.map(|_| "".to_string()); format!("\n{:?}", e) }) .sorted() @@ -127,12 +128,12 @@ mod tests { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_id: Some(""), level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_file_id: Some(""), level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_file_id: Some(""), level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# ); // list from storage let storage_entries = mito diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 2eb8a2ea0e..b6891d7410 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -213,7 +213,11 @@ impl AccessLayer { } /// Deletes a SST file (and its index file if it has one) with given file id. - pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> { + pub(crate) async fn delete_sst( + &self, + region_file_id: &RegionFileId, + index_file_id: &RegionFileId, + ) -> Result<()> { let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type); self.object_store .delete(&path) @@ -222,7 +226,7 @@ impl AccessLayer { file_id: region_file_id.file_id(), })?; - let path = location::index_file_path(&self.table_dir, *region_file_id, self.path_type); + let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type); self.object_store .delete(&path) .await diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 52b8fe068a..8a1a44d4c2 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -432,6 +432,7 @@ impl Compactor for DefaultCompactor { file_size: sst_info.file_size, available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, + index_file_id: None, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: max_sequence, diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index 3dc212ff4d..781b905349 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -76,6 +76,7 @@ pub fn new_file_handle_with_size_and_sequence( file_size, available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, num_series: 0, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 73cb930f77..42033e02e1 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -500,19 +500,21 @@ impl MitoEngine { return Vec::new(); }; - let file_id = match FileId::parse_str(&entry.file_id) { + let Some(index_file_id) = entry.index_file_id.as_ref() else { + return Vec::new(); + }; + let file_id = match FileId::parse_str(index_file_id) { Ok(file_id) => file_id, Err(err) => { warn!( err; "Failed to parse puffin index file id, table_dir: {}, file_id: {}", entry.table_dir, - entry.file_id + index_file_id ); return Vec::new(); } }; - let region_file_id = RegionFileId::new(entry.region_id, file_id); let context = IndexEntryContext { table_dir: &entry.table_dir, @@ -522,7 +524,7 @@ impl MitoEngine { region_number: entry.region_number, region_group: entry.region_group, region_sequence: entry.region_sequence, - file_id: &entry.file_id, + file_id: index_file_id, index_file_size: entry.index_file_size, node_id, }; diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index ca62f384c7..404b6f26c2 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -859,9 +859,9 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) { #[tokio::test] async fn test_list_ssts() { test_list_ssts_with_format(false, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } @@ -869,9 +869,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_s StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await; test_list_ssts_with_format(true, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_file_id: Some(""), level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } @@ -943,6 +943,7 @@ async fn test_list_ssts_with_format( .index_file_path .map(|p| p.replace(&e.file_id, "")); e.file_id = "".to_string(); + e.index_file_id = e.index_file_id.map(|_| "".to_string()); format!("\n{:?}", e) }) .sorted() diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 59f4d1cda5..2247b79366 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -638,6 +638,7 @@ impl RegionFlushTask { file_size: sst_info.file_size, available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, + index_file_id: None, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: NonZeroU64::new(max_sequence), diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index f7cd266eb4..822fd6820d 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -365,7 +365,22 @@ impl LocalGcWorker { unused_len, region_id ); - self.delete_files(region_id, &unused_files).await?; + let file_pairs: Vec<(FileId, FileId)> = unused_files + .iter() + .filter_map(|file_id| { + current_files + .get(file_id) + .map(|meta| (meta.file_id().file_id(), meta.index_file_id().file_id())) + }) + .collect(); + + info!( + "Found {} unused index files to delete for region {}", + file_pairs.len(), + region_id + ); + + self.delete_files(region_id, &file_pairs).await?; debug!( "Successfully deleted {} unused files for region {}", @@ -375,7 +390,7 @@ impl LocalGcWorker { Ok(unused_files) } - async fn delete_files(&self, region_id: RegionId, file_ids: &[FileId]) -> Result<()> { + async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> { delete_files( region_id, file_ids, diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index a99a7878ad..71391457bb 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -265,6 +265,7 @@ async fn checkpoint_with_different_compression_types() { file_size: 1024000, available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, sequence: None, @@ -331,6 +332,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) file_size: 1024000, available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, sequence: None, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index f4a9deb9c6..fd691888af 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -590,11 +590,17 @@ impl MitoRegion { .map(|meta| { let region_id = self.region_id; let origin_region_id = meta.region_id; - let (index_file_path, index_file_size) = if meta.index_file_size > 0 { - let index_file_path = index_file_path(table_dir, meta.file_id(), path_type); - (Some(index_file_path), Some(meta.index_file_size)) + let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0 + { + let index_file_path = + index_file_path(table_dir, meta.index_file_id(), path_type); + ( + Some(meta.index_file_id().file_id().to_string()), + Some(index_file_path), + Some(meta.index_file_size), + ) } else { - (None, None) + (None, None, None) }; let visible = visible_ssts.contains(&meta.file_id); ManifestSstEntry { @@ -605,6 +611,7 @@ impl MitoRegion { region_group: region_id.region_group(), region_sequence: region_id.region_sequence(), file_id: meta.file_id.to_string(), + index_file_id, level: meta.level, file_path: sst_file_path(table_dir, meta.file_id(), path_type), file_size: meta.file_size, diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index a10159401b..cafb62f191 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -427,6 +427,7 @@ mod tests { file_size: 1024, available_indexes: SmallVec::new(), index_file_size: 0, + index_file_id: None, num_rows: 100, num_row_groups: 1, sequence: NonZeroU64::new(1), diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index ce013b15d3..794576a23c 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -39,7 +39,7 @@ use store_api::region_request::{ RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; -use store_api::storage::RegionId; +use store_api::storage::{FileId, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::error::{ @@ -780,8 +780,9 @@ pub(crate) enum BackgroundNotify { FlushFailed(FlushFailed), /// Index build has finished. IndexBuildFinished(IndexBuildFinished), + /// Index build has been stopped (aborted or succeeded). + IndexBuildStopped(IndexBuildStopped), /// Index build has failed. - #[allow(dead_code)] IndexBuildFailed(IndexBuildFailed), /// Compaction has finished. CompactionFinished(CompactionFinished), @@ -846,10 +847,17 @@ pub(crate) struct IndexBuildFinished { pub(crate) edit: RegionEdit, } +/// Notifies an index build job has been stopped. +#[derive(Debug)] +pub(crate) struct IndexBuildStopped { + #[allow(dead_code)] + pub(crate) region_id: RegionId, + pub(crate) file_id: FileId, +} + /// Notifies an index build job has failed. #[derive(Debug)] pub(crate) struct IndexBuildFailed { - #[allow(dead_code)] pub(crate) err: Arc, } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index dc5727f9cc..70c4f5a016 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -146,6 +146,12 @@ pub struct FileMeta { pub available_indexes: SmallVec<[IndexType; 4]>, /// Size of the index file. pub index_file_size: u64, + /// File ID of the index file. + /// + /// When this field is None, it means the index file id is the same as the file id. + /// Only meaningful when index_file_size > 0. + /// Used for rebuilding index files. + pub index_file_id: Option, /// Number of rows in the file. /// /// For historical reasons, this field might be missing in old files. Thus @@ -259,6 +265,16 @@ impl FileMeta { pub fn file_id(&self) -> RegionFileId { RegionFileId::new(self.region_id, self.file_id) } + + /// Returns the cross-region index file id. + /// If the index file id is not set, returns the file id. + pub fn index_file_id(&self) -> RegionFileId { + if let Some(index_file_id) = self.index_file_id { + RegionFileId::new(self.region_id, index_file_id) + } else { + self.file_id() + } + } } /// Handle to a SST file. @@ -294,6 +310,16 @@ impl FileHandle { RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id) } + /// Returns the cross-region index file id. + /// If the index file id is not set, returns the file id. + pub fn index_file_id(&self) -> RegionFileId { + if let Some(index_file_id) = self.inner.meta.index_file_id { + RegionFileId::new(self.inner.meta.region_id, index_file_id) + } else { + self.file_id() + } + } + /// Returns the complete file path of the file. pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String { location::sst_file_path(table_dir, self.file_id(), path_type) @@ -379,22 +405,28 @@ impl FileHandleInner { /// Delete pub async fn delete_files( region_id: RegionId, - file_ids: &[FileId], + file_ids: &[(FileId, FileId)], delete_index: bool, access_layer: &AccessLayerRef, cache_manager: &Option, ) -> crate::error::Result<()> { // Remove meta of the file from cache. if let Some(cache) = &cache_manager { - for file_id in file_ids { + for (file_id, _) in file_ids { cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id)); } } let mut deleted_files = Vec::with_capacity(file_ids.len()); - for file_id in file_ids { + for (file_id, index_file_id) in file_ids { let region_file_id = RegionFileId::new(region_id, *file_id); - match access_layer.delete_sst(®ion_file_id).await { + match access_layer + .delete_sst( + &RegionFileId::new(region_id, *file_id), + &RegionFileId::new(region_id, *index_file_id), + ) + .await + { Ok(_) => { deleted_files.push(*file_id); } @@ -411,14 +443,12 @@ pub async fn delete_files( deleted_files ); - for file_id in file_ids { - let region_file_id = RegionFileId::new(region_id, *file_id); - + for (file_id, index_file_id) in file_ids { if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { // Removes index file from the cache. if delete_index { write_cache - .remove(IndexKey::new(region_id, *file_id, FileType::Puffin)) + .remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin)) .await; } @@ -431,11 +461,11 @@ pub async fn delete_files( // Purges index content in the stager. if let Err(e) = access_layer .puffin_manager_factory() - .purge_stager(region_file_id) + .purge_stager(RegionFileId::new(region_id, *index_file_id)) .await { error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", - file_id, region_id); + index_file_id, region_id); } } Ok(()) @@ -459,6 +489,7 @@ mod tests { file_size: 0, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, sequence: None, @@ -505,6 +536,7 @@ mod tests { file_size: 0, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, sequence: None, diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 11f38ac1ad..64e83c1a54 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -128,7 +128,7 @@ impl LocalFilePurger { if let Err(e) = self.scheduler.schedule(Box::pin(async move { if let Err(e) = delete_files( file_meta.region_id, - &[file_meta.file_id], + &[(file_meta.file_id, file_meta.index_file_id().file_id())], file_meta.exists_index(), &sst_layer, &cache_manager, @@ -233,6 +233,7 @@ mod tests { file_size: 4096, available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, sequence: None, @@ -300,6 +301,7 @@ mod tests { file_size: 4096, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), index_file_size: 4096, + index_file_id: None, num_rows: 1024, num_row_groups: 1, sequence: NonZeroU64::new(4096), diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index 28f3e95f89..8f750ebf2a 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -235,6 +235,7 @@ mod tests { file_size: 4096, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), index_file_size: 4096, + index_file_id: None, num_rows: 1024, num_row_groups: 1, sequence: NonZeroU64::new(4096), diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index e67a4b6e98..83bceab351 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -21,11 +21,13 @@ pub mod puffin_manager; mod statistics; pub(crate) mod store; +use std::cmp::Ordering; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::num::NonZeroUsize; use std::sync::Arc; use bloom_filter::creator::BloomFilterIndexer; -use common_telemetry::{debug, info, warn}; +use common_telemetry::{debug, error, info, warn}; use datatypes::arrow::array::BinaryArray; use datatypes::arrow::record_batch::RecordBatch; use mito_codec::index::IndexValuesCodec; @@ -43,7 +45,10 @@ use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, Regio use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::write_cache::{UploadTracker, WriteCacheRef}; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; -use crate::error::{BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, Result}; +use crate::error::{ + BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu, + RegionDroppedSnafu, RegionTruncatedSnafu, Result, +}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::{Batch, BatchReader}; @@ -51,7 +56,8 @@ use crate::region::options::IndexOptions; use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState}; use crate::request::{ - BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime, + BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest, + WorkerRequestWithTime, }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId}; @@ -410,7 +416,7 @@ impl IndexerBuilderImpl { } /// Type of an index build task. -#[derive(Debug, Clone, PartialEq, IntoStaticStr)] +#[derive(Debug, Clone, IntoStaticStr)] pub enum IndexBuildType { /// Build index when schema change. SchemaChange, @@ -426,6 +432,16 @@ impl IndexBuildType { fn as_str(&self) -> &'static str { self.into() } + + // Higher value means higher priority. + fn priority(&self) -> u8 { + match self { + IndexBuildType::Manual => 3, + IndexBuildType::SchemaChange => 2, + IndexBuildType::Flush => 1, + IndexBuildType::Compact => 0, + } + } } impl From for IndexBuildType { @@ -447,6 +463,7 @@ pub enum IndexBuildOutcome { /// Mpsc output result sender. pub type ResultMpscSender = Sender>; +#[derive(Clone)] pub struct IndexBuildTask { /// The file meta to build index for. pub file_meta: FileMeta, @@ -465,14 +482,24 @@ pub struct IndexBuildTask { pub(crate) result_sender: ResultMpscSender, } +impl std::fmt::Debug for IndexBuildTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IndexBuildTask") + .field("region_id", &self.file_meta.region_id) + .field("file_id", &self.file_meta.file_id) + .field("reason", &self.reason) + .finish() + } +} + impl IndexBuildTask { /// Notify the caller the job is success. - pub async fn on_success(&mut self, outcome: IndexBuildOutcome) { + pub async fn on_success(&self, outcome: IndexBuildOutcome) { let _ = self.result_sender.send(Ok(outcome)).await; } /// Send index build error to waiter. - pub async fn on_failure(&mut self, err: Arc) { + pub async fn on_failure(&self, err: Arc) { let _ = self .result_sender .send(Err(err.clone()).context(BuildIndexAsyncSnafu { @@ -503,7 +530,18 @@ impl IndexBuildTask { ); self.on_failure(e.into()).await } + } + let worker_request = WorkerRequest::Background { + region_id: self.file_meta.region_id, + notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped { + region_id: self.file_meta.region_id, + file_id: self.file_meta.file_id, + }), }; + let _ = self + .request_sender + .send(WorkerRequestWithTime::new(worker_request)) + .await; } // Checks if the SST file still exists in object store and version to avoid conflict with compaction. @@ -542,7 +580,13 @@ impl IndexBuildTask { &mut self, version_control: VersionControlRef, ) -> Result { - let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await; + let index_file_id = if self.file_meta.index_file_size > 0 { + // Generate new file ID if index file exists to avoid overwrite. + FileId::random() + } else { + self.file_meta.file_id + }; + let mut indexer = self.indexer_builder.build(index_file_id).await; // Check SST file existence before building index to avoid failure of parquet reader. if !self.check_sst_file_exists(&version_control).await { @@ -602,9 +646,10 @@ impl IndexBuildTask { } // Upload index file if write cache is enabled. - self.maybe_upload_index_file(index_output.clone()).await?; + self.maybe_upload_index_file(index_output.clone(), index_file_id) + .await?; - let worker_request = match self.update_manifest(index_output).await { + let worker_request = match self.update_manifest(index_output, index_file_id).await { Ok(edit) => { let index_build_finished = IndexBuildFinished { region_id: self.file_meta.region_id, @@ -632,14 +677,18 @@ impl IndexBuildTask { Ok(IndexBuildOutcome::Finished) } - async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> { + async fn maybe_upload_index_file( + &self, + output: IndexOutput, + index_file_id: FileId, + ) -> Result<()> { if let Some(write_cache) = &self.write_cache { let file_id = self.file_meta.file_id; let region_id = self.file_meta.region_id; let remote_store = self.access_layer.object_store(); let mut upload_tracker = UploadTracker::new(region_id); let mut err = None; - let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); + let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin); let puffin_path = RegionFilePathFactory::new( self.access_layer.table_dir().to_string(), self.access_layer.path_type(), @@ -673,9 +722,14 @@ impl IndexBuildTask { Ok(()) } - async fn update_manifest(&mut self, output: IndexOutput) -> Result { + async fn update_manifest( + &mut self, + output: IndexOutput, + index_file_id: FileId, + ) -> Result { self.file_meta.available_indexes = output.build_available_indexes(); self.file_meta.index_file_size = output.file_size; + self.file_meta.index_file_id = Some(index_file_id); let edit = RegionEdit { files_to_add: vec![self.file_meta.clone()], files_to_remove: vec![], @@ -701,26 +755,205 @@ impl IndexBuildTask { } } -#[derive(Clone)] -pub struct IndexBuildScheduler { - scheduler: SchedulerRef, +impl PartialEq for IndexBuildTask { + fn eq(&self, other: &Self) -> bool { + self.reason.priority() == other.reason.priority() + } } -impl IndexBuildScheduler { - pub fn new(scheduler: SchedulerRef) -> Self { - IndexBuildScheduler { scheduler } +impl Eq for IndexBuildTask {} + +impl PartialOrd for IndexBuildTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for IndexBuildTask { + fn cmp(&self, other: &Self) -> Ordering { + self.reason.priority().cmp(&other.reason.priority()) + } +} + +/// Tracks the index build status of a region scheduled by the [IndexBuildScheduler]. +pub struct IndexBuildStatus { + pub region_id: RegionId, + pub building_files: HashSet, + pub pending_tasks: BinaryHeap, +} + +impl IndexBuildStatus { + pub fn new(region_id: RegionId) -> Self { + IndexBuildStatus { + region_id, + building_files: HashSet::new(), + pending_tasks: BinaryHeap::new(), + } } - pub(crate) fn schedule_build( + async fn on_failure(self, err: Arc) { + for task in self.pending_tasks { + task.on_failure(err.clone()).await; + } + } +} + +pub struct IndexBuildScheduler { + /// Background job scheduler. + scheduler: SchedulerRef, + /// Tracks regions need to build index. + region_status: HashMap, + /// Limit of files allowed to build index concurrently for a region. + files_limit: usize, +} + +/// Manager background index build tasks of a worker. +impl IndexBuildScheduler { + pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self { + IndexBuildScheduler { + scheduler, + region_status: HashMap::new(), + files_limit, + } + } + + pub(crate) async fn schedule_build( &mut self, version_control: &VersionControlRef, task: IndexBuildTask, ) -> Result<()> { - // We should clone version control to expand the lifetime. - let job = task.into_index_build_job(version_control.clone()); - self.scheduler.schedule(job)?; + let status = self + .region_status + .entry(task.file_meta.region_id) + .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id)); + + if status.building_files.contains(&task.file_meta.file_id) { + let region_file_id = + RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id); + debug!( + "Aborting index build task since index is already being built for region file {:?}", + region_file_id + ); + task.on_success(IndexBuildOutcome::Aborted(format!( + "Index is already being built for region file {:?}", + region_file_id + ))) + .await; + task.listener.on_index_build_abort(region_file_id).await; + return Ok(()); + } + + status.pending_tasks.push(task); + + self.schedule_next_build_batch(version_control); Ok(()) } + + /// Schedule tasks until reaching the files limit or no more tasks. + fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) { + let mut building_count = 0; + for status in self.region_status.values() { + building_count += status.building_files.len(); + } + + while building_count < self.files_limit { + if let Some(task) = self.find_next_task() { + let region_id = task.file_meta.region_id; + let file_id = task.file_meta.file_id; + let job = task.into_index_build_job(version_control.clone()); + if self.scheduler.schedule(job).is_ok() { + if let Some(status) = self.region_status.get_mut(®ion_id) { + status.building_files.insert(file_id); + building_count += 1; + status + .pending_tasks + .retain(|t| t.file_meta.file_id != file_id); + } else { + error!( + "Region status not found when scheduling index build task, region: {}", + region_id + ); + } + } else { + error!( + "Failed to schedule index build job, region: {}, file_id: {}", + region_id, file_id + ); + } + } else { + // No more tasks to schedule. + break; + } + } + } + + /// Find the next task which has the highest priority to run. + fn find_next_task(&self) -> Option { + self.region_status + .iter() + .filter_map(|(_, status)| status.pending_tasks.peek()) + .max() + .cloned() + } + + pub(crate) fn on_task_stopped( + &mut self, + region_id: RegionId, + file_id: FileId, + version_control: &VersionControlRef, + ) { + if let Some(status) = self.region_status.get_mut(®ion_id) { + status.building_files.remove(&file_id); + if status.building_files.is_empty() && status.pending_tasks.is_empty() { + // No more tasks for this region, remove it. + self.region_status.remove(®ion_id); + } + } + + self.schedule_next_build_batch(version_control); + } + + pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc) { + error!( + err; "Index build scheduler encountered failure for region {}, removing all pending tasks.", + region_id + ); + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + status.on_failure(err).await; + } + + /// Notifies the scheduler that the region is dropped. + pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) { + self.remove_region_on_failure( + region_id, + Arc::new(RegionDroppedSnafu { region_id }.build()), + ) + .await; + } + + /// Notifies the scheduler that the region is closed. + pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) { + self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build())) + .await; + } + + /// Notifies the scheduler that the region is truncated. + pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) { + self.remove_region_on_failure( + region_id, + Arc::new(RegionTruncatedSnafu { region_id }.build()), + ) + .await; + } + + async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { + let Some(status) = self.region_status.remove(®ion_id) else { + return; + }; + status.on_failure(err).await; + } } /// Decodes primary keys from a flat format RecordBatch. @@ -1192,7 +1425,7 @@ mod tests { let env = SchedulerEnv::new().await; let (tx, _rx) = mpsc::channel(4); let (result_tx, mut result_rx) = mpsc::channel::>(4); - let mut scheduler = env.mock_index_build_scheduler(); + let mut scheduler = env.mock_index_build_scheduler(4); let metadata = Arc::new(sst_region_metadata()); let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; let file_purger = Arc::new(NoopFilePurger {}); @@ -1222,7 +1455,10 @@ mod tests { }; // Schedule the build task and check result. - scheduler.schedule_build(&version_control, task).unwrap(); + scheduler + .schedule_build(&version_control, task) + .await + .unwrap(); match result_rx.recv().await.unwrap() { Ok(outcome) => { if outcome == IndexBuildOutcome::Finished { @@ -1236,7 +1472,7 @@ mod tests { #[tokio::test] async fn test_index_build_task_sst_exist() { let env = SchedulerEnv::new().await; - let mut scheduler = env.mock_index_build_scheduler(); + let mut scheduler = env.mock_index_build_scheduler(4); let metadata = Arc::new(sst_region_metadata()); let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; let region_id = metadata.region_id; @@ -1272,7 +1508,10 @@ mod tests { result_sender: result_tx, }; - scheduler.schedule_build(&version_control, task).unwrap(); + scheduler + .schedule_build(&version_control, task) + .await + .unwrap(); // The task should finish successfully. match result_rx.recv().await.unwrap() { @@ -1304,7 +1543,7 @@ mod tests { async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) { let env = SchedulerEnv::new().await; - let mut scheduler = env.mock_index_build_scheduler(); + let mut scheduler = env.mock_index_build_scheduler(4); let metadata = Arc::new(sst_region_metadata()); let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; let file_purger = Arc::new(NoopFilePurger {}); @@ -1340,7 +1579,10 @@ mod tests { result_sender: result_tx, }; - scheduler.schedule_build(&version_control, task).unwrap(); + scheduler + .schedule_build(&version_control, task) + .await + .unwrap(); let puffin_path = location::index_file_path( env.access_layer.table_dir(), @@ -1395,7 +1637,7 @@ mod tests { #[tokio::test] async fn test_index_build_task_no_index() { let env = SchedulerEnv::new().await; - let mut scheduler = env.mock_index_build_scheduler(); + let mut scheduler = env.mock_index_build_scheduler(4); let mut metadata = sst_region_metadata(); // Unset indexes in metadata to simulate no index scenario. metadata.column_metadatas.iter_mut().for_each(|col| { @@ -1437,7 +1679,10 @@ mod tests { result_sender: result_tx, }; - scheduler.schedule_build(&version_control, task).unwrap(); + scheduler + .schedule_build(&version_control, task) + .await + .unwrap(); // The task should finish successfully. match result_rx.recv().await.unwrap() { @@ -1454,7 +1699,7 @@ mod tests { #[tokio::test] async fn test_index_build_task_with_write_cache() { let env = SchedulerEnv::new().await; - let mut scheduler = env.mock_index_build_scheduler(); + let mut scheduler = env.mock_index_build_scheduler(4); let metadata = Arc::new(sst_region_metadata()); let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; let file_purger = Arc::new(NoopFilePurger {}); @@ -1518,7 +1763,10 @@ mod tests { result_sender: result_tx, }; - scheduler.schedule_build(&version_control, task).unwrap(); + scheduler + .schedule_build(&version_control, task) + .await + .unwrap(); // The task should finish successfully. match result_rx.recv().await.unwrap() { @@ -1532,4 +1780,188 @@ mod tests { let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin); assert!(write_cache.file_cache().contains_key(&index_key)); } + + async fn create_mock_task_for_schedule( + env: &SchedulerEnv, + file_id: FileId, + region_id: RegionId, + reason: IndexBuildType, + ) -> IndexBuildTask { + let metadata = Arc::new(sst_region_metadata()); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + let file_purger = Arc::new(NoopFilePurger {}); + let indexer_builder = mock_indexer_builder(metadata, env).await; + let (tx, _rx) = mpsc::channel(4); + let (result_tx, _result_rx) = mpsc::channel::>(4); + + IndexBuildTask { + file_meta: FileMeta { + region_id, + file_id, + file_size: 100, + ..Default::default() + }, + reason, + access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), + manifest_ctx, + write_cache: None, + file_purger, + indexer_builder, + request_sender: tx, + result_sender: result_tx, + } + } + + #[tokio::test] + async fn test_scheduler_comprehensive() { + let env = SchedulerEnv::new().await; + let mut scheduler = env.mock_index_build_scheduler(2); + let metadata = Arc::new(sst_region_metadata()); + let region_id = metadata.region_id; + let file_purger = Arc::new(NoopFilePurger {}); + + // Prepare multiple files for testing + let file_id1 = FileId::random(); + let file_id2 = FileId::random(); + let file_id3 = FileId::random(); + let file_id4 = FileId::random(); + let file_id5 = FileId::random(); + + let mut files = HashMap::new(); + for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] { + files.insert( + file_id, + FileMeta { + region_id, + file_id, + file_size: 100, + ..Default::default() + }, + ); + } + + let version_control = mock_version_control(metadata, file_purger, files).await; + + // Test 1: Basic scheduling + let task1 = + create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await; + assert!( + scheduler + .schedule_build(&version_control, task1) + .await + .is_ok() + ); + assert!(scheduler.region_status.contains_key(®ion_id)); + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert_eq!(status.building_files.len(), 1); + assert!(status.building_files.contains(&file_id1)); + + // Test 2: Duplicate file scheduling (should be skipped) + let task1_dup = + create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await; + scheduler + .schedule_build(&version_control, task1_dup) + .await + .unwrap(); + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert_eq!(status.building_files.len(), 1); // Still only one + + // Test 3: Fill up to limit (2 building tasks) + let task2 = + create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await; + scheduler + .schedule_build(&version_control, task2) + .await + .unwrap(); + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert_eq!(status.building_files.len(), 2); // Reached limit + assert_eq!(status.pending_tasks.len(), 0); + + // Test 4: Add tasks with different priorities to pending queue + // Now all new tasks will be pending since we reached the limit + let task3 = + create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await; + let task4 = + create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange) + .await; + let task5 = + create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await; + + scheduler + .schedule_build(&version_control, task3) + .await + .unwrap(); + scheduler + .schedule_build(&version_control, task4) + .await + .unwrap(); + scheduler + .schedule_build(&version_control, task5) + .await + .unwrap(); + + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert_eq!(status.building_files.len(), 2); // Still at limit + assert_eq!(status.pending_tasks.len(), 3); // Three pending + + // Test 5: Task completion triggers scheduling next highest priority task (Manual) + scheduler.on_task_stopped(region_id, file_id1, &version_control); + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert!(!status.building_files.contains(&file_id1)); + assert_eq!(status.building_files.len(), 2); // Should schedule next task + assert_eq!(status.pending_tasks.len(), 2); // One less pending + // The highest priority task (Manual) should now be building + assert!(status.building_files.contains(&file_id5)); + + // Test 6: Complete another task, should schedule SchemaChange (second highest priority) + scheduler.on_task_stopped(region_id, file_id2, &version_control); + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert_eq!(status.building_files.len(), 2); + assert_eq!(status.pending_tasks.len(), 1); // One less pending + assert!(status.building_files.contains(&file_id4)); // SchemaChange should be building + + // Test 7: Complete remaining tasks and cleanup + scheduler.on_task_stopped(region_id, file_id5, &version_control); + scheduler.on_task_stopped(region_id, file_id4, &version_control); + + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert_eq!(status.building_files.len(), 1); // Last task (Compact) should be building + assert_eq!(status.pending_tasks.len(), 0); + assert!(status.building_files.contains(&file_id3)); + + scheduler.on_task_stopped(region_id, file_id3, &version_control); + + // Region should be removed when all tasks complete + assert!(!scheduler.region_status.contains_key(®ion_id)); + + // Test 8: Region dropped with pending tasks + let task6 = + create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await; + let task7 = + create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await; + let task8 = + create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await; + + scheduler + .schedule_build(&version_control, task6) + .await + .unwrap(); + scheduler + .schedule_build(&version_control, task7) + .await + .unwrap(); + scheduler + .schedule_build(&version_control, task8) + .await + .unwrap(); + + assert!(scheduler.region_status.contains_key(®ion_id)); + let status = scheduler.region_status.get(®ion_id).unwrap(); + assert_eq!(status.building_files.len(), 2); + assert_eq!(status.pending_tasks.len(), 1); + + scheduler.on_region_dropped(region_id).await; + assert!(!scheduler.region_status.contains_key(®ion_id)); + } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index a421f947a0..95ba0b28b3 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -768,6 +768,7 @@ mod tests { file_size: info.file_size, available_indexes: info.index_metadata.build_available_indexes(), index_file_size: info.index_metadata.file_size, + index_file_id: None, num_row_groups: info.num_row_groups, num_rows: info.num_rows as u64, sequence: None, diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 3fbfbd0ad1..8e5b8b9434 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -111,10 +111,10 @@ impl SchedulerEnv { } /// Creates a new index build scheduler. - pub(crate) fn mock_index_build_scheduler(&self) -> IndexBuildScheduler { + pub(crate) fn mock_index_build_scheduler(&self, files_limit: usize) -> IndexBuildScheduler { let scheduler = self.get_scheduler(); - IndexBuildScheduler::new(scheduler) + IndexBuildScheduler::new(scheduler, files_limit) } /// Creates a new manifest context. diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index fc29ca0826..dc75a1e08c 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -125,6 +125,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) file_size: 0, available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, num_series: 0, diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 30da6677e3..53b28478c9 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -103,6 +103,7 @@ impl VersionControlBuilder { file_size: 0, // We don't care file size. available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, num_series: 0, @@ -192,6 +193,7 @@ pub(crate) fn apply_edit( file_size: 0, // We don't care file size. available_indexes: Default::default(), index_file_size: 0, + index_file_id: None, num_rows: 0, num_row_groups: 0, num_series: 0, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index adb88f3467..4f0b2b9fce 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -509,7 +509,10 @@ impl WorkerStarter { ), purge_scheduler: self.purge_scheduler.clone(), write_buffer_manager: self.write_buffer_manager, - index_build_scheduler: IndexBuildScheduler::new(self.index_build_job_pool), + index_build_scheduler: IndexBuildScheduler::new( + self.index_build_job_pool, + self.config.max_background_index_builds, + ), flush_scheduler: FlushScheduler::new(self.flush_job_pool), compaction_scheduler: CompactionScheduler::new( self.compact_job_pool, @@ -1059,6 +1062,9 @@ impl RegionWorkerLoop { BackgroundNotify::IndexBuildFinished(req) => { self.handle_index_build_finished(region_id, req).await } + BackgroundNotify::IndexBuildStopped(req) => { + self.handle_index_build_stopped(region_id, req).await + } BackgroundNotify::IndexBuildFailed(req) => { self.handle_index_build_failed(region_id, req).await } diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 8e33fcb1eb..1568ae0799 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -38,6 +38,8 @@ impl RegionWorkerLoop { self.flush_scheduler.on_region_closed(region_id); // Clean compaction status. self.compaction_scheduler.on_region_closed(region_id); + // clean index build status. + self.index_build_scheduler.on_region_closed(region_id).await; info!("Region {} closed, worker: {}", region_id, self.id); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index fd90ef7f3e..2786126076 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -83,6 +83,10 @@ where self.flush_scheduler.on_region_dropped(region_id); // Notifies compaction scheduler. self.compaction_scheduler.on_region_dropped(region_id); + // notifies index build scheduler. + self.index_build_scheduler + .on_region_dropped(region_id) + .await; // Marks region version as dropped region diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index 38ca07f1a9..6e7e96d7f2 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -22,9 +22,12 @@ use store_api::region_request::RegionBuildIndexRequest; use store_api::storage::{FileId, RegionId}; use tokio::sync::mpsc; +use crate::cache::CacheStrategy; use crate::error::Result; use crate::region::MitoRegionRef; -use crate::request::{BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, OptionOutputTx}; +use crate::request::{ + BuildIndexRequest, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, OptionOutputTx, +}; use crate::sst::file::{FileHandle, RegionFileId}; use crate::sst::index::{ IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl, ResultMpscSender, @@ -172,7 +175,8 @@ impl RegionWorkerLoop { ); let _ = self .index_build_scheduler - .schedule_build(®ion.version_control, task); + .schedule_build(®ion.version_control, task) + .await; } // Wait for all index build tasks to finish and notify the caller. common_runtime::spawn_global(async move { @@ -203,11 +207,19 @@ impl RegionWorkerLoop { } }; + // Clean old puffin-related cache for all rebuilt files. + let cache_strategy = CacheStrategy::EnableAll(self.cache_manager.clone()); + for file_meta in &request.edit.files_to_add { + let region_file_id = RegionFileId::new(region_id, file_meta.file_id); + cache_strategy.evict_puffin_cache(region_file_id).await; + } + region.version_control.apply_edit( Some(request.edit.clone()), &[], region.file_purger.clone(), ); + for file_meta in &request.edit.files_to_add { self.listener .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id)) @@ -221,6 +233,27 @@ impl RegionWorkerLoop { request: IndexBuildFailed, ) { error!(request.err; "Index build failed for region: {}", region_id); - // TODO(SNC123): Implement error handling logic after IndexBuildScheduler optimization. + self.index_build_scheduler + .on_failure(region_id, request.err.clone()) + .await; + } + + pub(crate) async fn handle_index_build_stopped( + &mut self, + region_id: RegionId, + request: IndexBuildStopped, + ) { + let Some(region) = self.regions.get_region(region_id) else { + warn!( + "Region not found for index build stopped, region_id: {}", + region_id + ); + return; + }; + self.index_build_scheduler.on_task_stopped( + region_id, + request.file_id, + ®ion.version_control, + ); } } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 16a1b5a59a..64f8488da1 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -142,6 +142,10 @@ impl RegionWorkerLoop { self.flush_scheduler.on_region_truncated(region_id); // Notifies compaction scheduler. self.compaction_scheduler.on_region_truncated(region_id); + // Notifies index build scheduler. + self.index_build_scheduler + .on_region_truncated(region_id) + .await; if let TruncateKind::All { truncated_entry_id, diff --git a/src/store-api/src/sst_entry.rs b/src/store-api/src/sst_entry.rs index 52295bdb59..d71e5f0cdc 100644 --- a/src/store-api/src/sst_entry.rs +++ b/src/store-api/src/sst_entry.rs @@ -47,6 +47,8 @@ pub struct ManifestSstEntry { pub region_sequence: RegionSeq, /// Engine-specific file identifier (string form). pub file_id: String, + /// Engine-specific index file identifier (string form). + pub index_file_id: Option, /// SST level. pub level: u8, /// Full path of the SST file in object store. @@ -89,6 +91,7 @@ impl ManifestSstEntry { ColumnSchema::new("region_group", Ty::uint8_datatype(), false), ColumnSchema::new("region_sequence", Ty::uint32_datatype(), false), ColumnSchema::new("file_id", Ty::string_datatype(), false), + ColumnSchema::new("index_file_id", Ty::string_datatype(), true), ColumnSchema::new("level", Ty::uint8_datatype(), false), ColumnSchema::new("file_path", Ty::string_datatype(), false), ColumnSchema::new("file_size", Ty::uint64_datatype(), false), @@ -116,6 +119,7 @@ impl ManifestSstEntry { let region_groups = entries.iter().map(|e| e.region_group); let region_sequences = entries.iter().map(|e| e.region_sequence); let file_ids = entries.iter().map(|e| e.file_id.as_str()); + let index_file_ids = entries.iter().map(|e| e.index_file_id.as_ref()); let levels = entries.iter().map(|e| e.level); let file_paths = entries.iter().map(|e| e.file_path.as_str()); let file_sizes = entries.iter().map(|e| e.file_size); @@ -147,6 +151,7 @@ impl ManifestSstEntry { Arc::new(UInt8Array::from_iter_values(region_groups)), Arc::new(UInt32Array::from_iter_values(region_sequences)), Arc::new(StringArray::from_iter_values(file_ids)), + Arc::new(StringArray::from_iter(index_file_ids)), Arc::new(UInt8Array::from_iter_values(levels)), Arc::new(StringArray::from_iter_values(file_paths)), Arc::new(UInt64Array::from_iter_values(file_sizes)), @@ -432,6 +437,7 @@ mod tests { region_group: region_group1, region_sequence: region_seq1, file_id: "f1".to_string(), + index_file_id: None, level: 1, file_path: "/p1".to_string(), file_size: 100, @@ -455,6 +461,7 @@ mod tests { region_group: region_group2, region_sequence: region_seq2, file_id: "f2".to_string(), + index_file_id: Some("idx".to_string()), level: 3, file_path: "/p2".to_string(), file_size: 200, @@ -541,16 +548,24 @@ mod tests { assert_eq!("f1", file_ids.value(0)); assert_eq!("f2", file_ids.value(1)); - let levels = batch + let index_file_ids = batch .column(7) .as_any() + .downcast_ref::() + .unwrap(); + assert!(index_file_ids.is_null(0)); + assert_eq!("idx", index_file_ids.value(1)); + + let levels = batch + .column(8) + .as_any() .downcast_ref::() .unwrap(); assert_eq!(1, levels.value(0)); assert_eq!(3, levels.value(1)); let file_paths = batch - .column(8) + .column(9) .as_any() .downcast_ref::() .unwrap(); @@ -558,7 +573,7 @@ mod tests { assert_eq!("/p2", file_paths.value(1)); let file_sizes = batch - .column(9) + .column(10) .as_any() .downcast_ref::() .unwrap(); @@ -566,7 +581,7 @@ mod tests { assert_eq!(200, file_sizes.value(1)); let index_file_paths = batch - .column(10) + .column(11) .as_any() .downcast_ref::() .unwrap(); @@ -574,7 +589,7 @@ mod tests { assert_eq!("idx", index_file_paths.value(1)); let index_file_sizes = batch - .column(11) + .column(12) .as_any() .downcast_ref::() .unwrap(); @@ -582,7 +597,7 @@ mod tests { assert_eq!(11, index_file_sizes.value(1)); let num_rows = batch - .column(12) + .column(13) .as_any() .downcast_ref::() .unwrap(); @@ -590,7 +605,7 @@ mod tests { assert_eq!(20, num_rows.value(1)); let num_row_groups = batch - .column(13) + .column(14) .as_any() .downcast_ref::() .unwrap(); @@ -598,7 +613,7 @@ mod tests { assert_eq!(4, num_row_groups.value(1)); let num_series = batch - .column(14) + .column(15) .as_any() .downcast_ref::() .unwrap(); @@ -606,7 +621,7 @@ mod tests { assert!(num_series.is_null(1)); let min_ts = batch - .column(15) + .column(16) .as_any() .downcast_ref::() .unwrap(); @@ -614,7 +629,7 @@ mod tests { assert_eq!(5, min_ts.value(1)); let max_ts = batch - .column(16) + .column(17) .as_any() .downcast_ref::() .unwrap(); @@ -622,7 +637,7 @@ mod tests { assert_eq!(2_000_000, max_ts.value(1)); let sequences = batch - .column(17) + .column(18) .as_any() .downcast_ref::() .unwrap(); @@ -630,7 +645,7 @@ mod tests { assert_eq!(9, sequences.value(1)); let origin_region_ids = batch - .column(18) + .column(19) .as_any() .downcast_ref::() .unwrap(); @@ -638,7 +653,7 @@ mod tests { assert_eq!(region_id2.as_u64(), origin_region_ids.value(1)); let node_ids = batch - .column(19) + .column(20) .as_any() .downcast_ref::() .unwrap(); @@ -646,7 +661,7 @@ mod tests { assert!(node_ids.is_null(1)); let visible = batch - .column(20) + .column(21) .as_any() .downcast_ref::() .unwrap(); diff --git a/tests/cases/standalone/common/information_schema/ssts.result b/tests/cases/standalone/common/information_schema/ssts.result index d546efbdfb..f9ac0dd47b 100644 --- a/tests/cases/standalone/common/information_schema/ssts.result +++ b/tests/cases/standalone/common/information_schema/ssts.result @@ -10,6 +10,7 @@ DESC TABLE information_schema.ssts_manifest; | region_group | UInt8 | | NO | | FIELD | | region_sequence | UInt32 | | NO | | FIELD | | file_id | String | | NO | | FIELD | +| index_file_id | String | | YES | | FIELD | | level | UInt8 | | NO | | FIELD | | file_path | String | | NO | | FIELD | | file_size | UInt64 | | NO | | FIELD | @@ -96,13 +97,13 @@ ADMIN FLUSH_TABLE('sst_case'); -- SQLNESS REPLACE (/public/\d+) /public/ SELECT * FROM information_schema.ssts_manifest order by file_path; -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) @@ -164,15 +165,15 @@ ADMIN FLUSH_TABLE('sst_case'); -- SQLNESS REPLACE (/public/\d+) /public/ SELECT * FROM information_schema.ssts_manifest order by file_path; -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | index_file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| | || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index d211938c2a..4ad8adbb10 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -406,26 +406,27 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | ssts_index_meta | target_key | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_index_meta | target_type | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_manifest | file_id | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | ssts_manifest | file_path | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | -| greptime | information_schema | ssts_manifest | file_size | 10 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | index_file_path | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | -| greptime | information_schema | ssts_manifest | index_file_size | 12 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | level | 8 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | | -| greptime | information_schema | ssts_manifest | max_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | -| greptime | information_schema | ssts_manifest | min_ts | 16 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | -| greptime | information_schema | ssts_manifest | node_id | 20 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | num_row_groups | 14 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | num_rows | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | num_series | 15 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | origin_region_id | 19 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | file_path | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | ssts_manifest | file_size | 11 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | index_file_id | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | ssts_manifest | index_file_path | 12 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | ssts_manifest | index_file_size | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | level | 9 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | | +| greptime | information_schema | ssts_manifest | max_ts | 18 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | +| greptime | information_schema | ssts_manifest | min_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | +| greptime | information_schema | ssts_manifest | node_id | 21 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | num_row_groups | 15 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | num_rows | 14 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | num_series | 16 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | origin_region_id | 20 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | ssts_manifest | region_group | 5 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | | | greptime | information_schema | ssts_manifest | region_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | ssts_manifest | region_number | 4 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | | greptime | information_schema | ssts_manifest | region_sequence | 6 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | information_schema | ssts_manifest | sequence | 18 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | sequence | 19 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | ssts_manifest | table_dir | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_manifest | table_id | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | information_schema | ssts_manifest | visible | 21 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | | +| greptime | information_schema | ssts_manifest | visible | 22 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | | | greptime | information_schema | ssts_storage | file_path | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_storage | file_size | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | ssts_storage | last_modified_ms | 3 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |