From 83290be3bacf7bec7c499fc9b6c73e8a06866d0b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 9 Sep 2025 16:37:29 -0700 Subject: [PATCH] feat: store partition expr per file in region manifest (#6849) * feat: store partition expr per file in region manifest Signed-off-by: Ruihang Xia * use deserialized form in memory Signed-off-by: Ruihang Xia * test non-existing partition expr Signed-off-by: Ruihang Xia * try remove partition expr dup Signed-off-by: Ruihang Xia * Revert "try remove partition expr dup" This reverts commit a0f2c8ca6a127ef356a5510466e84513cf4b13d8. * fix merge test Signed-off-by: Ruihang Xia * format doc Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/mito2/Cargo.toml | 1 + src/mito2/src/compaction/compactor.rs | 20 ++- src/mito2/src/compaction/test_util.rs | 1 + src/mito2/src/error.rs | 9 + src/mito2/src/flush.rs | 12 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 + src/mito2/src/sst/file.rs | 193 ++++++++++++++++++++- src/mito2/src/sst/file_purger.rs | 2 + src/mito2/src/sst/file_ref.rs | 1 + src/mito2/src/sst/parquet.rs | 5 + src/mito2/src/test_util/sst_util.rs | 1 + src/mito2/src/test_util/version_util.rs | 10 ++ 13 files changed, 255 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b41073e6e..bab38fc9de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7612,6 +7612,7 @@ dependencies = [ "moka", "object-store", "parquet", + "partition", "paste", "pin-project", "prometheus", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index ec090cd222..1909c14071 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -60,6 +60,7 @@ paste.workspace = true pin-project.workspace = true prometheus.workspace = true prost.workspace = true +partition.workspace = true puffin.workspace = true rand.workspace = true rayon = "1.10" diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 2006f89741..bac14906d5 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -23,6 +23,7 @@ use common_time::TimeToLive; use either::Either; use itertools::Itertools; use object_store::manager::ObjectStoreManagerRef; +use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -34,7 +35,9 @@ use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::{PickerOutput, new_picker}; use crate::compaction::{CompactionSstReaderBuilder, find_ttl}; use crate::config::MitoConfig; -use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result}; +use crate::error::{ + EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result, +}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; use crate::manifest::storage::manifest_compress_type; @@ -349,6 +352,7 @@ impl Compactor for DefaultCompactor { .map(|f| f.meta_ref().sequence) .max() .flatten(); + let region_metadata_for_filemeta = region_metadata.clone(); futs.push(async move { let input_file_names = output .inputs @@ -385,6 +389,19 @@ impl Compactor for DefaultCompactor { WriteType::Compaction, ) .await?; + // Convert partition expression once outside the map + let partition_expr = match ®ion_metadata_for_filemeta.partition_expr { + None => None, + Some(json_str) if json_str.is_empty() => None, + Some(json_str) => { + PartitionExpr::from_json_str(json_str).with_context(|_| { + InvalidPartitionExprSnafu { + expr: json_str.clone(), + } + })? + } + }; + let output_files = sst_infos .into_iter() .map(|sst_info| FileMeta { @@ -398,6 +415,7 @@ impl Compactor for DefaultCompactor { num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: max_sequence, + partition_expr: partition_expr.clone(), }) .collect::>(); let output_file_names = diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index b05b8b5399..eb29a7c427 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -78,6 +78,7 @@ pub fn new_file_handle_with_size_and_sequence( num_rows: 0, num_row_groups: 0, sequence: NonZeroU64::new(sequence), + partition_expr: None, }, file_purger, ) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index e7df6d8b9a..f1b4191aa1 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -986,6 +986,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid partition expression: {}", expr))] + InvalidPartitionExpr { + expr: String, + #[snafu(implicit)] + location: Location, + source: partition::error::Error, + }, + #[snafu(display("Failed to decode bulk wal entry"))] ConvertBulkWalEntry { #[snafu(implicit)] @@ -1159,6 +1167,7 @@ impl ErrorExt for Error { ArrowReader { .. } => StatusCode::StorageUnavailable, ConvertValue { source, .. } => source.status_code(), ApplyBloomFilterIndex { source, .. } => source.status_code(), + InvalidPartitionExpr { source, .. } => source.status_code(), BuildIndexApplier { source, .. } | PushIndexValue { source, .. } | ApplyInvertedIndex { source, .. } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index d421b0e824..0c8e40dda5 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -29,7 +29,8 @@ use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteReques use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ - Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, + Error, FlushRegionSnafu, InvalidPartitionExprSnafu, RegionClosedSnafu, RegionDroppedSnafu, + RegionTruncatedSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::memtable::MemtableRanges; @@ -411,6 +412,14 @@ impl RegionFlushTask { } flush_metrics = flush_metrics.merge(metrics); + // Convert partition expression once outside the map + let partition_expr = match &version.metadata.partition_expr { + None => None, + Some(json_expr) if json_expr.is_empty() => None, + Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) + .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?, + }; + file_metas.extend(ssts_written.into_iter().map(|sst_info| { flushed_bytes += sst_info.file_size; FileMeta { @@ -424,6 +433,7 @@ impl RegionFlushTask { num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, sequence: NonZeroU64::new(max_sequence), + partition_expr: partition_expr.clone(), } })); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index a160c24d11..40663c3f4b 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -267,6 +267,7 @@ async fn checkpoint_with_different_compression_types() { num_rows: 0, num_row_groups: 0, sequence: None, + partition_expr: None, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], @@ -330,6 +331,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) num_rows: 0, num_row_groups: 0, sequence: None, + partition_expr: None, }; let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 35373e55c1..4c20001c7f 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use common_base::readable_size::ReadableSize; use common_time::Timestamp; +use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use snafu::{ResultExt, Snafu}; @@ -33,6 +34,46 @@ use uuid::Uuid; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location; +/// Custom serde functions for partition_expr field in FileMeta +fn serialize_partition_expr( + partition_expr: &Option, + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + use serde::ser::Error; + + match partition_expr { + None => serializer.serialize_none(), + Some(expr) => { + let json_str = expr.as_json_str().map_err(S::Error::custom)?; + serializer.serialize_some(&json_str) + } + } +} + +fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + use serde::de::Error; + + let opt_json_str: Option = Option::deserialize(deserializer)?; + match opt_json_str { + None => Ok(None), + Some(json_str) => { + if json_str.is_empty() { + // Empty string represents explicit "single-region/no-partition" designation + Ok(None) + } else { + // Parse the JSON string to PartitionExpr + PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom) + } + } + } +} + /// Type to store SST level. pub type Level = u8; /// Maximum level of SSTs. @@ -132,7 +173,7 @@ pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool { } /// Metadata of a SST file. -#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(default)] pub struct FileMeta { /// Region that created the file. The region id may not be the id of the current region. @@ -167,6 +208,18 @@ pub struct FileMeta { /// This sequence is the only sequence in this file. And it's retrieved from the max /// sequence of the rows on generating this file. pub sequence: Option, + /// Partition expression from the region metadata when the file is created. + /// + /// This is stored as a PartitionExpr object in memory for convenience, + /// but serialized as JSON string for manifest compatibility. + /// Compatibility behavior: + /// - None: no partition expr was set when the file was created (legacy files). + /// - Some(expr): partition expression from region metadata. + #[serde( + serialize_with = "serialize_partition_expr", + deserialize_with = "deserialize_partition_expr" + )] + pub partition_expr: Option, } impl Debug for FileMeta { @@ -201,6 +254,7 @@ impl Debug for FileMeta { write!(f, "{}", seq) } }) + .field("partition_expr", &self.partition_expr) .finish() } } @@ -356,6 +410,9 @@ impl FileHandleInner { #[cfg(test)] mod tests { + use datatypes::value::Value; + use partition::expr::{PartitionExpr, col}; + use super::*; #[test] @@ -392,6 +449,7 @@ mod tests { num_rows: 0, num_row_groups: 0, sequence: None, + partition_expr: None, } } @@ -415,4 +473,137 @@ mod tests { let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap(); assert_eq!(file_meta, deserialized_file_meta); } + + #[test] + fn test_file_meta_with_partition_expr() { + let file_id = FileId::random(); + let partition_expr = PartitionExpr::new( + col("a"), + partition::expr::RestrictedOp::GtEq, + Value::UInt32(10).into(), + ); + + let file_meta_with_partition = FileMeta { + region_id: 0.into(), + file_id, + time_range: FileTimeRange::default(), + level: 0, + file_size: 0, + available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), + index_file_size: 0, + num_rows: 0, + num_row_groups: 0, + sequence: None, + partition_expr: Some(partition_expr.clone()), + }; + + // Test serialization/deserialization + let serialized = serde_json::to_string(&file_meta_with_partition).unwrap(); + let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap(); + assert_eq!(file_meta_with_partition, deserialized); + + // Verify the serialized JSON contains the expected partition expression string + let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap(); + assert!(serialized_value["partition_expr"].as_str().is_some()); + let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap(); + assert!(partition_expr_json.contains("\"Column\":\"a\"")); + assert!(partition_expr_json.contains("\"op\":\"GtEq\"")); + + // Test with None (legacy files) + let file_meta_none = FileMeta { + partition_expr: None, + ..file_meta_with_partition.clone() + }; + let serialized_none = serde_json::to_string(&file_meta_none).unwrap(); + let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap(); + assert_eq!(file_meta_none, deserialized_none); + } + + #[test] + fn test_file_meta_partition_expr_backward_compatibility() { + // Test that we can deserialize old JSON format with partition_expr as string + let json_with_partition_expr = r#"{ + "region_id": 0, + "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55", + "time_range": [ + {"value": 0, "unit": "Millisecond"}, + {"value": 0, "unit": "Millisecond"} + ], + "level": 0, + "file_size": 0, + "available_indexes": ["InvertedIndex"], + "index_file_size": 0, + "num_rows": 0, + "num_row_groups": 0, + "sequence": null, + "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}" + }"#; + + let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap(); + assert!(file_meta.partition_expr.is_some()); + let expr = file_meta.partition_expr.unwrap(); + assert_eq!(format!("{}", expr), "a >= 10"); + + // Test empty partition expression string + let json_with_empty_expr = r#"{ + "region_id": 0, + "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55", + "time_range": [ + {"value": 0, "unit": "Millisecond"}, + {"value": 0, "unit": "Millisecond"} + ], + "level": 0, + "file_size": 0, + "available_indexes": [], + "index_file_size": 0, + "num_rows": 0, + "num_row_groups": 0, + "sequence": null, + "partition_expr": "" + }"#; + + let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap(); + assert!(file_meta_empty.partition_expr.is_none()); + + // Test null partition expression + let json_with_null_expr = r#"{ + "region_id": 0, + "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55", + "time_range": [ + {"value": 0, "unit": "Millisecond"}, + {"value": 0, "unit": "Millisecond"} + ], + "level": 0, + "file_size": 0, + "available_indexes": [], + "index_file_size": 0, + "num_rows": 0, + "num_row_groups": 0, + "sequence": null, + "partition_expr": null + }"#; + + let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap(); + assert!(file_meta_null.partition_expr.is_none()); + + // Test partition expression doesn't exist + let json_with_empty_expr = r#"{ + "region_id": 0, + "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55", + "time_range": [ + {"value": 0, "unit": "Millisecond"}, + {"value": 0, "unit": "Millisecond"} + ], + "level": 0, + "file_size": 0, + "available_indexes": [], + "index_file_size": 0, + "num_rows": 0, + "num_row_groups": 0, + "sequence": null + }"#; + + let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap(); + assert!(file_meta_empty.partition_expr.is_none()); + } } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index a672028aeb..efed3f6721 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -280,6 +280,7 @@ mod tests { num_rows: 0, num_row_groups: 0, sequence: None, + partition_expr: None, }, file_purger, ); @@ -345,6 +346,7 @@ mod tests { num_rows: 1024, num_row_groups: 1, sequence: NonZeroU64::new(4096), + partition_expr: None, }, file_purger, ); diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index dbb54cdf61..4a507a1fdd 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -258,6 +258,7 @@ mod tests { num_rows: 1024, num_row_groups: 1, sequence: NonZeroU64::new(4096), + partition_expr: None, }; file_ref_mgr.add_file(&file_meta); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 27a221c146..d14de32128 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -753,6 +753,11 @@ mod tests { num_row_groups: info.num_row_groups, num_rows: info.num_rows as u64, sequence: None, + partition_expr: match &metadata.partition_expr { + Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) + .expect("partition expression should be valid JSON"), + None => None, + }, }, Arc::new(NoopFilePurger), ); diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index a55a58465a..928ed63c6b 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -128,6 +128,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) num_rows: 0, num_row_groups: 0, sequence: None, + partition_expr: None, }, file_purger, ) diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index 63aca919a9..d023e50f53 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -106,6 +106,11 @@ impl VersionControlBuilder { num_rows: 0, num_row_groups: 0, sequence: NonZeroU64::new(start_ms as u64), + partition_expr: match &self.metadata.partition_expr { + Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) + .expect("partition expression should be valid JSON"), + None => None, + }, }, ); self @@ -189,6 +194,11 @@ pub(crate) fn apply_edit( num_rows: 0, num_row_groups: 0, sequence: NonZeroU64::new(*start_ms as u64), + partition_expr: match &version_control.current().version.metadata.partition_expr { + Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) + .expect("partition expression should be valid JSON"), + None => None, + }, } }) .collect();