feat: store partition expr per file in region manifest (#6849)

* feat: store partition expr per file in region manifest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use deserialized form in memory

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test non-existing partition expr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* try remove partition expr dup

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Revert "try remove partition expr dup"

This reverts commit a0f2c8ca6a127ef356a5510466e84513cf4b13d8.

* fix merge test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format doc

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-09-09 16:37:29 -07:00
committed by GitHub
parent 21ee981b49
commit 83290be3ba
13 changed files with 255 additions and 3 deletions

1
Cargo.lock generated
View File

@@ -7612,6 +7612,7 @@ dependencies = [
"moka",
"object-store",
"parquet",
"partition",
"paste",
"pin-project",
"prometheus",

View File

@@ -60,6 +60,7 @@ paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
prost.workspace = true
partition.workspace = true
puffin.workspace = true
rand.workspace = true
rayon = "1.10"

View File

@@ -23,6 +23,7 @@ use common_time::TimeToLive;
use either::Either;
use itertools::Itertools;
use object_store::manager::ObjectStoreManagerRef;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -34,7 +35,9 @@ use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{PickerOutput, new_picker};
use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
@@ -349,6 +352,7 @@ impl Compactor for DefaultCompactor {
.map(|f| f.meta_ref().sequence)
.max()
.flatten();
let region_metadata_for_filemeta = region_metadata.clone();
futs.push(async move {
let input_file_names = output
.inputs
@@ -385,6 +389,19 @@ impl Compactor for DefaultCompactor {
WriteType::Compaction,
)
.await?;
// Convert partition expression once outside the map
let partition_expr = match &region_metadata_for_filemeta.partition_expr {
None => None,
Some(json_str) if json_str.is_empty() => None,
Some(json_str) => {
PartitionExpr::from_json_str(json_str).with_context(|_| {
InvalidPartitionExprSnafu {
expr: json_str.clone(),
}
})?
}
};
let output_files = sst_infos
.into_iter()
.map(|sst_info| FileMeta {
@@ -398,6 +415,7 @@ impl Compactor for DefaultCompactor {
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
partition_expr: partition_expr.clone(),
})
.collect::<Vec<_>>();
let output_file_names =

View File

@@ -78,6 +78,7 @@ pub fn new_file_handle_with_size_and_sequence(
num_rows: 0,
num_row_groups: 0,
sequence: NonZeroU64::new(sequence),
partition_expr: None,
},
file_purger,
)

View File

@@ -986,6 +986,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid partition expression: {}", expr))]
InvalidPartitionExpr {
expr: String,
#[snafu(implicit)]
location: Location,
source: partition::error::Error,
},
#[snafu(display("Failed to decode bulk wal entry"))]
ConvertBulkWalEntry {
#[snafu(implicit)]
@@ -1159,6 +1167,7 @@ impl ErrorExt for Error {
ArrowReader { .. } => StatusCode::StorageUnavailable,
ConvertValue { source, .. } => source.status_code(),
ApplyBloomFilterIndex { source, .. } => source.status_code(),
InvalidPartitionExpr { source, .. } => source.status_code(),
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyInvertedIndex { source, .. }

View File

@@ -29,7 +29,8 @@ use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteReques
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
Error, FlushRegionSnafu, InvalidPartitionExprSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::memtable::MemtableRanges;
@@ -411,6 +412,14 @@ impl RegionFlushTask {
}
flush_metrics = flush_metrics.merge(metrics);
// Convert partition expression once outside the map
let partition_expr = match &version.metadata.partition_expr {
None => None,
Some(json_expr) if json_expr.is_empty() => None,
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
};
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
FileMeta {
@@ -424,6 +433,7 @@ impl RegionFlushTask {
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
partition_expr: partition_expr.clone(),
}
}));
}

View File

@@ -267,6 +267,7 @@ async fn checkpoint_with_different_compression_types() {
num_rows: 0,
num_row_groups: 0,
sequence: None,
partition_expr: None,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
@@ -330,6 +331,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
num_rows: 0,
num_row_groups: 0,
sequence: None,
partition_expr: None,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],

View File

@@ -23,6 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use common_base::readable_size::ReadableSize;
use common_time::Timestamp;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use snafu::{ResultExt, Snafu};
@@ -33,6 +34,46 @@ use uuid::Uuid;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::location;
/// Custom serde functions for partition_expr field in FileMeta
fn serialize_partition_expr<S>(
partition_expr: &Option<PartitionExpr>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::Error;
match partition_expr {
None => serializer.serialize_none(),
Some(expr) => {
let json_str = expr.as_json_str().map_err(S::Error::custom)?;
serializer.serialize_some(&json_str)
}
}
}
fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
match opt_json_str {
None => Ok(None),
Some(json_str) => {
if json_str.is_empty() {
// Empty string represents explicit "single-region/no-partition" designation
Ok(None)
} else {
// Parse the JSON string to PartitionExpr
PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
}
}
}
}
/// Type to store SST level.
pub type Level = u8;
/// Maximum level of SSTs.
@@ -132,7 +173,7 @@ pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
}
/// Metadata of a SST file.
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct FileMeta {
/// Region that created the file. The region id may not be the id of the current region.
@@ -167,6 +208,18 @@ pub struct FileMeta {
/// This sequence is the only sequence in this file. And it's retrieved from the max
/// sequence of the rows on generating this file.
pub sequence: Option<NonZeroU64>,
/// Partition expression from the region metadata when the file is created.
///
/// This is stored as a PartitionExpr object in memory for convenience,
/// but serialized as JSON string for manifest compatibility.
/// Compatibility behavior:
/// - None: no partition expr was set when the file was created (legacy files).
/// - Some(expr): partition expression from region metadata.
#[serde(
serialize_with = "serialize_partition_expr",
deserialize_with = "deserialize_partition_expr"
)]
pub partition_expr: Option<PartitionExpr>,
}
impl Debug for FileMeta {
@@ -201,6 +254,7 @@ impl Debug for FileMeta {
write!(f, "{}", seq)
}
})
.field("partition_expr", &self.partition_expr)
.finish()
}
}
@@ -356,6 +410,9 @@ impl FileHandleInner {
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use super::*;
#[test]
@@ -392,6 +449,7 @@ mod tests {
num_rows: 0,
num_row_groups: 0,
sequence: None,
partition_expr: None,
}
}
@@ -415,4 +473,137 @@ mod tests {
let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
assert_eq!(file_meta, deserialized_file_meta);
}
#[test]
fn test_file_meta_with_partition_expr() {
let file_id = FileId::random();
let partition_expr = PartitionExpr::new(
col("a"),
partition::expr::RestrictedOp::GtEq,
Value::UInt32(10).into(),
);
let file_meta_with_partition = FileMeta {
region_id: 0.into(),
file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
partition_expr: Some(partition_expr.clone()),
};
// Test serialization/deserialization
let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
assert_eq!(file_meta_with_partition, deserialized);
// Verify the serialized JSON contains the expected partition expression string
let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
assert!(serialized_value["partition_expr"].as_str().is_some());
let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
assert!(partition_expr_json.contains("\"Column\":\"a\""));
assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
// Test with None (legacy files)
let file_meta_none = FileMeta {
partition_expr: None,
..file_meta_with_partition.clone()
};
let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
assert_eq!(file_meta_none, deserialized_none);
}
#[test]
fn test_file_meta_partition_expr_backward_compatibility() {
// Test that we can deserialize old JSON format with partition_expr as string
let json_with_partition_expr = r#"{
"region_id": 0,
"file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
"time_range": [
{"value": 0, "unit": "Millisecond"},
{"value": 0, "unit": "Millisecond"}
],
"level": 0,
"file_size": 0,
"available_indexes": ["InvertedIndex"],
"index_file_size": 0,
"num_rows": 0,
"num_row_groups": 0,
"sequence": null,
"partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
}"#;
let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
assert!(file_meta.partition_expr.is_some());
let expr = file_meta.partition_expr.unwrap();
assert_eq!(format!("{}", expr), "a >= 10");
// Test empty partition expression string
let json_with_empty_expr = r#"{
"region_id": 0,
"file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
"time_range": [
{"value": 0, "unit": "Millisecond"},
{"value": 0, "unit": "Millisecond"}
],
"level": 0,
"file_size": 0,
"available_indexes": [],
"index_file_size": 0,
"num_rows": 0,
"num_row_groups": 0,
"sequence": null,
"partition_expr": ""
}"#;
let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
assert!(file_meta_empty.partition_expr.is_none());
// Test null partition expression
let json_with_null_expr = r#"{
"region_id": 0,
"file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
"time_range": [
{"value": 0, "unit": "Millisecond"},
{"value": 0, "unit": "Millisecond"}
],
"level": 0,
"file_size": 0,
"available_indexes": [],
"index_file_size": 0,
"num_rows": 0,
"num_row_groups": 0,
"sequence": null,
"partition_expr": null
}"#;
let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
assert!(file_meta_null.partition_expr.is_none());
// Test partition expression doesn't exist
let json_with_empty_expr = r#"{
"region_id": 0,
"file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
"time_range": [
{"value": 0, "unit": "Millisecond"},
{"value": 0, "unit": "Millisecond"}
],
"level": 0,
"file_size": 0,
"available_indexes": [],
"index_file_size": 0,
"num_rows": 0,
"num_row_groups": 0,
"sequence": null
}"#;
let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
assert!(file_meta_empty.partition_expr.is_none());
}
}

View File

@@ -280,6 +280,7 @@ mod tests {
num_rows: 0,
num_row_groups: 0,
sequence: None,
partition_expr: None,
},
file_purger,
);
@@ -345,6 +346,7 @@ mod tests {
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
},
file_purger,
);

View File

@@ -258,6 +258,7 @@ mod tests {
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
partition_expr: None,
};
file_ref_mgr.add_file(&file_meta);

View File

@@ -753,6 +753,11 @@ mod tests {
num_row_groups: info.num_row_groups,
num_rows: info.num_rows as u64,
sequence: None,
partition_expr: match &metadata.partition_expr {
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.expect("partition expression should be valid JSON"),
None => None,
},
},
Arc::new(NoopFilePurger),
);

View File

@@ -128,6 +128,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
num_rows: 0,
num_row_groups: 0,
sequence: None,
partition_expr: None,
},
file_purger,
)

View File

@@ -106,6 +106,11 @@ impl VersionControlBuilder {
num_rows: 0,
num_row_groups: 0,
sequence: NonZeroU64::new(start_ms as u64),
partition_expr: match &self.metadata.partition_expr {
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.expect("partition expression should be valid JSON"),
None => None,
},
},
);
self
@@ -189,6 +194,11 @@ pub(crate) fn apply_edit(
num_rows: 0,
num_row_groups: 0,
sequence: NonZeroU64::new(*start_ms as u64),
partition_expr: match &version_control.current().version.metadata.partition_expr {
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.expect("partition expression should be valid JSON"),
None => None,
},
}
})
.collect();