fix legacy format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-19 12:43:09 +08:00
parent 042bb42b86
commit d7e64419c9
2 changed files with 68 additions and 13 deletions

View File

@@ -553,7 +553,12 @@ impl RangeBase {
/// Builds an encoded primary-key filter for flat scan pre-filtering.
pub(crate) fn new_primary_key_filter(&self) -> Option<Box<dyn PrimaryKeyFilter>> {
if self.read_format.metadata().primary_key.is_empty() {
if self.read_format.metadata().primary_key.is_empty()
|| !self
.read_format
.as_flat()
.is_some_and(|format| format.raw_batch_has_primary_key_dictionary())
{
return None;
}
let filters = self.usable_primary_key_filters()?;
@@ -1129,18 +1134,23 @@ mod tests {
Array, ArrayRef, BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array,
UInt32Array, UInt64Array,
};
use datatypes::arrow::datatypes::UInt32Type;
use datatypes::arrow::datatypes::{Schema, UInt32Type};
use datatypes::schema::ColumnSchema;
use mito_codec::row_converter::build_primary_key_codec;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use super::*;
use crate::sst::internal_fields;
use crate::test_util::sst_util::{
new_primary_key, new_sparse_primary_key, sst_region_metadata,
sst_region_metadata_with_encoding,
};
fn flat_file_num_columns(metadata: &RegionMetadata) -> usize {
metadata.column_metadatas.len() + 3
}
fn new_test_range_base_with_metadata(
metadata: Arc<RegionMetadata>,
expected_metadata: Option<Arc<RegionMetadata>>,
@@ -1149,7 +1159,7 @@ mod tests {
let read_format = ReadFormat::new_flat(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
Some(5),
Some(flat_file_num_columns(&metadata)),
"test",
false,
)
@@ -1240,16 +1250,20 @@ mod tests {
) -> RecordBatch {
assert_eq!(primary_keys.len(), field_values.len());
let schema = ReadFormat::new_flat(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
Some(5),
"test",
false,
)
.unwrap()
.arrow_schema()
.clone();
let arrow_schema = metadata.schema.arrow_schema();
let field_column = arrow_schema
.field(arrow_schema.index_of("field_0").unwrap())
.clone();
let time_index_column = arrow_schema
.field(arrow_schema.index_of("ts").unwrap())
.clone();
let mut fields = vec![field_column, time_index_column];
fields.extend(
internal_fields()
.into_iter()
.map(|field| field.as_ref().clone()),
);
let schema = Arc::new(Schema::new(fields));
let mut dict_values = Vec::new();
let mut keys = Vec::with_capacity(primary_keys.len());
@@ -1318,6 +1332,38 @@ mod tests {
assert!(base.new_primary_key_filter().is_none());
}
#[test]
fn test_new_primary_key_filter_skips_legacy_primary_key_batches() {
let metadata = Arc::new(sst_region_metadata_with_encoding(
PrimaryKeyEncoding::Sparse,
));
let read_format = ReadFormat::new_flat(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
None,
"test",
true,
)
.unwrap();
let primary_key_filters =
vec![SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap()];
let base = RangeBase {
filters: vec![],
primary_key_filters: Some(Arc::new(primary_key_filters)),
dyn_filters: vec![],
read_format,
expected_metadata: None,
prune_schema: metadata.schema.clone(),
codec: build_primary_key_codec(metadata.as_ref()),
compat_batch: None,
compaction_projection_mapper: None,
pre_filter_mode: PreFilterMode::All,
partition_filter: None,
};
assert!(base.new_primary_key_filter().is_none());
}
#[test]
fn test_prefilter_primary_key_ignores_reused_expected_tag_name() {
let metadata = Arc::new(sst_region_metadata());

View File

@@ -267,6 +267,15 @@ impl FlatReadFormat {
}
}
/// Returns true if raw parquet batches already use the flat layout with an encoded
/// `__primary_key` dictionary column.
pub(crate) fn raw_batch_has_primary_key_dictionary(&self) -> bool {
match &self.parquet_adapter {
ParquetAdapter::Flat(_) => true,
ParquetAdapter::PrimaryKeyToFlat(_) => false,
}
}
/// Creates a sequence array to override.
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
self.override_sequence