safe schema evolution

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-19 06:06:30 +08:00
parent 1c7e2bb382
commit 042bb42b86
2 changed files with 159 additions and 23 deletions

View File

@@ -48,7 +48,7 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe
///
/// This struct contains normalized predicate expr. In the form of
/// `col` `op` `literal` where the `col` is provided from input.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SimpleFilterEvaluator {
/// Name of the referenced column.
column_name: String,

View File

@@ -35,7 +35,7 @@ use parquet::arrow::arrow_reader::RowSelection;
use parquet::file::metadata::ParquetMetaData;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, TimeSeriesRowSelector};
use table::predicate::Predicate;
@@ -492,35 +492,75 @@ impl TagDecodeState {
}
impl RangeBase {
fn has_usable_primary_key_filter(&self) -> bool {
fn usable_primary_key_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
let Some(filters) = &self.primary_key_filters else {
return false;
return None;
};
let metadata = self.read_format.metadata();
let sst_metadata = self.read_format.metadata();
let expected_metadata = self.expected_metadata.as_deref();
let usable_filters = filters
.iter()
.filter(|filter| {
Self::is_usable_primary_key_filter(sst_metadata, expected_metadata, filter)
})
.cloned()
.collect::<Vec<_>>();
filters.iter().any(|filter| {
!is_partition_column(filter.column_name())
&& metadata
.column_by_name(filter.column_name())
.is_some_and(|column| {
column.semantic_type == SemanticType::Tag
&& metadata.primary_key_index(column.column_id).is_some()
})
})
(!usable_filters.is_empty()).then_some(Arc::new(usable_filters))
}
fn is_usable_primary_key_filter(
sst_metadata: &RegionMetadataRef,
expected_metadata: Option<&RegionMetadata>,
filter: &SimpleFilterEvaluator,
) -> bool {
if is_partition_column(filter.column_name()) {
return false;
}
let sst_column = match expected_metadata {
Some(expected_metadata) => {
let Some(expected_column) = expected_metadata.column_by_name(filter.column_name())
else {
return false;
};
let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else {
return false;
};
if sst_column.column_schema.name != expected_column.column_schema.name
|| sst_column.semantic_type != expected_column.semantic_type
|| sst_column.column_schema.data_type != expected_column.column_schema.data_type
{
return false;
}
sst_column
}
None => {
let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else {
return false;
};
sst_column
}
};
sst_column.semantic_type == SemanticType::Tag
&& sst_metadata
.primary_key_index(sst_column.column_id)
.is_some()
}
/// Builds an encoded primary-key filter for flat scan pre-filtering.
pub(crate) fn new_primary_key_filter(&self) -> Option<Box<dyn PrimaryKeyFilter>> {
if self.read_format.metadata().primary_key.is_empty()
|| !self.has_usable_primary_key_filter()
{
if self.read_format.metadata().primary_key.is_empty() {
return None;
}
let filters = self.primary_key_filters.as_ref()?;
let filters = self.usable_primary_key_filters()?;
Some(
self.codec
.primary_key_filter(self.read_format.metadata(), filters.clone()),
.primary_key_filter(self.read_format.metadata(), filters),
)
}
@@ -1090,9 +1130,10 @@ mod tests {
UInt32Array, UInt64Array,
};
use datatypes::arrow::datatypes::UInt32Type;
use datatypes::schema::ColumnSchema;
use mito_codec::row_converter::build_primary_key_codec;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadata;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use super::*;
use crate::test_util::sst_util::{
@@ -1102,6 +1143,7 @@ mod tests {
fn new_test_range_base_with_metadata(
metadata: Arc<RegionMetadata>,
expected_metadata: Option<Arc<RegionMetadata>>,
exprs: &[Expr],
) -> RangeBase {
let read_format = ReadFormat::new_flat(
@@ -1123,7 +1165,7 @@ mod tests {
.then_some(Arc::new(primary_key_filters)),
dyn_filters: vec![],
read_format,
expected_metadata: None,
expected_metadata,
prune_schema: metadata.schema.clone(),
codec: build_primary_key_codec(metadata.as_ref()),
compat_batch: None,
@@ -1134,7 +1176,61 @@ mod tests {
}
fn new_test_range_base(exprs: &[Expr]) -> RangeBase {
new_test_range_base_with_metadata(Arc::new(sst_region_metadata()), exprs)
new_test_range_base_with_metadata(Arc::new(sst_region_metadata()), None, exprs)
}
fn new_test_range_base_with_expected_metadata(
metadata: Arc<RegionMetadata>,
expected_metadata: Arc<RegionMetadata>,
exprs: &[Expr],
) -> RangeBase {
new_test_range_base_with_metadata(metadata, Some(expected_metadata), exprs)
}
fn expected_metadata_with_reused_tag_name(
old_metadata: &RegionMetadata,
) -> Arc<RegionMetadata> {
let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 10,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_0".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![10, 1]);
Arc::new(builder.build().unwrap())
}
fn new_raw_batch_with_metadata(
@@ -1209,6 +1305,46 @@ mod tests {
assert!(base.new_primary_key_filter().is_none());
}
#[test]
fn test_new_primary_key_filter_skips_reused_expected_tag_name() {
let metadata = Arc::new(sst_region_metadata());
let expected_metadata = expected_metadata_with_reused_tag_name(&metadata);
let base = new_test_range_base_with_expected_metadata(
metadata,
expected_metadata,
&[col("tag_0").eq(lit("b"))],
);
assert!(base.new_primary_key_filter().is_none());
}
#[test]
fn test_prefilter_primary_key_ignores_reused_expected_tag_name() {
let metadata = Arc::new(sst_region_metadata());
let expected_metadata = expected_metadata_with_reused_tag_name(&metadata);
let pk_ax = new_primary_key(&["a", "x"]);
let pk_by = new_primary_key(&["b", "y"]);
let batch = new_raw_batch_with_metadata(
metadata.clone(),
&[pk_ax.as_slice(), pk_by.as_slice()],
&[10, 11],
);
let base = new_test_range_base_with_expected_metadata(
metadata,
expected_metadata,
&[col("tag_0").eq(lit("b")), col("tag_1").eq(lit("x"))],
);
let mut primary_key_filter = base.new_primary_key_filter().unwrap();
let filtered = base
.prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut())
.unwrap()
.unwrap();
assert_eq!(filtered.num_rows(), 1);
assert_eq!(field_values(&filtered), vec![10]);
}
#[test]
fn test_prefilter_primary_key_drops_single_dictionary_batch() {
let pk_a = new_primary_key(&["a", "x"]);
@@ -1302,7 +1438,7 @@ mod tests {
],
&[10, 11, 12, 13],
);
let base = new_test_range_base_with_metadata(metadata, &[col("tag_0").eq(lit("b"))]);
let base = new_test_range_base_with_metadata(metadata, None, &[col("tag_0").eq(lit("b"))]);
let mut primary_key_filter = base.new_primary_key_filter().unwrap();
let filtered = base