feat: converts batches in old format to the flat format in query time (#6987)

* feat: use correct projection index for old format

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove allow dead_code from format

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: check and convert old format to flat format

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: sub primary key num from projection

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: always convert the batch in FlatRowGroupReader

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: Change &Option<&[]> to Option<&[]>

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: only build arrow schema once

adds a method flat_sst_arrow_schema_column_num() to get the field num

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: Handle flat format and old format separately

Adds two structs ParquetFlat and ParquetPrimaryKeyToFlat.
ParquetPrimaryKeyToFlat delegates stats and projection to the
PrimaryKeyReadFormat.

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle non string tag correctly

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: do not register file cache twice

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: clean temp files

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: add rows and bytes to flush success log

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: convert format in memtable

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: add compaction flag to ScanInput

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: compaction should use old format for sparse encoding

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: merge schema use old format in sparse encoding

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: reads legacy format but not convert if skip_auto_convert

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: suppport sparse encoding in bulk parts

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-09-25 14:42:22 +08:00
committed by GitHub
parent 964dc254aa
commit cff9cb6327
25 changed files with 598 additions and 307 deletions

View File

@@ -455,11 +455,15 @@ fn flat_merge_iterator_bench(c: &mut Criterion) {
}
// Pre-create BulkIterContext
let context = Arc::new(BulkIterContext::new(
metadata.clone(),
&None, // No projection
None, // No predicate
));
let context = Arc::new(
BulkIterContext::new(
metadata.clone(),
None, // No projection
None, // No predicate
false,
)
.unwrap(),
);
group.bench_with_input(
format!("{}_parts_1024_hosts", num_parts),
@@ -519,11 +523,15 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) {
group.bench_function("4096_rows_with_hostname_filter", |b| {
b.iter(|| {
// Create context for BulkPartRecordBatchIter with predicate
let context = Arc::new(BulkIterContext::new(
metadata.clone(),
&None, // No projection
Some(predicate.clone()), // With hostname filter
));
let context = Arc::new(
BulkIterContext::new(
metadata.clone(),
None, // No projection
Some(predicate.clone()), // With hostname filter
false,
)
.unwrap(),
);
// Create and iterate over BulkPartRecordBatchIter with filter
let iter =
@@ -540,11 +548,15 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) {
group.bench_function("4096_rows_no_filter", |b| {
b.iter(|| {
// Create context for BulkPartRecordBatchIter without predicate
let context = Arc::new(BulkIterContext::new(
metadata.clone(),
&None, // No projection
None, // No predicate
));
let context = Arc::new(
BulkIterContext::new(
metadata.clone(),
None, // No projection
None, // No predicate
false,
)
.unwrap(),
);
// Create and iterate over BulkPartRecordBatchIter
let iter = BulkPartRecordBatchIter::new(record_batch_no_filter.clone(), context, None);

View File

@@ -126,7 +126,9 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable {
}
async fn flush(mem: &SimpleBulkMemtable) {
let MemtableRanges { ranges, .. } = mem.ranges(None, PredicateGroup::default(), None).unwrap();
let MemtableRanges { ranges, .. } = mem
.ranges(None, PredicateGroup::default(), None, true)
.unwrap();
let mut source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();

View File

@@ -118,27 +118,17 @@ impl WriteCache {
// Write to cache first
let cache_start = Instant::now();
let cache_path = self.file_cache.cache_file_path(parquet_key);
let mut cache_writer = self
.file_cache
.local_store()
.writer(&cache_path)
let store = self.file_cache.local_store();
let cleaner = TempFileCleaner::new(region_id, store.clone());
let write_res = store
.write(&cache_path, data.clone())
.await
.context(crate::error::OpenDalSnafu)?;
.context(crate::error::OpenDalSnafu);
if let Err(e) = write_res {
cleaner.clean_by_file_id(file_id).await;
return Err(e);
}
cache_writer
.write(data.clone())
.await
.context(crate::error::OpenDalSnafu)?;
cache_writer
.close()
.await
.context(crate::error::OpenDalSnafu)?;
// Register in file cache
let index_value = IndexValue {
file_size: data.len() as u32,
};
self.file_cache.put(parquet_key, index_value).await;
metrics.write_batch = cache_start.elapsed();
// Upload to remote store

View File

@@ -638,18 +638,16 @@ struct CompactionSstReaderBuilder<'a> {
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
let scan_input = self.build_scan_input(false)?;
let scan_input = self.build_scan_input(false)?.with_compaction(true);
SeqScan::new(scan_input, true)
.build_reader_for_compaction()
.await
SeqScan::new(scan_input).build_reader_for_compaction().await
}
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
let scan_input = self.build_scan_input(true)?;
let scan_input = self.build_scan_input(true)?.with_compaction(true);
SeqScan::new(scan_input, true)
SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await
}

View File

@@ -364,13 +364,22 @@ impl RegionFlushTask {
FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
}
let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
let mut file_ids = Vec::with_capacity(file_metas.len());
let mut total_rows = 0;
let mut total_bytes = 0;
for meta in &file_metas {
file_ids.push(meta.file_id);
total_rows += meta.num_rows;
total_bytes += meta.file_size;
}
info!(
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}, metrics: {:?}",
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
self.region_id,
self.reason.as_str(),
file_ids,
series_count,
total_rows,
total_bytes,
timer.stop_and_record(),
flush_metrics,
);
@@ -447,7 +456,8 @@ impl RegionFlushTask {
let compact_cost = compact_start.elapsed();
flush_metrics.compact_memtable += compact_cost;
let mem_ranges = mem.ranges(None, PredicateGroup::default(), None)?;
// Sets `for_flush` flag to true.
let mem_ranges = mem.ranges(None, PredicateGroup::default(), None, true)?;
let num_mem_ranges = mem_ranges.ranges.len();
let num_mem_rows = mem_ranges.stats.num_rows();
let memtable_id = mem.id();
@@ -558,8 +568,10 @@ impl RegionFlushTask {
write_opts: &WriteOptions,
mem_ranges: MemtableRanges,
) -> Result<FlushFlatMemResult> {
let batch_schema =
to_flat_sst_arrow_schema(&version.metadata, &FlatSchemaOptions::default());
let batch_schema = to_flat_sst_arrow_schema(
&version.metadata,
&FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
);
let flat_sources = memtable_flat_sources(
batch_schema,
mem_ranges,

View File

@@ -194,12 +194,15 @@ pub trait Memtable: Send + Sync + fmt::Debug {
) -> Result<BoxedBatchIterator>;
/// Returns the ranges in the memtable.
///
/// The `for_flush` flag is true if the flush job calls this method for flush.
/// The returned map contains the range id and the range after applying the predicate.
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
for_flush: bool,
) -> Result<MemtableRanges>;
/// Returns true if the memtable is empty.

View File

@@ -333,6 +333,7 @@ impl Memtable for BulkMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
for_flush: bool,
) -> Result<MemtableRanges> {
let mut ranges = BTreeMap::new();
let mut range_id = 0;
@@ -340,9 +341,10 @@ impl Memtable for BulkMemtable {
// TODO(yingwen): Filter ranges by sequence.
let context = Arc::new(BulkIterContext::new(
self.metadata.clone(),
&projection,
projection,
predicate.predicate().cloned(),
));
for_flush,
)?);
// Adds ranges for regular parts and encoded parts
{
@@ -919,9 +921,10 @@ impl MemtableCompactor {
let context = Arc::new(BulkIterContext::new(
metadata.clone(),
&None, // No column projection for merging
None, // No predicate for merging
));
None, // No column projection for merging
None, // No predicate for merging
true,
)?);
// Creates iterators for all parts to merge.
let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
@@ -1189,7 +1192,7 @@ mod tests {
assert_eq!(3000, max_ts.value());
let predicate_group = PredicateGroup::new(&metadata, &[]);
let ranges = memtable.ranges(None, predicate_group, None).unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
@@ -1231,7 +1234,7 @@ mod tests {
let projection = vec![4u32];
let predicate_group = PredicateGroup::new(&metadata, &[]);
let ranges = memtable
.ranges(Some(&projection), predicate_group, None)
.ranges(Some(&projection), predicate_group, None, false)
.unwrap();
assert_eq!(1, ranges.ranges.len());
@@ -1347,7 +1350,7 @@ mod tests {
}
let predicate_group = PredicateGroup::new(&metadata, &[]);
let ranges = memtable.ranges(None, predicate_group, None).unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
@@ -1380,7 +1383,7 @@ mod tests {
let predicate_group = PredicateGroup::new(&metadata, &[]);
let sequence_filter = Some(400u64); // Filters out rows with sequence > 400
let ranges = memtable
.ranges(None, predicate_group, sequence_filter)
.ranges(None, predicate_group, sequence_filter, false)
.unwrap();
assert_eq!(1, ranges.ranges.len());
@@ -1412,7 +1415,7 @@ mod tests {
memtable.compact(false).unwrap();
let predicate_group = PredicateGroup::new(&metadata, &[]);
let ranges = memtable.ranges(None, predicate_group, None).unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
// Should have ranges for both bulk parts and encoded parts
assert_eq!(3, ranges.ranges.len());

View File

@@ -23,6 +23,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::format::ReadFormat;
@@ -39,9 +40,10 @@ pub struct BulkIterContext {
impl BulkIterContext {
pub fn new(
region_metadata: RegionMetadataRef,
projection: &Option<&[ColumnId]>,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Self {
skip_auto_convert: bool,
) -> Result<Self> {
let codec = build_primary_key_codec(&region_metadata);
let simple_filters = predicate
@@ -55,9 +57,16 @@ impl BulkIterContext {
})
.collect();
let read_format = build_read_format(region_metadata, projection, true);
let read_format = ReadFormat::new(
region_metadata,
projection,
true,
None,
"memtable",
skip_auto_convert,
)?;
Self {
Ok(Self {
base: RangeBase {
filters: simple_filters,
read_format,
@@ -66,7 +75,7 @@ impl BulkIterContext {
compat_batch: None,
},
predicate,
}
})
}
/// Prunes row groups by stats.
@@ -96,23 +105,3 @@ impl BulkIterContext {
&self.base.read_format
}
}
fn build_read_format(
region_metadata: RegionMetadataRef,
projection: &Option<&[ColumnId]>,
flat_format: bool,
) -> ReadFormat {
if let Some(column_ids) = &projection {
ReadFormat::new(region_metadata, column_ids.iter().copied(), flat_format)
} else {
// No projection, lists all column ids to read.
ReadFormat::new(
region_metadata.clone(),
region_metadata
.column_metadatas
.iter()
.map(|col| col.column_id),
flat_format,
)
}
}

View File

@@ -1370,11 +1370,15 @@ mod tests {
let projection = &[4u32];
let mut reader = part
.read(
Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&Some(projection.as_slice()),
None,
)),
Arc::new(
BulkIterContext::new(
part.metadata.region_metadata.clone(),
Some(projection.as_slice()),
None,
false,
)
.unwrap(),
),
None,
)
.unwrap()
@@ -1425,11 +1429,15 @@ mod tests {
predicate: Option<Predicate>,
expected_rows: usize,
) {
let context = Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&None,
predicate,
));
let context = Arc::new(
BulkIterContext::new(
part.metadata.region_metadata.clone(),
None,
predicate,
false,
)
.unwrap(),
);
let mut reader = part
.read(context, None)
.unwrap()
@@ -1453,13 +1461,17 @@ mod tests {
("b", 1, (180, 210), 4),
]);
let context = Arc::new(BulkIterContext::new(
part.metadata.region_metadata.clone(),
&None,
Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
)])),
));
let context = Arc::new(
BulkIterContext::new(
part.metadata.region_metadata.clone(),
None,
Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
)])),
false,
)
.unwrap(),
);
assert!(part.read(context, None).unwrap().is_none());
check_prune_row_group(&part, None, 310);

View File

@@ -178,14 +178,20 @@ impl Iterator for BulkPartRecordBatchIter {
}
}
// TODO(yingwen): Supports sparse encoding which doesn't have decoded primary key columns.
/// Applies both predicate filtering and sequence filtering in a single pass.
/// Returns None if the filtered batch is empty.
///
/// # Panics
/// Panics if the format is not flat.
fn apply_combined_filters(
context: &BulkIterContext,
sequence: &Option<Scalar<UInt64Array>>,
record_batch: RecordBatch,
) -> error::Result<Option<RecordBatch>> {
// Converts the format to the flat format first.
let format = context.read_format().as_flat().unwrap();
let record_batch = format.convert_batch(record_batch, None)?;
let num_rows = record_batch.num_rows();
let mut combined_filter = None;
@@ -362,11 +368,15 @@ mod tests {
let region_metadata = builder.build().unwrap();
// Create context
let context = Arc::new(BulkIterContext::new(
Arc::new(region_metadata.clone()),
&None, // No projection
None, // No predicate
));
let context = Arc::new(
BulkIterContext::new(
Arc::new(region_metadata.clone()),
None, // No projection
None, // No predicate
false,
)
.unwrap(),
);
// Iterates all rows.
let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
@@ -385,11 +395,15 @@ mod tests {
);
assert_eq!(6, result[0].num_columns());
let context = Arc::new(BulkIterContext::new(
Arc::new(region_metadata),
&Some(&[0, 2]),
Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
));
let context = Arc::new(
BulkIterContext::new(
Arc::new(region_metadata),
Some(&[0, 2]),
Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
false,
)
.unwrap(),
);
// Creates iter with projection and predicate.
let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();

View File

@@ -191,6 +191,7 @@ impl Memtable for PartitionTreeMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
_for_flush: bool,
) -> Result<MemtableRanges> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {

View File

@@ -235,6 +235,7 @@ impl Memtable for SimpleBulkMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
_for_flush: bool,
) -> error::Result<MemtableRanges> {
let start_time = Instant::now();
let projection = Arc::new(self.build_projection(projection));
@@ -612,7 +613,7 @@ mod tests {
memtable.write_one(kv).unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None)
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
@@ -646,7 +647,7 @@ mod tests {
memtable.freeze().unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None)
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
@@ -689,7 +690,7 @@ mod tests {
memtable.freeze().unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None)
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
assert_eq!(ranges.ranges.len(), 1);
let range = ranges.ranges.into_values().next().unwrap();
@@ -903,7 +904,7 @@ mod tests {
})
.unwrap();
let MemtableRanges { ranges, .. } = memtable
.ranges(None, PredicateGroup::default(), None)
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
let mut source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();

View File

@@ -300,6 +300,7 @@ impl Memtable for TimeSeriesMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
_for_flush: bool,
) -> Result<MemtableRanges> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()

View File

@@ -206,11 +206,14 @@ impl FlatCompatBatch {
mapper: &FlatProjectionMapper,
actual: &RegionMetadataRef,
format_projection: &FormatProjection,
) -> Result<Self> {
) -> Result<Option<Self>> {
let actual_schema = flat_projected_columns(actual, format_projection);
let expect_schema = mapper.batch_schema();
// has_same_columns_and_pk_encoding() already checks columns and encodings.
debug_assert_ne!(expect_schema, actual_schema);
if expect_schema == actual_schema {
// Although the SST has a different schema, but the schema after projection is the same
// as expected schema.
return Ok(None);
}
// Maps column id to the index and data type in the actual schema.
let actual_schema_index: HashMap<_, _> = actual_schema
@@ -275,11 +278,11 @@ impl FlatCompatBatch {
let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
Ok(Self {
Ok(Some(Self {
index_or_defaults,
arrow_schema: Arc::new(Schema::new(fields)),
compat_pk,
})
}))
}
/// Make columns of the `batch` compatible.
@@ -1443,12 +1446,19 @@ mod tests {
));
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format =
FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[0, 1, 2, 3].into_iter(),
None,
"test",
false,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
let compat_batch = FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection)
.unwrap()
.unwrap();
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
tag_builder.append_value("tag1");
@@ -1527,12 +1537,19 @@ mod tests {
let expected_metadata = Arc::new(expected_metadata);
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format =
FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[0, 1, 2, 3].into_iter(),
None,
"test",
false,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
let compat_batch = FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection)
.unwrap()
.unwrap();
// Tag array.
let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();

View File

@@ -29,9 +29,11 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{InvalidRequestSnafu, Result};
use crate::sst::internal_fields;
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema,
};
/// Handles projection and converts batches in flat format with correct schema.
///
@@ -70,6 +72,7 @@ impl FlatProjectionMapper {
projection: impl Iterator<Item = usize>,
) -> Result<Self> {
let mut projection: Vec<_> = projection.collect();
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
if is_empty_projection {
@@ -197,8 +200,19 @@ impl FlatProjectionMapper {
/// Returns the input arrow schema from sources.
///
/// The merge reader can use this schema.
pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
self.input_arrow_schema.clone()
pub(crate) fn input_arrow_schema(
&self,
compaction: bool,
) -> datatypes::arrow::datatypes::SchemaRef {
if !compaction {
self.input_arrow_schema.clone()
} else {
// For compaction, we need to build a different schema from encoding.
to_flat_sst_arrow_schema(
&self.metadata,
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
}
}
/// Returns the schema of converted [RecordBatch].
@@ -296,21 +310,17 @@ fn compute_input_arrow_schema(
let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
for (column_id, _) in batch_schema {
let column_metadata = metadata.column_by_id(*column_id).unwrap();
let field = Arc::new(Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
));
let field = if column_metadata.semantic_type == SemanticType::Tag {
Field::new_dictionary(
&column_metadata.column_schema.name,
datatypes::arrow::datatypes::DataType::UInt32,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
)
tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
} else {
Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
)
field
};
new_fields.push(Arc::new(field));
new_fields.push(field);
}
new_fields.extend_from_slice(&internal_fields());

View File

@@ -91,8 +91,8 @@ impl RangeMeta {
}
/// Creates a list of ranges from the `input` for seq scan.
/// If `compaction` is true, it doesn't split the ranges.
pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
/// If `input.compaction` is true, it doesn't split the ranges.
pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
@@ -101,7 +101,7 @@ impl RangeMeta {
Self::push_extension_ranges(input, &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
if input.compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
// We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
return ranges;
}

View File

@@ -345,8 +345,8 @@ impl ScanRegion {
/// Scan sequentially.
pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input().await?;
Ok(SeqScan::new(input, false))
let input = self.scan_input().await?.with_compaction(false);
Ok(SeqScan::new(input))
}
/// Unordered scan.
@@ -437,6 +437,7 @@ impl ScanRegion {
Some(mapper.column_ids()),
predicate.clone(),
self.request.sequence,
false,
)?;
mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
// todo: we should add stats to MemtableRange
@@ -692,6 +693,8 @@ pub struct ScanInput {
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Whether to use flat format.
pub(crate) flat_format: bool,
/// Whether this scan is for compaction.
pub(crate) compaction: bool,
#[cfg(feature = "enterprise")]
extension_ranges: Vec<BoxedExtensionRange>,
}
@@ -721,6 +724,7 @@ impl ScanInput {
series_row_selector: None,
distribution: None,
flat_format: false,
compaction: false,
#[cfg(feature = "enterprise")]
extension_ranges: Vec::new(),
}
@@ -872,6 +876,13 @@ impl ScanInput {
self
}
/// Sets whether this scan is for compaction.
#[must_use]
pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
self.compaction = compaction;
self
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
@@ -922,6 +933,7 @@ impl ScanInput {
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.compaction(self.compaction)
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, selection) = match res {
@@ -945,11 +957,12 @@ impl ScanInput {
// mapper can convert it.
let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
let mapper = self.mapper.as_flat().unwrap();
Some(CompatBatch::Flat(FlatCompatBatch::try_new(
FlatCompatBatch::try_new(
mapper,
flat_format.metadata(),
flat_format.format_projection(),
)?))
)?
.map(CompatBatch::Flat)
} else {
let compact_batch = PrimaryKeyCompatBatch::new(
&self.mapper,
@@ -1127,9 +1140,9 @@ pub struct StreamContext {
impl StreamContext {
/// Creates a new [StreamContext] for [SeqScan].
pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
let ranges = RangeMeta::seq_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
Self {

View File

@@ -63,27 +63,24 @@ pub struct SeqScan {
properties: ScannerProperties,
/// Context of streams.
stream_ctx: Arc<StreamContext>,
/// The scanner is used for compaction.
compaction: bool,
/// Metrics for each partition.
/// The scanner only sets in query and keeps it empty during compaction.
metrics_list: PartitionMetricsList,
}
impl SeqScan {
/// Creates a new [SeqScan] with the given input and compaction flag.
/// If `compaction` is true, the scanner will not attempt to split ranges.
pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
/// Creates a new [SeqScan] with the given input.
/// If `input.compaction` is true, the scanner will not attempt to split ranges.
pub(crate) fn new(input: ScanInput) -> Self {
let mut properties = ScannerProperties::default()
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {
properties,
stream_ctx,
compaction,
metrics_list: PartitionMetricsList::default(),
}
}
@@ -123,7 +120,7 @@ impl SeqScan {
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
assert!(self.compaction);
assert!(self.stream_ctx.input.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();
let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
@@ -144,7 +141,7 @@ impl SeqScan {
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
assert!(self.compaction);
assert!(self.stream_ctx.input.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();
let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
@@ -288,7 +285,7 @@ impl SeqScan {
}
let mapper = stream_ctx.input.mapper.as_flat().unwrap();
let schema = mapper.input_arrow_schema();
let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
@@ -379,7 +376,7 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let compaction = self.stream_ctx.input.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let stream = try_stream! {
@@ -477,7 +474,7 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let compaction = self.stream_ctx.input.compaction;
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -556,13 +553,13 @@ impl SeqScan {
let metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
get_scanner_type(self.compaction),
get_scanner_type(self.stream_ctx.input.compaction),
self.stream_ctx.query_start,
explain_verbose,
metrics_set,
);
if !self.compaction {
if !self.stream_ctx.input.compaction {
self.metrics_list.set(partition, metrics.clone());
}

View File

@@ -80,7 +80,7 @@ impl SeriesScan {
let mut properties = ScannerProperties::default()
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {

View File

@@ -79,9 +79,10 @@ impl ConvertBatchStream {
}
}
ScanBatch::Series(series) => {
self.buffer.clear();
match series {
SeriesBatch::PrimaryKey(primary_key_batch) => {
self.buffer.clear();
self.buffer.reserve(primary_key_batch.batches.len());
// Safety: Only primary key format returns this batch.
let mapper = self.projection_mapper.as_primary_key().unwrap();
@@ -90,28 +91,25 @@ impl ConvertBatchStream {
let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
self.buffer.push(record_batch.into_df_record_batch());
}
let output_schema = mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;
RecordBatch::try_from_df_record_batch(output_schema, record_batch)
}
SeriesBatch::Flat(flat_batch) => {
self.buffer.reserve(flat_batch.batches.len());
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
let output_schema = mapper.output_schema();
let record_batch = compute::concat_batches(
output_schema.arrow_schema(),
&flat_batch.batches,
)
.context(ArrowComputeSnafu)?;
mapper.convert(&record_batch)
for batch in flat_batch.batches {
let record_batch = mapper.convert(&batch)?;
self.buffer.push(record_batch.into_df_record_batch());
}
}
}
let output_schema = self.projection_mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;
RecordBatch::try_from_df_record_batch(output_schema, record_batch)
}
ScanBatch::RecordBatch(df_record_batch) => {
// Safety: Only flat format returns this batch.

View File

@@ -112,11 +112,7 @@ pub fn to_flat_sst_arrow_schema(
metadata: &RegionMetadata,
options: &FlatSchemaOptions,
) -> SchemaRef {
let num_fields = if options.raw_pk_columns {
metadata.column_metadatas.len() + 3
} else {
metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
};
let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
let mut fields = Vec::with_capacity(num_fields);
let schema = metadata.schema.arrow_schema();
if options.raw_pk_columns {
@@ -152,6 +148,18 @@ pub fn to_flat_sst_arrow_schema(
Arc::new(Schema::new(fields))
}
/// Returns the number of columns in the flat format.
pub fn flat_sst_arrow_schema_column_num(
metadata: &RegionMetadata,
options: &FlatSchemaOptions,
) -> usize {
if options.raw_pk_columns {
metadata.column_metadatas.len() + 3
} else {
metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
}
}
/// Helper function to create a dictionary field from a field.
fn to_dictionary_field(field: &Field) -> Field {
Field::new_dictionary(

View File

@@ -43,6 +43,7 @@ use datatypes::prelude::{ConcreteDataType, DataType};
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
use parquet::file::metadata::RowGroupMetaData;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
@@ -51,14 +52,16 @@ use crate::error::{
NewRecordBatchSnafu, Result,
};
use crate::sst::parquet::format::{
FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, ReadFormat, StatValues,
FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
StatValues,
};
use crate::sst::{
FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
to_flat_sst_arrow_schema,
};
use crate::sst::{FlatSchemaOptions, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema};
/// Helper for writing the SST format.
#[allow(dead_code)]
pub(crate) struct FlatWriteFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
override_sequence: Option<SequenceNumber>,
@@ -66,18 +69,15 @@ pub(crate) struct FlatWriteFormat {
impl FlatWriteFormat {
/// Creates a new helper.
#[allow(dead_code)]
pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
FlatWriteFormat {
metadata,
arrow_schema,
override_sequence: None,
}
}
/// Set override sequence.
#[allow(dead_code)]
pub(crate) fn with_override_sequence(
mut self,
override_sequence: Option<SequenceNumber>,
@@ -87,13 +87,11 @@ impl FlatWriteFormat {
}
/// Gets the arrow schema to store in parquet.
#[allow(dead_code)]
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}
/// Convert `batch` to a arrow record batch to store in parquet.
#[allow(dead_code)]
pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
@@ -134,18 +132,10 @@ pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
///
/// It only supports flat format that stores primary keys additionally.
pub struct FlatReadFormat {
/// The metadata stored in the SST.
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
/// Projection computed for the format.
format_projection: FormatProjection,
/// Column id to index in SST.
column_id_to_sst_index: HashMap<ColumnId, usize>,
/// Sequence number to override the sequence read from the SST.
override_sequence: Option<SequenceNumber>,
/// Optional format converter for handling flat format conversion.
convert_format: Option<FlatConvertFormat>,
/// Parquet format adapter.
parquet_adapter: ParquetAdapter,
}
impl FlatReadFormat {
@@ -153,45 +143,40 @@ impl FlatReadFormat {
pub fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
convert_to_flat: bool,
) -> FlatReadFormat {
let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(&metadata);
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
arrow_schema.fields.len(),
column_ids,
);
let convert_format = if convert_to_flat {
let codec = build_primary_key_codec(&metadata);
FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
} else {
None
num_columns: Option<usize>,
file_path: &str,
skip_auto_convert: bool,
) -> Result<FlatReadFormat> {
let is_legacy = match num_columns {
Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
};
FlatReadFormat {
metadata,
arrow_schema,
format_projection,
column_id_to_sst_index: id_to_index,
let parquet_adapter = if is_legacy {
// Safety: is_legacy_format() ensures primary_key is not empty.
ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
metadata,
column_ids,
skip_auto_convert,
))
} else {
ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
};
Ok(FlatReadFormat {
override_sequence: None,
convert_format,
}
parquet_adapter,
})
}
/// Sets the sequence number to override.
#[allow(dead_code)]
pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
self.override_sequence = sequence;
}
/// Index of a column in the projected batch by its column id.
pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
self.format_projection
self.format_projection()
.column_id_to_projected_index
.get(&column_id)
.copied()
@@ -203,7 +188,10 @@ impl FlatReadFormat {
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
self.get_stat_values(row_groups, column_id, true)
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
}
}
/// Returns max values of specific column in row groups.
@@ -212,7 +200,10 @@ impl FlatReadFormat {
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
self.get_stat_values(row_groups, column_id, false)
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
}
}
/// Returns null counts of specific column in row groups.
@@ -221,13 +212,10 @@ impl FlatReadFormat {
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
let stats = ReadFormat::column_null_counts(row_groups, *index);
StatValues::from_stats_opt(stats)
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
}
}
/// Gets the arrow schema of the SST file.
@@ -235,22 +223,34 @@ impl FlatReadFormat {
/// This schema is computed from the region metadata but should be the same
/// as the arrow schema decoded from the file metadata.
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => &p.arrow_schema,
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
}
}
/// Gets the metadata of the SST.
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
&self.metadata
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => &p.metadata,
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
}
}
/// Gets sorted projection indices to read.
/// Gets sorted projection indices to read from the SST file.
pub(crate) fn projection_indices(&self) -> &[usize] {
&self.format_projection.projection_indices
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
}
}
/// Gets the projection.
/// Gets the projection in the flat format.
pub(crate) fn format_projection(&self) -> &FormatProjection {
&self.format_projection
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => &p.format_projection,
ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
}
}
/// Creates a sequence array to override.
@@ -263,17 +263,15 @@ impl FlatReadFormat {
///
/// Returns a new RecordBatch with flat format conversion applied first (if enabled),
/// then the sequence column replaced by the override sequence array.
#[allow(dead_code)]
pub(crate) fn convert_batch(
&self,
record_batch: RecordBatch,
override_sequence_array: Option<&ArrayRef>,
) -> Result<RecordBatch> {
// First, apply flat format conversion if enabled
let batch = if let Some(ref convert_format) = self.convert_format {
convert_format.convert(record_batch)?
} else {
record_batch
// First, apply flat format conversion.
let batch = match &self.parquet_adapter {
ParquetAdapter::Flat(_) => record_batch,
ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
};
// Then apply sequence override if provided
@@ -298,15 +296,18 @@ impl FlatReadFormat {
/// Checks whether the batch from the parquet file needs to be converted to match the flat format.
///
/// * `file_path` is the path to the parquet file, for error message.
/// * `num_columns` is the number of columns in the parquet file.
/// * `metadata` is the region metadata (always assumes flat format).
#[allow(dead_code)]
pub(crate) fn need_convert_to_flat(
file_path: &str,
num_columns: usize,
/// * `num_columns` is the number of columns in the parquet file.
/// * `file_path` is the path to the parquet file, for error message.
pub(crate) fn is_legacy_format(
metadata: &RegionMetadata,
num_columns: usize,
file_path: &str,
) -> Result<bool> {
if metadata.primary_key.is_empty() {
return Ok(false);
}
// For flat format, compute expected column number:
// all columns + internal columns (pk, sequence, op_type)
let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
@@ -344,6 +345,131 @@ impl FlatReadFormat {
Ok(true)
}
}
}
/// Wraps the parquet helper for different formats.
enum ParquetAdapter {
Flat(ParquetFlat),
PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
}
/// Helper to reads the parquet from primary key format into the flat format.
struct ParquetPrimaryKeyToFlat {
/// The primary key format to read the parquet.
format: PrimaryKeyReadFormat,
/// Format converter for handling flat format conversion.
convert_format: Option<FlatConvertFormat>,
/// Projection computed for the flat format.
format_projection: FormatProjection,
}
impl ParquetPrimaryKeyToFlat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
skip_auto_convert: bool,
) -> ParquetPrimaryKeyToFlat {
let column_ids: Vec<_> = column_ids.collect();
// Creates a map to lookup index based on the new format.
let id_to_index = sst_column_id_indices(&metadata);
let sst_column_num =
flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
// Computes the format projection for the new format.
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
sst_column_num,
column_ids.iter().copied(),
);
let codec = build_primary_key_codec(&metadata);
let convert_format = if skip_auto_convert {
None
} else {
FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
};
let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
Self {
format,
convert_format,
format_projection,
}
}
fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
if let Some(convert_format) = &self.convert_format {
convert_format.convert(record_batch)
} else {
Ok(record_batch)
}
}
}
/// Helper to reads the parquet in flat format directly.
struct ParquetFlat {
/// The metadata stored in the SST.
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
/// Projection computed for the flat format.
format_projection: FormatProjection,
/// Column id to index in SST.
column_id_to_sst_index: HashMap<ColumnId, usize>,
}
impl ParquetFlat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(&metadata);
let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let sst_column_num =
flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
let format_projection =
FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
Self {
metadata,
arrow_schema,
format_projection,
column_id_to_sst_index: id_to_index,
}
}
/// Returns min values of specific column in row groups.
fn min_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
self.get_stat_values(row_groups, column_id, true)
}
/// Returns max values of specific column in row groups.
fn max_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
self.get_stat_values(row_groups, column_id, false)
}
/// Returns null counts of specific column in row groups.
fn null_counts(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
let stats = ReadFormat::column_null_counts(row_groups, *index);
StatValues::from_stats_opt(stats)
}
fn get_stat_values(
&self,
@@ -567,7 +693,10 @@ impl FlatReadFormat {
Self::new(
Arc::clone(&metadata),
metadata.column_metadatas.iter().map(|c| c.column_id),
None,
"test",
false,
)
.unwrap()
}
}

View File

@@ -141,23 +141,13 @@ impl PrimaryKeyWriteFormat {
/// Helper to read parquet formats.
pub enum ReadFormat {
/// The parquet is in the old primary key format.
PrimaryKey(PrimaryKeyReadFormat),
/// The parquet is in the new flat format.
Flat(FlatReadFormat),
}
impl ReadFormat {
pub(crate) fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
flat_format: bool,
) -> Self {
if flat_format {
Self::new_flat(metadata, column_ids, false)
} else {
Self::new_primary_key(metadata, column_ids)
}
}
/// Creates a helper to read the primary key format.
pub fn new_primary_key(
metadata: RegionMetadataRef,
@@ -170,9 +160,65 @@ impl ReadFormat {
pub fn new_flat(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
convert_to_flat: bool,
) -> Self {
ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids, convert_to_flat))
num_columns: Option<usize>,
file_path: &str,
skip_auto_convert: bool,
) -> Result<Self> {
Ok(ReadFormat::Flat(FlatReadFormat::new(
metadata,
column_ids,
num_columns,
file_path,
skip_auto_convert,
)?))
}
/// Creates a new read format.
pub fn new(
region_metadata: RegionMetadataRef,
projection: Option<&[ColumnId]>,
flat_format: bool,
num_columns: Option<usize>,
file_path: &str,
skip_auto_convert: bool,
) -> Result<ReadFormat> {
if flat_format {
if let Some(column_ids) = projection {
ReadFormat::new_flat(
region_metadata,
column_ids.iter().copied(),
num_columns,
file_path,
skip_auto_convert,
)
} else {
// No projection, lists all column ids to read.
ReadFormat::new_flat(
region_metadata.clone(),
region_metadata
.column_metadatas
.iter()
.map(|col| col.column_id),
num_columns,
file_path,
skip_auto_convert,
)
}
} else if let Some(column_ids) = projection {
Ok(ReadFormat::new_primary_key(
region_metadata,
column_ids.iter().copied(),
))
} else {
// No projection, lists all column ids to read.
Ok(ReadFormat::new_primary_key(
region_metadata.clone(),
region_metadata
.column_metadatas
.iter()
.map(|col| col.column_id),
))
}
}
pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
@@ -1238,7 +1284,8 @@ mod tests {
.iter()
.map(|col| col.column_id)
.collect();
let read_format = ReadFormat::new(metadata, column_ids.iter().copied(), false);
let read_format =
ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
@@ -1366,19 +1413,26 @@ mod tests {
// The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
// Only read tag1 (column_id=3, index=1) + fixed columns
let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), false);
let read_format =
ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
.unwrap();
assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
// Only read field1 (column_id=4, index=2) + fixed columns
let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), false);
let read_format =
ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
.unwrap();
assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
// Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), false);
let read_format =
ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
.unwrap();
assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
// Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), false);
let read_format =
ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
}
@@ -1388,8 +1442,11 @@ mod tests {
let mut format = FlatReadFormat::new(
metadata,
std::iter::once(1), // Just read tag0
Some(8),
"test",
false,
);
)
.unwrap();
let num_rows = 4;
let original_sequence = 100u64;
@@ -1438,8 +1495,7 @@ mod tests {
// For flat format: all columns (5) + internal columns (3)
let expected_columns = metadata.column_metadatas.len() + 3;
let result =
FlatReadFormat::need_convert_to_flat("test.parquet", expected_columns, &metadata)
.unwrap();
FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
assert!(
!result,
"Should not need conversion when column counts match"
@@ -1449,7 +1505,7 @@ mod tests {
// Missing primary key columns (2 primary keys in test metadata)
let num_columns_without_pk = expected_columns - metadata.primary_key.len();
let result =
FlatReadFormat::need_convert_to_flat("test.parquet", num_columns_without_pk, &metadata)
FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
.unwrap();
assert!(
result,
@@ -1458,15 +1514,14 @@ mod tests {
// Test case 3: Invalid case - actual columns more than expected
let too_many_columns = expected_columns + 1;
let err = FlatReadFormat::need_convert_to_flat("test.parquet", too_many_columns, &metadata)
let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
.unwrap_err();
assert!(err.to_string().contains("Expected columns"), "{err:?}");
// Test case 4: Invalid case - column difference doesn't match primary key count
let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
let err =
FlatReadFormat::need_convert_to_flat("test.parquet", wrong_diff_columns, &metadata)
.unwrap_err();
let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
.unwrap_err();
assert!(
err.to_string().contains("Column number difference"),
"{err:?}"
@@ -1601,7 +1656,14 @@ mod tests {
.iter()
.map(|c| c.column_id)
.collect();
let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
let format = FlatReadFormat::new(
metadata.clone(),
column_ids.into_iter(),
Some(6),
"test",
false,
)
.unwrap();
let num_rows = 4;
let original_sequence = 100u64;
@@ -1676,7 +1738,14 @@ mod tests {
.iter()
.map(|c| c.column_id)
.collect();
let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
let format = FlatReadFormat::new(
metadata.clone(),
column_ids.into_iter(),
None,
"test",
false,
)
.unwrap();
let num_rows = 4;
let original_sequence = 100u64;

View File

@@ -115,6 +115,8 @@ pub struct ParquetReaderBuilder {
expected_metadata: Option<RegionMetadataRef>,
/// Whether to use flat format for reading.
flat_format: bool,
/// Whether this reader is for compaction.
compaction: bool,
}
impl ParquetReaderBuilder {
@@ -138,6 +140,7 @@ impl ParquetReaderBuilder {
fulltext_index_applier: None,
expected_metadata: None,
flat_format: false,
compaction: false,
}
}
@@ -208,6 +211,13 @@ impl ParquetReaderBuilder {
self
}
/// Sets the compaction flag.
#[must_use]
pub fn compaction(mut self, compaction: bool) -> Self {
self.compaction = compaction;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -239,20 +249,28 @@ impl ParquetReaderBuilder {
let mut read_format = if let Some(column_ids) = &self.projection {
ReadFormat::new(
region_meta.clone(),
column_ids.iter().copied(),
Some(column_ids),
self.flat_format,
)
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
self.compaction,
)?
} else {
// Lists all column ids to read, we always use the expected metadata if possible.
let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
let column_ids: Vec<_> = expected_meta
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect();
ReadFormat::new(
region_meta.clone(),
expected_meta
.column_metadatas
.iter()
.map(|col| col.column_id),
Some(&column_ids),
self.flat_format,
)
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
self.compaction,
)?
};
if need_override_sequence(&parquet_meta) {
read_format
@@ -1379,17 +1397,10 @@ impl FlatRowGroupReader {
let record_batch = batch_result.context(ArrowReaderSnafu {
path: self.context.file_path(),
})?;
// Apply override sequence if needed
if let (Some(flat_format), Some(override_array)) = (
self.context.read_format().as_flat(),
&self.override_sequence,
) {
let converted =
flat_format.convert_batch(record_batch, Some(override_array))?;
return Ok(Some(converted));
}
// Safety: Only flat format use FlatRowGroupReader.
let flat_format = self.context.read_format().as_flat().unwrap();
let record_batch =
flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
Ok(Some(record_batch))
}
None => Ok(None),

View File

@@ -99,6 +99,7 @@ impl Memtable for EmptyMemtable {
_projection: Option<&[ColumnId]>,
_predicate: PredicateGroup,
_sequence: Option<SequenceNumber>,
_for_flush: bool,
) -> Result<MemtableRanges> {
Ok(MemtableRanges::default())
}