diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 981d4f1d11..296d9ce2b1 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -60,7 +60,7 @@ use crate::error::{ }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; use crate::read::BoxedRecordBatchStream; -use crate::read::projection::ProjectionMapper; +use crate::read::flat_projection::FlatProjectionMapper; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; use crate::region::options::{MergeMode, RegionOptions}; @@ -1002,7 +1002,7 @@ impl CompactionSstReaderBuilder<'_> { } fn build_scan_input(self) -> Result { - let mapper = ProjectionMapper::all(&self.metadata)?; + let mapper = FlatProjectionMapper::all(&self.metadata)?; let mut scan_input = ScanInput::new(self.sst_layer, mapper) .with_files(self.inputs.to_vec()) .with_append_mode(self.append_mode) diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 7551eb33af..9fa3c641a7 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -26,7 +26,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::sst::parquet::file_range::{PreFilterMode, RangeBase}; -use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::parquet::prefilter::CachedPrimaryKeyFilter; use crate::sst::parquet::reader::SimpleFilterContext; use crate::sst::parquet::stats::RowGroupPruningStats; @@ -77,14 +77,26 @@ impl BulkIterContext { }) .collect(); - let read_format = ReadFormat::new( - region_metadata.clone(), - projection, - true, - None, - "memtable", - skip_auto_convert, - )?; + let read_format = if let Some(column_ids) = projection { + FlatReadFormat::new( + region_metadata.clone(), + column_ids.iter().copied(), + None, + "memtable", + skip_auto_convert, + )? + } else { + FlatReadFormat::new( + region_metadata.clone(), + region_metadata + .column_metadatas + .iter() + .map(|col| col.column_id), + None, + "memtable", + skip_auto_convert, + )? + }; let dyn_filters = predicate .as_ref() @@ -143,11 +155,10 @@ impl BulkIterContext { /// Extracts PK filters if flat format with dictionary-encoded PKs is used. fn extract_pk_filters( - read_format: &ReadFormat, + read_format: &FlatReadFormat, filters: &[SimpleFilterContext], ) -> Option>> { - let flat_format = read_format.as_flat()?; - if flat_format.batch_has_raw_pk_columns() { + if read_format.batch_has_raw_pk_columns() { return None; } let metadata = read_format.metadata(); @@ -179,7 +190,7 @@ impl BulkIterContext { Some(CachedPrimaryKeyFilter::new(inner)) } - pub(crate) fn read_format(&self) -> &ReadFormat { + pub(crate) fn read_format(&self) -> &FlatReadFormat { &self.base.read_format } diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index cea3304154..ac069f20be 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -60,8 +60,6 @@ impl EncodedBulkPartIter { sequence: Option, mem_scan_metrics: Option, ) -> error::Result { - assert!(context.read_format().as_flat().is_some()); - let parquet_meta = encoded_part.metadata().parquet_metadata.clone(); let data = encoded_part.data().clone(); let series_count = encoded_part.metadata().num_series as usize; @@ -238,8 +236,6 @@ impl BulkPartBatchIter { series_count: usize, mem_scan_metrics: Option, ) -> Self { - assert!(context.read_format().as_flat().is_some()); - let pk_filter = context.build_pk_filter(); Self { @@ -406,8 +402,7 @@ fn apply_combined_filters( }; // Converts the format to the flat format. - let format = context.read_format().as_flat().unwrap(); - let record_batch = format.convert_batch(record_batch, None)?; + let record_batch = context.read_format().convert_batch(record_batch, None)?; let num_rows = record_batch.num_rows(); let mut combined_filter = None; diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 90eb9a3da7..ec44a1da01 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -36,7 +36,7 @@ pub mod series_scan; pub mod stream; pub(crate) mod unordered_scan; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -63,7 +63,6 @@ use futures::TryStreamExt; use futures::stream::BoxStream; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use snafu::{OptionExt, ResultExt, ensure}; -use store_api::metadata::RegionMetadata; use store_api::storage::{ColumnId, SequenceNumber, SequenceRange}; use crate::error::{ @@ -71,8 +70,6 @@ use crate::error::{ Result, }; use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator}; -use crate::read::prune::PruneReader; - /// Storage internal representation of a batch of rows for a primary key (time series). /// /// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields @@ -573,24 +570,6 @@ impl Batch { size } - /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`. - pub(crate) fn projected_fields( - metadata: &RegionMetadata, - projection: &[ColumnId], - ) -> Vec<(ColumnId, ConcreteDataType)> { - let projected_ids: HashSet<_> = projection.iter().copied().collect(); - metadata - .field_columns() - .filter_map(|column| { - if projected_ids.contains(&column.column_id) { - Some((column.column_id, column.column_schema.data_type.clone())) - } else { - None - } - }) - .collect() - } - /// Returns timestamps in a native slice or `None` if the batch is empty. pub(crate) fn timestamps_native(&self) -> Option<&[i64]> { if self.timestamps.is_empty() { @@ -1111,8 +1090,6 @@ pub enum Source { Iter(BoxedBatchIterator), /// Source from a [BoxedBatchStream]. Stream(BoxedBatchStream), - /// Source from a [PruneReader]. - PruneReader(PruneReader), } impl Source { @@ -1122,7 +1099,6 @@ impl Source { Source::Reader(reader) => reader.next_batch().await, Source::Iter(iter) => iter.next().transpose(), Source::Stream(stream) => stream.try_next().await, - Source::PruneReader(reader) => reader.next_batch().await, } } } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 90d664a4bd..679c1d72e7 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -28,7 +28,7 @@ use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; use datatypes::value::Value; -use datatypes::vectors::{Helper, VectorRef}; +use datatypes::vectors::VectorRef; use mito_codec::row_converter::{ CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec, build_primary_key_codec_with_fields, @@ -39,127 +39,14 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::{ - CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu, - DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu, - UnsupportedOperationSnafu, + CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, + NewRecordBatchSnafu, RecordBatchSnafu, Result, UnsupportedOperationSnafu, }; use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns}; -use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper}; -use crate::read::{Batch, BatchColumn, BatchReader}; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray}; use crate::sst::{internal_fields, tag_maybe_to_dictionary_field}; -/// Reader to adapt schema of underlying reader to expected schema. -pub struct CompatReader { - /// Underlying reader. - reader: R, - /// Helper to compat batches. - compat: PrimaryKeyCompatBatch, -} - -impl CompatReader { - /// Creates a new compat reader. - /// - `mapper` is built from the metadata users expect to see. - /// - `reader_meta` is the metadata of the input reader. - /// - `reader` is the input reader. - pub fn new( - mapper: &ProjectionMapper, - reader_meta: RegionMetadataRef, - reader: R, - ) -> Result> { - Ok(CompatReader { - reader, - compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?, - }) - } -} - -#[async_trait::async_trait] -impl BatchReader for CompatReader { - async fn next_batch(&mut self) -> Result> { - let Some(mut batch) = self.reader.next_batch().await? else { - return Ok(None); - }; - - batch = self.compat.compat_batch(batch)?; - - Ok(Some(batch)) - } -} - -/// Helper to adapt schema of the batch to an expected schema. -pub(crate) enum CompatBatch { - /// Adapter for primary key format. - PrimaryKey(PrimaryKeyCompatBatch), - /// Adapter for flat format. - Flat(FlatCompatBatch), -} - -impl CompatBatch { - /// Returns the inner primary key batch adapter if this is a PrimaryKey format. - #[allow(dead_code)] - pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> { - match self { - CompatBatch::PrimaryKey(batch) => Some(batch), - _ => None, - } - } - - /// Returns the inner flat batch adapter if this is a Flat format. - pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> { - match self { - CompatBatch::Flat(batch) => Some(batch), - _ => None, - } - } -} - -/// A helper struct to adapt schema of the batch to an expected schema. -pub(crate) struct PrimaryKeyCompatBatch { - /// Optional primary key adapter. - rewrite_pk: Option, - /// Optional primary key adapter. - compat_pk: Option, - /// Optional fields adapter. - compat_fields: Option, -} - -impl PrimaryKeyCompatBatch { - /// Creates a new [CompatBatch]. - /// - `mapper` is built from the metadata users expect to see. - /// - `reader_meta` is the metadata of the input reader. - pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result { - let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta); - let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?; - let mapper = mapper.as_primary_key().context(UnexpectedSnafu { - reason: "Unexpected format", - })?; - let compat_fields = may_compat_fields(mapper, &reader_meta)?; - - Ok(Self { - rewrite_pk, - compat_pk, - compat_fields, - }) - } - - /// Adapts the `batch` to the expected schema. - pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result { - if let Some(rewrite_pk) = &self.rewrite_pk { - batch = rewrite_pk.compat(batch)?; - } - if let Some(compat_pk) = &self.compat_pk { - batch = compat_pk.compat(batch)?; - } - if let Some(compat_fields) = &self.compat_fields { - batch = compat_fields.compat(batch)?; - } - - Ok(batch) - } -} - /// Returns true if `left` and `right` have same columns and primary key encoding. pub(crate) fn has_same_columns_and_pk_encoding( left: &RegionMetadata, @@ -293,7 +180,6 @@ impl FlatCompatBatch { ), })?; index_or_defaults.push(IndexOrDefault::DefaultValue { - column_id: expect_column.column_id, default_vector, semantic_type: expect_column.semantic_type, }); @@ -367,7 +253,6 @@ impl FlatCompatBatch { } } IndexOrDefault::DefaultValue { - column_id: _, default_vector, semantic_type, } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag), @@ -415,121 +300,6 @@ fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result, - /// Default values to append. - values: Vec<(ColumnId, Value)>, -} - -impl CompatPrimaryKey { - /// Make primary key of the `batch` compatible. - fn compat(&self, mut batch: Batch) -> Result { - let mut buffer = Vec::with_capacity( - batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(), - ); - buffer.extend_from_slice(batch.primary_key()); - self.converter - .encode_values(&self.values, &mut buffer) - .context(EncodeSnafu)?; - - batch.set_primary_key(buffer); - - // update cache - if let Some(pk_values) = &mut batch.pk_values { - pk_values.extend(&self.values); - } - - Ok(batch) - } -} - -/// Helper to make fields compatible. -#[derive(Debug)] -struct CompatFields { - /// Column Ids and DataTypes the reader actually returns. - actual_fields: Vec<(ColumnId, ConcreteDataType)>, - /// Indices to convert actual fields to expect fields. - index_or_defaults: Vec, -} - -impl CompatFields { - /// Make fields of the `batch` compatible. - fn compat(&self, batch: Batch) -> Result { - debug_assert_eq!(self.actual_fields.len(), batch.fields().len()); - debug_assert!( - self.actual_fields - .iter() - .zip(batch.fields()) - .all(|((id, _), batch_column)| *id == batch_column.column_id) - ); - - let len = batch.num_rows(); - self.index_or_defaults - .iter() - .map(|index_or_default| match index_or_default { - IndexOrDefault::Index { pos, cast_type } => { - let old_column = &batch.fields()[*pos]; - - let data = if let Some(ty) = cast_type { - if let Some(json_type) = ty.as_json() { - let json_array = old_column.data.to_arrow_array(); - let json_array = - align_json_array(&json_array, &json_type.as_arrow_type()) - .context(RecordBatchSnafu)?; - Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)? - } else { - old_column.data.cast(ty).with_context(|_| CastVectorSnafu { - from: old_column.data.data_type(), - to: ty.clone(), - })? - } - } else { - old_column.data.clone() - }; - Ok(BatchColumn { - column_id: old_column.column_id, - data, - }) - } - IndexOrDefault::DefaultValue { - column_id, - default_vector, - semantic_type: _, - } => { - let data = default_vector.replicate(&[len]); - Ok(BatchColumn { - column_id: *column_id, - data, - }) - } - }) - .collect::>>() - .and_then(|fields| batch.with_fields(fields)) - } -} - -fn may_rewrite_primary_key( - expect: &RegionMetadata, - actual: &RegionMetadata, -) -> Option { - if expect.primary_key_encoding == actual.primary_key_encoding { - return None; - } - - let fields = expect.primary_key.clone(); - let original = build_primary_key_codec(actual); - let new = build_primary_key_codec(expect); - - Some(RewritePrimaryKey { - original, - new, - fields, - }) -} - /// Returns true if the actual primary keys is the same as expected. fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result { ensure!( @@ -557,113 +327,6 @@ fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Resu Ok(actual.primary_key.len() == expect.primary_key.len()) } -/// Creates a [CompatPrimaryKey] if needed. -fn may_compat_primary_key( - expect: &RegionMetadata, - actual: &RegionMetadata, -) -> Result> { - if is_primary_key_same(expect, actual)? { - return Ok(None); - } - - // We need to append default values to the primary key. - let to_add = &expect.primary_key[actual.primary_key.len()..]; - let mut fields = Vec::with_capacity(to_add.len()); - let mut values = Vec::with_capacity(to_add.len()); - for column_id in to_add { - // Safety: The id comes from expect region metadata. - let column = expect.column_by_id(*column_id).unwrap(); - fields.push(( - *column_id, - SortField::new(column.column_schema.data_type.clone()), - )); - let default_value = column - .column_schema - .create_default() - .context(CreateDefaultSnafu { - region_id: expect.region_id, - column: &column.column_schema.name, - })? - .with_context(|| CompatReaderSnafu { - region_id: expect.region_id, - reason: format!( - "key column {} does not have a default value to read", - column.column_schema.name - ), - })?; - values.push((*column_id, default_value)); - } - // Using expect primary key encoding to build the converter - let converter = - build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter()); - - Ok(Some(CompatPrimaryKey { converter, values })) -} - -/// Creates a [CompatFields] if needed. -fn may_compat_fields( - mapper: &PrimaryKeyProjectionMapper, - actual: &RegionMetadata, -) -> Result> { - let expect_fields = mapper.batch_fields(); - let actual_fields = Batch::projected_fields(actual, mapper.column_ids()); - if expect_fields == actual_fields { - return Ok(None); - } - - let source_field_index: HashMap<_, _> = actual_fields - .iter() - .enumerate() - .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type))) - .collect(); - - let index_or_defaults = expect_fields - .iter() - .map(|(column_id, expect_data_type)| { - if let Some((index, actual_data_type)) = source_field_index.get(column_id) { - let mut cast_type = None; - - if expect_data_type != *actual_data_type { - cast_type = Some(expect_data_type.clone()) - } - // Source has this field. - Ok(IndexOrDefault::Index { - pos: *index, - cast_type, - }) - } else { - // Safety: mapper must have this column. - let column = mapper.metadata().column_by_id(*column_id).unwrap(); - // Create a default vector with 1 element for that column. - let default_vector = column - .column_schema - .create_default_vector(1) - .context(CreateDefaultSnafu { - region_id: mapper.metadata().region_id, - column: &column.column_schema.name, - })? - .with_context(|| CompatReaderSnafu { - region_id: mapper.metadata().region_id, - reason: format!( - "column {} does not have a default value to read", - column.column_schema.name - ), - })?; - Ok(IndexOrDefault::DefaultValue { - column_id: column.column_id, - default_vector, - semantic_type: SemanticType::Field, - }) - } - }) - .collect::>>()?; - - Ok(Some(CompatFields { - actual_fields, - index_or_defaults, - })) -} - /// Index in source batch or a default value to fill a column. #[derive(Debug)] enum IndexOrDefault { @@ -674,8 +337,6 @@ enum IndexOrDefault { }, /// Default value for the column. DefaultValue { - /// Id of the column. - column_id: ColumnId, /// Default value. The vector has only 1 element. default_vector: VectorRef, /// Semantic type of the column. @@ -683,58 +344,6 @@ enum IndexOrDefault { }, } -/// Adapter to rewrite primary key. -struct RewritePrimaryKey { - /// Original primary key codec. - original: Arc, - /// New primary key codec. - new: Arc, - /// Order of the fields in the new primary key. - fields: Vec, -} - -impl RewritePrimaryKey { - /// Make primary key of the `batch` compatible. - fn compat(&self, mut batch: Batch) -> Result { - if batch.pk_values().is_none() { - let new_pk_values = self - .original - .decode(batch.primary_key()) - .context(DecodeSnafu)?; - batch.set_pk_values(new_pk_values); - } - // Safety: We ensure pk_values is not None. - let values = batch.pk_values().unwrap(); - - let mut buffer = Vec::with_capacity( - batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(), - ); - match values { - CompositeValues::Dense(values) => { - self.new - .encode_values(values.as_slice(), &mut buffer) - .context(EncodeSnafu)?; - } - CompositeValues::Sparse(values) => { - let values = self - .fields - .iter() - .map(|id| { - let value = values.get_or_null(*id); - (*id, value.as_value_ref()) - }) - .collect::>(); - self.new - .encode_value_refs(&values, &mut buffer) - .context(EncodeSnafu)?; - } - } - batch.set_primary_key(buffer); - - Ok(batch) - } -} - /// Helper to rewrite primary key to another encoding for flat format. struct FlatRewritePrimaryKey { /// New primary key encoder. @@ -1052,128 +661,6 @@ mod tests { buffer } - #[test] - fn test_invalid_pk_len() { - let reader_meta = new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Tag, ConcreteDataType::string_datatype()), - (3, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1, 2], - ); - let expect_meta = new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - ); - may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err(); - } - - #[test] - fn test_different_pk() { - let reader_meta = new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Tag, ConcreteDataType::string_datatype()), - (3, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[2, 1], - ); - let expect_meta = new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Tag, ConcreteDataType::string_datatype()), - (3, SemanticType::Field, ConcreteDataType::int64_datatype()), - (4, SemanticType::Tag, ConcreteDataType::string_datatype()), - ], - &[1, 2, 4], - ); - may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err(); - } - - #[test] - fn test_same_pk() { - let reader_meta = new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - ); - assert!( - may_compat_primary_key(&reader_meta, &reader_meta) - .unwrap() - .is_none() - ); - } - - #[test] - fn test_same_pk_encoding() { - let reader_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - ], - &[1], - )); - - assert!( - may_compat_primary_key(&reader_meta, &reader_meta) - .unwrap() - .is_none() - ); - } - - #[test] - fn test_same_fields() { - let reader_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap(); - assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none()) - } - /// Creates a primary key array for flat format testing. fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef { let mut builder = BinaryDictionaryBuilder::::new(); diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 02b4c6b3c1..d73bb5a205 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -240,11 +240,6 @@ impl FlatProjectionMapper { self.output_schema.clone() } - /// Returns an empty [RecordBatch]. - pub(crate) fn empty_record_batch(&self) -> RecordBatch { - RecordBatch::new_empty(self.output_schema.clone()) - } - /// Converts a flat format [RecordBatch] to a normal [RecordBatch]. /// /// The batch must match the `projection` using to build the mapper. diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index e087e12094..140e6eb4c9 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -20,7 +20,6 @@ use async_trait::async_trait; use datatypes::arrow::array::{Array, BinaryArray}; use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; -use datatypes::vectors::UInt32Vector; use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::{FileId, TimeSeriesRowSelector}; @@ -35,7 +34,7 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream}; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index}; use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets}; -use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; +use crate::sst::parquet::reader::FlatRowGroupReader; /// Reader to keep the last row for each time series. /// It assumes that batches from the input reader are @@ -81,183 +80,6 @@ impl BatchReader for LastRowReader { } } -/// Cached last row reader for specific row group. -/// If the last rows for current row group are already cached, this reader returns the cached value. -/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache -/// upon finish. -pub(crate) enum RowGroupLastRowCachedReader { - /// Cache hit, reads last rows from cached value. - Hit(LastRowCacheReader), - /// Cache miss, reads from row group reader and update cache. - Miss(RowGroupLastRowReader), -} - -impl RowGroupLastRowCachedReader { - pub(crate) fn new( - file_id: FileId, - row_group_idx: usize, - cache_strategy: CacheStrategy, - row_group_reader: RowGroupReader, - ) -> Self { - let key = SelectorResultKey { - file_id, - row_group_idx, - selector: TimeSeriesRowSelector::LastRow, - }; - - if let Some(value) = cache_strategy.get_selector_result(&key) { - let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_)); - let schema_matches = - value.projection == row_group_reader.read_format().projection_indices(); - if is_primary_key && schema_matches { - // Format and schema match, use cache batches. - Self::new_hit(value) - } else { - Self::new_miss(key, row_group_reader, cache_strategy) - } - } else { - Self::new_miss(key, row_group_reader, cache_strategy) - } - } - - /// Gets the underlying reader metrics if uncached. - pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> { - match self { - RowGroupLastRowCachedReader::Hit(_) => None, - RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()), - } - } - - /// Creates new Hit variant and updates metrics. - fn new_hit(value: Arc) -> Self { - selector_result_cache_hit(); - Self::Hit(LastRowCacheReader { value, idx: 0 }) - } - - /// Creates new Miss variant and updates metrics. - fn new_miss( - key: SelectorResultKey, - row_group_reader: RowGroupReader, - cache_strategy: CacheStrategy, - ) -> Self { - selector_result_cache_miss(); - Self::Miss(RowGroupLastRowReader::new( - key, - row_group_reader, - cache_strategy, - )) - } -} - -#[async_trait] -impl BatchReader for RowGroupLastRowCachedReader { - async fn next_batch(&mut self) -> Result> { - match self { - RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await, - RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await, - } - } -} - -/// Last row reader that returns the cached last rows for row group. -pub(crate) struct LastRowCacheReader { - value: Arc, - idx: usize, -} - -impl LastRowCacheReader { - /// Iterates cached last rows. - async fn next_batch(&mut self) -> Result> { - let batches = match &self.value.result { - SelectorResult::PrimaryKey(batches) => batches, - SelectorResult::Flat(_) => unreachable!(), - }; - if self.idx < batches.len() { - let res = Ok(Some(batches[self.idx].clone())); - self.idx += 1; - res - } else { - Ok(None) - } - } -} - -pub(crate) struct RowGroupLastRowReader { - key: SelectorResultKey, - reader: RowGroupReader, - selector: LastRowSelector, - yielded_batches: Vec, - cache_strategy: CacheStrategy, - /// Index buffer to take a new batch from the last row. - take_index: UInt32Vector, -} - -impl RowGroupLastRowReader { - fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self { - Self { - key, - reader, - selector: LastRowSelector::default(), - yielded_batches: vec![], - cache_strategy, - take_index: UInt32Vector::from_vec(vec![0]), - } - } - - async fn next_batch(&mut self) -> Result> { - while let Some(batch) = self.reader.next_batch().await? { - if let Some(yielded) = self.selector.on_next(batch) { - push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?; - return Ok(Some(yielded)); - } - } - let last_batch = if let Some(last_batch) = self.selector.finish() { - push_yielded_batches( - last_batch.clone(), - &self.take_index, - &mut self.yielded_batches, - )?; - Some(last_batch) - } else { - None - }; - - // All last rows in row group are yielded, update cache. - self.maybe_update_cache(); - Ok(last_batch) - } - - /// Updates row group's last row cache if cache manager is present. - fn maybe_update_cache(&mut self) { - if self.yielded_batches.is_empty() { - // we always expect that row groups yields batches. - return; - } - let value = Arc::new(SelectorResultValue::new( - std::mem::take(&mut self.yielded_batches), - self.reader.read_format().projection_indices().to_vec(), - )); - self.cache_strategy.put_selector_result(self.key, value); - } - - fn metrics(&self) -> &ReaderMetrics { - self.reader.metrics() - } -} - -/// Push last row into `yielded_batches`. -fn push_yielded_batches( - mut batch: Batch, - take_index: &UInt32Vector, - yielded_batches: &mut Vec, -) -> Result<()> { - assert_eq!(1, batch.num_rows()); - batch.take_in_place(take_index)?; - yielded_batches.push(batch); - - Ok(()) -} - /// Common struct that selects only the last row of each time series. #[derive(Default)] pub struct LastRowSelector { diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index d22c87bcc2..68e5bf3952 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -12,360 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Utilities for projection operations. +//! Projection helpers shared by flat projection code. use std::cmp::Ordering; -use std::collections::HashMap; -use std::sync::Arc; -use api::v1::SemanticType; -use common_error::ext::BoxedError; -use common_recordbatch::RecordBatch; -use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu}; +use common_recordbatch::error::DataTypesSnafu; use datatypes::prelude::{ConcreteDataType, DataType}; -use datatypes::schema::{Schema, SchemaRef}; use datatypes::value::Value; use datatypes::vectors::VectorRef; -use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, Result}; -use crate::read::Batch; -use crate::read::flat_projection::FlatProjectionMapper; /// Only cache vector when its length `<=` this value. pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; -/// Wrapper enum for different projection mapper implementations. -pub enum ProjectionMapper { - /// Projection mapper for primary key format. - PrimaryKey(PrimaryKeyProjectionMapper), - /// Projection mapper for flat format. - Flat(FlatProjectionMapper), -} - -impl ProjectionMapper { - /// Returns a new mapper with projection. - pub fn new( - metadata: &RegionMetadataRef, - projection: impl Iterator + Clone, - ) -> Result { - Ok(ProjectionMapper::Flat(FlatProjectionMapper::new( - metadata, projection, - )?)) - } - - /// Returns a new mapper with output projection and explicit read columns. - pub fn new_with_read_columns( - metadata: &RegionMetadataRef, - projection: impl Iterator, - read_column_ids: Vec, - ) -> Result { - let projection: Vec<_> = projection.collect(); - Ok(ProjectionMapper::Flat( - FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?, - )) - } - - /// Returns a new mapper without projection. - pub fn all(metadata: &RegionMetadataRef) -> Result { - Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?)) - } - - /// Returns the metadata that created the mapper. - pub(crate) fn metadata(&self) -> &RegionMetadataRef { - match self { - ProjectionMapper::PrimaryKey(m) => m.metadata(), - ProjectionMapper::Flat(m) => m.metadata(), - } - } - - /// Returns true if the projection includes any tag columns. - pub(crate) fn has_tags(&self) -> bool { - match self { - ProjectionMapper::PrimaryKey(m) => m.has_tags(), - ProjectionMapper::Flat(_) => false, - } - } - - /// Returns ids of projected columns that we need to read - /// from memtables and SSTs. - pub(crate) fn column_ids(&self) -> &[ColumnId] { - match self { - ProjectionMapper::PrimaryKey(m) => m.column_ids(), - ProjectionMapper::Flat(m) => m.column_ids(), - } - } - - /// Returns the schema of converted [RecordBatch]. - pub(crate) fn output_schema(&self) -> SchemaRef { - match self { - ProjectionMapper::PrimaryKey(m) => m.output_schema(), - ProjectionMapper::Flat(m) => m.output_schema(), - } - } - - /// Returns the primary key projection mapper or None if this is not a primary key mapper. - pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> { - match self { - ProjectionMapper::PrimaryKey(m) => Some(m), - ProjectionMapper::Flat(_) => None, - } - } - - /// Returns the flat projection mapper or None if this is not a flat mapper. - pub fn as_flat(&self) -> Option<&FlatProjectionMapper> { - match self { - ProjectionMapper::PrimaryKey(_) => None, - ProjectionMapper::Flat(m) => Some(m), - } - } - - /// Returns an empty [RecordBatch]. - // TODO(yingwen): This is unused now. Use it after we finishing the flat format. - pub fn empty_record_batch(&self) -> RecordBatch { - match self { - ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(), - ProjectionMapper::Flat(m) => m.empty_record_batch(), - } - } -} - -/// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. -#[allow(dead_code)] -pub struct PrimaryKeyProjectionMapper { - /// Metadata of the region. - metadata: RegionMetadataRef, - /// Maps column in [RecordBatch] to index in [Batch]. - batch_indices: Vec, - /// Output record batch contains tags. - has_tags: bool, - /// Decoder for primary key. - codec: Arc, - /// Schema for converted [RecordBatch]. - output_schema: SchemaRef, - /// Ids of columns to read from memtables and SSTs. - read_column_ids: Vec, - /// Ids and DataTypes of field columns in the read [Batch]. - batch_fields: Vec<(ColumnId, ConcreteDataType)>, - /// `true` If the original projection is empty. - is_empty_projection: bool, -} - -#[allow(dead_code)] -impl PrimaryKeyProjectionMapper { - /// Returns a new mapper with projection. - /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count. - /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts - /// empty `RecordBatch` and only use its row count in this query. - pub fn new( - metadata: &RegionMetadataRef, - projection: impl Iterator, - ) -> Result { - let projection: Vec<_> = projection.collect(); - let read_column_ids = read_column_ids_from_projection(metadata, &projection)?; - Self::new_with_read_columns(metadata, projection, read_column_ids) - } - - /// Returns a new mapper with output projection and explicit read columns. - pub fn new_with_read_columns( - metadata: &RegionMetadataRef, - projection: Vec, - read_column_ids: Vec, - ) -> Result { - // If the original projection is empty. - let is_empty_projection = projection.is_empty(); - - let mut column_schemas = Vec::with_capacity(projection.len()); - for idx in &projection { - // For each projection index, we get the column schema for projection - column_schemas.push( - metadata - .schema - .column_schemas() - .get(*idx) - .with_context(|| InvalidRequestSnafu { - region_id: metadata.region_id, - reason: format!("projection index {} is out of bound", idx), - })? - .clone(), - ); - } - - let codec = build_primary_key_codec(metadata); - // If projection is empty, we don't output any column. - let output_schema = if is_empty_projection { - Arc::new(Schema::new(vec![])) - } else { - // Safety: Columns come from existing schema. - Arc::new(Schema::new(column_schemas)) - }; - // Get fields in each read batch. - let batch_fields = Batch::projected_fields(metadata, &read_column_ids); - - // Field column id to its index in batch. - let field_id_to_index: HashMap<_, _> = batch_fields - .iter() - .enumerate() - .map(|(index, (column_id, _))| (*column_id, index)) - .collect(); - // For each projected column, compute its index in batches. - let mut batch_indices = Vec::with_capacity(projection.len()); - let mut has_tags = false; - if !is_empty_projection { - for idx in &projection { - // Safety: idx is valid. - let column = &metadata.column_metadatas[*idx]; - // Get column index in a batch by its semantic type and column id. - let batch_index = match column.semantic_type { - SemanticType::Tag => { - // Safety: It is a primary key column. - let index = metadata.primary_key_index(column.column_id).unwrap(); - // We need to output a tag. - has_tags = true; - // We always read all primary key so the column always exists and the tag - // index is always valid. - BatchIndex::Tag((index, column.column_id)) - } - SemanticType::Timestamp => BatchIndex::Timestamp, - SemanticType::Field => { - let index = *field_id_to_index.get(&column.column_id).context( - InvalidRequestSnafu { - region_id: metadata.region_id, - reason: format!( - "field column {} is missing in read projection", - column.column_schema.name - ), - }, - )?; - BatchIndex::Field(index) - } - }; - batch_indices.push(batch_index); - } - } - - Ok(PrimaryKeyProjectionMapper { - metadata: metadata.clone(), - batch_indices, - has_tags, - codec, - output_schema, - read_column_ids, - batch_fields, - is_empty_projection, - }) - } - - /// Returns a new mapper without projection. - pub fn all(metadata: &RegionMetadataRef) -> Result { - PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) - } - - /// Returns the metadata that created the mapper. - pub(crate) fn metadata(&self) -> &RegionMetadataRef { - &self.metadata - } - - /// Returns true if the projection includes any tag columns. - pub(crate) fn has_tags(&self) -> bool { - self.has_tags - } - - /// Returns ids of projected columns that we need to read - /// from memtables and SSTs. - pub(crate) fn column_ids(&self) -> &[ColumnId] { - &self.read_column_ids - } - - /// Returns ids of fields in [Batch]es the mapper expects to convert. - pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] { - &self.batch_fields - } - - /// Returns the schema of converted [RecordBatch]. - /// This is the schema that the stream will output. This schema may contain - /// less columns than [PrimaryKeyProjectionMapper::column_ids()]. - pub(crate) fn output_schema(&self) -> SchemaRef { - self.output_schema.clone() - } - - /// Returns an empty [RecordBatch]. - pub(crate) fn empty_record_batch(&self) -> RecordBatch { - RecordBatch::new_empty(self.output_schema.clone()) - } - - /// Converts a [Batch] to a [RecordBatch]. - /// - /// The batch must match the `projection` using to build the mapper. - pub(crate) fn convert( - &self, - batch: &Batch, - cache_strategy: &CacheStrategy, - ) -> common_recordbatch::error::Result { - if self.is_empty_projection { - return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows()); - } - - debug_assert_eq!(self.batch_fields.len(), batch.fields().len()); - debug_assert!( - self.batch_fields - .iter() - .zip(batch.fields()) - .all(|((id, _), batch_col)| *id == batch_col.column_id) - ); - - // Skips decoding pk if we don't need to output it. - let pk_values = if self.has_tags { - match batch.pk_values() { - Some(v) => v.clone(), - None => self - .codec - .decode(batch.primary_key()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?, - } - } else { - CompositeValues::Dense(vec![]) - }; - - let mut columns = Vec::with_capacity(self.output_schema.num_columns()); - let num_rows = batch.num_rows(); - for (index, column_schema) in self - .batch_indices - .iter() - .zip(self.output_schema.column_schemas()) - { - match index { - BatchIndex::Tag((idx, column_id)) => { - let value = match &pk_values { - CompositeValues::Dense(v) => &v[*idx].1, - CompositeValues::Sparse(v) => v.get_or_null(*column_id), - }; - let vector = repeated_vector_with_cache( - &column_schema.data_type, - value, - num_rows, - cache_strategy, - )?; - columns.push(vector); - } - BatchIndex::Timestamp => { - columns.push(batch.timestamps().clone()); - } - BatchIndex::Field(idx) => { - columns.push(batch.fields()[*idx].data.clone()); - } - } - } - - RecordBatch::new(self.output_schema.clone(), columns) - } -} - pub(crate) fn read_column_ids_from_projection( metadata: &RegionMetadataRef, projection: &[usize], @@ -389,18 +53,6 @@ pub(crate) fn read_column_ids_from_projection( Ok(column_ids) } -/// Index of a vector in a [Batch]. -#[derive(Debug, Clone, Copy)] -#[allow(dead_code)] -enum BatchIndex { - /// Index in primary keys. - Tag((usize, ColumnId)), - /// The time index column. - Timestamp, - /// Index in fields. - Field(usize), -} - /// Gets a vector with repeated values from specific cache or creates a new one. pub(crate) fn repeated_vector_with_cache( data_type: &ConcreteDataType, @@ -409,8 +61,6 @@ pub(crate) fn repeated_vector_with_cache( cache_strategy: &CacheStrategy, ) -> common_recordbatch::error::Result { if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) { - // Tries to get the vector from cache manager. If the vector doesn't - // have enough length, creates a new one. match vector.len().cmp(&num_rows) { Ordering::Less => (), Ordering::Equal => return Ok(vector), @@ -418,9 +68,7 @@ pub(crate) fn repeated_vector_with_cache( } } - // Creates a new one. let vector = new_repeated_vector(data_type, value, num_rows)?; - // Updates cache. if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE { cache_strategy.put_repeated_vector(value.clone(), vector.clone()); } @@ -438,7 +86,6 @@ pub(crate) fn new_repeated_vector( mutable_vector .try_push_value_ref(&value.as_value_ref()) .context(DataTypesSnafu)?; - // This requires an additional allocation. let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) } @@ -448,6 +95,7 @@ mod tests { use std::sync::Arc; use api::v1::OpType; + use common_recordbatch::RecordBatch; use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array}; use datatypes::arrow::datatypes::Field; use datatypes::arrow::util::pretty; @@ -459,6 +107,7 @@ mod tests { }; use super::*; + use crate::read::flat_projection::FlatProjectionMapper; fn print_record_batch(record_batch: RecordBatch) -> String { pretty::pretty_format_batches(&[record_batch.into_df_record_batch()]) @@ -475,9 +124,6 @@ mod tests { let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3); let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3); - // Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type - - // Primary key columns first for (i, tag) in idx_tags { let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n( *tag, num_rows, @@ -490,7 +136,6 @@ mod tests { )); } - // Field columns for (i, field) in idx_fields { let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n( *field, num_rows, @@ -503,7 +148,6 @@ mod tests { )); } - // Time index if let Some(ts_start) = ts_start { let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values( (0..num_rows).map(|i| ts_start + i as i64 * 1000), @@ -519,8 +163,6 @@ mod tests { )); } - // __primary_key column (encoded primary key as dictionary) - // Create encoded primary key let converter = DensePrimaryKeyCodec::with_fields( (0..idx_tags.len()) .map(|idx| { @@ -535,7 +177,6 @@ mod tests { .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v))) .unwrap(); - // Create dictionary array for the encoded primary key let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect(); let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32); let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values)); @@ -549,7 +190,6 @@ mod tests { false, )); - // __sequence column columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _); fields.push(Field::new( SEQUENCE_COLUMN_NAME, @@ -557,7 +197,6 @@ mod tests { false, )); - // __op_type column columns.push(Arc::new(UInt8Array::from_iter_values( (0..num_rows).map(|_| OpType::Put as u8), )) as _); @@ -581,7 +220,7 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - let mapper = ProjectionMapper::all(&metadata).unwrap(); + let mapper = FlatProjectionMapper::all(&metadata).unwrap(); assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!( [ @@ -591,11 +230,11 @@ mod tests { (4, ConcreteDataType::int64_datatype()), (0, ConcreteDataType::timestamp_millisecond_datatype()) ], - mapper.as_flat().unwrap().batch_schema() + mapper.batch_schema() ); let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +---------------------+----+----+----+----+ | ts | k0 | k1 | v0 | v1 | @@ -616,8 +255,7 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - // Columns v1, k0 - let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap(); assert_eq!([4, 1], mapper.column_ids()); assert_eq!( [ @@ -625,11 +263,11 @@ mod tests { (4, ConcreteDataType::int64_datatype()), (0, ConcreteDataType::timestamp_millisecond_datatype()) ], - mapper.as_flat().unwrap().batch_schema() + mapper.batch_schema() ); let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | @@ -650,14 +288,13 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - // Output columns v1, k0. Read also includes v0. let mapper = - ProjectionMapper::new_with_read_columns(&metadata, [4, 1].into_iter(), vec![4, 1, 3]) + FlatProjectionMapper::new_with_read_columns(&metadata, vec![4, 1], vec![4, 1, 3]) .unwrap(); assert_eq!([4, 1, 3], mapper.column_ids()); let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | @@ -678,18 +315,16 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - // Empty projection - let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap(); - assert_eq!([0], mapper.column_ids()); // Should still read the time index column + let mapper = FlatProjectionMapper::new(&metadata, [].into_iter()).unwrap(); + assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty()); - let flat_mapper = mapper.as_flat().unwrap(); assert_eq!( [(0, ConcreteDataType::timestamp_millisecond_datatype())], - flat_mapper.batch_schema() + mapper.batch_schema() ); let batch = new_flat_batch(Some(0), &[], &[], 3); - let record_batch = flat_mapper.convert(&batch, &cache).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); assert_eq!(3, record_batch.num_rows()); assert_eq!(0, record_batch.num_columns()); assert!(record_batch.schema.is_empty()); diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 55ad504e6f..623f7bbc03 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -24,122 +24,11 @@ use snafu::ResultExt; use crate::error::{RecordBatchSnafu, Result}; use crate::memtable::BoxedBatchIterator; -use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader}; -use crate::read::{Batch, BatchReader}; +use crate::read::Batch; +use crate::read::last_row::FlatRowGroupLastRowCachedReader; use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; -use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; - -#[allow(dead_code)] -pub enum Source { - RowGroup(RowGroupReader), - LastRow(RowGroupLastRowCachedReader), -} - -#[allow(dead_code)] -impl Source { - async fn next_batch(&mut self) -> Result> { - match self { - Source::RowGroup(r) => r.next_batch().await, - Source::LastRow(r) => r.next_batch().await, - } - } -} - -#[allow(dead_code)] -pub struct PruneReader { - /// Context for file ranges. - context: FileRangeContextRef, - source: Source, - metrics: ReaderMetrics, - /// Whether to skip field filters for this row group. - skip_fields: bool, -} - -#[allow(dead_code)] -impl PruneReader { - pub(crate) fn new_with_row_group_reader( - ctx: FileRangeContextRef, - reader: RowGroupReader, - skip_fields: bool, - ) -> Self { - Self { - context: ctx, - source: Source::RowGroup(reader), - metrics: Default::default(), - skip_fields, - } - } - - pub(crate) fn new_with_last_row_reader( - ctx: FileRangeContextRef, - reader: RowGroupLastRowCachedReader, - skip_fields: bool, - ) -> Self { - Self { - context: ctx, - source: Source::LastRow(reader), - metrics: Default::default(), - skip_fields, - } - } - - /// Merge metrics with the inner reader and return the merged metrics. - pub(crate) fn metrics(&self) -> ReaderMetrics { - let mut metrics = self.metrics.clone(); - match &self.source { - Source::RowGroup(r) => { - metrics.merge_from(r.metrics()); - } - Source::LastRow(r) => { - if let Some(inner_metrics) = r.metrics() { - metrics.merge_from(inner_metrics); - } - } - } - - metrics - } - - pub(crate) async fn next_batch(&mut self) -> Result> { - while let Some(b) = self.source.next_batch().await? { - match self.prune(b)? { - Some(b) => { - return Ok(Some(b)); - } - None => { - continue; - } - } - } - Ok(None) - } - - /// Prunes batches by the pushed down predicate. - fn prune(&mut self, batch: Batch) -> Result> { - // fast path - if self.context.filters().is_empty() && !self.context.has_partition_filter() { - return Ok(Some(batch)); - } - - let num_rows_before_filter = batch.num_rows(); - let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else { - // the entire batch is filtered out - self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter; - return Ok(None); - }; - - // update metric - let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); - self.metrics.filter_metrics.rows_precise_filtered += filtered_rows; - - if !batch_filtered.is_empty() { - Ok(Some(batch_filtered)) - } else { - Ok(None) - } - } -} +use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics}; /// An iterator that prunes batches by time range. pub(crate) struct PruneTimeIterator { diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 1daaa6399b..53d491e716 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -631,7 +631,7 @@ mod tests { use super::*; use crate::cache::CacheManager; - use crate::read::projection::ProjectionMapper; + use crate::read::flat_projection::FlatProjectionMapper; use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex}; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::test_util::memtable_util::metadata_with_primary_key; @@ -691,7 +691,7 @@ mod tests { ) -> (StreamContext, PartitionRange) { let env = SchedulerEnv::new().await; let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); let file_id = FileId::random(); let file = sst_file_handle_with_file_id( diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 99a78a5448..374a42144e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -53,8 +53,8 @@ use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result}; use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider}; use crate::memtable::{MemtableRange, RangesOptions}; use crate::metrics::READ_SST_COUNT; -use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch}; -use crate::read::projection::ProjectionMapper; +use crate::read::compat::{self, FlatCompatBatch}; +use crate::read::flat_projection::FlatProjectionMapper; use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; use crate::read::range_cache::ScanRequestFingerprint; use crate::read::seq_scan::SeqScan; @@ -412,12 +412,12 @@ impl ScanRegion { // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match self.request.projection_indices() { - Some(p) => ProjectionMapper::new_with_read_columns( + Some(p) => FlatProjectionMapper::new_with_read_columns( &self.version.metadata, - p.iter().copied(), + p.to_vec(), read_column_ids.clone(), )?, - None => ProjectionMapper::all(&self.version.metadata)?, + None => FlatProjectionMapper::all(&self.version.metadata)?, }; let ssts = &self.version.ssts; @@ -796,7 +796,7 @@ pub struct ScanInput { /// Region SST access layer. access_layer: AccessLayerRef, /// Maps projected Batches to RecordBatches. - pub(crate) mapper: Arc, + pub(crate) mapper: Arc, /// Column ids to read from memtables and SSTs. /// Notice this is different from the columns in `mapper` which are projected columns. /// But this read columns might also include non-projected columns needed for filtering. @@ -852,7 +852,7 @@ pub struct ScanInput { impl ScanInput { /// Creates a new [ScanInput]. #[must_use] - pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput { + pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput { ScanInput { access_layer, read_column_ids: mapper.column_ids().to_vec(), @@ -1099,7 +1099,12 @@ impl ScanInput { reader_metrics: &mut ReaderMetrics, ) -> Result { let predicate = self.predicate_for_file(file); - let decode_pk_values = !self.compaction && self.mapper.has_tags(); + let decode_pk_values = !self.compaction + && self + .mapper + .column_ids() + .iter() + .any(|column_id| self.mapper.metadata().primary_key.contains(column_id)); let reader = self .access_layer .read_sst(file.clone()) @@ -1146,22 +1151,12 @@ impl ScanInput { if need_compat { // They have different schema. We need to adapt the batch first so the // mapper can convert it. - let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() { - let mapper = self.mapper.as_flat().unwrap(); - FlatCompatBatch::try_new( - mapper, - flat_format.metadata(), - flat_format.format_projection(), - self.compaction, - )? - .map(CompatBatch::Flat) - } else { - let compact_batch = PrimaryKeyCompatBatch::new( - &self.mapper, - file_range_ctx.read_format().metadata().clone(), - )?; - Some(CompatBatch::PrimaryKey(compact_batch)) - }; + let compat = FlatCompatBatch::try_new( + &self.mapper, + file_range_ctx.read_format().metadata(), + file_range_ctx.read_format().format_projection(), + self.compaction, + )?; file_range_ctx.set_compat_batch(compat); } Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection)) @@ -1826,7 +1821,7 @@ mod tests { async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec) -> ScanInput { let env = SchedulerEnv::new().await; - let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); let file = FileHandle::new( crate::sst::file::FileMeta::default(), @@ -1994,7 +1989,7 @@ mod tests { let disabled = ScanInput::new( SchedulerEnv::new().await.access_layer.clone(), - ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(), + FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(), ) .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap()); assert!(build_scan_fingerprint(&disabled).is_none()); diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 80563f32a9..af19f42262 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -29,10 +29,9 @@ use datatypes::timestamp::timestamp_array_to_primitive; use futures::Stream; use prometheus::IntGauge; use smallvec::SmallVec; -use snafu::OptionExt; use store_api::storage::RegionId; -use crate::error::{Result, UnexpectedSnafu}; +use crate::error::Result; use crate::memtable::MemScanMetrics; use crate::metrics::{ IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL, @@ -1358,7 +1357,7 @@ mod split_tests { use store_api::storage::FileId; use super::*; - use crate::read::projection::ProjectionMapper; + use crate::read::flat_projection::FlatProjectionMapper; use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::sst::file::FileHandle; @@ -1369,7 +1368,7 @@ mod split_tests { async fn new_stream_context_with_files(files: Vec) -> StreamContext { let env = SchedulerEnv::new().await; let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files); StreamContext { @@ -1498,7 +1497,7 @@ pub(crate) async fn scan_flat_file_ranges( fields(read_type = read_type, range_count = ranges.len()) )] pub fn build_flat_file_range_scan_stream( - _stream_ctx: Arc, + stream_ctx: Arc, part_metrics: PartitionMetrics, read_type: &'static str, ranges: SmallVec<[FileRange; 2]>, @@ -1516,18 +1515,19 @@ pub fn build_flat_file_range_scan_stream( }; for range in ranges { let build_reader_start = Instant::now(); - let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue}; + let Some(mut reader) = range + .flat_reader( + stream_ctx.input.series_row_selector, + fetch_metrics.as_deref(), + ) + .await? + else { + continue; + }; let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); - let may_compat = range - .compat_batch() - .map(|compat| { - compat.as_flat().context(UnexpectedSnafu { - reason: "Invalid compat for flat format", - }) - }) - .transpose()?; + let may_compat = range.compat_batch(); let mapper = range.compaction_projection_mapper(); while let Some(record_batch) = reader.next_batch().await? { @@ -1707,7 +1707,7 @@ mod tests { BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange, MemtableRangeContext, MemtableStats, }; - use crate::read::projection::ProjectionMapper; + use crate::read::flat_projection::FlatProjectionMapper; use crate::read::range::{MemRangeBuilder, SourceIndex}; use crate::read::scan_region::ScanInput; use crate::sst::file::{FileHandle, FileMeta}; @@ -1741,7 +1741,7 @@ mod tests { ) -> Arc { let env = SchedulerEnv::new().await; let metadata = metadata_for_test(); - let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); let input = ScanInput::new(env.access_layer.clone(), mapper) .with_cache(CacheStrategy::Disabled) .with_memtables(memtables) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index fecd50564e..39bd0ce842 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -213,7 +213,7 @@ impl SeqScan { } } - let mapper = stream_ctx.input.mapper.as_flat().unwrap(); + let mapper = &stream_ctx.input.mapper; let reader: BoxedRecordBatchStream = if sources.len() == 1 { // Currently, we can't skip dedup when there is only one source because // that source may have duplicate rows. diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index c8547fdf0c..27ae0e8c3a 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -27,7 +27,7 @@ use snafu::ResultExt; use crate::cache::CacheStrategy; use crate::error::Result; -use crate::read::projection::ProjectionMapper; +use crate::read::flat_projection::FlatProjectionMapper; use crate::read::scan_util::PartitionMetrics; use crate::read::series_scan::SeriesBatch; @@ -42,7 +42,7 @@ pub type ScanBatchStream = BoxStream<'static, Result>; /// A stream that takes [`ScanBatch`]es and produces (converts them to) [`RecordBatch`]es. pub(crate) struct ConvertBatchStream { inner: ScanBatchStream, - projection_mapper: Arc, + projection_mapper: Arc, #[allow(dead_code)] cache_strategy: CacheStrategy, partition_metrics: PartitionMetrics, @@ -52,7 +52,7 @@ pub(crate) struct ConvertBatchStream { impl ConvertBatchStream { pub(crate) fn new( inner: ScanBatchStream, - projection_mapper: Arc, + projection_mapper: Arc, cache_strategy: CacheStrategy, partition_metrics: PartitionMetrics, ) -> Self { @@ -75,11 +75,11 @@ impl ConvertBatchStream { let SeriesBatch::Flat(flat_batch) = series; // Safety: Only flat format returns this batch. - let mapper = self.projection_mapper.as_flat().unwrap(); - for batch in flat_batch.batches { - self.pending - .push_back(mapper.convert(&batch, &self.cache_strategy)?); + self.pending.push_back( + self.projection_mapper + .convert(&batch, &self.cache_strategy)?, + ); } let output_schema = self.projection_mapper.output_schema(); @@ -90,9 +90,8 @@ impl ConvertBatchStream { } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch. - let mapper = self.projection_mapper.as_flat().unwrap(); - - mapper.convert(&df_record_batch, &self.cache_strategy) + self.projection_mapper + .convert(&df_record_batch, &self.cache_strategy) } } } diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 5863833df4..e8ae1a788a 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -26,9 +26,8 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray}; use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; -use datatypes::prelude::ConcreteDataType; use datatypes::schema::Schema; -use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; +use mito_codec::row_converter::PrimaryKeyCodec; use parquet::arrow::arrow_reader::RowSelection; use parquet::file::metadata::ParquetMetaData; use snafu::{OptionExt, ResultExt}; @@ -39,22 +38,19 @@ use table::predicate::Predicate; use crate::cache::CacheStrategy; use crate::error::{ - ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, - EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu, - UnexpectedSnafu, + ComputeArrowSnafu, DecodeStatsSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, + RecordBatchSnafu, Result, StatsNotPresentSnafu, UnexpectedSnafu, }; -use crate::read::Batch; -use crate::read::compat::CompatBatch; +use crate::read::compat::FlatCompatBatch; use crate::read::flat_projection::CompactionProjectionMapper; -use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader}; -use crate::read::prune::{FlatPruneReader, PruneReader}; +use crate::read::last_row::FlatRowGroupLastRowCachedReader; +use crate::read::prune::FlatPruneReader; use crate::sst::file::FileHandle; use crate::sst::parquet::flat_format::{ - DecodedPrimaryKeys, decode_primary_keys, time_index_column_index, + DecodedPrimaryKeys, FlatReadFormat, decode_primary_keys, time_index_column_index, }; -use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ - FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReader, RowGroupReaderBuilder, + FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReaderBuilder, SimpleFilterContext, }; use crate::sst::parquet::row_group::ParquetFetchMetrics; @@ -158,69 +154,6 @@ impl FileRange { .unwrap_or(true) // unexpected, not skip just in case } - /// Returns a reader to read the [FileRange]. - #[allow(dead_code)] - pub(crate) async fn reader( - &self, - selector: Option, - fetch_metrics: Option<&ParquetFetchMetrics>, - ) -> Result> { - if !self.in_dynamic_filter_range() { - return Ok(None); - } - // Compute skip_fields once for this row group - let skip_fields = self.context.base.pre_filter_mode.skip_fields(); - let parquet_reader = self - .context - .reader_builder - .build(self.context.build_context( - self.row_group_idx, - self.row_selection.clone(), - fetch_metrics, - skip_fields, - )) - .await?; - - let use_last_row_reader = if selector - .map(|s| s == TimeSeriesRowSelector::LastRow) - .unwrap_or(false) - { - // Only use LastRowReader if row group does not contain DELETE - // and all rows are selected. - let put_only = !self - .context - .contains_delete(self.row_group_idx) - .inspect_err(|e| { - error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader"); - }) - .unwrap_or(true); - put_only && self.select_all() - } else { - // No selector provided, use RowGroupReader - false - }; - - let prune_reader = if use_last_row_reader { - // Row group is PUT only, use LastRowReader to skip unnecessary rows. - let reader = RowGroupLastRowCachedReader::new( - self.file_handle().file_id().file_id(), - self.row_group_idx, - self.context.reader_builder.cache_strategy().clone(), - RowGroupReader::new(self.context.clone(), parquet_reader), - ); - PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields) - } else { - // Row group contains DELETE, fallback to default reader. - PruneReader::new_with_row_group_reader( - self.context.clone(), - RowGroupReader::new(self.context.clone(), parquet_reader), - skip_fields, - ) - }; - - Ok(Some(prune_reader)) - } - /// Creates a flat reader that returns RecordBatch. pub(crate) async fn flat_reader( &self, @@ -293,7 +226,7 @@ impl FileRange { } /// Returns the helper to compat batches. - pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { + pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> { self.context.compat_batch() } @@ -343,7 +276,7 @@ impl FileRangeContext { } /// Returns the format helper. - pub(crate) fn read_format(&self) -> &ReadFormat { + pub(crate) fn read_format(&self) -> &FlatReadFormat { &self.base.read_format } @@ -353,7 +286,7 @@ impl FileRangeContext { } /// Returns the helper to compat batches. - pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { + pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> { self.base.compat_batch.as_ref() } @@ -362,18 +295,11 @@ impl FileRangeContext { self.base.compaction_projection_mapper.as_ref() } - /// Sets the `CompatBatch` to the context. - pub(crate) fn set_compat_batch(&mut self, compat: Option) { + /// Sets the compat helper to the context. + pub(crate) fn set_compat_batch(&mut self, compat: Option) { self.base.compat_batch = compat; } - /// TRY THE BEST to perform pushed down predicate precisely on the input batch. - /// Return the filtered batch. If the entire batch is filtered out, return None. - /// If a partition expr filter is configured, it is also applied. - pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result> { - self.base.precise_filter(input, skip_fields) - } - /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. /// If a partition expr filter is configured, it is also applied. pub(crate) fn precise_filter_flat( @@ -452,14 +378,14 @@ pub(crate) struct RangeBase { /// Dynamic filter physical exprs. pub(crate) dyn_filters: Vec>, /// Helper to read the SST. - pub(crate) read_format: ReadFormat, + pub(crate) read_format: FlatReadFormat, pub(crate) expected_metadata: Option, /// Schema used for pruning with dynamic filters. pub(crate) prune_schema: Arc, /// Decoder for primary keys pub(crate) codec: Arc, /// Optional helper to compat batches. - pub(crate) compat_batch: Option, + pub(crate) compat_batch: Option, /// Optional helper to project batches. pub(crate) compaction_projection_mapper: Option, /// Mode to pre-filter columns. @@ -483,122 +409,6 @@ impl TagDecodeState { } impl RangeBase { - /// TRY THE BEST to perform pushed down predicate precisely on the input batch. - /// Return the filtered batch. If the entire batch is filtered out, return None. - /// - /// Supported filter expr type is defined in [SimpleFilterEvaluator]. - /// - /// When a filter is referencing primary key column, this method will decode - /// the primary key and put it into the batch. - /// - /// # Arguments - /// * `input` - The batch to filter - /// * `skip_fields` - Whether to skip field filters based on PreFilterMode - pub(crate) fn precise_filter( - &self, - mut input: Batch, - skip_fields: bool, - ) -> Result> { - let mut mask = BooleanBuffer::new_set(input.num_rows()); - - // Run filter one by one and combine them result - // TODO(ruihang): run primary key filter first. It may short circuit other filters - for filter_ctx in &self.filters { - let filter = match filter_ctx.filter() { - MaybeFilter::Filter(f) => f, - // Column matches. - MaybeFilter::Matched => continue, - // Column doesn't match, filter the entire batch. - MaybeFilter::Pruned => return Ok(None), - }; - let result = match filter_ctx.semantic_type() { - SemanticType::Tag => { - let pk_values = if let Some(pk_values) = input.pk_values() { - pk_values - } else { - input.set_pk_values( - self.codec - .decode(input.primary_key()) - .context(DecodeSnafu)?, - ); - input.pk_values().unwrap() - }; - let pk_value = match pk_values { - CompositeValues::Dense(v) => { - // Safety: this is a primary key - let pk_index = self - .read_format - .metadata() - .primary_key_index(filter_ctx.column_id()) - .unwrap(); - v[pk_index] - .1 - .try_to_scalar_value(filter_ctx.data_type()) - .context(DataTypeMismatchSnafu)? - } - CompositeValues::Sparse(v) => { - let v = v.get_or_null(filter_ctx.column_id()); - v.try_to_scalar_value(filter_ctx.data_type()) - .context(DataTypeMismatchSnafu)? - } - }; - if filter - .evaluate_scalar(&pk_value) - .context(RecordBatchSnafu)? - { - continue; - } else { - // PK not match means the entire batch is filtered out. - return Ok(None); - } - } - SemanticType::Field => { - // Skip field filters if skip_fields is true - if skip_fields { - continue; - } - // Safety: Input is Batch so we are using primary key format. - let Some(field_index) = self - .read_format - .as_primary_key() - .unwrap() - .field_index_by_id(filter_ctx.column_id()) - else { - continue; - }; - let field_col = &input.fields()[field_index].data; - filter - .evaluate_vector(field_col) - .context(RecordBatchSnafu)? - } - SemanticType::Timestamp => filter - .evaluate_vector(input.timestamps()) - .context(RecordBatchSnafu)?, - }; - - mask = mask.bitand(&result); - } - - if mask.count_set_bits() == 0 { - return Ok(None); - } - - // Apply partition filter - if let Some(partition_filter) = &self.partition_filter { - let record_batch = self - .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?; - let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?; - mask = mask.bitand(&partition_mask); - } - - if mask.count_set_bits() == 0 { - Ok(None) - } else { - input.filter(&BooleanArray::from(mask).into())?; - Ok(Some(input)) - } - } - /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. /// /// It assumes all necessary tags are already decoded from the primary key. @@ -668,13 +478,7 @@ impl RangeBase { ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); - let flat_format = self - .read_format - .as_flat() - .context(crate::error::UnexpectedSnafu { - reason: "Expected flat format for precise_filter_flat", - })?; - let metadata = flat_format.metadata(); + let metadata = self.read_format.metadata(); // Run filter one by one and combine them result for filter_ctx in &self.filters { @@ -700,7 +504,9 @@ impl RangeBase { // Get the column directly by its projected index. // If the column is missing and it's not a tag/time column, this filter is skipped. // Assumes the projection indices align with the input batch schema. - let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id()); + let column_idx = self + .read_format + .projected_index_by_id(filter_ctx.column_id()); if let Some(idx) = column_idx { let column = &input.columns().get(idx).unwrap(); let result = filter.evaluate_array(column).context(RecordBatchSnafu)?; @@ -804,84 +610,6 @@ impl RangeBase { Ok(mask) } - /// Builds a `RecordBatch` from the input `Batch` matching the given schema. - /// - /// This is used for partition expression evaluation. The schema should only contain - /// the columns referenced by the partition expression to minimize overhead. - fn build_record_batch_for_pruning( - &self, - input: &mut Batch, - schema: &Arc, - ) -> Result { - let arrow_schema = schema.arrow_schema(); - let mut columns = Vec::with_capacity(arrow_schema.fields().len()); - - // Decode primary key if necessary. - if input.pk_values().is_none() { - input.set_pk_values( - self.codec - .decode(input.primary_key()) - .context(DecodeSnafu)?, - ); - } - - for field in arrow_schema.fields() { - let metadata = self.read_format.metadata(); - let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id); - - // Partition pruning schema should be a subset of the input batch schema. - let Some(column_id) = column_id else { - return UnexpectedSnafu { - reason: format!( - "Partition pruning schema expects column '{}' but it is missing in \ - region metadata", - field.name() - ), - } - .fail(); - }; - - // 1. Check if it's a tag. - if let Some(pk_index) = metadata.primary_key_index(column_id) { - let pk_values = input.pk_values().unwrap(); - let value = match pk_values { - CompositeValues::Dense(v) => &v[pk_index].1, - CompositeValues::Sparse(v) => v.get_or_null(column_id), - }; - let concrete_type = ConcreteDataType::from_arrow_type(field.data_type()); - let arrow_scalar = value - .try_to_scalar_value(&concrete_type) - .context(DataTypeMismatchSnafu)?; - let array = arrow_scalar - .to_array_of_size(input.num_rows()) - .context(EvalPartitionFilterSnafu)?; - columns.push(array); - } else if metadata.time_index_column().column_id == column_id { - // 2. Check if it's the timestamp column. - columns.push(input.timestamps().to_arrow_array()); - } else if let Some(field_index) = self - .read_format - .as_primary_key() - .and_then(|f| f.field_index_by_id(column_id)) - { - // 3. Check if it's a field column. - columns.push(input.fields()[field_index].data.to_arrow_array()); - } else { - return UnexpectedSnafu { - reason: format!( - "Partition pruning schema expects column '{}' (id {}) but it is not \ - present in input batch", - field.name(), - column_id - ), - } - .fail(); - } - } - - RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu) - } - /// Projects the input `RecordBatch` to match the given schema. /// /// This is used for partition expression evaluation. The schema should only contain @@ -895,13 +623,7 @@ impl RangeBase { let arrow_schema = schema.arrow_schema(); let mut columns = Vec::with_capacity(arrow_schema.fields().len()); - let flat_format = self - .read_format - .as_flat() - .context(crate::error::UnexpectedSnafu { - reason: "Expected flat format for precise_filter_flat", - })?; - let metadata = flat_format.metadata(); + let metadata = self.read_format.metadata(); for field in arrow_schema.fields() { let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id); @@ -917,7 +639,7 @@ impl RangeBase { .fail(); }; - if let Some(idx) = flat_format.projected_index_by_id(column_id) { + if let Some(idx) = self.read_format.projected_index_by_id(column_id) { columns.push(input.column(idx).clone()); continue; } @@ -957,12 +679,12 @@ mod tests { use datafusion_expr::{col, lit}; use super::*; - use crate::sst::parquet::format::ReadFormat; + use crate::sst::parquet::flat_format::FlatReadFormat; use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata}; fn new_test_range_base(filters: Vec) -> RangeBase { let metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); - let read_format = ReadFormat::new_flat( + let read_format = FlatReadFormat::new( metadata.clone(), metadata.column_metadatas.iter().map(|c| c.column_id), None, diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index d4d6c11a45..f4c2ea3eca 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -53,7 +53,7 @@ use crate::error::{ }; use crate::sst::parquet::format::{ FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, - PrimaryKeyReadFormat, ReadFormat, StatValues, + PrimaryKeyReadFormat, StatValues, column_null_counts, column_values, }; use crate::sst::{ FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field, @@ -518,7 +518,7 @@ impl ParquetFlat { return StatValues::NoColumn; }; - let stats = ReadFormat::column_null_counts(row_groups, *index); + let stats = column_null_counts(row_groups, *index); StatValues::from_stats_opt(stats) } @@ -535,7 +535,7 @@ impl ParquetFlat { // Safety: `column_id_to_sst_index` is built from `metadata`. let index = self.column_id_to_sst_index.get(&column_id).unwrap(); - let stats = ReadFormat::column_values(row_groups, column, *index, is_min); + let stats = column_values(row_groups, column, *index, is_min); StatValues::from_stats_opt(stats) } } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index ba64eac78b..a8679fc036 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -41,8 +41,7 @@ use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; use datatypes::vectors::Helper; use mito_codec::row_converter::{ - CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec, - build_primary_key_codec_with_fields, + CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields, }; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::statistics::Statistics; @@ -55,7 +54,6 @@ use crate::error::{ }; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::sst::file::{FileMeta, FileTimeRange}; -use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::to_sst_arrow_schema; /// Arrow array type for the primary key dictionary. @@ -125,267 +123,82 @@ impl PrimaryKeyWriteFormat { } } -/// Helper to read parquet formats. -pub enum ReadFormat { - /// The parquet is in the old primary key format. - PrimaryKey(PrimaryKeyReadFormat), - /// The parquet is in the new flat format. - Flat(FlatReadFormat), +/// Returns min/max values of specific columns. +/// Returns None if the column does not have statistics. +/// The column should not be encoded as a part of a primary key. +pub(crate) fn column_values( + row_groups: &[impl Borrow], + column: &ColumnMetadata, + column_index: usize, + is_min: bool, +) -> Option { + let null_scalar: ScalarValue = column + .column_schema + .data_type + .as_arrow_type() + .try_into() + .ok()?; + let scalar_values = row_groups + .iter() + .map(|meta| { + let stats = meta.borrow().column(column_index).statistics()?; + match stats { + Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::Int96(_) => None, + Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::ByteArray(s) => { + let bytes = if is_min { + s.min_bytes_opt()? + } else { + s.max_bytes_opt()? + }; + let s = String::from_utf8(bytes.to_vec()).ok(); + Some(ScalarValue::Utf8(s)) + } + Statistics::FixedLenByteArray(_) => None, + } + }) + .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone())) + .collect::>(); + debug_assert_eq!(scalar_values.len(), row_groups.len()); + ScalarValue::iter_to_array(scalar_values).ok() } -impl ReadFormat { - /// Creates a helper to read the primary key format. - pub fn new_primary_key( - metadata: RegionMetadataRef, - column_ids: impl Iterator, - ) -> Self { - ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids)) - } - - /// Creates a helper to read the flat format. - pub fn new_flat( - metadata: RegionMetadataRef, - column_ids: impl Iterator, - num_columns: Option, - file_path: &str, - skip_auto_convert: bool, - ) -> Result { - Ok(ReadFormat::Flat(FlatReadFormat::new( - metadata, - column_ids, - num_columns, - file_path, - skip_auto_convert, - )?)) - } - - /// Creates a new read format. - pub fn new( - region_metadata: RegionMetadataRef, - projection: Option<&[ColumnId]>, - flat_format: bool, - num_columns: Option, - file_path: &str, - skip_auto_convert: bool, - ) -> Result { - if flat_format { - if let Some(column_ids) = projection { - ReadFormat::new_flat( - region_metadata, - column_ids.iter().copied(), - num_columns, - file_path, - skip_auto_convert, - ) - } else { - // No projection, lists all column ids to read. - ReadFormat::new_flat( - region_metadata.clone(), - region_metadata - .column_metadatas - .iter() - .map(|col| col.column_id), - num_columns, - file_path, - skip_auto_convert, - ) - } - } else if let Some(column_ids) = projection { - Ok(ReadFormat::new_primary_key( - region_metadata, - column_ids.iter().copied(), - )) - } else { - // No projection, lists all column ids to read. - Ok(ReadFormat::new_primary_key( - region_metadata.clone(), - region_metadata - .column_metadatas - .iter() - .map(|col| col.column_id), - )) - } - } - - pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> { - match self { - ReadFormat::PrimaryKey(format) => Some(format), - _ => None, - } - } - - pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> { - match self { - ReadFormat::Flat(format) => Some(format), - _ => None, - } - } - - /// Gets the arrow schema of the SST file. - /// - /// This schema is computed from the region metadata but should be the same - /// as the arrow schema decoded from the file metadata. - pub(crate) fn arrow_schema(&self) -> &SchemaRef { - match self { - ReadFormat::PrimaryKey(format) => format.arrow_schema(), - ReadFormat::Flat(format) => format.arrow_schema(), - } - } - - /// Gets the metadata of the SST. - pub(crate) fn metadata(&self) -> &RegionMetadataRef { - match self { - ReadFormat::PrimaryKey(format) => format.metadata(), - ReadFormat::Flat(format) => format.metadata(), - } - } - - /// Gets sorted projection indices to read. - pub(crate) fn projection_indices(&self) -> &[usize] { - match self { - ReadFormat::PrimaryKey(format) => format.projection_indices(), - ReadFormat::Flat(format) => format.projection_indices(), - } - } - - /// Returns min values of specific column in row groups. - pub fn min_values( - &self, - row_groups: &[impl Borrow], - column_id: ColumnId, - ) -> StatValues { - match self { - ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id), - ReadFormat::Flat(format) => format.min_values(row_groups, column_id), - } - } - - /// Returns max values of specific column in row groups. - pub fn max_values( - &self, - row_groups: &[impl Borrow], - column_id: ColumnId, - ) -> StatValues { - match self { - ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id), - ReadFormat::Flat(format) => format.max_values(row_groups, column_id), - } - } - - /// Returns null counts of specific column in row groups. - pub fn null_counts( - &self, - row_groups: &[impl Borrow], - column_id: ColumnId, - ) -> StatValues { - match self { - ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id), - ReadFormat::Flat(format) => format.null_counts(row_groups, column_id), - } - } - - /// Returns min/max values of specific columns. - /// Returns None if the column does not have statistics. - /// The column should not be encoded as a part of a primary key. - pub(crate) fn column_values( - row_groups: &[impl Borrow], - column: &ColumnMetadata, - column_index: usize, - is_min: bool, - ) -> Option { - let null_scalar: ScalarValue = column - .column_schema - .data_type - .as_arrow_type() - .try_into() - .ok()?; - let scalar_values = row_groups - .iter() - .map(|meta| { - let stats = meta.borrow().column(column_index).statistics()?; - match stats { - Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - - Statistics::Int96(_) => None, - Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::ByteArray(s) => { - let bytes = if is_min { - s.min_bytes_opt()? - } else { - s.max_bytes_opt()? - }; - let s = String::from_utf8(bytes.to_vec()).ok(); - Some(ScalarValue::Utf8(s)) - } - - Statistics::FixedLenByteArray(_) => None, - } - }) - .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone())) - .collect::>(); - debug_assert_eq!(scalar_values.len(), row_groups.len()); - ScalarValue::iter_to_array(scalar_values).ok() - } - - /// Returns null counts of specific columns. - /// The column should not be encoded as a part of a primary key. - pub(crate) fn column_null_counts( - row_groups: &[impl Borrow], - column_index: usize, - ) -> Option { - let values = row_groups.iter().map(|meta| { - let col = meta.borrow().column(column_index); - let stat = col.statistics()?; - stat.null_count_opt() - }); - Some(Arc::new(UInt64Array::from_iter(values))) - } - - /// Sets the sequence number to override. - pub(crate) fn set_override_sequence(&mut self, sequence: Option) { - match self { - ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence), - ReadFormat::Flat(format) => format.set_override_sequence(sequence), - } - } - - /// Enables or disables eager decoding of primary key values into batches. - pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) { - if let ReadFormat::PrimaryKey(format) = self { - format.set_decode_primary_key_values(decode); - } - } - - /// Creates a sequence array to override. - pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { - match self { - ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length), - ReadFormat::Flat(format) => format.new_override_sequence_array(length), - } - } +/// Returns null counts of specific columns. +/// The column should not be encoded as a part of a primary key. +pub(crate) fn column_null_counts( + row_groups: &[impl Borrow], + column_index: usize, +) -> Option { + let values = row_groups.iter().map(|meta| { + let col = meta.borrow().column(column_index); + let stat = col.statistics()?; + stat.null_count_opt() + }); + Some(Arc::new(UInt64Array::from_iter(values))) } /// Helper for reading the SST format. @@ -402,8 +215,6 @@ pub struct PrimaryKeyReadFormat { /// Field column id to their index in the projected schema ( /// the schema of [Batch]). field_id_to_projected_index: HashMap, - /// Sequence number to override the sequence read from the SST. - override_sequence: Option, /// Codec used to decode primary key values if eager decoding is enabled. primary_key_codec: Option>, } @@ -433,25 +244,10 @@ impl PrimaryKeyReadFormat { field_id_to_index, projection_indices: format_projection.projection_indices, field_id_to_projected_index: format_projection.column_id_to_projected_index, - override_sequence: None, primary_key_codec: None, } } - /// Sets the sequence number to override. - pub(crate) fn set_override_sequence(&mut self, sequence: Option) { - self.override_sequence = sequence; - } - - /// Enables or disables eager decoding of primary key values into batches. - pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) { - self.primary_key_codec = if decode { - Some(build_primary_key_codec(&self.metadata)) - } else { - None - }; - } - /// Gets the arrow schema of the SST file. /// /// This schema is computed from the region metadata but should be the same @@ -475,12 +271,6 @@ impl PrimaryKeyReadFormat { &self.field_id_to_projected_index } - /// Creates a sequence array to override. - pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { - self.override_sequence - .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef) - } - /// Convert a arrow record batch into `batches`. /// /// The length of `override_sequence_array` must be larger than the length of the record batch. @@ -598,12 +388,12 @@ impl PrimaryKeyReadFormat { SemanticType::Field => { // Safety: `field_id_to_index` is initialized by the semantic type. let index = self.field_id_to_index.get(&column_id).unwrap(); - let stats = ReadFormat::column_values(row_groups, column, *index, true); + let stats = column_values(row_groups, column, *index, true); StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - let stats = ReadFormat::column_values(row_groups, column, index, true); + let stats = column_values(row_groups, column, index, true); StatValues::from_stats_opt(stats) } } @@ -624,12 +414,12 @@ impl PrimaryKeyReadFormat { SemanticType::Field => { // Safety: `field_id_to_index` is initialized by the semantic type. let index = self.field_id_to_index.get(&column_id).unwrap(); - let stats = ReadFormat::column_values(row_groups, column, *index, false); + let stats = column_values(row_groups, column, *index, false); StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - let stats = ReadFormat::column_values(row_groups, column, index, false); + let stats = column_values(row_groups, column, index, false); StatValues::from_stats_opt(stats) } } @@ -650,12 +440,12 @@ impl PrimaryKeyReadFormat { SemanticType::Field => { // Safety: `field_id_to_index` is initialized by the semantic type. let index = self.field_id_to_index.get(&column_id).unwrap(); - let stats = ReadFormat::column_null_counts(row_groups, *index); + let stats = column_null_counts(row_groups, *index); StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - let stats = ReadFormat::column_null_counts(row_groups, index); + let stats = column_null_counts(row_groups, index); StatValues::from_stats_opt(stats) } } @@ -1006,7 +796,9 @@ mod tests { use store_api::storage::consts::ReservedColumnId; use super::*; - use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index}; + use crate::sst::parquet::flat_format::{ + FlatReadFormat, FlatWriteFormat, sequence_column_index, + }; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; const TEST_SEQUENCE: u64 = 1; @@ -1134,16 +926,16 @@ mod tests { fn test_projection_indices() { let metadata = build_test_region_metadata(); // Only read tag1 - let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied()); + let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [3].iter().copied()); assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Only read field1 - let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied()); + let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [4].iter().copied()); assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices()); // Only read ts - let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied()); + let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [5].iter().copied()); assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Read field0, tag0, ts - let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied()); + let read_format = PrimaryKeyReadFormat::new(metadata, [2, 1, 5].iter().copied()); assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices()); } @@ -1240,8 +1032,7 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = - ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap(); + let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied()); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 @@ -1261,8 +1052,6 @@ mod tests { let mut batches = VecDeque::new(); read_format - .as_primary_key() - .unwrap() .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches) .unwrap(); @@ -1370,25 +1159,25 @@ mod tests { // Only read tag1 (column_id=3, index=1) + fixed columns let read_format = - ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false) + FlatReadFormat::new(metadata.clone(), [3].iter().copied(), None, "test", false) .unwrap(); assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices()); // Only read field1 (column_id=4, index=2) + fixed columns let read_format = - ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false) + FlatReadFormat::new(metadata.clone(), [4].iter().copied(), None, "test", false) .unwrap(); assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices()); // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed) let read_format = - ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false) + FlatReadFormat::new(metadata.clone(), [5].iter().copied(), None, "test", false) .unwrap(); assert_eq!(&[4, 5, 6, 7], read_format.projection_indices()); // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns let read_format = - ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap(); + FlatReadFormat::new(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap(); assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices()); } diff --git a/src/mito2/src/sst/parquet/prefilter.rs b/src/mito2/src/sst/parquet/prefilter.rs index 967ddd491b..c722e442cb 100644 --- a/src/mito2/src/sst/parquet/prefilter.rs +++ b/src/mito2/src/sst/parquet/prefilter.rs @@ -34,8 +34,8 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use crate::error::{ComputeArrowSnafu, DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu}; -use crate::sst::parquet::flat_format::primary_key_column_index; -use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; +use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index}; +use crate::sst::parquet::format::PrimaryKeyArray; use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder}; use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact; @@ -251,7 +251,7 @@ impl PrefilterContextBuilder { /// - The read format doesn't use flat layout with dictionary-encoded PKs /// - The primary key is empty pub(crate) fn new( - read_format: &ReadFormat, + read_format: &FlatReadFormat, codec: &Arc, primary_key_filters: Option<&Arc>>, parquet_schema: &SchemaDescriptor, @@ -267,8 +267,7 @@ impl PrefilterContextBuilder { } // Only perform PK prefiltering for primary-key-to-flat conversion path. - let flat_format = read_format.as_flat()?; - if flat_format.batch_has_raw_pk_columns() { + if read_format.batch_has_raw_pk_columns() { return None; } @@ -404,7 +403,7 @@ mod tests { use super::*; use crate::sst::internal_fields; - use crate::sst::parquet::format::ReadFormat; + use crate::sst::parquet::flat_format::FlatReadFormat; use crate::test_util::sst_util::{ new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding, }; @@ -414,7 +413,7 @@ mod tests { let metadata = Arc::new(sst_region_metadata_with_encoding( PrimaryKeyEncoding::Sparse, )); - let read_format = ReadFormat::new_flat( + let read_format = FlatReadFormat::new( metadata.clone(), metadata.column_metadatas.iter().map(|c| c.column_id), None, @@ -422,7 +421,7 @@ mod tests { true, ) .unwrap(); - assert!(read_format.as_flat().is_some()); + assert!(!read_format.batch_has_raw_pk_columns()); let filter = SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap(); assert!(is_usable_primary_key_filter(&metadata, None, &filter)); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index bcc20542c3..6942c8223d 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -16,7 +16,7 @@ #[cfg(feature = "vector_index")] use std::collections::BTreeSet; -use std::collections::{HashSet, VecDeque}; +use std::collections::HashSet; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -55,7 +55,6 @@ use crate::metrics::{ }; use crate::read::flat_projection::CompactionProjectionMapper; use crate::read::prune::FlatPruneReader; -use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::{ BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics, @@ -73,7 +72,8 @@ use crate::sst::parquet::async_reader::SstAsyncFileReader; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, }; -use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; +use crate::sst::parquet::flat_format::FlatReadFormat; +use crate::sst::parquet::format::need_override_sequence; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::prefilter::{ PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter, @@ -371,10 +371,9 @@ impl ParquetReaderBuilder { }; let mut read_format = if let Some(column_ids) = &self.projection { - ReadFormat::new( + FlatReadFormat::new( region_meta.clone(), - Some(column_ids), - true, // Always reads as flat format. + column_ids.iter().copied(), Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, skip_auto_convert, @@ -387,18 +386,14 @@ impl ParquetReaderBuilder { .iter() .map(|col| col.column_id) .collect(); - ReadFormat::new( + FlatReadFormat::new( region_meta.clone(), - Some(&column_ids), - true, // Always reads as flat format. + column_ids.iter().copied(), Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, skip_auto_convert, )? }; - if self.decode_primary_key_values { - read_format.set_decode_primary_key_values(true); - } if need_override_sequence(&parquet_meta) { read_format .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get())); @@ -540,7 +535,7 @@ impl ParquetReaderBuilder { /// and build a partition filter if they differ. fn build_partition_filter( &self, - read_format: &ReadFormat, + read_format: &FlatReadFormat, prune_schema: &Arc, ) -> Result> { let region_partition_expr_str = self @@ -567,15 +562,13 @@ impl ParquetReaderBuilder { region_partition_expr.collect_column_names(&mut referenced_columns); // Build a partition_schema containing only referenced columns. - let is_flat = read_format.as_flat().is_some(); let partition_schema = Arc::new(datatypes::schema::Schema::new( prune_schema .column_schemas() .iter() .filter(|col| referenced_columns.contains(&col.name)) .map(|col| { - if is_flat - && let Some(column_meta) = read_format.metadata().column_by_name(&col.name) + if let Some(column_meta) = read_format.metadata().column_by_name(&col.name) && column_meta.semantic_type == SemanticType::Tag && col.data_type.is_string() { @@ -656,7 +649,7 @@ impl ParquetReaderBuilder { )] async fn row_groups_to_read( &self, - read_format: &ReadFormat, + read_format: &FlatReadFormat, parquet_meta: &ParquetMetaData, metrics: &mut ReaderFilterMetrics, ) -> RowGroupSelection { @@ -1116,7 +1109,7 @@ impl ParquetReaderBuilder { /// Computes row groups selection after min-max pruning. fn row_groups_by_minmax( &self, - read_format: &ReadFormat, + read_format: &FlatReadFormat, parquet_meta: &ParquetMetaData, row_group_size: usize, total_row_count: usize, @@ -1791,8 +1784,6 @@ pub(crate) struct SimpleFilterContext { column_id: ColumnId, /// Semantic type of the column. semantic_type: SemanticType, - /// The data type of the column. - data_type: ConcreteDataType, /// Whether this filter can be applied by flat parquet primary-key prefiltering. usable_primary_key_filter: bool, } @@ -1849,7 +1840,6 @@ impl SimpleFilterContext { filter: maybe_filter, column_id: column_metadata.column_id, semantic_type: column_metadata.semantic_type, - data_type: column_metadata.column_schema.data_type.clone(), usable_primary_key_filter, }) } @@ -1869,11 +1859,6 @@ impl SimpleFilterContext { self.semantic_type } - /// Returns the data type of the column. - pub(crate) fn data_type(&self) -> &ConcreteDataType { - &self.data_type - } - /// Returns whether this filter is eligible for flat parquet PK prefiltering. pub(crate) fn usable_primary_key_filter(&self) -> bool { self.usable_primary_key_filter @@ -1967,7 +1952,6 @@ impl ParquetReader { context: FileRangeContextRef, mut selection: RowGroupSelection, ) -> Result { - debug_assert!(context.read_format().as_flat().is_some()); let fetch_metrics = ParquetFetchMetrics::default(); let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() { let skip_fields = context.pre_filter_mode().skip_fields(); @@ -2007,140 +1991,6 @@ impl ParquetReader { } } -/// RowGroupReaderContext represents the fields that cannot be shared -/// between different `RowGroupReader`s. -pub(crate) trait RowGroupReaderContext: Send { - fn read_format(&self) -> &ReadFormat; - - fn file_path(&self) -> &str; -} - -impl RowGroupReaderContext for FileRangeContextRef { - fn read_format(&self) -> &ReadFormat { - self.as_ref().read_format() - } - - fn file_path(&self) -> &str { - self.as_ref().file_path() - } -} - -/// [RowGroupReader] that reads from [FileRange]. -pub(crate) type RowGroupReader = RowGroupReaderBase; - -#[allow(dead_code)] -impl RowGroupReader { - /// Creates a new reader from file range. - pub(crate) fn new( - context: FileRangeContextRef, - stream: ParquetRecordBatchStream, - ) -> Self { - Self::create(context, stream) - } -} - -/// Reader to read a row group of a parquet file. -pub(crate) struct RowGroupReaderBase { - /// Context of [RowGroupReader] so adapts to different underlying implementation. - context: T, - /// Inner parquet record batch stream. - stream: ParquetRecordBatchStream, - /// Buffered batches to return. - batches: VecDeque, - /// Local scan metrics. - metrics: ReaderMetrics, - /// Cached sequence array to override sequences. - override_sequence: Option, -} - -#[allow(dead_code)] -impl RowGroupReaderBase -where - T: RowGroupReaderContext, -{ - /// Creates a new reader to read the primary key format. - pub(crate) fn create(context: T, stream: ParquetRecordBatchStream) -> Self { - // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE. - let override_sequence = context - .read_format() - .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE); - assert!(context.read_format().as_primary_key().is_some()); - - Self { - context, - stream, - batches: VecDeque::new(), - metrics: ReaderMetrics::default(), - override_sequence, - } - } - - /// Gets the metrics. - pub(crate) fn metrics(&self) -> &ReaderMetrics { - &self.metrics - } - - /// Gets [ReadFormat] of underlying reader. - pub(crate) fn read_format(&self) -> &ReadFormat { - self.context.read_format() - } - - /// Tries to fetch next [RecordBatch] from the stream asynchronously. - async fn fetch_next_record_batch(&mut self) -> Result> { - match self.stream.next().await.transpose() { - Ok(batch) => Ok(batch), - Err(e) => Err(e).context(ReadParquetSnafu { - path: self.context.file_path(), - }), - } - } - - /// Returns the next [Batch]. - pub(crate) async fn next_inner(&mut self) -> Result> { - let scan_start = Instant::now(); - if let Some(batch) = self.batches.pop_front() { - self.metrics.num_rows += batch.num_rows(); - self.metrics.scan_cost += scan_start.elapsed(); - return Ok(Some(batch)); - } - - // We need to fetch next record batch and convert it to batches. - while self.batches.is_empty() { - let Some(record_batch) = self.fetch_next_record_batch().await? else { - self.metrics.scan_cost += scan_start.elapsed(); - return Ok(None); - }; - self.metrics.num_record_batches += 1; - - // Safety: We ensures the format is primary key in the RowGroupReaderBase::create(). - self.context - .read_format() - .as_primary_key() - .unwrap() - .convert_record_batch( - &record_batch, - self.override_sequence.as_ref(), - &mut self.batches, - )?; - self.metrics.num_batches += self.batches.len(); - } - let batch = self.batches.pop_front(); - self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); - self.metrics.scan_cost += scan_start.elapsed(); - Ok(batch) - } -} - -#[async_trait::async_trait] -impl BatchReader for RowGroupReaderBase -where - T: RowGroupReaderContext + Send + Sync, -{ - async fn next_batch(&mut self) -> Result> { - self.next_inner().await - } -} - /// Reader to read a row group of a parquet file in flat format, returning RecordBatch. pub(crate) struct FlatRowGroupReader { /// Context for file ranges. @@ -2177,10 +2027,10 @@ impl FlatRowGroupReader { path: self.context.file_path(), })?; - // Safety: Only flat format use FlatRowGroupReader. - let flat_format = self.context.read_format().as_flat().unwrap(); - let record_batch = - flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?; + let record_batch = self + .context + .read_format() + .convert_batch(record_batch, self.override_sequence.as_ref())?; Ok(Some(record_batch)) } None => Ok(None), @@ -2269,8 +2119,17 @@ mod tests { object_store.write(&file_path, parquet_bytes).await.unwrap(); let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); - let read_format = - ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap(); + let read_format = FlatReadFormat::new( + region_metadata.clone(), + region_metadata + .column_metadatas + .iter() + .map(|column| column.column_id), + None, + &file_path, + false, + ) + .unwrap(); let mut cache_metrics = MetadataCacheMetrics::default(); let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); diff --git a/src/mito2/src/sst/parquet/stats.rs b/src/mito2/src/sst/parquet/stats.rs index 7c5da69d4b..cdedde94e2 100644 --- a/src/mito2/src/sst/parquet/stats.rs +++ b/src/mito2/src/sst/parquet/stats.rs @@ -26,14 +26,15 @@ use parquet::file::metadata::RowGroupMetaData; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; -use crate::sst::parquet::format::{ReadFormat, StatValues}; +use crate::sst::parquet::flat_format::FlatReadFormat; +use crate::sst::parquet::format::StatValues; /// Statistics for pruning row groups. pub(crate) struct RowGroupPruningStats<'a, T> { /// Metadata of SST row groups. row_groups: &'a [T], /// Helper to read the SST. - read_format: &'a ReadFormat, + read_format: &'a FlatReadFormat, /// The metadata of the region. /// It contains the schema a query expects to read. If it is not None, we use it instead /// of the metadata in the SST to get the column id of a column as the SST may have @@ -47,7 +48,7 @@ impl<'a, T> RowGroupPruningStats<'a, T> { /// Creates a new statistics to prune specific `row_groups`. pub(crate) fn new( row_groups: &'a [T], - read_format: &'a ReadFormat, + read_format: &'a FlatReadFormat, expected_metadata: Option, skip_fields: bool, ) -> Self {