diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index ee415be366..48881e5f05 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -57,6 +57,7 @@ use crate::read::Batch; use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue}; use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::parquet::PARQUET_METADATA_KEY; +use crate::sst::parquet::read_columns::ParquetReadColumns; use crate::sst::parquet::reader::MetadataCacheMetrics; /// Metrics type key for sst meta. @@ -1223,24 +1224,27 @@ pub enum SelectorResult { pub struct SelectorResultValue { /// Batches of rows selected by the selector. pub result: SelectorResult, - /// Projection of rows. - pub projection: Vec, + /// The read columns of rows. + pub read_cols: ParquetReadColumns, } impl SelectorResultValue { /// Creates a new selector result value with primary key format. - pub fn new(result: Vec, projection: Vec) -> SelectorResultValue { + pub fn new(result: Vec, read_cols: ParquetReadColumns) -> SelectorResultValue { SelectorResultValue { result: SelectorResult::PrimaryKey(result), - projection, + read_cols, } } /// Creates a new selector result value with flat format. - pub fn new_flat(result: Vec, projection: Vec) -> SelectorResultValue { + pub fn new_flat( + result: Vec, + read_cols: ParquetReadColumns, + ) -> SelectorResultValue { SelectorResultValue { result: SelectorResult::Flat(result), - projection, + read_cols, } } @@ -1289,6 +1293,7 @@ mod tests { use crate::read::range_cache::{ RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder, }; + use crate::read::read_columns::ReadColumns; use crate::sst::parquet::row_selection::RowGroupSelection; #[tokio::test] @@ -1442,7 +1447,10 @@ mod tests { selector: TimeSeriesRowSelector::LastRow, }; assert!(cache.get_selector_result(&key).is_none()); - let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new())); + let result = Arc::new(SelectorResultValue::new( + Vec::new(), + ParquetReadColumns::from_deduped(Vec::new()), + )); cache.put_selector_result(key, result); assert!(cache.get_selector_result(&key).is_some()); } @@ -1459,7 +1467,7 @@ mod tests { region_id: RegionId::new(1, 1), row_groups: vec![(FileId::random(), 0)], scan: ScanRequestFingerprintBuilder { - read_column_ids: vec![], + read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()), read_column_types: vec![], filters: vec!["tag_0 = 1".to_string()], time_filters: vec![], diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d7d3f656c5..b822c17976 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1240,6 +1240,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to cast column"))] + CastColumn { + #[snafu(source)] + error: datafusion::error::DataFusionError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1329,6 +1337,7 @@ impl ErrorExt for Error { | ReadDataPart { .. } | BuildEntry { .. } | Metadata { .. } + | CastColumn { .. } | MitoManifestInfo { .. } => StatusCode::Internal, FetchManifests { source, .. } => source.status_code(), diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 9290fa17b0..3ac8e009fe 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -25,6 +25,7 @@ use store_api::storage::ColumnId; use table::predicate::Predicate; use crate::error::Result; +use crate::read::read_columns::ReadColumns; use crate::sst::parquet::file_range::{PreFilterMode, RangeBase}; use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, build_bulk_filter_plan}; @@ -65,26 +66,23 @@ impl BulkIterContext { ) -> Result { let codec = build_primary_key_codec(®ion_metadata); - let read_format = if let Some(column_ids) = projection { - FlatReadFormat::new( - region_metadata.clone(), - column_ids.iter().copied(), - None, - "memtable", - skip_auto_convert, - )? + let read_cols = if let Some(col_ids) = projection { + ReadColumns::from_deduped_column_ids(col_ids.iter().copied()) } else { - FlatReadFormat::new( - region_metadata.clone(), + ReadColumns::from_deduped_column_ids( region_metadata .column_metadatas .iter() .map(|col| col.column_id), - None, - "memtable", - skip_auto_convert, - )? + ) }; + let read_format = FlatReadFormat::new( + region_metadata.clone(), + read_cols, + None, + "memtable", + skip_auto_convert, + )?; let dyn_filters = predicate .as_ref() diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 11cb5873d5..7853bd9d9c 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -64,10 +64,13 @@ impl EncodedBulkPartIter { let data = encoded_part.data().clone(); let series_count = encoded_part.metadata().num_series as usize; - let projection_mask = ProjectionMask::roots( - parquet_meta.file_metadata().schema_descr(), - context.read_format().projection_indices().iter().copied(), - ); + // TODO(fys): Nested projection pushdown to the memtable layer is not supported yet. + let root_indices = context + .read_format() + .parquet_read_columns() + .root_indices_iter(); + let projection_mask = + ProjectionMask::roots(parquet_meta.file_metadata().schema_descr(), root_indices); let builder = MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?; @@ -276,7 +279,11 @@ impl BulkPartBatchIter { /// Applies projection to the RecordBatch if needed. fn apply_projection(&self, record_batch: RecordBatch) -> error::Result { - let projection_indices = self.context.read_format().projection_indices(); + let projection_indices = self + .context + .read_format() + .parquet_read_columns() + .root_indices(); if projection_indices.len() == record_batch.num_columns() { return Ok(record_batch); } diff --git a/src/mito2/src/read/batch_adapter.rs b/src/mito2/src/read/batch_adapter.rs index ddb6c8d6bd..1b9821e11b 100644 --- a/src/mito2/src/read/batch_adapter.rs +++ b/src/mito2/src/read/batch_adapter.rs @@ -688,7 +688,7 @@ mod tests { BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids); let rb = adapter.into_iter().next().unwrap().unwrap(); - let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 3]).unwrap(); assert_eq!(rb.schema(), mapper.input_arrow_schema(false)); // tag_0 + field_1 + ts + 3 internal columns. assert_eq!(6, rb.num_columns()); diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index d6aa5c52a0..6d6f535068 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -609,6 +609,7 @@ mod tests { use super::*; use crate::read::flat_projection::FlatProjectionMapper; + use crate::read::read_columns::ReadColumns; use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; @@ -713,7 +714,7 @@ mod tests { let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); let read_format = FlatReadFormat::new( actual_metadata.clone(), - [0, 1, 2, 3].into_iter(), + ReadColumns::from_deduped_column_ids([0, 1, 2, 3]), None, "test", false, @@ -799,16 +800,15 @@ mod tests { &[1], )); - // Output projection: tag_1, field_2. Read also includes field_3. let mapper = FlatProjectionMapper::new_with_read_columns( &expected_metadata, vec![1, 2], - vec![1, 2, 3], + ReadColumns::from_deduped_column_ids([1, 2, 3]), ) .unwrap(); let read_format = FlatReadFormat::new( actual_metadata.clone(), - [1, 2, 3].into_iter(), + ReadColumns::from_deduped_column_ids([1, 2, 3]), None, "test", false, @@ -899,7 +899,7 @@ mod tests { let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); let read_format = FlatReadFormat::new( actual_metadata.clone(), - [0, 1, 2, 3].into_iter(), + ReadColumns::from_deduped_column_ids([0, 1, 2, 3]), None, "test", false, @@ -993,7 +993,7 @@ mod tests { let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap(); let read_format = FlatReadFormat::new( actual_metadata.clone(), - [0, 2, 3].into_iter(), + ReadColumns::from_deduped_column_ids([0, 2, 3]), None, "test", true, diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 294cd8bbb0..d2fef71a01 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -33,6 +33,7 @@ use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result}; use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache}; +use crate::read::read_columns::ReadColumns; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; use crate::sst::{ @@ -49,11 +50,11 @@ pub struct FlatProjectionMapper { metadata: RegionMetadataRef, /// Schema for converted [RecordBatch] to return. output_schema: SchemaRef, - /// Ids of columns to read from memtables and SSTs. + /// The columns to read from memtables and SSTs. /// The mapper won't deduplicate the column ids. /// /// Note that this doesn't contain the `__table_id` and `__tsid`. - read_column_ids: Vec, + read_cols: ReadColumns, /// Ids and DataTypes of columns of the expected batch. /// We can use this to check if the batch is compatible with the expected schema. /// @@ -74,50 +75,48 @@ impl FlatProjectionMapper { /// empty `RecordBatch` and only use its row count in this query. pub fn new( metadata: &RegionMetadataRef, - projection: impl Iterator, + projection: impl IntoIterator, ) -> Result { - let projection: Vec<_> = projection.collect(); + let projection: Vec<_> = projection.into_iter().collect(); let read_column_ids = read_column_ids_from_projection(metadata, &projection)?; - Self::new_with_read_columns(metadata, projection, read_column_ids) + let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids); + Self::new_with_read_columns(metadata, projection, read_cols) } /// Returns a new mapper with output projection and explicit read columns. pub fn new_with_read_columns( metadata: &RegionMetadataRef, projection: Vec, - read_column_ids: Vec, + read_cols: ReadColumns, ) -> Result { // If the original projection is empty. let is_empty_projection = projection.is_empty(); // Output column schemas for the projection. - let mut column_schemas = Vec::with_capacity(projection.len()); + let mut col_schemas = Vec::with_capacity(projection.len()); // Column ids of the output projection without deduplication. - let mut output_column_ids = Vec::with_capacity(projection.len()); + let mut output_col_ids = Vec::with_capacity(projection.len()); for idx in &projection { - // For each projection index, we get the column id for projection. - let column = - metadata - .column_metadatas - .get(*idx) - .with_context(|| InvalidRequestSnafu { - region_id: metadata.region_id, - reason: format!("projection index {} is out of bound", idx), - })?; - - output_column_ids.push(column.column_id); - // Safety: idx is valid. - column_schemas.push(metadata.schema.column_schemas()[*idx].clone()); + let col = metadata + .column_metadatas + .get(*idx) + .with_context(|| InvalidRequestSnafu { + region_id: metadata.region_id, + reason: format!("projection index {} is out of bound", idx), + })?; + output_col_ids.push(col.column_id); + col_schemas.push(col.column_schema.clone()); } // Creates a map to lookup index. let id_to_index = sst_column_id_indices(metadata); + // TODO(yingwen): Support different flat schema options. let format_projection = FormatProjection::compute_format_projection( &id_to_index, // All columns with internal columns. metadata.column_metadatas.len() + 3, - read_column_ids.iter().copied(), + read_cols.clone(), ); let batch_schema = flat_projected_columns(metadata, &format_projection); @@ -130,13 +129,13 @@ impl FlatProjectionMapper { Arc::new(Schema::new(vec![])) } else { // Safety: Columns come from existing schema. - Arc::new(Schema::new(column_schemas)) + Arc::new(Schema::new(col_schemas)) }; let batch_indices = if is_empty_projection { vec![] } else { - output_column_ids + output_col_ids .iter() .map(|id| { // Safety: The map is computed from the read projection. @@ -164,7 +163,7 @@ impl FlatProjectionMapper { Ok(FlatProjectionMapper { metadata: metadata.clone(), output_schema, - read_column_ids, + read_cols, batch_schema, is_empty_projection, batch_indices, @@ -181,11 +180,9 @@ impl FlatProjectionMapper { pub(crate) fn metadata(&self) -> &RegionMetadataRef { &self.metadata } - - /// Returns ids of projected columns that we need to read - /// from memtables and SSTs. - pub(crate) fn column_ids(&self) -> &[ColumnId] { - &self.read_column_ids + /// Returns projected columns that we need to read from memtables and SSTs. + pub(crate) fn read_columns(&self) -> &ReadColumns { + &self.read_cols } /// Returns the field column start index in output batch. @@ -439,15 +436,9 @@ impl CompactionProjectionMapper { .chain([metadata.time_index_column_pos()]) .collect::>(); - let mapper = FlatProjectionMapper::new_with_read_columns( - metadata, - projection, - metadata - .column_metadatas - .iter() - .map(|col| col.column_id) - .collect(), - )?; + let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id); + let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids); + let mapper = FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols)?; let assembler = DfBatchAssembler::new(mapper.output_schema()); Ok(Self { mapper, assembler }) diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 140e6eb4c9..301a5cc0b7 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -34,6 +34,7 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream}; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index}; use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets}; +use crate::sst::parquet::read_columns::ParquetReadColumns; use crate::sst::parquet::reader::FlatRowGroupReader; /// Reader to keep the last row for each time series. @@ -134,7 +135,7 @@ impl FlatRowGroupLastRowCachedReader { file_id: FileId, row_group_idx: usize, cache_strategy: CacheStrategy, - projection: &[usize], + read_cols: &ParquetReadColumns, reader: FlatRowGroupReader, ) -> Self { let key = SelectorResultKey { @@ -145,14 +146,14 @@ impl FlatRowGroupLastRowCachedReader { if let Some(value) = cache_strategy.get_selector_result(&key) { let is_flat = matches!(&value.result, SelectorResult::Flat(_)); - let schema_matches = value.projection == projection; + let schema_matches = value.read_cols == *read_cols; if is_flat && schema_matches { Self::new_hit(value) } else { - Self::new_miss(key, projection, reader, cache_strategy) + Self::new_miss(key, read_cols, reader, cache_strategy) } } else { - Self::new_miss(key, projection, reader, cache_strategy) + Self::new_miss(key, read_cols, reader, cache_strategy) } } @@ -171,14 +172,14 @@ impl FlatRowGroupLastRowCachedReader { fn new_miss( key: SelectorResultKey, - projection: &[usize], + read_cols: &ParquetReadColumns, reader: FlatRowGroupReader, cache_strategy: CacheStrategy, ) -> Self { selector_result_cache_miss(); Self::Miss(FlatRowGroupLastRowReader::new( key, - projection.to_vec(), + read_cols.clone(), reader, cache_strategy, )) @@ -257,7 +258,7 @@ pub(crate) struct FlatRowGroupLastRowReader { selector: FlatLastTimestampSelector, yielded_batches: Vec, cache_strategy: CacheStrategy, - projection: Vec, + read_cols: ParquetReadColumns, /// Accumulates small selector-output batches before concatenating. pending: BatchBuffer, } @@ -265,7 +266,7 @@ pub(crate) struct FlatRowGroupLastRowReader { impl FlatRowGroupLastRowReader { fn new( key: SelectorResultKey, - projection: Vec, + read_cols: ParquetReadColumns, reader: FlatRowGroupReader, cache_strategy: CacheStrategy, ) -> Self { @@ -275,7 +276,7 @@ impl FlatRowGroupLastRowReader { selector: FlatLastTimestampSelector::default(), yielded_batches: vec![], cache_strategy, - projection, + read_cols, pending: BatchBuffer::new(), } } @@ -323,7 +324,7 @@ impl FlatRowGroupLastRowReader { let batches = std::mem::take(&mut self.yielded_batches); let value = Arc::new(SelectorResultValue::new_flat( batches, - self.projection.clone(), + self.read_cols.clone(), )); self.cache_strategy.put_selector_result(self.key, value); } diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 68e5bf3952..c3d8e4323d 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -108,6 +108,7 @@ mod tests { use super::*; use crate::read::flat_projection::FlatProjectionMapper; + use crate::read::read_columns::ReadColumns; fn print_record_batch(record_batch: RecordBatch) -> String { pretty::pretty_format_batches(&[record_batch.into_df_record_batch()]) @@ -221,7 +222,10 @@ mod tests { ); let cache = CacheStrategy::Disabled; let mapper = FlatProjectionMapper::all(&metadata).unwrap(); - assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); + assert_eq!( + &[0, 1, 2, 3, 4], + mapper.read_columns().column_ids().as_slice() + ); assert_eq!( [ (1, ConcreteDataType::int64_datatype()), @@ -255,8 +259,8 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - let mapper = FlatProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap(); - assert_eq!([4, 1], mapper.column_ids()); + let mapper = FlatProjectionMapper::new(&metadata, [4, 1]).unwrap(); + assert_eq!(&[4, 1], mapper.read_columns().column_ids().as_slice()); assert_eq!( [ (1, ConcreteDataType::int64_datatype()), @@ -288,10 +292,13 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - let mapper = - FlatProjectionMapper::new_with_read_columns(&metadata, vec![4, 1], vec![4, 1, 3]) - .unwrap(); - assert_eq!([4, 1, 3], mapper.column_ids()); + let mapper = FlatProjectionMapper::new_with_read_columns( + &metadata, + vec![4, 1], + ReadColumns::from_deduped_column_ids([4, 1, 3]), + ) + .unwrap(); + assert_eq!(&[4, 1, 3], mapper.read_columns().column_ids().as_slice()); let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3); let record_batch = mapper.convert(&batch, &cache).unwrap(); @@ -315,8 +322,8 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - let mapper = FlatProjectionMapper::new(&metadata, [].into_iter()).unwrap(); - assert_eq!([0], mapper.column_ids()); + let mapper = FlatProjectionMapper::new(&metadata, []).unwrap(); + assert_eq!(&[0], mapper.read_columns().column_ids().as_slice()); assert!(mapper.output_schema().is_empty()); assert_eq!( [(0, ConcreteDataType::timestamp_millisecond_datatype())], diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 93995bd017..208715f57d 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -26,12 +26,13 @@ use datatypes::prelude::ConcreteDataType; use futures::TryStreamExt; use snafu::ResultExt; use store_api::region_engine::PartitionRange; -use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector}; +use store_api::storage::{FileId, RegionId, TimeSeriesRowSelector}; use tokio::sync::{mpsc, oneshot}; use crate::cache::CacheStrategy; use crate::error::{ComputeArrowSnafu, Result}; use crate::read::BoxedRecordBatchStream; +use crate::read::read_columns::ReadColumns; use crate::read::scan_region::StreamContext; use crate::read::scan_util::PartitionMetrics; use crate::region::options::MergeMode; @@ -63,7 +64,7 @@ pub(crate) struct ScanRequestFingerprint { #[derive(Debug)] pub(crate) struct ScanRequestFingerprintBuilder { - pub(crate) read_column_ids: Vec, + pub(crate) read_columns: ReadColumns, pub(crate) read_column_types: Vec>, pub(crate) filters: Vec, pub(crate) time_filters: Vec, @@ -77,7 +78,7 @@ pub(crate) struct ScanRequestFingerprintBuilder { impl ScanRequestFingerprintBuilder { pub(crate) fn build(self) -> ScanRequestFingerprint { let Self { - read_column_ids, + read_columns, read_column_types, filters, time_filters, @@ -90,7 +91,7 @@ impl ScanRequestFingerprintBuilder { ScanRequestFingerprint { inner: Arc::new(SharedScanRequestFingerprint { - read_column_ids, + read_columns, read_column_types, filters, }), @@ -107,8 +108,8 @@ impl ScanRequestFingerprintBuilder { /// Non-copiable struct of the fingerprint. #[derive(Debug, PartialEq, Eq, Hash)] struct SharedScanRequestFingerprint { - /// Column ids of the projection. - read_column_ids: Vec, + /// Logical columns of the projection. + read_columns: ReadColumns, /// Column types of the projection. /// We keep this to ensure we won't reuse the fingerprint after a schema change. read_column_types: Vec>, @@ -118,8 +119,8 @@ struct SharedScanRequestFingerprint { impl ScanRequestFingerprint { #[cfg(test)] - pub(crate) fn read_column_ids(&self) -> &[ColumnId] { - &self.inner.read_column_ids + pub(crate) fn read_columns(&self) -> &ReadColumns { + &self.inner.read_columns } #[cfg(test)] @@ -154,7 +155,7 @@ impl ScanRequestFingerprint { pub(crate) fn estimated_size(&self) -> usize { mem::size_of::() - + self.inner.read_column_ids.capacity() * mem::size_of::() + + self.inner.read_columns.estimated_size() + self.inner.read_column_types.capacity() * mem::size_of::>() + self.inner.filters.capacity() * mem::size_of::() + self @@ -591,7 +592,7 @@ pub fn bench_cache_flat_range_stream( let cache_strategy = CacheStrategy::EnableAll(cache_manager); let fingerprint = ScanRequestFingerprintBuilder { - read_column_ids: vec![], + read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()), read_column_types: vec![], filters: vec![], time_filters: vec![], @@ -654,8 +655,9 @@ mod tests { filter_deleted: bool, partition_expr_version: u64, ) -> ScanRequestFingerprint { + let read_columns = ReadColumns::from_deduped_column_ids([1, 2]); ScanRequestFingerprintBuilder { - read_column_ids: vec![1, 2], + read_columns, read_column_types: vec![None, None], filters, time_filters, @@ -704,7 +706,7 @@ mod tests { ) -> (StreamContext, PartitionRange) { let env = SchedulerEnv::new().await; let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); let file_id = FileId::random(); let file = sst_file_handle_with_file_id( @@ -848,7 +850,7 @@ mod tests { let reset = fingerprint.without_time_filters(); - assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids()); + assert_eq!(reset.read_columns(), fingerprint.read_columns()); assert_eq!(reset.read_column_types(), fingerprint.read_column_types()); assert_eq!(reset.filters(), fingerprint.filters()); assert!(reset.time_filters().is_empty()); diff --git a/src/mito2/src/read/read_columns.rs b/src/mito2/src/read/read_columns.rs index f601a4377d..ae5876237b 100644 --- a/src/mito2/src/read/read_columns.rs +++ b/src/mito2/src/read/read_columns.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(fys): remove this once the module is used -#![allow(dead_code)] - use std::collections::{BTreeMap, HashSet}; use std::mem; @@ -65,7 +62,7 @@ use crate::read::scan_region::PredicateGroup; /// If `nested_paths` is empty, the whole column will be read. #[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] pub struct ReadColumns { - cols: Vec, + pub cols: Vec, } impl ReadColumns { @@ -85,7 +82,7 @@ impl ReadColumns { } pub fn column_ids_iter(&self) -> impl Iterator + '_ { - self.cols.iter().map(|column| column.column_id()) + self.cols.iter().map(|column| column.column_id) } pub fn column_ids(&self) -> Vec { @@ -108,10 +105,10 @@ impl ReadColumns { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ReadColumn { - column_id: ColumnId, + pub column_id: ColumnId, /// Nested field paths under this column. /// Empty means reading the whole column. - nested_paths: Vec, + pub nested_paths: Vec, } impl ReadColumn { @@ -122,10 +119,6 @@ impl ReadColumn { } } - pub fn column_id(&self) -> ColumnId { - self.column_id - } - pub fn nested_paths(&self) -> &[NestedPath] { &self.nested_paths } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 374a42144e..8d6bde5bad 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -34,11 +34,11 @@ use datafusion_expr::utils::expr_to_columns; use futures::StreamExt; use partition::expr::PartitionExpr; use smallvec::SmallVec; -use snafu::{OptionExt as _, ResultExt}; +use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ - ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution, + RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector, }; use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr}; @@ -48,7 +48,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::CacheStrategy; use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES; -use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result}; +use crate::error::{InvalidPartitionExprSnafu, Result}; #[cfg(feature = "enterprise")] use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider}; use crate::memtable::{MemtableRange, RangesOptions}; @@ -57,6 +57,9 @@ use crate::read::compat::{self, FlatCompatBatch}; use crate::read::flat_projection::FlatProjectionMapper; use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; use crate::read::range_cache::ScanRequestFingerprint; +use crate::read::read_columns::{ + ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection, +}; use crate::read::seq_scan::SeqScan; use crate::read::series_scan::SeriesScan; use crate::read::stream::ScanBatchStream; @@ -399,23 +402,33 @@ impl ScanRegion { let time_range = self.build_time_range_predicate(); let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?; - let read_column_ids = match self.request.projection_indices() { - Some(p) => self.build_read_column_ids(p, &predicate)?, - None => self - .version - .metadata - .column_metadatas - .iter() - .map(|col| col.column_id) - .collect(), + let read_cols = match &self.request.projection_input { + Some(p) => { + // Read columns include the pushed-down projection and columns + // resolved from the predicate. + let metadata = &self.version.metadata; + let from_projection = read_columns_from_projection(p.clone(), metadata)?; + let from_predicate = read_columns_from_predicate(&predicate, metadata); + merge(from_projection, from_predicate) + } + None => { + let read_col_ids = self + .version + .metadata + .column_metadatas + .iter() + .map(|col| col.column_id); + ReadColumns::from_deduped_column_ids(read_col_ids) + } }; + let read_col_ids = read_cols.column_ids(); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match self.request.projection_indices() { Some(p) => FlatProjectionMapper::new_with_read_columns( &self.version.metadata, p.to_vec(), - read_column_ids.clone(), + read_cols, )?, None => FlatProjectionMapper::all(&self.version.metadata)?, }; @@ -464,7 +477,7 @@ impl ScanRegion { continue; } let ranges_in_memtable = m.ranges( - Some(read_column_ids.as_slice()), + Some(&read_col_ids), RangesOptions::default() .with_predicate(predicate.clone()) .with_sequence(SequenceRange::new( @@ -573,72 +586,6 @@ impl ScanRegion { build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters) } - /// Return all columns id to read according to the projection and filters. - fn build_read_column_ids( - &self, - projection: &[usize], - predicate: &PredicateGroup, - ) -> Result> { - let metadata = &self.version.metadata; - // use Vec for read_column_ids to keep the order of columns. - let mut read_column_ids = Vec::new(); - let mut seen = HashSet::new(); - - for idx in projection { - let column = - metadata - .column_metadatas - .get(*idx) - .with_context(|| InvalidRequestSnafu { - region_id: metadata.region_id, - reason: format!("projection index {} is out of bound", idx), - })?; - seen.insert(column.column_id); - // keep the projection order - read_column_ids.push(column.column_id); - } - - if projection.is_empty() { - let time_index = metadata.time_index_column().column_id; - if seen.insert(time_index) { - read_column_ids.push(time_index); - } - } - - let mut extra_names = HashSet::new(); - let mut columns = HashSet::new(); - - for expr in &self.request.filters { - columns.clear(); - if expr_to_columns(expr, &mut columns).is_err() { - continue; - } - extra_names.extend(columns.iter().map(|column| column.name.clone())); - } - - if let Some(expr) = predicate.region_partition_expr() { - expr.collect_column_names(&mut extra_names); - } - - if !extra_names.is_empty() { - for column in &metadata.column_metadatas { - if extra_names.contains(column.column_schema.name.as_str()) - && !seen.contains(&column.column_id) - { - read_column_ids.push(column.column_id); - } - extra_names.remove(column.column_schema.name.as_str()); - } - if !extra_names.is_empty() { - warn!( - "Some columns in filters are not found in region {}: {:?}", - metadata.region_id, extra_names - ); - } - } - Ok(read_column_ids) - } - /// Partitions filters into two groups: non-field filters and field filters. /// Returns `(non_field_filters, field_filters)`. fn partition_by_field_filters(&self) -> (Vec, Vec) { @@ -797,10 +744,10 @@ pub struct ScanInput { access_layer: AccessLayerRef, /// Maps projected Batches to RecordBatches. pub(crate) mapper: Arc, - /// Column ids to read from memtables and SSTs. + /// The columns to read from memtables and SSTs. /// Notice this is different from the columns in `mapper` which are projected columns. /// But this read columns might also include non-projected columns needed for filtering. - pub(crate) read_column_ids: Vec, + pub(crate) read_cols: ReadColumns, /// Time range filter for time index. pub(crate) time_range: Option, /// Predicate to push down. @@ -855,7 +802,7 @@ impl ScanInput { pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput { ScanInput { access_layer, - read_column_ids: mapper.column_ids().to_vec(), + read_cols: mapper.read_columns().clone(), mapper: Arc::new(mapper), time_range: None, predicate: PredicateGroup::default(), @@ -1102,14 +1049,14 @@ impl ScanInput { let decode_pk_values = !self.compaction && self .mapper - .column_ids() - .iter() - .any(|column_id| self.mapper.metadata().primary_key.contains(column_id)); + .read_columns() + .column_ids_iter() + .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id)); let reader = self .access_layer .read_sst(file.clone()) .predicate(predicate) - .projection(Some(self.read_column_ids.clone())) + .projection(Some(self.read_cols.clone())) .cache(self.cache_strategy.clone()) .inverted_index_appliers(self.inverted_index_appliers.clone()) .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone()) @@ -1408,19 +1355,18 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option VersionRef { - let mutable = Arc::new(TimePartitions::new( - metadata.clone(), - Arc::new(EmptyMemtableBuilder::default()), - 0, - None, - )); - Arc::new(VersionBuilder::new(metadata, mutable).build()) - } - async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec) -> ScanInput { let env = SchedulerEnv::new().await; - let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); let file = FileHandle::new( crate::sst::file::FileMeta::default(), @@ -1838,86 +1770,6 @@ mod tests { .with_files(vec![file]) } - #[tokio::test] - async fn test_build_read_column_ids_includes_filters() { - let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let version = new_version(metadata.clone()); - let env = SchedulerEnv::new().await; - let request = ScanRequest { - projection_input: Some(vec![4].into()), - filters: vec![ - col("v0").gt(lit(1)), - col("ts").gt(lit(0)), - col("k0").eq(lit("foo")), - ], - ..Default::default() - }; - let scan_region = ScanRegion::new( - version, - env.access_layer.clone(), - request, - CacheStrategy::Disabled, - ); - let predicate = - PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap(); - let projection = &scan_region.request.projection_indices().unwrap(); - let read_ids = scan_region - .build_read_column_ids(projection, &predicate) - .unwrap(); - assert_eq!(vec![4, 0, 2, 3], read_ids); - } - - #[tokio::test] - async fn test_build_read_column_ids_empty_projection() { - let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let version = new_version(metadata.clone()); - let env = SchedulerEnv::new().await; - let request = ScanRequest { - projection_input: Some(ProjectionInput::default()), - ..Default::default() - }; - let scan_region = ScanRegion::new( - version, - env.access_layer.clone(), - request, - CacheStrategy::Disabled, - ); - let predicate = - PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap(); - let projection = &scan_region.request.projection_indices().unwrap(); - let read_ids = scan_region - .build_read_column_ids(projection, &predicate) - .unwrap(); - // Empty projection should still read the time index column (id 2 in this test schema). - assert_eq!(vec![2], read_ids); - } - - #[tokio::test] - async fn test_build_read_column_ids_keeps_projection_order() { - let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let version = new_version(metadata.clone()); - let env = SchedulerEnv::new().await; - let request = ScanRequest { - projection_input: Some(vec![4, 1].into()), - filters: vec![col("v0").gt(lit(1))], - ..Default::default() - }; - let scan_region = ScanRegion::new( - version, - env.access_layer.clone(), - request, - CacheStrategy::Disabled, - ); - let predicate = - PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap(); - let projection = &scan_region.request.projection_indices().unwrap(); - let read_ids = scan_region - .build_read_column_ids(projection, &predicate) - .unwrap(); - // Projection order preserved, extra columns appended in schema order. - assert_eq!(vec![4, 1, 3], read_ids); - } - /// Helper to create a timestamp millisecond literal. fn ts_lit(val: i64) -> datafusion_expr::Expr { lit(ScalarValue::TimestampMillisecond(Some(val), None)) @@ -1943,7 +1795,7 @@ mod tests { let fingerprint = build_scan_fingerprint(&input).unwrap(); let expected = ScanRequestFingerprintBuilder { - read_column_ids: input.read_column_ids.clone(), + read_columns: input.read_cols, read_column_types: vec![ metadata .column_by_id(0) @@ -2019,7 +1871,7 @@ mod tests { let fingerprint = build_scan_fingerprint(&input).unwrap(); let expected = ScanRequestFingerprintBuilder { - read_column_ids: input.read_column_ids.clone(), + read_columns: input.read_cols, read_column_types: vec![ metadata .column_by_id(0) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 1a94c274be..5b7e46b0c1 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1368,7 +1368,7 @@ mod split_tests { async fn new_stream_context_with_files(files: Vec) -> StreamContext { let env = SchedulerEnv::new().await; let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files); StreamContext { @@ -1745,7 +1745,7 @@ mod tests { ) -> Arc { let env = SchedulerEnv::new().await; let metadata = metadata_for_test(); - let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); let input = ScanInput::new(env.access_layer.clone(), mapper) .with_cache(CacheStrategy::Disabled) .with_memtables(memtables) diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 0415cdbfb3..ec7e038a08 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -207,7 +207,7 @@ impl FileRange { self.file_handle().file_id().file_id(), self.row_group_idx, cache_strategy, - self.context.read_format().projection_indices(), + self.context.read_format().parquet_read_columns(), flat_row_group_reader, ); FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) @@ -655,14 +655,18 @@ mod tests { use datafusion_expr::{col, lit}; use super::*; + use crate::read::read_columns::ReadColumns; use crate::sst::parquet::flat_format::FlatReadFormat; use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata}; fn new_test_range_base(filters: Vec) -> RangeBase { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", true, @@ -705,7 +709,9 @@ mod tests { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", true, diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index e23d6c85cc..d8111dbe62 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -51,10 +51,12 @@ use crate::error::{ ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, }; +use crate::read::read_columns::ReadColumns; use crate::sst::parquet::format::{ FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, StatValues, column_null_counts, column_values, }; +use crate::sst::parquet::read_columns::ParquetReadColumns; use crate::sst::{ FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema, with_field_id, @@ -162,7 +164,7 @@ impl FlatReadFormat { /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding. pub fn new( metadata: RegionMetadataRef, - column_ids: impl Iterator, + read_cols: ReadColumns, num_columns: Option, file_path: &str, skip_auto_convert: bool, @@ -178,16 +180,16 @@ impl FlatReadFormat { // Only skip auto convert when the primary key encoding is sparse. ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new( metadata, - column_ids, + read_cols, skip_auto_convert, )) } else { ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new( - metadata, column_ids, false, + metadata, read_cols, false, )) } } else { - ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids)) + ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols)) }; Ok(FlatReadFormat { @@ -258,9 +260,10 @@ impl FlatReadFormat { /// Gets the projected output schema produced by parquet reading. pub(crate) fn output_arrow_schema(&self) -> Result { + let projection = self.parquet_read_columns().root_indices(); let schema = self .arrow_schema() - .project(self.projection_indices()) + .project(projection) .context(ComputeArrowSnafu)?; Ok(Arc::new(schema)) } @@ -273,11 +276,11 @@ impl FlatReadFormat { } } - /// Gets sorted projection indices to read from the SST file. - pub(crate) fn projection_indices(&self) -> &[usize] { + /// Get the sorted read columns to read from the sst file. + pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns { match &self.parquet_adapter { - ParquetAdapter::Flat(p) => &p.format_projection.projection_indices, - ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(), + ParquetAdapter::Flat(p) => &p.format_projection.parquet_read_cols, + ParquetAdapter::PrimaryKeyToFlat(p) => p.format.parquet_read_columns(), } } @@ -413,7 +416,7 @@ impl ParquetPrimaryKeyToFlat { /// Creates a helper with existing `metadata` and `column_ids` to read. fn new( metadata: RegionMetadataRef, - column_ids: impl Iterator, + read_cols: ReadColumns, skip_auto_convert: bool, ) -> ParquetPrimaryKeyToFlat { assert!(if skip_auto_convert { @@ -422,20 +425,18 @@ impl ParquetPrimaryKeyToFlat { true }); - let column_ids: Vec<_> = column_ids.collect(); - // Creates a map to lookup index based on the new format. let id_to_index = sst_column_id_indices(&metadata); let sst_column_num = flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default()); let codec = build_primary_key_codec(&metadata); - let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied()); + let format = PrimaryKeyReadFormat::new(metadata.clone(), read_cols.clone()); let (convert_format, format_projection) = if skip_auto_convert { ( None, FormatProjection { - projection_indices: format.projection_indices().to_vec(), + parquet_read_cols: format.parquet_read_columns().clone(), column_id_to_projected_index: format.field_id_to_projected_index().clone(), }, ) @@ -444,7 +445,7 @@ impl ParquetPrimaryKeyToFlat { let format_projection = FormatProjection::compute_format_projection( &id_to_index, sst_column_num, - column_ids.iter().copied(), + read_cols.clone(), ); ( FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec), @@ -482,14 +483,14 @@ struct ParquetFlat { impl ParquetFlat { /// Creates a helper with existing `metadata` and `column_ids` to read. - fn new(metadata: RegionMetadataRef, column_ids: impl Iterator) -> ParquetFlat { + fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> ParquetFlat { // Creates a map to lookup index. let id_to_index = sst_column_id_indices(&metadata); let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); let sst_column_num = flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default()); let format_projection = - FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids); + FormatProjection::compute_format_projection(&id_to_index, sst_column_num, read_cols); Self { metadata, @@ -789,7 +790,9 @@ impl FlatReadFormat { pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat { Self::new( Arc::clone(&metadata), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", false, @@ -810,6 +813,7 @@ mod tests { use store_api::storage::RegionId; use super::{FlatReadFormat, field_column_start}; + use crate::read::read_columns::ReadColumns; use crate::sst::{ FlatSchemaOptions, flat_sst_arrow_schema_column_num, to_flat_sst_arrow_schema, }; @@ -892,7 +896,7 @@ mod tests { let metadata = Arc::new(build_metadata(1, 2, PrimaryKeyEncoding::Dense)); let read_format = FlatReadFormat::new( metadata.clone(), - [0_u32, 2_u32].into_iter(), + ReadColumns::from_deduped_column_ids([0_u32, 2_u32]), None, "test", false, @@ -900,9 +904,10 @@ mod tests { .unwrap(); let output_schema = read_format.output_arrow_schema().unwrap(); + let projection = read_format.parquet_read_columns().root_indices(); let expected = Arc::new( to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()) - .project(read_format.projection_indices()) + .project(projection) .unwrap(), ); diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index e8b2cfe647..b9afdfaac7 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -52,8 +52,10 @@ use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, }; +use crate::read::read_columns::ReadColumns; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::sst::file::{FileMeta, FileTimeRange}; +use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns}; use crate::sst::to_sst_arrow_schema; /// Arrow array type for the primary key dictionary. @@ -212,7 +214,7 @@ pub struct PrimaryKeyReadFormat { /// In SST schema, fields are stored in the front of the schema. field_id_to_index: HashMap, /// Indices of columns to read from the SST. It contains all internal columns. - projection_indices: Vec, + parquet_read_cols: ParquetReadColumns, /// Field column id to their index in the projected schema ( /// the schema of [Batch]). field_id_to_projected_index: HashMap, @@ -222,10 +224,7 @@ pub struct PrimaryKeyReadFormat { impl PrimaryKeyReadFormat { /// Creates a helper with existing `metadata` and `column_ids` to read. - pub fn new( - metadata: RegionMetadataRef, - column_ids: impl Iterator, - ) -> PrimaryKeyReadFormat { + pub fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> PrimaryKeyReadFormat { let field_id_to_index: HashMap<_, _> = metadata .field_columns() .enumerate() @@ -236,14 +235,14 @@ impl PrimaryKeyReadFormat { let format_projection = FormatProjection::compute_format_projection( &field_id_to_index, arrow_schema.fields.len(), - column_ids, + read_cols, ); PrimaryKeyReadFormat { metadata, arrow_schema, field_id_to_index, - projection_indices: format_projection.projection_indices, + parquet_read_cols: format_projection.parquet_read_cols, field_id_to_projected_index: format_projection.column_id_to_projected_index, primary_key_codec: None, } @@ -262,9 +261,8 @@ impl PrimaryKeyReadFormat { &self.metadata } - /// Gets sorted projection indices to read. - pub(crate) fn projection_indices(&self) -> &[usize] { - &self.projection_indices + pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns { + &self.parquet_read_cols } /// Gets the field id to projected index. @@ -580,8 +578,8 @@ impl PrimaryKeyReadFormat { /// Helper to compute the projection for the SST. pub(crate) struct FormatProjection { - /// Indices of columns to read from the SST. It contains all internal columns. - pub(crate) projection_indices: Vec, + /// The columns to read from the SST. It contains all internal columns. + pub(crate) parquet_read_cols: ParquetReadColumns, /// Column id to their index in the projected schema ( /// the schema after projection). /// @@ -596,50 +594,91 @@ impl FormatProjection { pub(crate) fn compute_format_projection( id_to_index: &HashMap, sst_column_num: usize, - column_ids: impl Iterator, + cols: ReadColumns, ) -> Self { - // Maps column id of a projected column to its index in SST. - // It also ignores columns not in the SST. - // [(column id, index in SST)] - let mut projected_schema: Vec<_> = column_ids - .filter_map(|column_id| { + let mut projected_columns: Vec<_> = cols + .cols + .into_iter() + .filter_map(|col| { id_to_index - .get(&column_id) + .get(&col.column_id) .copied() - .map(|index| (column_id, index)) + .map(|index_of_sst| (col.column_id, index_of_sst, col.nested_paths)) }) .collect(); // Sorts columns by their indices in the SST. SST uses a bitmap for projection. - // This ensures the schema of `projected_schema` is the same as the batch returned from the SST. - projected_schema.sort_unstable_by_key(|x| x.1); - // Dedups the entries to avoid the case that `column_ids` has duplicated columns. - projected_schema.dedup_by_key(|x| x.1); - - // Collects all projected indices. - // It contains the positions of all columns we need to read. - let mut projection_indices: Vec<_> = projected_schema - .iter() - .map(|(_column_id, index)| *index) - // We need to add all fixed position columns. - .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num) - .collect(); - projection_indices.sort_unstable(); - // Removes duplications. - projection_indices.dedup(); + // This ensures the schema of `projected_columns` is the same as the batch returned from the SST. + projected_columns.sort_unstable_by_key(|(_, index, _)| *index); + let mut parquet_read_cols: Vec = + Vec::with_capacity(projected_columns.len() + FIXED_POS_COLUMN_NUM); // Creates a map from column id to the index of that column in the projected record batch. - let column_id_to_projected_index = projected_schema - .into_iter() - .map(|(column_id, _)| column_id) - .enumerate() - .map(|(index, column_id)| (column_id, index)) - .collect(); + let mut column_id_to_projected_index = HashMap::with_capacity(projected_columns.len()); + + for (col_id, index_of_sst, nested_paths) in projected_columns { + Self::merge_or_push_parquet_column(&mut parquet_read_cols, index_of_sst, nested_paths); + + column_id_to_projected_index + .entry(col_id) + .or_insert_with(|| parquet_read_cols.len() - 1); + } + + // In SST schema, fixed-position columns are always in the tail: + // `time index, __primary_key, __sequence, __op_type`. + Self::append_time_index_if_needed(&mut parquet_read_cols, sst_column_num); + Self::append_fixed_internal_columns(&mut parquet_read_cols, sst_column_num); Self { - projection_indices, + parquet_read_cols: ParquetReadColumns::from_deduped(parquet_read_cols), column_id_to_projected_index, } } + + fn merge_or_push_parquet_column( + parquet_read_cols: &mut Vec, + index_of_sst: usize, + nested_paths: Vec>, + ) { + // `projected_columns` is sorted by parquet root index, so repeated reads + // for the same root column are always adjacent. + if let Some(last_col) = parquet_read_cols.last_mut() + && last_col.root_index() == index_of_sst + { + last_col.merge_nested_paths(nested_paths); + return; + } + + let parquet_col = ParquetReadColumn::new(index_of_sst).with_nested_paths(nested_paths); + parquet_read_cols.push(parquet_col); + } + + fn append_time_index_if_needed( + parquet_read_cols: &mut Vec, + sst_column_num: usize, + ) { + let time_index = sst_column_num - FIXED_POS_COLUMN_NUM; + // Existing projected roots are already sorted by SST root index, and may + // already include the time index, so we compare against the last root to + // decide whether we still need to append `time index`. + let needs_time_index = parquet_read_cols + .last() + .map(|col| col.root_index() != time_index) + .unwrap_or(true); + if needs_time_index { + parquet_read_cols.push(ParquetReadColumn::new(time_index)); + } + } + + // Append internal columns in fixed order: `__primary_key`, `__sequence`, + // `__op_type`. + fn append_fixed_internal_columns( + parquet_read_cols: &mut Vec, + sst_column_num: usize, + ) { + for index in sst_column_num - INTERNAL_COLUMN_NUM..sst_column_num { + parquet_read_cols.push(ParquetReadColumn::new(index)); + } + } } /// Values of column statistics of the SST. @@ -671,7 +710,9 @@ impl PrimaryKeyReadFormat { pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat { Self::new( Arc::clone(&metadata), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), ) } } @@ -929,17 +970,33 @@ mod tests { fn test_projection_indices() { let metadata = build_test_region_metadata(); // Only read tag1 - let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [3].iter().copied()); - assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); + let read_format = + PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([3])); + assert_eq!( + &[2, 3, 4, 5], + read_format.parquet_read_columns().root_indices() + ); // Only read field1 - let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [4].iter().copied()); - assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices()); + let read_format = + PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([4])); + assert_eq!( + &[0, 2, 3, 4, 5], + read_format.parquet_read_columns().root_indices() + ); // Only read ts - let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [5].iter().copied()); - assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); + let read_format = + PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([5])); + assert_eq!( + &[2, 3, 4, 5], + read_format.parquet_read_columns().root_indices() + ); // Read field0, tag0, ts - let read_format = PrimaryKeyReadFormat::new(metadata, [2, 1, 5].iter().copied()); - assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices()); + let read_format = + PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids([2, 1, 5])); + assert_eq!( + &[1, 2, 3, 4, 5], + read_format.parquet_read_columns().root_indices() + ); } #[test] @@ -985,7 +1042,8 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied()); + let read_format = + PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids)); assert_eq!(arrow_schema, *read_format.arrow_schema()); let record_batch = RecordBatch::new_empty(arrow_schema); @@ -1004,7 +1062,8 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied()); + let read_format = + PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids)); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 @@ -1030,12 +1089,12 @@ mod tests { #[test] fn test_convert_record_batch_with_override_sequence() { let metadata = build_test_region_metadata(); - let column_ids: Vec<_> = metadata - .column_metadatas - .iter() - .map(|col| col.column_id) - .collect(); - let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied()); + let read_format = PrimaryKeyReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), + ); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 @@ -1202,27 +1261,60 @@ mod tests { // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7) // Only read tag1 (column_id=3, index=1) + fixed columns - let read_format = - FlatReadFormat::new(metadata.clone(), [3].iter().copied(), None, "test", false) - .unwrap(); - assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices()); + let read_format = FlatReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids([3]), + None, + "test", + false, + ) + .unwrap(); + assert_eq!( + &[1, 4, 5, 6, 7], + read_format.parquet_read_columns().root_indices() + ); // Only read field1 (column_id=4, index=2) + fixed columns - let read_format = - FlatReadFormat::new(metadata.clone(), [4].iter().copied(), None, "test", false) - .unwrap(); - assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices()); + let read_format = FlatReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids([4]), + None, + "test", + false, + ) + .unwrap(); + assert_eq!( + &[2, 4, 5, 6, 7], + read_format.parquet_read_columns().root_indices() + ); // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed) - let read_format = - FlatReadFormat::new(metadata.clone(), [5].iter().copied(), None, "test", false) - .unwrap(); - assert_eq!(&[4, 5, 6, 7], read_format.projection_indices()); + let read_format = FlatReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids([5]), + None, + "test", + false, + ) + .unwrap(); + assert_eq!( + &[4, 5, 6, 7], + read_format.parquet_read_columns().root_indices() + ); // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns - let read_format = - FlatReadFormat::new(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap(); - assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices()); + let read_format = FlatReadFormat::new( + metadata, + ReadColumns::from_deduped_column_ids([2, 1, 5]), + None, + "test", + false, + ) + .unwrap(); + assert_eq!( + &[0, 3, 4, 5, 6, 7], + read_format.parquet_read_columns().root_indices() + ); } #[test] @@ -1230,7 +1322,7 @@ mod tests { let metadata = build_test_region_metadata(); let mut format = FlatReadFormat::new( metadata, - std::iter::once(1), // Just read tag0 + ReadColumns::from_deduped_column_ids(std::iter::once(1)), // Just read tag0 Some(8), "test", false, @@ -1447,7 +1539,7 @@ mod tests { .collect(); let format = FlatReadFormat::new( metadata.clone(), - column_ids.into_iter(), + ReadColumns::from_deduped_column_ids(column_ids), Some(6), "test", false, @@ -1513,7 +1605,7 @@ mod tests { .collect(); let format = FlatReadFormat::new( metadata.clone(), - column_ids.clone().into_iter(), + ReadColumns::from_deduped_column_ids(column_ids.clone()), None, "test", false, @@ -1581,9 +1673,14 @@ mod tests { // Compare the actual result with the expected record batch assert_eq!(expected_record_batch, result); - let format = - FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true) - .unwrap(); + let format = FlatReadFormat::new( + metadata.clone(), + ReadColumns::from_deduped_column_ids(column_ids), + None, + "test", + true, + ) + .unwrap(); // Test conversion with sparse encoding and skip convert. let result = format.convert_batch(record_batch.clone(), None).unwrap(); assert_eq!(record_batch, result); diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs index 7c6ce9e156..241a56e8ed 100644 --- a/src/mito2/src/sst/parquet/prefilter.rs +++ b/src/mito2/src/sst/parquet/prefilter.rs @@ -474,7 +474,7 @@ impl PrefilterContextBuilder { return None; } - let total_count = read_format.projection_indices().len(); + let total_count = read_format.parquet_read_columns().root_indices().len(); let remaining_count = total_count.saturating_sub(prefilter_count); if pk_filters.is_none() && prefilter_count >= total_count { return None; @@ -734,6 +734,7 @@ mod tests { use store_api::codec::PrimaryKeyEncoding; use super::*; + use crate::read::read_columns::ReadColumns; use crate::sst::internal_fields; use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index}; use crate::test_util::sst_util::{ @@ -981,7 +982,9 @@ mod tests { Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense)); let read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", false, @@ -1017,7 +1020,9 @@ mod tests { )); let legacy_read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "memtable", false, @@ -1044,7 +1049,9 @@ mod tests { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let raw_pk_read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "memtable", true, @@ -1078,7 +1085,9 @@ mod tests { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let full_read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", true, @@ -1108,7 +1117,7 @@ mod tests { let ts = metadata.time_index_column().column_id; let projected_read_format = FlatReadFormat::new( metadata.clone(), - [field_0, ts].into_iter(), + ReadColumns::from_deduped_column_ids([field_0, ts]), None, "test", true, @@ -1161,7 +1170,9 @@ mod tests { Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense)); let read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", false, diff --git a/src/mito2/src/sst/parquet/read_columns.rs b/src/mito2/src/sst/parquet/read_columns.rs index ee177822d1..bca1873060 100644 --- a/src/mito2/src/sst/parquet/read_columns.rs +++ b/src/mito2/src/sst/parquet/read_columns.rs @@ -23,21 +23,46 @@ pub type ParquetNestedPath = Vec; /// The parquet columns to read. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParquetReadColumns { + /// Root parquet column indices in the same order as `cols`. + /// + /// Most readers need these indices as a borrowed slice for Arrow schema + /// projection or parquet root-column projection. Keeping them here avoids + /// repeatedly collecting `cols.iter().map(|col| col.root_index)`. + root_indices: Vec, cols: Vec, has_nested: bool, } impl ParquetReadColumns { + /// Builds parquet read columns from deduplicated, normalized input. + /// + /// `cols` must not contain duplicate root indices, and nested paths must + /// already be merged. Empty `nested_paths` means reading the whole root column. + /// + /// This constructor does not validate or merge input. + pub fn from_deduped(cols: Vec) -> Self { + let has_nested = cols.iter().any(|col| !col.nested_paths.is_empty()); + let root_indices = cols.iter().map(|col| col.root_index).collect(); + Self { + root_indices, + cols, + has_nested, + } + } + /// Builds root-column projections from root indices that are already /// deduplicated. /// /// Note: this constructor does not check for duplicates. pub fn from_deduped_root_indices(root_indices: impl IntoIterator) -> Self { + let root_indices = root_indices.into_iter().collect::>(); let cols = root_indices - .into_iter() + .iter() + .copied() .map(ParquetReadColumn::new) .collect(); Self { + root_indices, cols, has_nested: false, } @@ -52,7 +77,12 @@ impl ParquetReadColumns { } pub fn root_indices_iter(&self) -> impl Iterator + '_ { - self.cols.iter().map(|col| col.root_index) + self.root_indices.iter().copied() + } + + /// Returns root parquet column indices. + pub fn root_indices(&self) -> &[usize] { + &self.root_indices } } @@ -95,6 +125,17 @@ impl ParquetReadColumn { } } + /// Merges additional nested paths into this root column. + pub fn merge_nested_paths(&mut self, nested_paths: Vec) { + let reads_whole_root = self.nested_paths.is_empty() || nested_paths.is_empty(); + if reads_whole_root { + // Empty nested paths means reading the whole root column. + self.nested_paths = vec![]; + } else { + self.nested_paths.extend(nested_paths); + } + } + pub fn root_index(&self) -> usize { self.root_index } @@ -235,13 +276,10 @@ mod tests { fn test_reads_whole_root() { let parquet_schema_desc = build_test_nested_parquet_schema(); - let projection = ParquetReadColumns { - cols: vec![ParquetReadColumn { - root_index: 0, - nested_paths: vec![], - }], - has_nested: false, - }; + let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![], + }]); let (leaf_indices, matched_roots) = build_parquet_leaves_indices(&parquet_schema_desc, &projection); @@ -253,19 +291,16 @@ mod tests { fn test_filters_nested_paths() { let parquet_schema_desc = build_test_nested_parquet_schema(); - let projection = ParquetReadColumns { - cols: vec![ - ParquetReadColumn { - root_index: 0, - nested_paths: vec![vec!["j".to_string(), "b".to_string()]], - }, - ParquetReadColumn { - root_index: 1, - nested_paths: vec![], - }, - ], - has_nested: true, - }; + let projection = ParquetReadColumns::from_deduped(vec![ + ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }, + ParquetReadColumn { + root_index: 1, + nested_paths: vec![], + }, + ]); let (leaf_indices, matched_roots) = build_parquet_leaves_indices(&parquet_schema_desc, &projection); @@ -277,13 +312,10 @@ mod tests { fn test_reads_middle_level_path() { let parquet_schema_desc = build_test_nested_parquet_schema(); - let projection = ParquetReadColumns { - cols: vec![ParquetReadColumn { - root_index: 0, - nested_paths: vec![vec!["j".to_string(), "b".to_string()]], - }], - has_nested: true, - }; + let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string()]], + }]); let (leaf_indices, matched_roots) = build_parquet_leaves_indices(&parquet_schema_desc, &projection); @@ -295,13 +327,10 @@ mod tests { fn test_reads_leaf_level_path() { let parquet_schema_desc = build_test_nested_parquet_schema(); - let projection = ParquetReadColumns { - cols: vec![ParquetReadColumn { - root_index: 0, - nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]], - }], - has_nested: true, - }; + let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]], + }]); let (leaf_indices, matched_roots) = build_parquet_leaves_indices(&parquet_schema_desc, &projection); @@ -313,19 +342,16 @@ mod tests { fn test_build_projection_mask_with_unmatched_roots() { let parquet_schema_desc = build_test_nested_parquet_schema(); - let projection = ParquetReadColumns { - cols: vec![ - ParquetReadColumn { - root_index: 0, - nested_paths: vec![vec!["j".to_string(), "missing".to_string()]], - }, - ParquetReadColumn { - root_index: 1, - nested_paths: vec![], - }, - ], - has_nested: true, - }; + let projection = ParquetReadColumns::from_deduped(vec![ + ParquetReadColumn { + root_index: 0, + nested_paths: vec![vec!["j".to_string(), "missing".to_string()]], + }, + ParquetReadColumn { + root_index: 1, + nested_paths: vec![], + }, + ]); let plan = build_projection_plan(&projection, &parquet_schema_desc); @@ -340,16 +366,13 @@ mod tests { fn test_merges_mixed_paths() { let parquet_schema_desc = build_test_nested_parquet_schema(); - let projection = ParquetReadColumns { - cols: vec![ParquetReadColumn { - root_index: 0, - nested_paths: vec![ - vec!["j".to_string(), "a".to_string()], - vec!["j".to_string(), "b".to_string(), "d".to_string()], - ], - }], - has_nested: true, - }; + let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn { + root_index: 0, + nested_paths: vec![ + vec!["j".to_string(), "a".to_string()], + vec!["j".to_string(), "b".to_string(), "d".to_string()], + ], + }]); let (leaf_indices, matched_roots) = build_parquet_leaves_indices(&parquet_schema_desc, &projection); @@ -357,6 +380,32 @@ mod tests { assert_eq!(HashSet::from([0]), matched_roots); } + #[test] + fn test_merge_nested_paths_extends_paths() { + let mut col = ParquetReadColumn::new(0) + .with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]); + + col.merge_nested_paths(vec![vec!["j".to_string(), "b".to_string()]]); + + assert_eq!( + &[ + vec!["j".to_string(), "a".to_string()], + vec!["j".to_string(), "b".to_string()], + ], + col.nested_paths() + ); + } + + #[test] + fn test_merge_nested_paths_with_whole_root() { + let mut col = ParquetReadColumn::new(0) + .with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]); + + col.merge_nested_paths(vec![]); + + assert!(col.nested_paths().is_empty()); + } + // Test schema: // schema // |- j diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 5688133c46..5a7b8126ca 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -48,6 +48,7 @@ use store_api::region_request::PathType; use store_api::storage::{ColumnId, FileId}; use table::predicate::Predicate; +use self::stream::{NestedSchemaAligner, ParquetErrorAdapter, ProjectedRecordBatchStream}; use crate::cache::index::result_cache::PredicateKey; use crate::cache::{CacheStrategy, CachedSstMeta}; #[cfg(feature = "vector_index")] @@ -59,6 +60,7 @@ use crate::metrics::{ }; use crate::read::flat_projection::CompactionProjectionMapper; use crate::read::prune::FlatPruneReader; +use crate::read::read_columns::ReadColumns; use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::{ BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics, @@ -82,10 +84,7 @@ use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter, }; -use crate::sst::parquet::read_columns::{ - ParquetReadColumns, ProjectionMaskPlan, build_projection_plan, -}; -use crate::sst::parquet::reader::stream::{ParquetErrorAdapter, ProjectedRecordBatchStream}; +use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan}; use crate::sst::parquet::row_group::ParquetFetchMetrics; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -127,11 +126,11 @@ pub struct ParquetReaderBuilder { object_store: ObjectStore, /// Predicate to push down. predicate: Option, - /// Metadata of columns to read. + /// The columns to read. /// /// `None` reads all columns. Due to schema change, the projection /// can contain columns not in the parquet file. - projection: Option>, + read_cols: Option, /// Strategy to cache SST data. cache_strategy: CacheStrategy, /// Index appliers. @@ -171,7 +170,7 @@ impl ParquetReaderBuilder { file_handle, object_store, predicate: None, - projection: None, + read_cols: None, cache_strategy: CacheStrategy::Disabled, inverted_index_appliers: [None, None], bloom_filter_index_appliers: [None, None], @@ -199,8 +198,8 @@ impl ParquetReaderBuilder { /// /// The reader only applies the projection to fields. #[must_use] - pub fn projection(mut self, projection: Option>) -> ParquetReaderBuilder { - self.projection = projection; + pub fn projection(mut self, read_cols: Option) -> ParquetReaderBuilder { + self.read_cols = read_cols; self } @@ -377,30 +376,25 @@ impl ParquetReaderBuilder { None }; - let mut read_format = if let Some(column_ids) = &self.projection { - FlatReadFormat::new( - region_meta.clone(), - column_ids.iter().copied(), - Some(parquet_meta.file_metadata().schema_descr().num_columns()), - &file_path, - skip_auto_convert, - )? + let read_cols = if let Some(read_cols) = &self.read_cols { + read_cols.clone() } else { - // Lists all column ids to read, we always use the expected metadata if possible. let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); - let column_ids: Vec<_> = expected_meta - .column_metadatas - .iter() - .map(|col| col.column_id) - .collect(); - FlatReadFormat::new( - region_meta.clone(), - column_ids.iter().copied(), - Some(parquet_meta.file_metadata().schema_descr().num_columns()), - &file_path, - skip_auto_convert, - )? + // Lists all column ids to read, we always use the expected metadata if possible. + ReadColumns::from_deduped_column_ids( + expected_meta + .column_metadatas + .iter() + .map(|col| col.column_id), + ) }; + let mut read_format = FlatReadFormat::new( + region_meta.clone(), + read_cols, + Some(parquet_meta.file_metadata().schema_descr().num_columns()), + &file_path, + skip_auto_convert, + )?; if need_override_sequence(&parquet_meta) { read_format .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get())); @@ -408,12 +402,8 @@ impl ParquetReaderBuilder { // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); - let parquet_read_cols = ParquetReadColumns::from_deduped_root_indices( - read_format.projection_indices().iter().copied(), - ); - - let projection_plan = build_projection_plan(&parquet_read_cols, parquet_schema_desc); - + let parquet_read_cols = read_format.parquet_read_columns(); + let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc); let selection = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; @@ -479,6 +469,7 @@ impl ParquetReaderBuilder { output_schema, object_store: self.object_store.clone(), projection: projection_plan, + has_nested_projection: parquet_read_cols.has_nested(), cache_strategy: self.cache_strategy.clone(), prefilter_builder: filter_plan.prefilter_builder, }; @@ -1627,6 +1618,8 @@ pub(crate) struct RowGroupReaderBuilder { object_store: ObjectStore, /// Projection mask. projection: ProjectionMaskPlan, + /// Whether projected read columns include nested paths. + has_nested_projection: bool, /// Cache. cache_strategy: CacheStrategy, /// Pre-built prefilter state. `None` if prefiltering is not applicable. @@ -1730,11 +1723,16 @@ impl RowGroupReaderBuilder { stream: ParquetRecordBatchStream, ) -> Result { let stream = ParquetErrorAdapter::new(stream, self.file_path.clone()); - ProjectedRecordBatchStream::new( + if !self.has_nested_projection { + return Ok(stream.boxed()); + } + + Ok(NestedSchemaAligner::new( stream, self.projection.projected_root_presence.clone(), self.output_schema.clone(), - ) + )? + .boxed()) } /// Builds a [ParquetRecordBatchStream] with a custom projection mask. @@ -2222,10 +2220,12 @@ mod tests { let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let read_format = FlatReadFormat::new( region_metadata.clone(), - region_metadata - .column_metadatas - .iter() - .map(|column| column.column_id), + ReadColumns::from_deduped_column_ids( + region_metadata + .column_metadatas + .iter() + .map(|column| column.column_id), + ), None, &file_path, false, @@ -2328,7 +2328,9 @@ mod tests { let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref()); let read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", true, @@ -2350,7 +2352,9 @@ mod tests { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); let read_format = FlatReadFormat::new( metadata.clone(), - metadata.column_metadatas.iter().map(|c| c.column_id), + ReadColumns::from_deduped_column_ids( + metadata.column_metadatas.iter().map(|c| c.column_id), + ), None, "test", true, diff --git a/src/mito2/src/sst/parquet/reader/stream.rs b/src/mito2/src/sst/parquet/reader/stream.rs index adc0f44112..f6211df475 100644 --- a/src/mito2/src/sst/parquet/reader/stream.rs +++ b/src/mito2/src/sst/parquet/reader/stream.rs @@ -15,75 +15,95 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use datatypes::arrow::array::new_null_array; -use datatypes::arrow::datatypes::SchemaRef; +use datafusion_common::cast_column; +use datafusion_common::format::DEFAULT_CAST_OPTIONS; +use datatypes::arrow::array::{ArrayRef, new_null_array}; +use datatypes::arrow::datatypes::{DataType, FieldRef, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use futures::Stream; +use futures::stream::BoxStream; use parquet::arrow::async_reader::ParquetRecordBatchStream; use snafu::{IntoError, ResultExt, ensure}; -use crate::error::{NewRecordBatchSnafu, ReadParquetSnafu, Result, UnexpectedSnafu}; +use crate::error::{ + CastColumnSnafu, NewRecordBatchSnafu, ReadParquetSnafu, Result, UnexpectedSnafu, +}; use crate::sst::parquet::async_reader::SstAsyncFileReader; -/// Wraps a parquet record batch stream and fills missing projected root columns. +/// Aligns projected batches to the expected output schema for nested projections. /// +/// Background +/// ---------- /// Nested projection may ask parquet to read leaves under a root column. If none /// of the requested leaves exists in the current parquet file, parquet decoding -/// omits the whole root from the physical `RecordBatch`. The logical projection -/// still contains that root, so this wrapper restores the output shape by -/// inserting a root-level null array. -pub struct MissingColFiller { - /// Inner stream that yields record batches from parquet reader. +/// omits the whole root from the physical [`RecordBatch`]. +/// +/// In addition, after nested-path filtering, returned struct arrays may contain +/// only a subset of fields. The current output schema is not pruned by nested +/// paths, so physical struct fields can be a subset of the expected struct +/// fields, and their nested schema can differ from the expected output schema. +/// +/// To keep projected batches schema-consistent before entering upper readers: +/// - Root-column presence alignment restores missing projected root columns by +/// inserting root-level null arrays. +/// - Nested struct alignment aligns struct arrays to the expected nested field +/// layout. +pub struct NestedSchemaAligner { inner: S, /// Output schema expected by the upper reader. output_schema: SchemaRef, - /// Whether each projected root exists in the physical batch returned by parquet. - projected_root_matches: Vec, + /// Whether each projected root exists in the physical batch returned by + /// parquet. + projected_root_presence: Vec, /// Number of columns expected from the physical batch returned by parquet. expected_input_col_num: usize, - /// Whether all projected roots are present and the stream can pass batches through. - all_matched: bool, + /// Whether all projected roots are present and the stream can pass batches + /// through. + all_roots_present: bool, + /// The cache for whether incoming batches already match output schema. + is_schema_matched: Option, } -pub(crate) type ProjectedRecordBatchStream = MissingColFiller; +pub(crate) type ProjectedRecordBatchStream = BoxStream<'static, Result>; -impl MissingColFiller +impl NestedSchemaAligner where S: Stream>, { pub fn new( inner: S, - projected_root_matches: Vec, + projected_root_presence: Vec, output_schema: SchemaRef, - ) -> Result> { + ) -> Result> { ensure!( - projected_root_matches.len() == output_schema.fields().len(), + projected_root_presence.len() == output_schema.fields().len(), UnexpectedSnafu { reason: format!( - "MissingColFiller projected root matches len {} does not match output schema columns {}", - projected_root_matches.len(), + "NestedSchemaAligner projected root presence len {} does not match output schema columns {}", + projected_root_presence.len(), output_schema.fields().len() ), } ); - let expected_input_col_num = projected_root_matches + let expected_input_col_num = projected_root_presence .iter() .filter(|matched| **matched) .count(); - let all_matched = projected_root_matches.iter().all(|&m| m); + let all_roots_present = projected_root_presence.iter().all(|&m| m); - Ok(MissingColFiller { + Ok(NestedSchemaAligner { inner, output_schema, - projected_root_matches, + projected_root_presence, expected_input_col_num, - all_matched, + all_roots_present, + is_schema_matched: None, }) } } -impl Stream for MissingColFiller +impl Stream for NestedSchemaAligner where S: Stream> + Unpin, { @@ -94,18 +114,26 @@ where match Pin::new(&mut this.inner).poll_next(cx) { Poll::Ready(Some(Ok(rb))) => { - let output_schema = &this.output_schema; - let rb = if this.all_matched { + let rb = if this.all_roots_present { rb } else { fill_missing_cols( rb, - output_schema, - &this.projected_root_matches, + &this.output_schema, + &this.projected_root_presence, this.expected_input_col_num, )? }; - Poll::Ready(Some(Ok(rb))) + + let is_schema_matched = *this + .is_schema_matched + .get_or_insert_with(|| rb.schema() == this.output_schema); + + if is_schema_matched { + Poll::Ready(Some(Ok(rb))) + } else { + Poll::Ready(Some(align_batch_to_schema(rb, &this.output_schema))) + } } Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), Poll::Ready(None) => Poll::Ready(None), @@ -124,7 +152,7 @@ fn fill_missing_cols( rb.columns().len() == expected_input_col_num, UnexpectedSnafu { reason: format!( - "MissingColFiller expected {} input columns but got {}", + "NestedSchemaAligner expected {} input columns but got {}", expected_input_col_num, rb.columns().len() ), @@ -146,6 +174,40 @@ fn fill_missing_cols( RecordBatch::try_new(output_schema.clone(), cols).context(NewRecordBatchSnafu) } +fn align_batch_to_schema(rb: RecordBatch, output_schema: &SchemaRef) -> Result { + ensure!( + rb.num_columns() == output_schema.fields().len(), + UnexpectedSnafu { + reason: format!( + "NestedSchemaAligner expected {} columns but got {}", + output_schema.fields().len(), + rb.num_columns() + ), + } + ); + + let columns = rb + .columns() + .iter() + .zip(output_schema.fields()) + .map(|(array, field)| align_array(array, field)) + .collect::>>()?; + + RecordBatch::try_new(output_schema.clone(), columns).context(NewRecordBatchSnafu) +} + +fn align_array(array: &ArrayRef, field: &FieldRef) -> Result { + if array.data_type() == field.data_type() { + return Ok(array.clone()); + } + + if !matches!(field.data_type(), DataType::Struct(_)) { + return Ok(array.clone()); + } + + cast_column(array, field.as_ref(), &DEFAULT_CAST_OPTIONS).context(CastColumnSnafu) +} + /// Maps parquet stream errors into mito errors before batches enter the filler. pub(crate) struct ParquetErrorAdapter { inner: ParquetRecordBatchStream, @@ -181,14 +243,14 @@ impl Stream for ParquetErrorAdapter { mod tests { use std::sync::Arc; - use datatypes::arrow::array::{Array, ArrayRef, Int64Array, StringArray}; + use datatypes::arrow::array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; use datatypes::arrow::datatypes::{DataType, Field, Fields, Schema}; use futures::{StreamExt, stream}; use super::*; #[tokio::test] - async fn test_filler_with_all_projected_roots_match() { + async fn test_aligner_with_all_projected_roots_match() { let output_schema = schema([ Field::new("a", DataType::Int64, true), Field::new("b", DataType::Utf8, true), @@ -200,16 +262,16 @@ mod tests { .unwrap(); let stream = stream::iter([Ok(input.clone())]); - let mut filler = - MissingColFiller::new(stream, vec![true, true], output_schema.clone()).unwrap(); - let output = filler.next().await.unwrap().unwrap(); + let mut aligner = + NestedSchemaAligner::new(stream, vec![true, true], output_schema.clone()).unwrap(); + let output = aligner.next().await.unwrap().unwrap(); assert_eq!(input, output); - assert!(filler.next().await.is_none()); + assert!(aligner.next().await.is_none()); } #[tokio::test] - async fn test_filler_with_fills_null_root_columns() { + async fn test_aligner_with_fills_null_root_columns() { let input_schema = schema([Field::new("a", DataType::Int64, true)]); let output_schema = schema([ Field::new("a", DataType::Int64, true), @@ -219,9 +281,10 @@ mod tests { let input = RecordBatch::try_new(input_schema, vec![int_array([10, 20])]).unwrap(); let stream = stream::iter([Ok(input)]); - let mut filler = - MissingColFiller::new(stream, vec![true, false, false], output_schema.clone()).unwrap(); - let output = filler.next().await.unwrap().unwrap(); + let mut aligner = + NestedSchemaAligner::new(stream, vec![true, false, false], output_schema.clone()) + .unwrap(); + let output = aligner.next().await.unwrap().unwrap(); assert_eq!(output_schema, output.schema()); assert_eq!(3, output.num_columns()); @@ -243,7 +306,7 @@ mod tests { } #[tokio::test] - async fn test_filler_with_fills_missing_struct_root_column() { + async fn test_aligner_with_fills_missing_struct_root_column() { let input_schema = schema([Field::new("a", DataType::Int64, true)]); let struct_type = DataType::Struct(Fields::from(vec![ Field::new("x", DataType::Int64, true), @@ -256,9 +319,9 @@ mod tests { let input = RecordBatch::try_new(input_schema, vec![int_array([10, 20])]).unwrap(); let stream = stream::iter([Ok(input)]); - let mut filler = - MissingColFiller::new(stream, vec![true, false], output_schema.clone()).unwrap(); - let output = filler.next().await.unwrap().unwrap(); + let mut aligner = + NestedSchemaAligner::new(stream, vec![true, false], output_schema.clone()).unwrap(); + let output = aligner.next().await.unwrap().unwrap(); assert_eq!(output_schema, output.schema()); assert_eq!(2, output.num_columns()); @@ -267,20 +330,23 @@ mod tests { } #[tokio::test] - async fn test_filler_with_reject_projection_len_mismatch() { + async fn test_aligner_reject_projection_len_mismatch() { let output_schema = schema([Field::new("a", DataType::Int64, true)]); let stream = stream::iter([]); - let err = match MissingColFiller::new(stream, vec![true, false], output_schema) { - Ok(_) => panic!("MissingColFiller should reject projection length mismatch"), + let err = match NestedSchemaAligner::new(stream, vec![true, false], output_schema) { + Ok(_) => panic!("NestedSchemaAligner should reject projection length mismatch"), Err(err) => err, }; - assert!(err.to_string().contains("projected root matches len 2")); + assert!( + err.to_string() + .contains("projected root presence len 2 does not match output schema columns 1") + ); } #[tokio::test] - async fn test_filler_reject_with_input_column_mismatch() { + async fn test_aligner_reject_with_input_column_mismatch() { let input_schema = schema([Field::new("a", DataType::Int64, true)]); let output_schema = schema([ Field::new("a", DataType::Int64, true), @@ -290,9 +356,9 @@ mod tests { let input = RecordBatch::try_new(input_schema, vec![int_array([1, 2])]).unwrap(); let stream = stream::iter([Ok(input)]); - let mut filler = - MissingColFiller::new(stream, vec![true, true, false], output_schema).unwrap(); - let err = filler.next().await.unwrap().unwrap_err(); + let mut aligner = + NestedSchemaAligner::new(stream, vec![true, true, false], output_schema).unwrap(); + let err = aligner.next().await.unwrap().unwrap_err(); assert!( err.to_string() @@ -300,6 +366,44 @@ mod tests { ); } + #[tokio::test] + async fn test_nested_schema_aligner_aligns_struct_field() { + let output_schema = schema([Field::new( + "nested", + DataType::Struct(Fields::from(vec![ + Field::new("x", DataType::Int64, true), + Field::new("y", DataType::Utf8, true), + ])), + true, + )]); + let input = RecordBatch::try_new( + schema([Field::new( + "nested", + DataType::Struct(Fields::from(vec![Field::new("x", DataType::Int64, true)])), + true, + )]), + vec![Arc::new(StructArray::from(vec![( + Arc::new(Field::new("x", DataType::Int64, true)), + int_array([1, 2]), + )]))], + ) + .unwrap(); + + let mut aligner = + NestedSchemaAligner::new(stream::iter([Ok(input)]), vec![true], output_schema.clone()) + .unwrap(); + let output = aligner.next().await.unwrap().unwrap(); + + assert_eq!(output_schema, output.schema()); + let nested = output + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(2, nested.columns().len()); + assert_eq!(2, nested.column(1).null_count()); + } + fn schema(fields: impl IntoIterator) -> SchemaRef { Arc::new(Schema::new(fields.into_iter().collect::>())) }