From 4c70b4c31d5abd4ffc47c6acc27d1e7546c4a6cd Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 24 Oct 2025 13:53:48 +0800 Subject: [PATCH] feat: store estimated series num in file meta (#7126) * feat: add num_series to FileMeta Signed-off-by: evenyag * feat: add SeriesEstimator to collect num_series Signed-off-by: evenyag * fix: set num_series in compactor Signed-off-by: evenyag * chore: print num_series in Debug for FileMeta Signed-off-by: evenyag * style: fmt code Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * fix: increase series count when next ts <= last Signed-off-by: evenyag * test: add tests for SeriesEstimator Signed-off-by: evenyag * feat: add num_series to ssts_manifest table Signed-off-by: evenyag * test: update sqlness tests Signed-off-by: evenyag * test: fix metric engine list entry test Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/metric-engine/src/engine/flush.rs | 12 +- src/mito2/src/compaction/compactor.rs | 1 + src/mito2/src/compaction/test_util.rs | 1 + src/mito2/src/engine/basic_test.rs | 12 +- src/mito2/src/flush.rs | 1 + src/mito2/src/manifest/tests/checkpoint.rs | 2 + src/mito2/src/memtable/bulk/part.rs | 10 +- src/mito2/src/region.rs | 1 + src/mito2/src/remap_manifest.rs | 1 + src/mito2/src/sst.rs | 428 ++++++++++++++++++ src/mito2/src/sst/file.rs | 7 + src/mito2/src/sst/file_purger.rs | 2 + src/mito2/src/sst/file_ref.rs | 1 + src/mito2/src/sst/parquet.rs | 3 + src/mito2/src/sst/parquet/writer.rs | 12 +- src/mito2/src/test_util/sst_util.rs | 1 + src/mito2/src/test_util/version_util.rs | 2 + src/store-api/src/sst_entry.rs | 27 +- .../common/information_schema/ssts.result | 33 +- .../common/system/information_schema.result | 13 +- 20 files changed, 527 insertions(+), 43 deletions(-) diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index 23899cbb05..c82862583d 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -127,12 +127,12 @@ mod tests { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# ); // list from storage let storage_entries = mito diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ba267f4a48..2b871947c0 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -433,6 +433,7 @@ impl Compactor for DefaultCompactor { num_row_groups: sst_info.num_row_groups, sequence: max_sequence, partition_expr: partition_expr.clone(), + num_series: sst_info.num_series, }) .collect::>(); let output_file_names = diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index b785d36bcb..3dc212ff4d 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -78,6 +78,7 @@ pub fn new_file_handle_with_size_and_sequence( index_file_size: 0, num_rows: 0, num_row_groups: 0, + num_series: 0, sequence: NonZeroU64::new(sequence), partition_expr: None, }, diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 39f2366659..ca62f384c7 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -859,9 +859,9 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) { #[tokio::test] async fn test_list_ssts() { test_list_ssts_with_format(false, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } @@ -869,9 +869,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_s StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await; test_list_ssts_with_format(true, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index eb5e605ce1..ddad947f8a 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -641,6 +641,7 @@ impl RegionFlushTask { num_row_groups: sst_info.num_row_groups, sequence: NonZeroU64::new(max_sequence), partition_expr, + num_series: sst_info.num_series, } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index e10d3aad46..a99a7878ad 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -269,6 +269,7 @@ async fn checkpoint_with_different_compression_types() { num_row_groups: 0, sequence: None, partition_expr: None, + num_series: 0, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], @@ -334,6 +335,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) num_row_groups: 0, sequence: None, partition_expr: None, + num_series: 0, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 4eb2655755..21ac141cff 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -69,7 +69,7 @@ use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo}; -use crate::sst::to_sst_arrow_schema; +use crate::sst::{SeriesEstimator, to_sst_arrow_schema}; const INIT_DICT_VALUE_CAPACITY: usize = 8; @@ -563,6 +563,7 @@ impl EncodedBulkPart { num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64, file_metadata: Some(self.metadata.parquet_metadata.clone()), index_metadata: IndexOutput::default(), + num_series: self.metadata.num_series, } } @@ -602,6 +603,8 @@ pub struct BulkPartMeta { pub parquet_metadata: Arc, /// Part region schema. pub region_metadata: RegionMetadataRef, + /// Number of series. + pub num_series: u64, } /// Metrics for encoding a part. @@ -669,6 +672,7 @@ impl BulkPartEncoder { let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone()) .context(EncodeMemtableSnafu)?; let mut total_rows = 0; + let mut series_estimator = SeriesEstimator::default(); // Process each batch from the iterator let mut iter_start = Instant::now(); @@ -679,6 +683,7 @@ impl BulkPartEncoder { continue; } + series_estimator.update_flat(&batch); metrics.raw_size += record_batch_estimated_size(&batch); let write_start = Instant::now(); writer.write(&batch).context(EncodeMemtableSnafu)?; @@ -701,6 +706,7 @@ impl BulkPartEncoder { let buf = Bytes::from(buf); let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?); + let num_series = series_estimator.finish(); Ok(Some(EncodedBulkPart { data: buf, @@ -710,6 +716,7 @@ impl BulkPartEncoder { min_timestamp, parquet_metadata, region_metadata: self.metadata.clone(), + num_series, }, })) } @@ -742,6 +749,7 @@ impl BulkPartEncoder { min_timestamp: part.min_timestamp, parquet_metadata, region_metadata: self.metadata.clone(), + num_series: part.estimated_series_count() as u64, }, })) } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index aac7090174..ee49da763e 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -608,6 +608,7 @@ impl MitoRegion { index_file_size, num_rows: meta.num_rows, num_row_groups: meta.num_row_groups, + num_series: Some(meta.num_series), min_ts: meta.time_range.0, max_ts: meta.time_range.1, sequence: meta.sequence.map(|s| s.get()), diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index 6800a4bf4d..a10159401b 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -431,6 +431,7 @@ mod tests { num_row_groups: 1, sequence: NonZeroU64::new(1), partition_expr, + num_series: 1, } } diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 1d94e74eaa..f3f51bdc08 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -21,7 +21,9 @@ use common_base::readable_size::ReadableSize; use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, }; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; +use datatypes::timestamp::timestamp_array_to_primitive; use serde::{Deserialize, Serialize}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadata; @@ -29,6 +31,9 @@ use store_api::storage::consts::{ OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, }; +use crate::read::Batch; +use crate::sst::parquet::flat_format::time_index_column_index; + pub mod file; pub mod file_purger; pub mod file_ref; @@ -241,3 +246,426 @@ fn plain_internal_fields() -> [FieldRef; 2] { Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)), ] } + +/// Gets the estimated number of series from record batches. +/// +/// This struct tracks the last timestamp value to detect series boundaries +/// by observing when timestamps decrease (indicating a new series). +#[derive(Default)] +pub(crate) struct SeriesEstimator { + /// The last timestamp value seen + last_timestamp: Option, + /// The estimated number of series + series_count: u64, +} + +impl SeriesEstimator { + /// Updates the estimator with a new Batch. + /// + /// Since each Batch contains only one series, this increments the series count + /// and updates the last timestamp. + pub(crate) fn update(&mut self, batch: &Batch) { + let Some(last_ts) = batch.last_timestamp() else { + return; + }; + + // Checks if there's a boundary between the last batch and this batch + if let Some(prev_last_ts) = self.last_timestamp { + // If the first timestamp of this batch is less than the last timestamp + // we've seen, it indicates a new series + if let Some(first_ts) = batch.first_timestamp() + && first_ts.value() <= prev_last_ts + { + self.series_count += 1; + } + } else { + // First batch, counts as first series + self.series_count = 1; + } + + // Updates the last timestamp + self.last_timestamp = Some(last_ts.value()); + } + + /// Updates the estimator with a new record batch in flat format. + /// + /// This method examines the time index column to detect series boundaries. + pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) { + let batch_rows = record_batch.num_rows(); + if batch_rows == 0 { + return; + } + + let time_index_pos = time_index_column_index(record_batch.num_columns()); + let timestamps = record_batch.column(time_index_pos); + let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else { + return; + }; + let values = ts_values.values(); + + // Checks if there's a boundary between the last batch and this batch + if let Some(last_ts) = self.last_timestamp { + if values[0] <= last_ts { + self.series_count += 1; + } + } else { + // First batch, counts as first series + self.series_count = 1; + } + + // Counts series boundaries within this batch. + for i in 0..batch_rows - 1 { + // We assumes the same timestamp as a new series, which is different from + // how we split batches. + if values[i] >= values[i + 1] { + self.series_count += 1; + } + } + + // Updates the last timestamp + self.last_timestamp = Some(values[batch_rows - 1]); + } + + /// Returns the estimated number of series. + pub(crate) fn finish(&mut self) -> u64 { + self.last_timestamp = None; + let count = self.series_count; + self.series_count = 0; + + count + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::OpType; + use datatypes::arrow::array::{ + BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt8Builder, + UInt32Array, UInt64Array, + }; + use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit}; + use datatypes::arrow::record_batch::RecordBatch; + + use super::*; + use crate::read::{Batch, BatchBuilder}; + + fn new_batch( + primary_key: &[u8], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + ) -> Batch { + let timestamps = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec())); + let sequences = Arc::new(UInt64Array::from(sequences.to_vec())); + let mut op_type_builder = UInt8Builder::with_capacity(op_types.len()); + for op_type in op_types { + op_type_builder.append_value(*op_type as u8); + } + let op_types = Arc::new(UInt8Array::from( + op_types.iter().map(|op| *op as u8).collect::>(), + )); + + let mut builder = BatchBuilder::new(primary_key.to_vec()); + builder + .timestamps_array(timestamps) + .unwrap() + .sequences_array(sequences) + .unwrap() + .op_types_array(op_types) + .unwrap(); + builder.build().unwrap() + } + + fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch { + // Flat format has: [fields..., time_index, __primary_key, __sequence, __op_type] + let num_cols = 4; // time_index + 3 internal columns + let time_index_pos = time_index_column_index(num_cols); + assert_eq!(time_index_pos, 0); // For 4 columns, time index should be at position 0 + + let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec())); + let pk_array = Arc::new(DictionaryArray::new( + UInt32Array::from(vec![0; timestamps.len()]), + Arc::new(BinaryArray::from(vec![b"test".as_slice()])), + )); + let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()])); + let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()])); + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "time", + ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new_dictionary( + "__primary_key", + ArrowDataType::UInt32, + ArrowDataType::Binary, + false, + ), + Field::new("__sequence", ArrowDataType::UInt64, false), + Field::new("__op_type", ArrowDataType::UInt8, false), + ])); + + RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap() + } + + #[test] + fn test_series_estimator_empty_batch() { + let mut estimator = SeriesEstimator::default(); + let batch = new_batch(b"test", &[], &[], &[]); + estimator.update(&batch); + assert_eq!(0, estimator.finish()); + } + + #[test] + fn test_series_estimator_single_batch() { + let mut estimator = SeriesEstimator::default(); + let batch = new_batch( + b"test", + &[1, 2, 3], + &[1, 2, 3], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch); + assert_eq!(1, estimator.finish()); + } + + #[test] + fn test_series_estimator_multiple_batches_same_series() { + let mut estimator = SeriesEstimator::default(); + + // First batch with timestamps 1, 2, 3 + let batch1 = new_batch( + b"test", + &[1, 2, 3], + &[1, 2, 3], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch1); + + // Second batch with timestamps 4, 5, 6 (continuation) + let batch2 = new_batch( + b"test", + &[4, 5, 6], + &[4, 5, 6], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch2); + + assert_eq!(1, estimator.finish()); + } + + #[test] + fn test_series_estimator_new_series_detected() { + let mut estimator = SeriesEstimator::default(); + + // First batch with timestamps 1, 2, 3 + let batch1 = new_batch( + b"pk0", + &[1, 2, 3], + &[1, 2, 3], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch1); + + // Second batch with timestamps 2, 3, 4 (timestamp goes back, new series) + let batch2 = new_batch( + b"pk1", + &[2, 3, 4], + &[4, 5, 6], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch2); + + assert_eq!(2, estimator.finish()); + } + + #[test] + fn test_series_estimator_equal_timestamp_boundary() { + let mut estimator = SeriesEstimator::default(); + + // First batch ending at timestamp 5 + let batch1 = new_batch( + b"test", + &[1, 2, 5], + &[1, 2, 3], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch1); + + // Second batch starting at timestamp 5 (equal, indicates new series) + let batch2 = new_batch( + b"test", + &[5, 6, 7], + &[4, 5, 6], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch2); + + assert_eq!(2, estimator.finish()); + } + + #[test] + fn test_series_estimator_finish_resets_state() { + let mut estimator = SeriesEstimator::default(); + + let batch1 = new_batch( + b"test", + &[1, 2, 3], + &[1, 2, 3], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch1); + + assert_eq!(1, estimator.finish()); + + // After finish, state should be reset + let batch2 = new_batch( + b"test", + &[4, 5, 6], + &[4, 5, 6], + &[OpType::Put, OpType::Put, OpType::Put], + ); + estimator.update(&batch2); + + assert_eq!(1, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_empty_batch() { + let mut estimator = SeriesEstimator::default(); + let record_batch = new_flat_record_batch(&[]); + estimator.update_flat(&record_batch); + assert_eq!(0, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_single_batch() { + let mut estimator = SeriesEstimator::default(); + let record_batch = new_flat_record_batch(&[1, 2, 3]); + estimator.update_flat(&record_batch); + assert_eq!(1, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_series_boundary_within_batch() { + let mut estimator = SeriesEstimator::default(); + // Timestamps decrease from 3 to 2, indicating a series boundary + let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]); + estimator.update_flat(&record_batch); + // Should detect boundary at position 3 (3 >= 2) + assert_eq!(2, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_multiple_boundaries_within_batch() { + let mut estimator = SeriesEstimator::default(); + // Multiple series boundaries: 5>=4, 6>=3 + let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]); + estimator.update_flat(&record_batch); + assert_eq!(3, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_equal_timestamps() { + let mut estimator = SeriesEstimator::default(); + // Equal timestamps are considered as new series + let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]); + estimator.update_flat(&record_batch); + // Boundaries at: 2>=2, 3>=3, 3>=3 + assert_eq!(4, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_multiple_batches_continuation() { + let mut estimator = SeriesEstimator::default(); + + // First batch: timestamps 1, 2, 3 + let batch1 = new_flat_record_batch(&[1, 2, 3]); + estimator.update_flat(&batch1); + + // Second batch: timestamps 4, 5, 6 (continuation) + let batch2 = new_flat_record_batch(&[4, 5, 6]); + estimator.update_flat(&batch2); + + assert_eq!(1, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_multiple_batches_new_series() { + let mut estimator = SeriesEstimator::default(); + + // First batch: timestamps 1, 2, 3 + let batch1 = new_flat_record_batch(&[1, 2, 3]); + estimator.update_flat(&batch1); + + // Second batch: timestamps 2, 3, 4 (goes back to 2, new series) + let batch2 = new_flat_record_batch(&[2, 3, 4]); + estimator.update_flat(&batch2); + + assert_eq!(2, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_boundary_at_batch_edge_equal() { + let mut estimator = SeriesEstimator::default(); + + // First batch ending at 5 + let batch1 = new_flat_record_batch(&[1, 2, 5]); + estimator.update_flat(&batch1); + + // Second batch starting at 5 (equal timestamp, new series) + let batch2 = new_flat_record_batch(&[5, 6, 7]); + estimator.update_flat(&batch2); + + assert_eq!(2, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_mixed_batches() { + let mut estimator = SeriesEstimator::default(); + + // Batch 1: single series [10, 20, 30] + let batch1 = new_flat_record_batch(&[10, 20, 30]); + estimator.update_flat(&batch1); + + // Batch 2: starts new series [5, 15], boundary within batch [15, 10, 25] + let batch2 = new_flat_record_batch(&[5, 15, 10, 25]); + estimator.update_flat(&batch2); + + // Batch 3: continues from 25 to [30, 35] + let batch3 = new_flat_record_batch(&[30, 35]); + estimator.update_flat(&batch3); + + // Expected: 1 (batch1) + 1 (batch2 start) + 1 (within batch2) = 3 + assert_eq!(3, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_descending_timestamps() { + let mut estimator = SeriesEstimator::default(); + // Strictly descending timestamps - each pair creates a boundary + let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]); + estimator.update_flat(&record_batch); + // Boundaries: 10>=9, 9>=8, 8>=7, 7>=6 = 4 boundaries + 1 initial = 5 series + assert_eq!(5, estimator.finish()); + } + + #[test] + fn test_series_estimator_flat_finish_resets_state() { + let mut estimator = SeriesEstimator::default(); + + let batch1 = new_flat_record_batch(&[1, 2, 3]); + estimator.update_flat(&batch1); + + assert_eq!(1, estimator.finish()); + + // After finish, state should be reset + let batch2 = new_flat_record_batch(&[4, 5, 6]); + estimator.update_flat(&batch2); + + assert_eq!(1, estimator.finish()); + } +} diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 4ddde55746..ae255e9407 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -175,6 +175,10 @@ pub struct FileMeta { deserialize_with = "deserialize_partition_expr" )] pub partition_expr: Option, + /// Number of series in the file. + /// + /// The number is 0 if the series number is not available. + pub num_series: u64, } impl Debug for FileMeta { @@ -210,6 +214,7 @@ impl Debug for FileMeta { } }) .field("partition_expr", &self.partition_expr) + .field("num_series", &self.num_series) .finish() } } @@ -458,6 +463,7 @@ mod tests { num_row_groups: 0, sequence: None, partition_expr: None, + num_series: 0, } } @@ -503,6 +509,7 @@ mod tests { num_row_groups: 0, sequence: None, partition_expr: Some(partition_expr.clone()), + num_series: 0, }; // Test serialization/deserialization diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 7bd0e6b515..c5197ea2fb 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -236,6 +236,7 @@ mod tests { num_row_groups: 0, sequence: None, partition_expr: None, + num_series: 0, }, file_purger, ); @@ -302,6 +303,7 @@ mod tests { num_row_groups: 1, sequence: NonZeroU64::new(4096), partition_expr: None, + num_series: 0, }, file_purger, ); diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index c8b86ed0fd..de071b3f04 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -259,6 +259,7 @@ mod tests { num_row_groups: 1, sequence: NonZeroU64::new(4096), partition_expr: None, + num_series: 0, }; file_ref_mgr.add_file(&file_meta); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 9b56ffd4ae..83cd17acc8 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -84,6 +84,8 @@ pub struct SstInfo { pub file_metadata: Option>, /// Index Meta Data pub index_metadata: IndexOutput, + /// Number of series + pub num_series: u64, } #[cfg(test)] @@ -766,6 +768,7 @@ mod tests { .expect("partition expression should be valid JSON"), None => None, }, + num_series: 0, }, Arc::new(NoopFilePurger), ); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 01e1e95a9c..d52615690f 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -57,7 +57,9 @@ use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index} use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions}; -use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions}; +use crate::sst::{ + DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator, +}; /// Parquet SST writer. pub struct ParquetWriter { @@ -176,7 +178,7 @@ where ) -> Result<()> { // maybe_init_writer will re-create a new file. if let Some(mut current_writer) = mem::take(&mut self.writer) { - let stats = mem::take(stats); + let mut stats = mem::take(stats); // At least one row has been written. assert!(stats.num_rows > 0); @@ -211,6 +213,7 @@ where // convert FileMetaData to ParquetMetaData let parquet_metadata = parse_parquet_metadata(file_meta)?; + let num_series = stats.series_estimator.finish(); ssts.push(SstInfo { file_id: self.current_file, time_range, @@ -219,6 +222,7 @@ where num_row_groups: parquet_metadata.num_row_groups() as u64, file_metadata: Some(Arc::new(parquet_metadata)), index_metadata: index_output, + num_series, }); self.current_file = FileId::random(); self.bytes_written.store(0, Ordering::Relaxed) @@ -496,6 +500,8 @@ struct SourceStats { num_rows: usize, /// Time range of fetched batches. time_range: Option<(Timestamp, Timestamp)>, + /// Series estimator for computing num_series. + series_estimator: SeriesEstimator, } impl SourceStats { @@ -505,6 +511,7 @@ impl SourceStats { } self.num_rows += batch.num_rows(); + self.series_estimator.update(batch); // Safety: batch is not empty. let (min_in_batch, max_in_batch) = ( batch.first_timestamp().unwrap(), @@ -524,6 +531,7 @@ impl SourceStats { } self.num_rows += record_batch.num_rows(); + self.series_estimator.update_flat(record_batch); // Get the timestamp column by index let time_index_col_idx = time_index_column_index(record_batch.num_columns()); diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 5eacf06bd5..fc29ca0826 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -127,6 +127,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) index_file_size: 0, num_rows: 0, num_row_groups: 0, + num_series: 0, sequence: None, partition_expr: None, }, diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 86cc11eaf5..30da6677e3 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -105,6 +105,7 @@ impl VersionControlBuilder { index_file_size: 0, num_rows: 0, num_row_groups: 0, + num_series: 0, sequence: NonZeroU64::new(start_ms as u64), partition_expr: match &self.metadata.partition_expr { Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) @@ -193,6 +194,7 @@ pub(crate) fn apply_edit( index_file_size: 0, num_rows: 0, num_row_groups: 0, + num_series: 0, sequence: NonZeroU64::new(*start_ms as u64), partition_expr: match &version_control.current().version.metadata.partition_expr { Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) diff --git a/src/store-api/src/sst_entry.rs b/src/store-api/src/sst_entry.rs index 8330af7b2e..52295bdb59 100644 --- a/src/store-api/src/sst_entry.rs +++ b/src/store-api/src/sst_entry.rs @@ -61,6 +61,8 @@ pub struct ManifestSstEntry { pub num_rows: u64, /// Number of row groups in the SST. pub num_row_groups: u64, + /// Number of series in the SST. + pub num_series: Option, /// Min timestamp. pub min_ts: Timestamp, /// Max timestamp. @@ -94,6 +96,7 @@ impl ManifestSstEntry { ColumnSchema::new("index_file_size", Ty::uint64_datatype(), true), ColumnSchema::new("num_rows", Ty::uint64_datatype(), false), ColumnSchema::new("num_row_groups", Ty::uint64_datatype(), false), + ColumnSchema::new("num_series", Ty::uint64_datatype(), true), ColumnSchema::new("min_ts", Ty::timestamp_nanosecond_datatype(), true), ColumnSchema::new("max_ts", Ty::timestamp_nanosecond_datatype(), true), ColumnSchema::new("sequence", Ty::uint64_datatype(), true), @@ -120,6 +123,7 @@ impl ManifestSstEntry { let index_file_sizes = entries.iter().map(|e| e.index_file_size); let num_rows = entries.iter().map(|e| e.num_rows); let num_row_groups = entries.iter().map(|e| e.num_row_groups); + let num_series = entries.iter().map(|e| e.num_series); let min_ts = entries.iter().map(|e| { e.min_ts .convert_to(TimeUnit::Nanosecond) @@ -150,6 +154,7 @@ impl ManifestSstEntry { Arc::new(UInt64Array::from_iter(index_file_sizes)), Arc::new(UInt64Array::from_iter_values(num_rows)), Arc::new(UInt64Array::from_iter_values(num_row_groups)), + Arc::new(UInt64Array::from_iter(num_series)), Arc::new(TimestampNanosecondArray::from_iter(min_ts)), Arc::new(TimestampNanosecondArray::from_iter(max_ts)), Arc::new(UInt64Array::from_iter(sequences)), @@ -434,6 +439,7 @@ mod tests { index_file_size: None, num_rows: 10, num_row_groups: 2, + num_series: Some(5), min_ts: Timestamp::new_millisecond(1000), // 1s -> 1_000_000_000ns max_ts: Timestamp::new_second(2), // 2s -> 2_000_000_000ns sequence: None, @@ -456,6 +462,7 @@ mod tests { index_file_size: Some(11), num_rows: 20, num_row_groups: 4, + num_series: None, min_ts: Timestamp::new_nanosecond(5), // 5ns max_ts: Timestamp::new_microsecond(2000), // 2ms -> 2_000_000ns sequence: Some(9), @@ -590,16 +597,24 @@ mod tests { assert_eq!(2, num_row_groups.value(0)); assert_eq!(4, num_row_groups.value(1)); - let min_ts = batch + let num_series = batch .column(14) .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(5, num_series.value(0)); + assert!(num_series.is_null(1)); + + let min_ts = batch + .column(15) + .as_any() .downcast_ref::() .unwrap(); assert_eq!(1_000_000_000, min_ts.value(0)); assert_eq!(5, min_ts.value(1)); let max_ts = batch - .column(15) + .column(16) .as_any() .downcast_ref::() .unwrap(); @@ -607,7 +622,7 @@ mod tests { assert_eq!(2_000_000, max_ts.value(1)); let sequences = batch - .column(16) + .column(17) .as_any() .downcast_ref::() .unwrap(); @@ -615,7 +630,7 @@ mod tests { assert_eq!(9, sequences.value(1)); let origin_region_ids = batch - .column(17) + .column(18) .as_any() .downcast_ref::() .unwrap(); @@ -623,7 +638,7 @@ mod tests { assert_eq!(region_id2.as_u64(), origin_region_ids.value(1)); let node_ids = batch - .column(18) + .column(19) .as_any() .downcast_ref::() .unwrap(); @@ -631,7 +646,7 @@ mod tests { assert!(node_ids.is_null(1)); let visible = batch - .column(19) + .column(20) .as_any() .downcast_ref::() .unwrap(); diff --git a/tests/cases/standalone/common/information_schema/ssts.result b/tests/cases/standalone/common/information_schema/ssts.result index 2c28e6e63c..d546efbdfb 100644 --- a/tests/cases/standalone/common/information_schema/ssts.result +++ b/tests/cases/standalone/common/information_schema/ssts.result @@ -17,6 +17,7 @@ DESC TABLE information_schema.ssts_manifest; | index_file_size | UInt64 | | YES | | FIELD | | num_rows | UInt64 | | NO | | FIELD | | num_row_groups | UInt64 | | NO | | FIELD | +| num_series | UInt64 | | YES | | FIELD | | min_ts | TimestampNanosecond | | YES | | FIELD | | max_ts | TimestampNanosecond | | YES | | FIELD | | sequence | UInt64 | | YES | | FIELD | @@ -95,13 +96,13 @@ ADMIN FLUSH_TABLE('sst_case'); -- SQLNESS REPLACE (/public/\d+) /public/ SELECT * FROM information_schema.ssts_manifest order by file_path; -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | min_ts | max_ts | sequence | origin_region_id | node_id | visible | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+-------------------------+-------------------------+----------+------------------+---------+---------+ ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) @@ -163,15 +164,15 @@ ADMIN FLUSH_TABLE('sst_case'); -- SQLNESS REPLACE (/public/\d+) /public/ SELECT * FROM information_schema.ssts_manifest order by file_path; -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | min_ts | max_ts | sequence | origin_region_id | node_id | visible | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin |||| | |||| true | -+----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+-------------------------+-------------------------+----------+------------------+---------+---------+ ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| table_dir | region_id | table_id | region_number | region_group | region_sequence | file_id | level | file_path | file_size | index_file_path | index_file_size | num_rows | num_row_groups | num_series | min_ts | max_ts | sequence | origin_region_id | node_id | visible | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | +| data/greptime/public// |||||| || data/greptime/public//_/.parquet || data/greptime/public//_/index/.puffin ||||| | |||| true | ++----------------------------+---------------+----------+---------------+--------------+-----------------+--------------------------------------+-------+----------------------------------------------------------------------------------------+-----------+---------------------------------------------------------------------------------------------+-----------------+----------+----------------+------------+-------------------------+-------------------------+----------+------------------+---------+---------+ -- SQLNESS REPLACE (\s+\d+\s+) -- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 1cb53ccfe3..d211938c2a 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -411,20 +411,21 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | ssts_manifest | index_file_path | 11 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | ssts_manifest | index_file_size | 12 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | ssts_manifest | level | 8 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | | -| greptime | information_schema | ssts_manifest | max_ts | 16 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | -| greptime | information_schema | ssts_manifest | min_ts | 15 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | -| greptime | information_schema | ssts_manifest | node_id | 19 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | max_ts | 17 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | +| greptime | information_schema | ssts_manifest | min_ts | 16 | | | | | 9 | | | | | select,insert | | TimestampNanosecond | timestamp(9) | FIELD | | Yes | timestamp(9) | | | +| greptime | information_schema | ssts_manifest | node_id | 20 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | ssts_manifest | num_row_groups | 14 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | ssts_manifest | num_rows | 13 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | -| greptime | information_schema | ssts_manifest | origin_region_id | 18 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | num_series | 15 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | origin_region_id | 19 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | ssts_manifest | region_group | 5 | | | 3 | 0 | | | | | | select,insert | | UInt8 | tinyint unsigned | FIELD | | No | tinyint unsigned | | | | greptime | information_schema | ssts_manifest | region_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | ssts_manifest | region_number | 4 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | | greptime | information_schema | ssts_manifest | region_sequence | 6 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | information_schema | ssts_manifest | sequence | 17 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | ssts_manifest | sequence | 18 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | ssts_manifest | table_dir | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_manifest | table_id | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | -| greptime | information_schema | ssts_manifest | visible | 20 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | | +| greptime | information_schema | ssts_manifest | visible | 21 | | | | | | | | | | select,insert | | Boolean | boolean | FIELD | | No | boolean | | | | greptime | information_schema | ssts_storage | file_path | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | ssts_storage | file_size | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | ssts_storage | last_modified_ms | 3 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | Yes | timestamp(3) | | |