diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 02ad4e48bb..c46a6aeedc 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -455,11 +455,15 @@ fn flat_merge_iterator_bench(c: &mut Criterion) { } // Pre-create BulkIterContext - let context = Arc::new(BulkIterContext::new( - metadata.clone(), - &None, // No projection - None, // No predicate - )); + let context = Arc::new( + BulkIterContext::new( + metadata.clone(), + None, // No projection + None, // No predicate + false, + ) + .unwrap(), + ); group.bench_with_input( format!("{}_parts_1024_hosts", num_parts), @@ -519,11 +523,15 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) { group.bench_function("4096_rows_with_hostname_filter", |b| { b.iter(|| { // Create context for BulkPartRecordBatchIter with predicate - let context = Arc::new(BulkIterContext::new( - metadata.clone(), - &None, // No projection - Some(predicate.clone()), // With hostname filter - )); + let context = Arc::new( + BulkIterContext::new( + metadata.clone(), + None, // No projection + Some(predicate.clone()), // With hostname filter + false, + ) + .unwrap(), + ); // Create and iterate over BulkPartRecordBatchIter with filter let iter = @@ -540,11 +548,15 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) { group.bench_function("4096_rows_no_filter", |b| { b.iter(|| { // Create context for BulkPartRecordBatchIter without predicate - let context = Arc::new(BulkIterContext::new( - metadata.clone(), - &None, // No projection - None, // No predicate - )); + let context = Arc::new( + BulkIterContext::new( + metadata.clone(), + None, // No projection + None, // No predicate + false, + ) + .unwrap(), + ); // Create and iterate over BulkPartRecordBatchIter let iter = BulkPartRecordBatchIter::new(record_batch_no_filter.clone(), context, None); diff --git a/src/mito2/benches/simple_bulk_memtable.rs b/src/mito2/benches/simple_bulk_memtable.rs index ce8da2b18c..9aa55893a1 100644 --- a/src/mito2/benches/simple_bulk_memtable.rs +++ b/src/mito2/benches/simple_bulk_memtable.rs @@ -126,7 +126,9 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable { } async fn flush(mem: &SimpleBulkMemtable) { - let MemtableRanges { ranges, .. } = mem.ranges(None, PredicateGroup::default(), None).unwrap(); + let MemtableRanges { ranges, .. } = mem + .ranges(None, PredicateGroup::default(), None, true) + .unwrap(); let mut source = if ranges.len() == 1 { let only_range = ranges.into_values().next().unwrap(); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 498239bc7f..3a4c9b1ece 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -118,27 +118,17 @@ impl WriteCache { // Write to cache first let cache_start = Instant::now(); let cache_path = self.file_cache.cache_file_path(parquet_key); - let mut cache_writer = self - .file_cache - .local_store() - .writer(&cache_path) + let store = self.file_cache.local_store(); + let cleaner = TempFileCleaner::new(region_id, store.clone()); + let write_res = store + .write(&cache_path, data.clone()) .await - .context(crate::error::OpenDalSnafu)?; + .context(crate::error::OpenDalSnafu); + if let Err(e) = write_res { + cleaner.clean_by_file_id(file_id).await; + return Err(e); + } - cache_writer - .write(data.clone()) - .await - .context(crate::error::OpenDalSnafu)?; - cache_writer - .close() - .await - .context(crate::error::OpenDalSnafu)?; - - // Register in file cache - let index_value = IndexValue { - file_size: data.len() as u32, - }; - self.file_cache.put(parquet_key, index_value).await; metrics.write_batch = cache_start.elapsed(); // Upload to remote store diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 45b568fc4e..da9e6b4582 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -638,18 +638,16 @@ struct CompactionSstReaderBuilder<'a> { impl CompactionSstReaderBuilder<'_> { /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. async fn build_sst_reader(self) -> Result { - let scan_input = self.build_scan_input(false)?; + let scan_input = self.build_scan_input(false)?.with_compaction(true); - SeqScan::new(scan_input, true) - .build_reader_for_compaction() - .await + SeqScan::new(scan_input).build_reader_for_compaction().await } /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction. async fn build_flat_sst_reader(self) -> Result { - let scan_input = self.build_scan_input(true)?; + let scan_input = self.build_scan_input(true)?.with_compaction(true); - SeqScan::new(scan_input, true) + SeqScan::new(scan_input) .build_flat_reader_for_compaction() .await } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 27c04851c8..a928706fcd 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -364,13 +364,22 @@ impl RegionFlushTask { FLUSH_BYTES_TOTAL.inc_by(flushed_bytes); } - let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect(); + let mut file_ids = Vec::with_capacity(file_metas.len()); + let mut total_rows = 0; + let mut total_bytes = 0; + for meta in &file_metas { + file_ids.push(meta.file_id); + total_rows += meta.num_rows; + total_bytes += meta.file_size; + } info!( - "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}, metrics: {:?}", + "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}", self.region_id, self.reason.as_str(), file_ids, series_count, + total_rows, + total_bytes, timer.stop_and_record(), flush_metrics, ); @@ -447,7 +456,8 @@ impl RegionFlushTask { let compact_cost = compact_start.elapsed(); flush_metrics.compact_memtable += compact_cost; - let mem_ranges = mem.ranges(None, PredicateGroup::default(), None)?; + // Sets `for_flush` flag to true. + let mem_ranges = mem.ranges(None, PredicateGroup::default(), None, true)?; let num_mem_ranges = mem_ranges.ranges.len(); let num_mem_rows = mem_ranges.stats.num_rows(); let memtable_id = mem.id(); @@ -558,8 +568,10 @@ impl RegionFlushTask { write_opts: &WriteOptions, mem_ranges: MemtableRanges, ) -> Result { - let batch_schema = - to_flat_sst_arrow_schema(&version.metadata, &FlatSchemaOptions::default()); + let batch_schema = to_flat_sst_arrow_schema( + &version.metadata, + &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding), + ); let flat_sources = memtable_flat_sources( batch_schema, mem_ranges, diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 77b7d54f99..2bcfc83545 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -194,12 +194,15 @@ pub trait Memtable: Send + Sync + fmt::Debug { ) -> Result; /// Returns the ranges in the memtable. + /// + /// The `for_flush` flag is true if the flush job calls this method for flush. /// The returned map contains the range id and the range after applying the predicate. fn ranges( &self, projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, + for_flush: bool, ) -> Result; /// Returns true if the memtable is empty. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index e7a7e0d1c3..31896751de 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -333,6 +333,7 @@ impl Memtable for BulkMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, + for_flush: bool, ) -> Result { let mut ranges = BTreeMap::new(); let mut range_id = 0; @@ -340,9 +341,10 @@ impl Memtable for BulkMemtable { // TODO(yingwen): Filter ranges by sequence. let context = Arc::new(BulkIterContext::new( self.metadata.clone(), - &projection, + projection, predicate.predicate().cloned(), - )); + for_flush, + )?); // Adds ranges for regular parts and encoded parts { @@ -919,9 +921,10 @@ impl MemtableCompactor { let context = Arc::new(BulkIterContext::new( metadata.clone(), - &None, // No column projection for merging - None, // No predicate for merging - )); + None, // No column projection for merging + None, // No predicate for merging + true, + )?); // Creates iterators for all parts to merge. let iterators: Vec = parts_to_merge @@ -1189,7 +1192,7 @@ mod tests { assert_eq!(3000, max_ts.value()); let predicate_group = PredicateGroup::new(&metadata, &[]); - let ranges = memtable.ranges(None, predicate_group, None).unwrap(); + let ranges = memtable.ranges(None, predicate_group, None, false).unwrap(); assert_eq!(3, ranges.ranges.len()); assert_eq!(5, ranges.stats.num_rows); @@ -1231,7 +1234,7 @@ mod tests { let projection = vec![4u32]; let predicate_group = PredicateGroup::new(&metadata, &[]); let ranges = memtable - .ranges(Some(&projection), predicate_group, None) + .ranges(Some(&projection), predicate_group, None, false) .unwrap(); assert_eq!(1, ranges.ranges.len()); @@ -1347,7 +1350,7 @@ mod tests { } let predicate_group = PredicateGroup::new(&metadata, &[]); - let ranges = memtable.ranges(None, predicate_group, None).unwrap(); + let ranges = memtable.ranges(None, predicate_group, None, false).unwrap(); assert_eq!(3, ranges.ranges.len()); assert_eq!(5, ranges.stats.num_rows); @@ -1380,7 +1383,7 @@ mod tests { let predicate_group = PredicateGroup::new(&metadata, &[]); let sequence_filter = Some(400u64); // Filters out rows with sequence > 400 let ranges = memtable - .ranges(None, predicate_group, sequence_filter) + .ranges(None, predicate_group, sequence_filter, false) .unwrap(); assert_eq!(1, ranges.ranges.len()); @@ -1412,7 +1415,7 @@ mod tests { memtable.compact(false).unwrap(); let predicate_group = PredicateGroup::new(&metadata, &[]); - let ranges = memtable.ranges(None, predicate_group, None).unwrap(); + let ranges = memtable.ranges(None, predicate_group, None, false).unwrap(); // Should have ranges for both bulk parts and encoded parts assert_eq!(3, ranges.ranges.len()); diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 6b2da7d645..55d064e3b9 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -23,6 +23,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; +use crate::error::Result; use crate::sst::parquet::file_range::RangeBase; use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::parquet::format::ReadFormat; @@ -39,9 +40,10 @@ pub struct BulkIterContext { impl BulkIterContext { pub fn new( region_metadata: RegionMetadataRef, - projection: &Option<&[ColumnId]>, + projection: Option<&[ColumnId]>, predicate: Option, - ) -> Self { + skip_auto_convert: bool, + ) -> Result { let codec = build_primary_key_codec(®ion_metadata); let simple_filters = predicate @@ -55,9 +57,16 @@ impl BulkIterContext { }) .collect(); - let read_format = build_read_format(region_metadata, projection, true); + let read_format = ReadFormat::new( + region_metadata, + projection, + true, + None, + "memtable", + skip_auto_convert, + )?; - Self { + Ok(Self { base: RangeBase { filters: simple_filters, read_format, @@ -66,7 +75,7 @@ impl BulkIterContext { compat_batch: None, }, predicate, - } + }) } /// Prunes row groups by stats. @@ -96,23 +105,3 @@ impl BulkIterContext { &self.base.read_format } } - -fn build_read_format( - region_metadata: RegionMetadataRef, - projection: &Option<&[ColumnId]>, - flat_format: bool, -) -> ReadFormat { - if let Some(column_ids) = &projection { - ReadFormat::new(region_metadata, column_ids.iter().copied(), flat_format) - } else { - // No projection, lists all column ids to read. - ReadFormat::new( - region_metadata.clone(), - region_metadata - .column_metadatas - .iter() - .map(|col| col.column_id), - flat_format, - ) - } -} diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index c3ac1e3245..f2402406a4 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -1370,11 +1370,15 @@ mod tests { let projection = &[4u32]; let mut reader = part .read( - Arc::new(BulkIterContext::new( - part.metadata.region_metadata.clone(), - &Some(projection.as_slice()), - None, - )), + Arc::new( + BulkIterContext::new( + part.metadata.region_metadata.clone(), + Some(projection.as_slice()), + None, + false, + ) + .unwrap(), + ), None, ) .unwrap() @@ -1425,11 +1429,15 @@ mod tests { predicate: Option, expected_rows: usize, ) { - let context = Arc::new(BulkIterContext::new( - part.metadata.region_metadata.clone(), - &None, - predicate, - )); + let context = Arc::new( + BulkIterContext::new( + part.metadata.region_metadata.clone(), + None, + predicate, + false, + ) + .unwrap(), + ); let mut reader = part .read(context, None) .unwrap() @@ -1453,13 +1461,17 @@ mod tests { ("b", 1, (180, 210), 4), ]); - let context = Arc::new(BulkIterContext::new( - part.metadata.region_metadata.clone(), - &None, - Some(Predicate::new(vec![datafusion_expr::col("ts").eq( - datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)), - )])), - )); + let context = Arc::new( + BulkIterContext::new( + part.metadata.region_metadata.clone(), + None, + Some(Predicate::new(vec![datafusion_expr::col("ts").eq( + datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)), + )])), + false, + ) + .unwrap(), + ); assert!(part.read(context, None).unwrap().is_none()); check_prune_row_group(&part, None, 310); diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index fb80323619..49a3e6b1b7 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -178,14 +178,20 @@ impl Iterator for BulkPartRecordBatchIter { } } -// TODO(yingwen): Supports sparse encoding which doesn't have decoded primary key columns. /// Applies both predicate filtering and sequence filtering in a single pass. /// Returns None if the filtered batch is empty. +/// +/// # Panics +/// Panics if the format is not flat. fn apply_combined_filters( context: &BulkIterContext, sequence: &Option>, record_batch: RecordBatch, ) -> error::Result> { + // Converts the format to the flat format first. + let format = context.read_format().as_flat().unwrap(); + let record_batch = format.convert_batch(record_batch, None)?; + let num_rows = record_batch.num_rows(); let mut combined_filter = None; @@ -362,11 +368,15 @@ mod tests { let region_metadata = builder.build().unwrap(); // Create context - let context = Arc::new(BulkIterContext::new( - Arc::new(region_metadata.clone()), - &None, // No projection - None, // No predicate - )); + let context = Arc::new( + BulkIterContext::new( + Arc::new(region_metadata.clone()), + None, // No projection + None, // No predicate + false, + ) + .unwrap(), + ); // Iterates all rows. let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None); let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect(); @@ -385,11 +395,15 @@ mod tests { ); assert_eq!(6, result[0].num_columns()); - let context = Arc::new(BulkIterContext::new( - Arc::new(region_metadata), - &Some(&[0, 2]), - Some(Predicate::new(vec![col("key1").eq(lit("key2"))])), - )); + let context = Arc::new( + BulkIterContext::new( + Arc::new(region_metadata), + Some(&[0, 2]), + Some(Predicate::new(vec![col("key1").eq(lit("key2"))])), + false, + ) + .unwrap(), + ); // Creates iter with projection and predicate. let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None); let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect(); diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 7942eb58df..d20c51a137 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -191,6 +191,7 @@ impl Memtable for PartitionTreeMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, + _for_flush: bool, ) -> Result { let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 31cbf097d6..8ce2018a5d 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -235,6 +235,7 @@ impl Memtable for SimpleBulkMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, + _for_flush: bool, ) -> error::Result { let start_time = Instant::now(); let projection = Arc::new(self.build_projection(projection)); @@ -612,7 +613,7 @@ mod tests { memtable.write_one(kv).unwrap(); let ranges = memtable - .ranges(None, PredicateGroup::default(), None) + .ranges(None, PredicateGroup::default(), None, false) .unwrap(); let mut source = vec![]; for r in ranges.ranges.values() { @@ -646,7 +647,7 @@ mod tests { memtable.freeze().unwrap(); let ranges = memtable - .ranges(None, PredicateGroup::default(), None) + .ranges(None, PredicateGroup::default(), None, false) .unwrap(); let mut source = vec![]; for r in ranges.ranges.values() { @@ -689,7 +690,7 @@ mod tests { memtable.freeze().unwrap(); let ranges = memtable - .ranges(None, PredicateGroup::default(), None) + .ranges(None, PredicateGroup::default(), None, false) .unwrap(); assert_eq!(ranges.ranges.len(), 1); let range = ranges.ranges.into_values().next().unwrap(); @@ -903,7 +904,7 @@ mod tests { }) .unwrap(); let MemtableRanges { ranges, .. } = memtable - .ranges(None, PredicateGroup::default(), None) + .ranges(None, PredicateGroup::default(), None, false) .unwrap(); let mut source = if ranges.len() == 1 { let only_range = ranges.into_values().next().unwrap(); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 9e55f98788..e1c292269f 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -300,6 +300,7 @@ impl Memtable for TimeSeriesMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, + _for_flush: bool, ) -> Result { let projection = if let Some(projection) = projection { projection.iter().copied().collect() diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 789a6b4192..f9243eb382 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -206,11 +206,14 @@ impl FlatCompatBatch { mapper: &FlatProjectionMapper, actual: &RegionMetadataRef, format_projection: &FormatProjection, - ) -> Result { + ) -> Result> { let actual_schema = flat_projected_columns(actual, format_projection); let expect_schema = mapper.batch_schema(); - // has_same_columns_and_pk_encoding() already checks columns and encodings. - debug_assert_ne!(expect_schema, actual_schema); + if expect_schema == actual_schema { + // Although the SST has a different schema, but the schema after projection is the same + // as expected schema. + return Ok(None); + } // Maps column id to the index and data type in the actual schema. let actual_schema_index: HashMap<_, _> = actual_schema @@ -275,11 +278,11 @@ impl FlatCompatBatch { let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?; - Ok(Self { + Ok(Some(Self { index_or_defaults, arrow_schema: Arc::new(Schema::new(fields)), compat_pk, - }) + })) } /// Make columns of the `batch` compatible. @@ -1443,12 +1446,19 @@ mod tests { )); let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); - let read_format = - FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false); + let read_format = FlatReadFormat::new( + actual_metadata.clone(), + [0, 1, 2, 3].into_iter(), + None, + "test", + false, + ) + .unwrap(); let format_projection = read_format.format_projection(); - let compat_batch = - FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap(); + let compat_batch = FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection) + .unwrap() + .unwrap(); let mut tag_builder = StringDictionaryBuilder::::new(); tag_builder.append_value("tag1"); @@ -1527,12 +1537,19 @@ mod tests { let expected_metadata = Arc::new(expected_metadata); let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); - let read_format = - FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false); + let read_format = FlatReadFormat::new( + actual_metadata.clone(), + [0, 1, 2, 3].into_iter(), + None, + "test", + false, + ) + .unwrap(); let format_projection = read_format.format_projection(); - let compat_batch = - FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap(); + let compat_batch = FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection) + .unwrap() + .unwrap(); // Tag array. let mut tag1_builder = StringDictionaryBuilder::::new(); diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 69e82f33d3..ddad8e772f 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -29,9 +29,11 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::{InvalidRequestSnafu, Result}; -use crate::sst::internal_fields; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; +use crate::sst::{ + FlatSchemaOptions, internal_fields, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema, +}; /// Handles projection and converts batches in flat format with correct schema. /// @@ -70,6 +72,7 @@ impl FlatProjectionMapper { projection: impl Iterator, ) -> Result { let mut projection: Vec<_> = projection.collect(); + // If the original projection is empty. let is_empty_projection = projection.is_empty(); if is_empty_projection { @@ -197,8 +200,19 @@ impl FlatProjectionMapper { /// Returns the input arrow schema from sources. /// /// The merge reader can use this schema. - pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef { - self.input_arrow_schema.clone() + pub(crate) fn input_arrow_schema( + &self, + compaction: bool, + ) -> datatypes::arrow::datatypes::SchemaRef { + if !compaction { + self.input_arrow_schema.clone() + } else { + // For compaction, we need to build a different schema from encoding. + to_flat_sst_arrow_schema( + &self.metadata, + &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), + ) + } } /// Returns the schema of converted [RecordBatch]. @@ -296,21 +310,17 @@ fn compute_input_arrow_schema( let mut new_fields = Vec::with_capacity(batch_schema.len() + 3); for (column_id, _) in batch_schema { let column_metadata = metadata.column_by_id(*column_id).unwrap(); + let field = Arc::new(Field::new( + &column_metadata.column_schema.name, + column_metadata.column_schema.data_type.as_arrow_type(), + column_metadata.column_schema.is_nullable(), + )); let field = if column_metadata.semantic_type == SemanticType::Tag { - Field::new_dictionary( - &column_metadata.column_schema.name, - datatypes::arrow::datatypes::DataType::UInt32, - column_metadata.column_schema.data_type.as_arrow_type(), - column_metadata.column_schema.is_nullable(), - ) + tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field) } else { - Field::new( - &column_metadata.column_schema.name, - column_metadata.column_schema.data_type.as_arrow_type(), - column_metadata.column_schema.is_nullable(), - ) + field }; - new_fields.push(Arc::new(field)); + new_fields.push(field); } new_fields.extend_from_slice(&internal_fields()); diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index b5cc5c9aa0..f8c8f6c4f0 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -91,8 +91,8 @@ impl RangeMeta { } /// Creates a list of ranges from the `input` for seq scan. - /// If `compaction` is true, it doesn't split the ranges. - pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec { + /// If `input.compaction` is true, it doesn't split the ranges. + pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec { let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len()); Self::push_seq_mem_ranges(&input.memtables, &mut ranges); Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges); @@ -101,7 +101,7 @@ impl RangeMeta { Self::push_extension_ranges(input, &mut ranges); let ranges = group_ranges_for_seq_scan(ranges); - if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) { + if input.compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) { // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries. return ranges; } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 6e2e461c56..42c26aa435 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -345,8 +345,8 @@ impl ScanRegion { /// Scan sequentially. pub(crate) async fn seq_scan(self) -> Result { - let input = self.scan_input().await?; - Ok(SeqScan::new(input, false)) + let input = self.scan_input().await?.with_compaction(false); + Ok(SeqScan::new(input)) } /// Unordered scan. @@ -437,6 +437,7 @@ impl ScanRegion { Some(mapper.column_ids()), predicate.clone(), self.request.sequence, + false, )?; mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| { // todo: we should add stats to MemtableRange @@ -692,6 +693,8 @@ pub struct ScanInput { pub(crate) distribution: Option, /// Whether to use flat format. pub(crate) flat_format: bool, + /// Whether this scan is for compaction. + pub(crate) compaction: bool, #[cfg(feature = "enterprise")] extension_ranges: Vec, } @@ -721,6 +724,7 @@ impl ScanInput { series_row_selector: None, distribution: None, flat_format: false, + compaction: false, #[cfg(feature = "enterprise")] extension_ranges: Vec::new(), } @@ -872,6 +876,13 @@ impl ScanInput { self } + /// Sets whether this scan is for compaction. + #[must_use] + pub(crate) fn with_compaction(mut self, compaction: bool) -> Self { + self.compaction = compaction; + self + } + /// Scans sources in parallel. /// /// # Panics if the input doesn't allow parallel scan. @@ -922,6 +933,7 @@ impl ScanInput { .fulltext_index_applier(self.fulltext_index_applier.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) .flat_format(self.flat_format) + .compaction(self.compaction) .build_reader_input(reader_metrics) .await; let (mut file_range_ctx, selection) = match res { @@ -945,11 +957,12 @@ impl ScanInput { // mapper can convert it. let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() { let mapper = self.mapper.as_flat().unwrap(); - Some(CompatBatch::Flat(FlatCompatBatch::try_new( + FlatCompatBatch::try_new( mapper, flat_format.metadata(), flat_format.format_projection(), - )?)) + )? + .map(CompatBatch::Flat) } else { let compact_batch = PrimaryKeyCompatBatch::new( &self.mapper, @@ -1127,9 +1140,9 @@ pub struct StreamContext { impl StreamContext { /// Creates a new [StreamContext] for [SeqScan]. - pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self { + pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); - let ranges = RangeMeta::seq_scan_ranges(&input, compaction); + let ranges = RangeMeta::seq_scan_ranges(&input); READ_SST_COUNT.observe(input.num_files() as f64); Self { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 8191b9f993..631c40b42a 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -63,27 +63,24 @@ pub struct SeqScan { properties: ScannerProperties, /// Context of streams. stream_ctx: Arc, - /// The scanner is used for compaction. - compaction: bool, /// Metrics for each partition. /// The scanner only sets in query and keeps it empty during compaction. metrics_list: PartitionMetricsList, } impl SeqScan { - /// Creates a new [SeqScan] with the given input and compaction flag. - /// If `compaction` is true, the scanner will not attempt to split ranges. - pub(crate) fn new(input: ScanInput, compaction: bool) -> Self { + /// Creates a new [SeqScan] with the given input. + /// If `input.compaction` is true, the scanner will not attempt to split ranges. + pub(crate) fn new(input: ScanInput) -> Self { let mut properties = ScannerProperties::default() .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); - let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction)); + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); properties.partitions = vec![stream_ctx.partition_ranges()]; Self { properties, stream_ctx, - compaction, metrics_list: PartitionMetricsList::default(), } } @@ -123,7 +120,7 @@ impl SeqScan { /// # Panics /// Panics if the compaction flag is not set. pub async fn build_reader_for_compaction(&self) -> Result { - assert!(self.compaction); + assert!(self.stream_ctx.input.compaction); let metrics_set = ExecutionPlanMetricsSet::new(); let part_metrics = self.new_partition_metrics(false, &metrics_set, 0); @@ -144,7 +141,7 @@ impl SeqScan { /// # Panics /// Panics if the compaction flag is not set. pub async fn build_flat_reader_for_compaction(&self) -> Result { - assert!(self.compaction); + assert!(self.stream_ctx.input.compaction); let metrics_set = ExecutionPlanMetricsSet::new(); let part_metrics = self.new_partition_metrics(false, &metrics_set, 0); @@ -288,7 +285,7 @@ impl SeqScan { } let mapper = stream_ctx.input.mapper.as_flat().unwrap(); - let schema = mapper.input_arrow_schema(); + let schema = mapper.input_arrow_schema(stream_ctx.input.compaction); let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?; @@ -379,7 +376,7 @@ impl SeqScan { let stream_ctx = self.stream_ctx.clone(); let semaphore = self.new_semaphore(); let partition_ranges = self.properties.partitions[partition].clone(); - let compaction = self.compaction; + let compaction = self.stream_ctx.input.compaction; let distinguish_range = self.properties.distinguish_partition_range; let stream = try_stream! { @@ -477,7 +474,7 @@ impl SeqScan { let stream_ctx = self.stream_ctx.clone(); let semaphore = self.new_semaphore(); let partition_ranges = self.properties.partitions[partition].clone(); - let compaction = self.compaction; + let compaction = self.stream_ctx.input.compaction; let stream = try_stream! { part_metrics.on_first_poll(); @@ -556,13 +553,13 @@ impl SeqScan { let metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, partition, - get_scanner_type(self.compaction), + get_scanner_type(self.stream_ctx.input.compaction), self.stream_ctx.query_start, explain_verbose, metrics_set, ); - if !self.compaction { + if !self.stream_ctx.input.compaction { self.metrics_list.set(partition, metrics.clone()); } diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 6756e91dae..3a006dcb67 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -80,7 +80,7 @@ impl SeriesScan { let mut properties = ScannerProperties::default() .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); - let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false)); + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); properties.partitions = vec![stream_ctx.partition_ranges()]; Self { diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 086a92cb16..b51f9f5fcc 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -79,9 +79,10 @@ impl ConvertBatchStream { } } ScanBatch::Series(series) => { + self.buffer.clear(); + match series { SeriesBatch::PrimaryKey(primary_key_batch) => { - self.buffer.clear(); self.buffer.reserve(primary_key_batch.batches.len()); // Safety: Only primary key format returns this batch. let mapper = self.projection_mapper.as_primary_key().unwrap(); @@ -90,28 +91,25 @@ impl ConvertBatchStream { let record_batch = mapper.convert(&batch, &self.cache_strategy)?; self.buffer.push(record_batch.into_df_record_batch()); } - - let output_schema = mapper.output_schema(); - let record_batch = - compute::concat_batches(output_schema.arrow_schema(), &self.buffer) - .context(ArrowComputeSnafu)?; - - RecordBatch::try_from_df_record_batch(output_schema, record_batch) } SeriesBatch::Flat(flat_batch) => { + self.buffer.reserve(flat_batch.batches.len()); // Safety: Only flat format returns this batch. let mapper = self.projection_mapper.as_flat().unwrap(); - let output_schema = mapper.output_schema(); - let record_batch = compute::concat_batches( - output_schema.arrow_schema(), - &flat_batch.batches, - ) - .context(ArrowComputeSnafu)?; - - mapper.convert(&record_batch) + for batch in flat_batch.batches { + let record_batch = mapper.convert(&batch)?; + self.buffer.push(record_batch.into_df_record_batch()); + } } } + + let output_schema = self.projection_mapper.output_schema(); + let record_batch = + compute::concat_batches(output_schema.arrow_schema(), &self.buffer) + .context(ArrowComputeSnafu)?; + + RecordBatch::try_from_df_record_batch(output_schema, record_batch) } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch. diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index c3248ab0c8..b3e6fdbca2 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -112,11 +112,7 @@ pub fn to_flat_sst_arrow_schema( metadata: &RegionMetadata, options: &FlatSchemaOptions, ) -> SchemaRef { - let num_fields = if options.raw_pk_columns { - metadata.column_metadatas.len() + 3 - } else { - metadata.column_metadatas.len() + 3 - metadata.primary_key.len() - }; + let num_fields = flat_sst_arrow_schema_column_num(metadata, options); let mut fields = Vec::with_capacity(num_fields); let schema = metadata.schema.arrow_schema(); if options.raw_pk_columns { @@ -152,6 +148,18 @@ pub fn to_flat_sst_arrow_schema( Arc::new(Schema::new(fields)) } +/// Returns the number of columns in the flat format. +pub fn flat_sst_arrow_schema_column_num( + metadata: &RegionMetadata, + options: &FlatSchemaOptions, +) -> usize { + if options.raw_pk_columns { + metadata.column_metadatas.len() + 3 + } else { + metadata.column_metadatas.len() + 3 - metadata.primary_key.len() + } +} + /// Helper function to create a dictionary field from a field. fn to_dictionary_field(field: &Field) -> Field { Field::new_dictionary( diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index bdf2e12005..5e8debe3c6 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -43,6 +43,7 @@ use datatypes::prelude::{ConcreteDataType, DataType}; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec}; use parquet::file::metadata::RowGroupMetaData; use snafu::{OptionExt, ResultExt, ensure}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, SequenceNumber}; @@ -51,14 +52,16 @@ use crate::error::{ NewRecordBatchSnafu, Result, }; use crate::sst::parquet::format::{ - FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, ReadFormat, StatValues, + FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat, + StatValues, +}; +use crate::sst::{ + FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field, + to_flat_sst_arrow_schema, }; -use crate::sst::{FlatSchemaOptions, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema}; /// Helper for writing the SST format. -#[allow(dead_code)] pub(crate) struct FlatWriteFormat { - metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, override_sequence: Option, @@ -66,18 +69,15 @@ pub(crate) struct FlatWriteFormat { impl FlatWriteFormat { /// Creates a new helper. - #[allow(dead_code)] pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat { let arrow_schema = to_flat_sst_arrow_schema(&metadata, options); FlatWriteFormat { - metadata, arrow_schema, override_sequence: None, } } /// Set override sequence. - #[allow(dead_code)] pub(crate) fn with_override_sequence( mut self, override_sequence: Option, @@ -87,13 +87,11 @@ impl FlatWriteFormat { } /// Gets the arrow schema to store in parquet. - #[allow(dead_code)] pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema } /// Convert `batch` to a arrow record batch to store in parquet. - #[allow(dead_code)] pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result { debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len()); @@ -134,18 +132,10 @@ pub(crate) fn op_type_column_index(num_columns: usize) -> usize { /// /// It only supports flat format that stores primary keys additionally. pub struct FlatReadFormat { - /// The metadata stored in the SST. - metadata: RegionMetadataRef, - /// SST file schema. - arrow_schema: SchemaRef, - /// Projection computed for the format. - format_projection: FormatProjection, - /// Column id to index in SST. - column_id_to_sst_index: HashMap, /// Sequence number to override the sequence read from the SST. override_sequence: Option, - /// Optional format converter for handling flat format conversion. - convert_format: Option, + /// Parquet format adapter. + parquet_adapter: ParquetAdapter, } impl FlatReadFormat { @@ -153,45 +143,40 @@ impl FlatReadFormat { pub fn new( metadata: RegionMetadataRef, column_ids: impl Iterator, - convert_to_flat: bool, - ) -> FlatReadFormat { - let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); - - // Creates a map to lookup index. - let id_to_index = sst_column_id_indices(&metadata); - - let format_projection = FormatProjection::compute_format_projection( - &id_to_index, - arrow_schema.fields.len(), - column_ids, - ); - - let convert_format = if convert_to_flat { - let codec = build_primary_key_codec(&metadata); - FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec) - } else { - None + num_columns: Option, + file_path: &str, + skip_auto_convert: bool, + ) -> Result { + let is_legacy = match num_columns { + Some(num) => Self::is_legacy_format(&metadata, num, file_path)?, + None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse, }; - FlatReadFormat { - metadata, - arrow_schema, - format_projection, - column_id_to_sst_index: id_to_index, + let parquet_adapter = if is_legacy { + // Safety: is_legacy_format() ensures primary_key is not empty. + ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new( + metadata, + column_ids, + skip_auto_convert, + )) + } else { + ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids)) + }; + + Ok(FlatReadFormat { override_sequence: None, - convert_format, - } + parquet_adapter, + }) } /// Sets the sequence number to override. - #[allow(dead_code)] pub(crate) fn set_override_sequence(&mut self, sequence: Option) { self.override_sequence = sequence; } /// Index of a column in the projected batch by its column id. pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option { - self.format_projection + self.format_projection() .column_id_to_projected_index .get(&column_id) .copied() @@ -203,7 +188,10 @@ impl FlatReadFormat { row_groups: &[impl Borrow], column_id: ColumnId, ) -> StatValues { - self.get_stat_values(row_groups, column_id, true) + match &self.parquet_adapter { + ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id), + ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id), + } } /// Returns max values of specific column in row groups. @@ -212,7 +200,10 @@ impl FlatReadFormat { row_groups: &[impl Borrow], column_id: ColumnId, ) -> StatValues { - self.get_stat_values(row_groups, column_id, false) + match &self.parquet_adapter { + ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id), + ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id), + } } /// Returns null counts of specific column in row groups. @@ -221,13 +212,10 @@ impl FlatReadFormat { row_groups: &[impl Borrow], column_id: ColumnId, ) -> StatValues { - let Some(index) = self.column_id_to_sst_index.get(&column_id) else { - // No such column in the SST. - return StatValues::NoColumn; - }; - - let stats = ReadFormat::column_null_counts(row_groups, *index); - StatValues::from_stats_opt(stats) + match &self.parquet_adapter { + ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id), + ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id), + } } /// Gets the arrow schema of the SST file. @@ -235,22 +223,34 @@ impl FlatReadFormat { /// This schema is computed from the region metadata but should be the same /// as the arrow schema decoded from the file metadata. pub(crate) fn arrow_schema(&self) -> &SchemaRef { - &self.arrow_schema + match &self.parquet_adapter { + ParquetAdapter::Flat(p) => &p.arrow_schema, + ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(), + } } /// Gets the metadata of the SST. pub(crate) fn metadata(&self) -> &RegionMetadataRef { - &self.metadata + match &self.parquet_adapter { + ParquetAdapter::Flat(p) => &p.metadata, + ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(), + } } - /// Gets sorted projection indices to read. + /// Gets sorted projection indices to read from the SST file. pub(crate) fn projection_indices(&self) -> &[usize] { - &self.format_projection.projection_indices + match &self.parquet_adapter { + ParquetAdapter::Flat(p) => &p.format_projection.projection_indices, + ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(), + } } - /// Gets the projection. + /// Gets the projection in the flat format. pub(crate) fn format_projection(&self) -> &FormatProjection { - &self.format_projection + match &self.parquet_adapter { + ParquetAdapter::Flat(p) => &p.format_projection, + ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection, + } } /// Creates a sequence array to override. @@ -263,17 +263,15 @@ impl FlatReadFormat { /// /// Returns a new RecordBatch with flat format conversion applied first (if enabled), /// then the sequence column replaced by the override sequence array. - #[allow(dead_code)] pub(crate) fn convert_batch( &self, record_batch: RecordBatch, override_sequence_array: Option<&ArrayRef>, ) -> Result { - // First, apply flat format conversion if enabled - let batch = if let Some(ref convert_format) = self.convert_format { - convert_format.convert(record_batch)? - } else { - record_batch + // First, apply flat format conversion. + let batch = match &self.parquet_adapter { + ParquetAdapter::Flat(_) => record_batch, + ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?, }; // Then apply sequence override if provided @@ -298,15 +296,18 @@ impl FlatReadFormat { /// Checks whether the batch from the parquet file needs to be converted to match the flat format. /// - /// * `file_path` is the path to the parquet file, for error message. - /// * `num_columns` is the number of columns in the parquet file. /// * `metadata` is the region metadata (always assumes flat format). - #[allow(dead_code)] - pub(crate) fn need_convert_to_flat( - file_path: &str, - num_columns: usize, + /// * `num_columns` is the number of columns in the parquet file. + /// * `file_path` is the path to the parquet file, for error message. + pub(crate) fn is_legacy_format( metadata: &RegionMetadata, + num_columns: usize, + file_path: &str, ) -> Result { + if metadata.primary_key.is_empty() { + return Ok(false); + } + // For flat format, compute expected column number: // all columns + internal columns (pk, sequence, op_type) let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM; @@ -344,6 +345,131 @@ impl FlatReadFormat { Ok(true) } } +} + +/// Wraps the parquet helper for different formats. +enum ParquetAdapter { + Flat(ParquetFlat), + PrimaryKeyToFlat(ParquetPrimaryKeyToFlat), +} + +/// Helper to reads the parquet from primary key format into the flat format. +struct ParquetPrimaryKeyToFlat { + /// The primary key format to read the parquet. + format: PrimaryKeyReadFormat, + /// Format converter for handling flat format conversion. + convert_format: Option, + /// Projection computed for the flat format. + format_projection: FormatProjection, +} + +impl ParquetPrimaryKeyToFlat { + /// Creates a helper with existing `metadata` and `column_ids` to read. + fn new( + metadata: RegionMetadataRef, + column_ids: impl Iterator, + skip_auto_convert: bool, + ) -> ParquetPrimaryKeyToFlat { + let column_ids: Vec<_> = column_ids.collect(); + + // Creates a map to lookup index based on the new format. + let id_to_index = sst_column_id_indices(&metadata); + let sst_column_num = + flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default()); + // Computes the format projection for the new format. + let format_projection = FormatProjection::compute_format_projection( + &id_to_index, + sst_column_num, + column_ids.iter().copied(), + ); + let codec = build_primary_key_codec(&metadata); + let convert_format = if skip_auto_convert { + None + } else { + FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec) + }; + + let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied()); + + Self { + format, + convert_format, + format_projection, + } + } + + fn convert_batch(&self, record_batch: RecordBatch) -> Result { + if let Some(convert_format) = &self.convert_format { + convert_format.convert(record_batch) + } else { + Ok(record_batch) + } + } +} + +/// Helper to reads the parquet in flat format directly. +struct ParquetFlat { + /// The metadata stored in the SST. + metadata: RegionMetadataRef, + /// SST file schema. + arrow_schema: SchemaRef, + /// Projection computed for the flat format. + format_projection: FormatProjection, + /// Column id to index in SST. + column_id_to_sst_index: HashMap, +} + +impl ParquetFlat { + /// Creates a helper with existing `metadata` and `column_ids` to read. + fn new(metadata: RegionMetadataRef, column_ids: impl Iterator) -> ParquetFlat { + // Creates a map to lookup index. + let id_to_index = sst_column_id_indices(&metadata); + let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + let sst_column_num = + flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default()); + let format_projection = + FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids); + + Self { + metadata, + arrow_schema, + format_projection, + column_id_to_sst_index: id_to_index, + } + } + + /// Returns min values of specific column in row groups. + fn min_values( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + self.get_stat_values(row_groups, column_id, true) + } + + /// Returns max values of specific column in row groups. + fn max_values( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + self.get_stat_values(row_groups, column_id, false) + } + + /// Returns null counts of specific column in row groups. + fn null_counts( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + let Some(index) = self.column_id_to_sst_index.get(&column_id) else { + // No such column in the SST. + return StatValues::NoColumn; + }; + + let stats = ReadFormat::column_null_counts(row_groups, *index); + StatValues::from_stats_opt(stats) + } fn get_stat_values( &self, @@ -567,7 +693,10 @@ impl FlatReadFormat { Self::new( Arc::clone(&metadata), metadata.column_metadatas.iter().map(|c| c.column_id), + None, + "test", false, ) + .unwrap() } } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 9943ec5e62..48eca5879a 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -141,23 +141,13 @@ impl PrimaryKeyWriteFormat { /// Helper to read parquet formats. pub enum ReadFormat { + /// The parquet is in the old primary key format. PrimaryKey(PrimaryKeyReadFormat), + /// The parquet is in the new flat format. Flat(FlatReadFormat), } impl ReadFormat { - pub(crate) fn new( - metadata: RegionMetadataRef, - column_ids: impl Iterator, - flat_format: bool, - ) -> Self { - if flat_format { - Self::new_flat(metadata, column_ids, false) - } else { - Self::new_primary_key(metadata, column_ids) - } - } - /// Creates a helper to read the primary key format. pub fn new_primary_key( metadata: RegionMetadataRef, @@ -170,9 +160,65 @@ impl ReadFormat { pub fn new_flat( metadata: RegionMetadataRef, column_ids: impl Iterator, - convert_to_flat: bool, - ) -> Self { - ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids, convert_to_flat)) + num_columns: Option, + file_path: &str, + skip_auto_convert: bool, + ) -> Result { + Ok(ReadFormat::Flat(FlatReadFormat::new( + metadata, + column_ids, + num_columns, + file_path, + skip_auto_convert, + )?)) + } + + /// Creates a new read format. + pub fn new( + region_metadata: RegionMetadataRef, + projection: Option<&[ColumnId]>, + flat_format: bool, + num_columns: Option, + file_path: &str, + skip_auto_convert: bool, + ) -> Result { + if flat_format { + if let Some(column_ids) = projection { + ReadFormat::new_flat( + region_metadata, + column_ids.iter().copied(), + num_columns, + file_path, + skip_auto_convert, + ) + } else { + // No projection, lists all column ids to read. + ReadFormat::new_flat( + region_metadata.clone(), + region_metadata + .column_metadatas + .iter() + .map(|col| col.column_id), + num_columns, + file_path, + skip_auto_convert, + ) + } + } else if let Some(column_ids) = projection { + Ok(ReadFormat::new_primary_key( + region_metadata, + column_ids.iter().copied(), + )) + } else { + // No projection, lists all column ids to read. + Ok(ReadFormat::new_primary_key( + region_metadata.clone(), + region_metadata + .column_metadatas + .iter() + .map(|col| col.column_id), + )) + } } pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> { @@ -1238,7 +1284,8 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = ReadFormat::new(metadata, column_ids.iter().copied(), false); + let read_format = + ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap(); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 @@ -1366,19 +1413,26 @@ mod tests { // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7) // Only read tag1 (column_id=3, index=1) + fixed columns - let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), false); + let read_format = + ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false) + .unwrap(); assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices()); // Only read field1 (column_id=4, index=2) + fixed columns - let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), false); + let read_format = + ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false) + .unwrap(); assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices()); // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed) - let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), false); + let read_format = + ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false) + .unwrap(); assert_eq!(&[4, 5, 6, 7], read_format.projection_indices()); // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns - let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), false); + let read_format = + ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap(); assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices()); } @@ -1388,8 +1442,11 @@ mod tests { let mut format = FlatReadFormat::new( metadata, std::iter::once(1), // Just read tag0 + Some(8), + "test", false, - ); + ) + .unwrap(); let num_rows = 4; let original_sequence = 100u64; @@ -1438,8 +1495,7 @@ mod tests { // For flat format: all columns (5) + internal columns (3) let expected_columns = metadata.column_metadatas.len() + 3; let result = - FlatReadFormat::need_convert_to_flat("test.parquet", expected_columns, &metadata) - .unwrap(); + FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap(); assert!( !result, "Should not need conversion when column counts match" @@ -1449,7 +1505,7 @@ mod tests { // Missing primary key columns (2 primary keys in test metadata) let num_columns_without_pk = expected_columns - metadata.primary_key.len(); let result = - FlatReadFormat::need_convert_to_flat("test.parquet", num_columns_without_pk, &metadata) + FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet") .unwrap(); assert!( result, @@ -1458,15 +1514,14 @@ mod tests { // Test case 3: Invalid case - actual columns more than expected let too_many_columns = expected_columns + 1; - let err = FlatReadFormat::need_convert_to_flat("test.parquet", too_many_columns, &metadata) + let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet") .unwrap_err(); assert!(err.to_string().contains("Expected columns"), "{err:?}"); // Test case 4: Invalid case - column difference doesn't match primary key count let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys - let err = - FlatReadFormat::need_convert_to_flat("test.parquet", wrong_diff_columns, &metadata) - .unwrap_err(); + let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet") + .unwrap_err(); assert!( err.to_string().contains("Column number difference"), "{err:?}" @@ -1601,7 +1656,14 @@ mod tests { .iter() .map(|c| c.column_id) .collect(); - let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true); + let format = FlatReadFormat::new( + metadata.clone(), + column_ids.into_iter(), + Some(6), + "test", + false, + ) + .unwrap(); let num_rows = 4; let original_sequence = 100u64; @@ -1676,7 +1738,14 @@ mod tests { .iter() .map(|c| c.column_id) .collect(); - let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true); + let format = FlatReadFormat::new( + metadata.clone(), + column_ids.into_iter(), + None, + "test", + false, + ) + .unwrap(); let num_rows = 4; let original_sequence = 100u64; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index d1581f4b85..d02786455e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -115,6 +115,8 @@ pub struct ParquetReaderBuilder { expected_metadata: Option, /// Whether to use flat format for reading. flat_format: bool, + /// Whether this reader is for compaction. + compaction: bool, } impl ParquetReaderBuilder { @@ -138,6 +140,7 @@ impl ParquetReaderBuilder { fulltext_index_applier: None, expected_metadata: None, flat_format: false, + compaction: false, } } @@ -208,6 +211,13 @@ impl ParquetReaderBuilder { self } + /// Sets the compaction flag. + #[must_use] + pub fn compaction(mut self, compaction: bool) -> Self { + self.compaction = compaction; + self + } + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. @@ -239,20 +249,28 @@ impl ParquetReaderBuilder { let mut read_format = if let Some(column_ids) = &self.projection { ReadFormat::new( region_meta.clone(), - column_ids.iter().copied(), + Some(column_ids), self.flat_format, - ) + Some(parquet_meta.file_metadata().schema_descr().num_columns()), + &file_path, + self.compaction, + )? } else { // Lists all column ids to read, we always use the expected metadata if possible. let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); + let column_ids: Vec<_> = expected_meta + .column_metadatas + .iter() + .map(|col| col.column_id) + .collect(); ReadFormat::new( region_meta.clone(), - expected_meta - .column_metadatas - .iter() - .map(|col| col.column_id), + Some(&column_ids), self.flat_format, - ) + Some(parquet_meta.file_metadata().schema_descr().num_columns()), + &file_path, + self.compaction, + )? }; if need_override_sequence(&parquet_meta) { read_format @@ -1379,17 +1397,10 @@ impl FlatRowGroupReader { let record_batch = batch_result.context(ArrowReaderSnafu { path: self.context.file_path(), })?; - - // Apply override sequence if needed - if let (Some(flat_format), Some(override_array)) = ( - self.context.read_format().as_flat(), - &self.override_sequence, - ) { - let converted = - flat_format.convert_batch(record_batch, Some(override_array))?; - return Ok(Some(converted)); - } - + // Safety: Only flat format use FlatRowGroupReader. + let flat_format = self.context.read_format().as_flat().unwrap(); + let record_batch = + flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?; Ok(Some(record_batch)) } None => Ok(None), diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 2681bf0a73..75efa0c6f5 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -99,6 +99,7 @@ impl Memtable for EmptyMemtable { _projection: Option<&[ColumnId]>, _predicate: PredicateGroup, _sequence: Option, + _for_flush: bool, ) -> Result { Ok(MemtableRanges::default()) }