diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 9d86830856..086e965e92 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -638,15 +638,18 @@ struct CompactionSstReaderBuilder<'a> { impl CompactionSstReaderBuilder<'_> { /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. async fn build_sst_reader(self) -> Result { - let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?) - .with_files(self.inputs.to_vec()) - .with_append_mode(self.append_mode) - // We use special cache strategy for compaction. - .with_cache(CacheStrategy::Compaction(self.cache)) - .with_filter_deleted(self.filter_deleted) - // We ignore file not found error during compaction. - .with_ignore_file_not_found(true) - .with_merge_mode(self.merge_mode); + let mut scan_input = ScanInput::new( + self.sst_layer, + ProjectionMapper::all(&self.metadata, false)?, + ) + .with_files(self.inputs.to_vec()) + .with_append_mode(self.append_mode) + // We use special cache strategy for compaction. + .with_cache(CacheStrategy::Compaction(self.cache)) + .with_filter_deleted(self.filter_deleted) + // We ignore file not found error during compaction. + .with_ignore_file_not_found(true) + .with_merge_mode(self.merge_mode); // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944 // by converting time ranges into predicate. diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 5d1580dd17..76e43a41f3 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -18,6 +18,7 @@ pub mod compat; pub mod dedup; pub mod flat_dedup; pub mod flat_merge; +pub mod flat_projection; pub mod last_row; pub mod merge; pub mod plain_batch; diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 4bd23987b7..839a68a8f0 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -28,8 +28,10 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; -use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result}; -use crate::read::projection::ProjectionMapper; +use crate::error::{ + CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result, UnexpectedSnafu, +}; +use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper}; use crate::read::{Batch, BatchColumn, BatchReader}; /// Reader to adapt schema of underlying reader to expected schema. @@ -87,6 +89,9 @@ impl CompatBatch { pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result { let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta); let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?; + let mapper = mapper.as_primary_key().context(UnexpectedSnafu { + reason: "Unexpected format", + })?; let compat_fields = may_compat_fields(mapper, &reader_meta)?; Ok(Self { @@ -313,7 +318,7 @@ fn may_compat_primary_key( /// Creates a [CompatFields] if needed. fn may_compat_fields( - mapper: &ProjectionMapper, + mapper: &PrimaryKeyProjectionMapper, actual: &RegionMetadata, ) -> Result> { let expect_fields = mapper.batch_fields(); @@ -675,7 +680,7 @@ mod tests { ], &[1], )); - let mapper = ProjectionMapper::all(&reader_meta).unwrap(); + let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap(); assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none()) } @@ -707,7 +712,7 @@ mod tests { ], &[1, 3], )); - let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); let k1 = encode_key(&[Some("a")]); let k2 = encode_key(&[Some("b")]); let source_reader = VecBatchReader::new(&[ @@ -756,7 +761,7 @@ mod tests { ], &[1], )); - let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); let k1 = encode_key(&[Some("a")]); let k2 = encode_key(&[Some("b")]); let source_reader = VecBatchReader::new(&[ @@ -801,7 +806,7 @@ mod tests { ], &[1], )); - let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); let k1 = encode_key(&[Some("a")]); let k2 = encode_key(&[Some("b")]); let source_reader = VecBatchReader::new(&[ @@ -858,7 +863,7 @@ mod tests { &[1], )); // tag_1, field_2, field_3 - let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap(); + let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap(); let k1 = encode_key(&[Some("a")]); let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]); @@ -871,7 +876,7 @@ mod tests { .await; // tag_1, field_4, field_3 - let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap(); + let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap(); let k1 = encode_key(&[Some("a")]); let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]); @@ -916,7 +921,7 @@ mod tests { expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse; let expect_meta = Arc::new(expect_meta); - let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); let k1 = encode_key(&[Some("a")]); let k2 = encode_key(&[Some("b")]); let source_reader = VecBatchReader::new(&[ diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs new file mode 100644 index 0000000000..ddf8271013 --- /dev/null +++ b/src/mito2/src/read/flat_projection.rs @@ -0,0 +1,221 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for projection on flat format. + +use std::sync::Arc; + +use common_error::ext::BoxedError; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::RecordBatch; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{Schema, SchemaRef}; +use datatypes::vectors::Helper; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::ColumnId; + +use crate::error::{InvalidRequestSnafu, Result}; +use crate::sst::parquet::flat_format::sst_column_id_indices; +use crate::sst::parquet::format::FormatProjection; + +/// Handles projection and converts batches in flat format with correct schema. +/// +/// This mapper support duplicate and unsorted projection indices. +/// The output schema is determined by the projection indices. +#[allow(dead_code)] +pub struct FlatProjectionMapper { + /// Metadata of the region. + metadata: RegionMetadataRef, + /// Schema for converted [RecordBatch] to return. + output_schema: SchemaRef, + /// Ids of columns to project. It keeps ids in the same order as the `projection` + /// indices to build the mapper. + /// The mapper won't deduplicate the column ids. + column_ids: Vec, + /// Ids and DataTypes of columns of the expected batch. + /// We can use this to check if the batch is compatible with the expected schema. + batch_schema: Vec<(ColumnId, ConcreteDataType)>, + /// `true` If the original projection is empty. + is_empty_projection: bool, + /// The index in flat format [RecordBatch] for each column in the output [RecordBatch]. + batch_indices: Vec, +} + +impl FlatProjectionMapper { + /// Returns a new mapper with projection. + /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count. + /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts + /// empty `RecordBatch` and only use its row count in this query. + pub fn new( + metadata: &RegionMetadataRef, + projection: impl Iterator, + ) -> Result { + let mut projection: Vec<_> = projection.collect(); + // If the original projection is empty. + let is_empty_projection = projection.is_empty(); + if is_empty_projection { + // If the projection is empty, we still read the time index column. + projection.push(metadata.time_index_column_pos()); + } + + // Output column schemas for the projection. + let mut column_schemas = Vec::with_capacity(projection.len()); + // Column ids of the projection without deduplication. + let mut column_ids = Vec::with_capacity(projection.len()); + for idx in &projection { + // For each projection index, we get the column id for projection. + let column = metadata + .column_metadatas + .get(*idx) + .context(InvalidRequestSnafu { + region_id: metadata.region_id, + reason: format!("projection index {} is out of bound", idx), + })?; + + column_ids.push(column.column_id); + // Safety: idx is valid. + column_schemas.push(metadata.schema.column_schemas()[*idx].clone()); + } + + if is_empty_projection { + // If projection is empty, we don't output any column. + return Ok(FlatProjectionMapper { + metadata: metadata.clone(), + output_schema: Arc::new(Schema::new(vec![])), + column_ids, + batch_schema: vec![], + is_empty_projection, + batch_indices: vec![], + }); + } + + // Safety: Columns come from existing schema. + let output_schema = Arc::new(Schema::new(column_schemas)); + + // Creates a map to lookup index. + let id_to_index = sst_column_id_indices(metadata); + // TODO(yingwen): Support different flat schema options. + let format_projection = FormatProjection::compute_format_projection( + &id_to_index, + // All columns with internal columns. + metadata.column_metadatas.len() + 3, + column_ids.iter().copied(), + ); + + let batch_indices: Vec<_> = column_ids + .iter() + .map(|id| { + // Safety: The map is computed from `projection` itself. + format_projection + .column_id_to_projected_index + .get(id) + .copied() + .unwrap() + }) + .collect(); + + let batch_schema = plain_projected_columns(metadata, &format_projection); + + Ok(FlatProjectionMapper { + metadata: metadata.clone(), + output_schema, + column_ids, + batch_schema, + is_empty_projection, + batch_indices, + }) + } + + /// Returns a new mapper without projection. + pub fn all(metadata: &RegionMetadataRef) -> Result { + FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) + } + + /// Returns the metadata that created the mapper. + pub(crate) fn metadata(&self) -> &RegionMetadataRef { + &self.metadata + } + + /// Returns ids of projected columns that we need to read + /// from memtables and SSTs. + pub(crate) fn column_ids(&self) -> &[ColumnId] { + &self.column_ids + } + + /// Returns ids of columns of the batch that the mapper expects to convert. + #[allow(dead_code)] + pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] { + &self.batch_schema + } + + /// Returns the schema of converted [RecordBatch]. + /// This is the schema that the stream will output. This schema may contain + /// less columns than [ProjectionMapper::column_ids()]. + pub(crate) fn output_schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + /// Returns an empty [RecordBatch]. + pub(crate) fn empty_record_batch(&self) -> RecordBatch { + RecordBatch::new_empty(self.output_schema.clone()) + } + + /// Converts a flat format [RecordBatch] to a normal [RecordBatch]. + /// + /// The batch must match the `projection` using to build the mapper. + #[allow(dead_code)] + pub(crate) fn convert( + &self, + batch: &datatypes::arrow::record_batch::RecordBatch, + ) -> common_recordbatch::error::Result { + if self.is_empty_projection { + return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows()); + } + + let mut columns = Vec::with_capacity(self.output_schema.num_columns()); + for index in &self.batch_indices { + let array = batch.column(*index).clone(); + let vector = Helper::try_into_vector(array) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + columns.push(vector); + } + + RecordBatch::new(self.output_schema.clone(), columns) + } +} + +/// Returns ids and datatypes of columns of the output batch after applying the `projection`. +pub(crate) fn plain_projected_columns( + metadata: &RegionMetadata, + format_projection: &FormatProjection, +) -> Vec<(ColumnId, ConcreteDataType)> { + let mut schema = vec![None; format_projection.column_id_to_projected_index.len()]; + for (column_id, index) in &format_projection.column_id_to_projected_index { + // Safety: FormatProjection ensures the id is valid. + schema[*index] = Some(( + *column_id, + metadata + .column_by_id(*column_id) + .unwrap() + .column_schema + .data_type + .clone(), + )); + } + + // Safety: FormatProjection ensures all indices can be unwrapped. + schema.into_iter().map(|id_type| id_type.unwrap()).collect() +} diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 1837b8e18e..1f8bb0ed20 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Utilities for projection. +//! Utilities for projection operations. use std::cmp::Ordering; use std::collections::HashMap; @@ -33,13 +33,102 @@ use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, Result}; +use crate::read::flat_projection::FlatProjectionMapper; use crate::read::Batch; /// Only cache vector when its length `<=` this value. const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384; +/// Wrapper enum for different projection mapper implementations. +pub enum ProjectionMapper { + /// Projection mapper for primary key format. + PrimaryKey(PrimaryKeyProjectionMapper), + /// Projection mapper for flat format. + Flat(FlatProjectionMapper), +} + +impl ProjectionMapper { + /// Returns a new mapper with projection. + pub fn new( + metadata: &RegionMetadataRef, + projection: impl Iterator + Clone, + flat_format: bool, + ) -> Result { + if flat_format { + Ok(ProjectionMapper::Flat(FlatProjectionMapper::new( + metadata, projection, + )?)) + } else { + Ok(ProjectionMapper::PrimaryKey( + PrimaryKeyProjectionMapper::new(metadata, projection)?, + )) + } + } + + /// Returns a new mapper without projection. + pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result { + if flat_format { + Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?)) + } else { + Ok(ProjectionMapper::PrimaryKey( + PrimaryKeyProjectionMapper::all(metadata)?, + )) + } + } + + /// Returns the metadata that created the mapper. + pub(crate) fn metadata(&self) -> &RegionMetadataRef { + match self { + ProjectionMapper::PrimaryKey(m) => m.metadata(), + ProjectionMapper::Flat(m) => m.metadata(), + } + } + + /// Returns ids of projected columns that we need to read + /// from memtables and SSTs. + pub(crate) fn column_ids(&self) -> &[ColumnId] { + match self { + ProjectionMapper::PrimaryKey(m) => m.column_ids(), + ProjectionMapper::Flat(m) => m.column_ids(), + } + } + + /// Returns the schema of converted [RecordBatch]. + pub(crate) fn output_schema(&self) -> SchemaRef { + match self { + ProjectionMapper::PrimaryKey(m) => m.output_schema(), + ProjectionMapper::Flat(m) => m.output_schema(), + } + } + + /// Returns the primary key projection mapper or None if this is not a primary key mapper. + pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> { + match self { + ProjectionMapper::PrimaryKey(m) => Some(m), + ProjectionMapper::Flat(_) => None, + } + } + + /// Returns the flat projection mapper or None if this is not a flat mapper. + pub fn as_flat(&self) -> Option<&FlatProjectionMapper> { + match self { + ProjectionMapper::PrimaryKey(_) => None, + ProjectionMapper::Flat(m) => Some(m), + } + } + + /// Returns an empty [RecordBatch]. + // TODO(yingwen): This is unused now. Use it after we finishing the flat format. + pub fn empty_record_batch(&self) -> RecordBatch { + match self { + ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(), + ProjectionMapper::Flat(m) => m.empty_record_batch(), + } + } +} + /// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. -pub struct ProjectionMapper { +pub struct PrimaryKeyProjectionMapper { /// Metadata of the region. metadata: RegionMetadataRef, /// Maps column in [RecordBatch] to index in [Batch]. @@ -59,7 +148,7 @@ pub struct ProjectionMapper { is_empty_projection: bool, } -impl ProjectionMapper { +impl PrimaryKeyProjectionMapper { /// Returns a new mapper with projection. /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count. /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts @@ -67,7 +156,7 @@ impl ProjectionMapper { pub fn new( metadata: &RegionMetadataRef, projection: impl Iterator, - ) -> Result { + ) -> Result { let mut projection: Vec<_> = projection.collect(); // If the original projection is empty. let is_empty_projection = projection.is_empty(); @@ -96,7 +185,7 @@ impl ProjectionMapper { let codec = build_primary_key_codec(metadata); if is_empty_projection { // If projection is empty, we don't output any column. - return Ok(ProjectionMapper { + return Ok(PrimaryKeyProjectionMapper { metadata: metadata.clone(), batch_indices: vec![], has_tags: false, @@ -146,7 +235,7 @@ impl ProjectionMapper { batch_indices.push(batch_index); } - Ok(ProjectionMapper { + Ok(PrimaryKeyProjectionMapper { metadata: metadata.clone(), batch_indices, has_tags, @@ -159,8 +248,8 @@ impl ProjectionMapper { } /// Returns a new mapper without projection. - pub fn all(metadata: &RegionMetadataRef) -> Result { - ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) + pub fn all(metadata: &RegionMetadataRef) -> Result { + PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len()) } /// Returns the metadata that created the mapper. @@ -181,7 +270,7 @@ impl ProjectionMapper { /// Returns the schema of converted [RecordBatch]. /// This is the schema that the stream will output. This schema may contain - /// less columns than [ProjectionMapper::column_ids()]. + /// less columns than [PrimaryKeyProjectionMapper::column_ids()]. pub(crate) fn output_schema(&self) -> SchemaRef { self.output_schema.clone() } @@ -318,10 +407,14 @@ mod tests { use api::v1::OpType; use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; + use datatypes::arrow::datatypes::Field; use datatypes::arrow::util::pretty; use datatypes::value::ValueRef; use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; use mito_codec::test_util::TestRegionMetadataBuilder; + use store_api::storage::consts::{ + OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, + }; use super::*; use crate::cache::CacheManager; @@ -386,21 +479,26 @@ mod tests { .num_fields(2) .build(), ); - let mapper = ProjectionMapper::all(&metadata).unwrap(); + // Create the enum wrapper with default format (primary key) + let mapper = ProjectionMapper::all(&metadata, false).unwrap(); assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!( [ (3, ConcreteDataType::int64_datatype()), (4, ConcreteDataType::int64_datatype()) ], - mapper.batch_fields() + mapper.as_primary_key().unwrap().batch_fields() ); // With vector cache. let cache = CacheManager::builder().vector_cache_size(1024).build(); let cache = CacheStrategy::EnableAll(Arc::new(cache)); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.convert(&batch, &cache).unwrap(); + let record_batch = mapper + .as_primary_key() + .unwrap() + .convert(&batch, &cache) + .unwrap(); let expect = "\ +---------------------+----+----+----+----+ | ts | k0 | k1 | v0 | v1 | @@ -420,7 +518,11 @@ mod tests { assert!(cache .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3)) .is_none()); - let record_batch = mapper.convert(&batch, &cache).unwrap(); + let record_batch = mapper + .as_primary_key() + .unwrap() + .convert(&batch, &cache) + .unwrap(); assert_eq!(expect, print_record_batch(record_batch)); } @@ -433,17 +535,21 @@ mod tests { .build(), ); // Columns v1, k0 - let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap(); + let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap(); assert_eq!([4, 1], mapper.column_ids()); assert_eq!( [(4, ConcreteDataType::int64_datatype())], - mapper.batch_fields() + mapper.as_primary_key().unwrap().batch_fields() ); let batch = new_batch(0, &[1, 2], &[(4, 4)], 3); let cache = CacheManager::builder().vector_cache_size(1024).build(); let cache = CacheStrategy::EnableAll(Arc::new(cache)); - let record_batch = mapper.convert(&batch, &cache).unwrap(); + let record_batch = mapper + .as_primary_key() + .unwrap() + .convert(&batch, &cache) + .unwrap(); let expect = "\ +----+----+ | v1 | k0 | @@ -464,18 +570,213 @@ mod tests { .build(), ); // Empty projection - let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap(); + let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap(); assert_eq!([0], mapper.column_ids()); // Should still read the time index column - assert!(mapper.batch_fields().is_empty()); - assert!(!mapper.has_tags); - assert!(mapper.batch_indices.is_empty()); assert!(mapper.output_schema().is_empty()); - assert!(mapper.is_empty_projection); + let pk_mapper = mapper.as_primary_key().unwrap(); + assert!(pk_mapper.batch_fields().is_empty()); + assert!(!pk_mapper.has_tags); + assert!(pk_mapper.batch_indices.is_empty()); + assert!(pk_mapper.is_empty_projection); let batch = new_batch(0, &[1, 2], &[], 3); let cache = CacheManager::builder().vector_cache_size(1024).build(); let cache = CacheStrategy::EnableAll(Arc::new(cache)); - let record_batch = mapper.convert(&batch, &cache).unwrap(); + let record_batch = pk_mapper.convert(&batch, &cache).unwrap(); + assert_eq!(3, record_batch.num_rows()); + assert_eq!(0, record_batch.num_columns()); + assert!(record_batch.schema.is_empty()); + } + + fn new_flat_batch( + ts_start: Option, + idx_tags: &[(usize, i64)], + idx_fields: &[(usize, i64)], + num_rows: usize, + ) -> datatypes::arrow::record_batch::RecordBatch { + let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3); + let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3); + + // Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type + + // Primary key columns first + for (i, tag) in idx_tags { + let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n( + *tag, num_rows, + ))) as _; + columns.push(array); + fields.push(Field::new( + format!("k{i}"), + datatypes::arrow::datatypes::DataType::Int64, + true, + )); + } + + // Field columns + for (i, field) in idx_fields { + let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n( + *field, num_rows, + ))) as _; + columns.push(array); + fields.push(Field::new( + format!("v{i}"), + datatypes::arrow::datatypes::DataType::Int64, + true, + )); + } + + // Time index + if let Some(ts_start) = ts_start { + let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values( + (0..num_rows).map(|i| ts_start + i as i64 * 1000), + )) as _; + columns.push(timestamps); + fields.push(Field::new( + "ts", + datatypes::arrow::datatypes::DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Millisecond, + None, + ), + true, + )); + } + + // __primary_key column (encoded primary key as dictionary) + // Create encoded primary key + let converter = DensePrimaryKeyCodec::with_fields( + (0..idx_tags.len()) + .map(|idx| { + ( + idx as u32, + SortField::new(ConcreteDataType::int64_datatype()), + ) + }) + .collect(), + ); + let encoded_pk = converter + .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v))) + .unwrap(); + + // Create dictionary array for the encoded primary key + let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect(); + let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32); + let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values)); + let pk_array = + Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _; + columns.push(pk_array); + fields.push(Field::new_dictionary( + PRIMARY_KEY_COLUMN_NAME, + datatypes::arrow::datatypes::DataType::UInt32, + datatypes::arrow::datatypes::DataType::Binary, + false, + )); + + // __sequence column + columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _); + fields.push(Field::new( + SEQUENCE_COLUMN_NAME, + datatypes::arrow::datatypes::DataType::UInt64, + false, + )); + + // __op_type column + columns.push(Arc::new(UInt8Array::from_iter_values( + (0..num_rows).map(|_| OpType::Put as u8), + )) as _); + fields.push(Field::new( + OP_TYPE_COLUMN_NAME, + datatypes::arrow::datatypes::DataType::UInt8, + false, + )); + + let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields)); + + datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap() + } + + #[test] + fn test_plain_projection_mapper_all() { + let metadata = Arc::new( + TestRegionMetadataBuilder::default() + .num_tags(2) + .num_fields(2) + .build(), + ); + let mapper = ProjectionMapper::all(&metadata, true).unwrap(); + assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); + assert_eq!( + [ + (1, ConcreteDataType::int64_datatype()), + (2, ConcreteDataType::int64_datatype()), + (3, ConcreteDataType::int64_datatype()), + (4, ConcreteDataType::int64_datatype()), + (0, ConcreteDataType::timestamp_millisecond_datatype()) + ], + mapper.as_flat().unwrap().batch_schema() + ); + + let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3); + let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let expect = "\ ++---------------------+----+----+----+----+ +| ts | k0 | k1 | v0 | v1 | ++---------------------+----+----+----+----+ +| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 | +| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 | +| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 | ++---------------------+----+----+----+----+"; + assert_eq!(expect, print_record_batch(record_batch)); + } + + #[test] + fn test_plain_projection_mapper_with_projection() { + let metadata = Arc::new( + TestRegionMetadataBuilder::default() + .num_tags(2) + .num_fields(2) + .build(), + ); + // Columns v1, k0 + let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap(); + assert_eq!([4, 1], mapper.column_ids()); + assert_eq!( + [ + (1, ConcreteDataType::int64_datatype()), + (4, ConcreteDataType::int64_datatype()) + ], + mapper.as_flat().unwrap().batch_schema() + ); + + let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3); + let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap(); + let expect = "\ ++----+----+ +| v1 | k0 | ++----+----+ +| 4 | 1 | +| 4 | 1 | +| 4 | 1 | ++----+----+"; + assert_eq!(expect, print_record_batch(record_batch)); + } + + #[test] + fn test_plain_projection_mapper_empty_projection() { + let metadata = Arc::new( + TestRegionMetadataBuilder::default() + .num_tags(2) + .num_fields(2) + .build(), + ); + // Empty projection + let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap(); + assert_eq!([0], mapper.column_ids()); // Should still read the time index column + assert!(mapper.output_schema().is_empty()); + let plain_mapper = mapper.as_flat().unwrap(); + assert!(plain_mapper.batch_schema().is_empty()); + + let batch = new_flat_batch(Some(0), &[], &[], 3); + let record_batch = plain_mapper.convert(&batch).unwrap(); assert_eq!(3, record_batch.num_rows()); assert_eq!(0, record_batch.num_columns()); assert!(record_batch.schema.is_empty()); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index e442f3e924..363a6ea503 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -374,8 +374,8 @@ impl ScanRegion { // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { - Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, - None => ProjectionMapper::all(&self.version.metadata)?, + Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?, + None => ProjectionMapper::all(&self.version.metadata, false)?, }; let ssts = &self.version.ssts; @@ -451,8 +451,8 @@ impl ScanRegion { let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { - Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, - None => ProjectionMapper::all(&self.version.metadata)?, + Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?, + None => ProjectionMapper::all(&self.version.metadata, false)?, }; let input = ScanInput::new(self.access_layer, mapper) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index a2ed0bda37..3cec65b710 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -27,7 +27,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::StreamExt; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, @@ -35,7 +35,7 @@ use store_api::region_engine::{ use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; -use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu}; +use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu}; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; @@ -274,6 +274,9 @@ impl SeqScan { stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), )); + let mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu { + reason: "Unexpected format", + })?; // Scans each part. for part_range in partition_ranges { let mut sources = Vec::new(); @@ -309,7 +312,7 @@ impl SeqScan { #[cfg(debug_assertions)] checker.ensure_part_range_batch( "SeqScan", - stream_ctx.input.mapper.metadata().region_id, + mapper.metadata().region_id, partition, part_range, &batch, diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 7bde4be6f6..58befd0711 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -23,10 +23,10 @@ use common_recordbatch::{DfRecordBatch, RecordBatch}; use datatypes::compute; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::cache::CacheStrategy; -use crate::error::Result; +use crate::error::{Result, UnexpectedSnafu}; use crate::read::projection::ProjectionMapper; use crate::read::scan_util::PartitionMetrics; use crate::read::series_scan::SeriesBatch; @@ -66,12 +66,21 @@ impl ConvertBatchStream { } fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result { + let mapper = self + .projection_mapper + .as_primary_key() + .context(UnexpectedSnafu { + reason: "Unexpected format", + }) + .map_err(|e| BoxedError::new(e) as _) + .context(ExternalSnafu)?; + match batch { ScanBatch::Normal(batch) => { if batch.is_empty() { - Ok(self.projection_mapper.empty_record_batch()) + Ok(mapper.empty_record_batch()) } else { - self.projection_mapper.convert(&batch, &self.cache_strategy) + mapper.convert(&batch, &self.cache_strategy) } } ScanBatch::Series(series) => { @@ -79,13 +88,11 @@ impl ConvertBatchStream { self.buffer.reserve(series.batches.len()); for batch in series.batches { - let record_batch = self - .projection_mapper - .convert(&batch, &self.cache_strategy)?; + let record_batch = mapper.convert(&batch, &self.cache_strategy)?; self.buffer.push(record_batch.into_df_record_batch()); } - let output_schema = self.projection_mapper.output_schema(); + let output_schema = mapper.output_schema(); let record_batch = compute::concat_batches(output_schema.arrow_schema(), &self.buffer) .context(ArrowComputeSnafu)?; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 91e1e36612..e9ac497962 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -227,6 +227,7 @@ impl UnorderedScan { for part_range in part_ranges { let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); + let mapper = &stream_ctx.input.mapper; #[cfg(debug_assertions)] let mut checker = crate::read::BatchChecker::default() .with_start(Some(part_range.start)) @@ -252,7 +253,7 @@ impl UnorderedScan { #[cfg(debug_assertions)] checker.ensure_part_range_batch( "UnorderedScan", - stream_ctx.input.mapper.metadata().region_id, + mapper.metadata().region_id, partition, part_range, &batch, diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index a0b4bc4cad..5e5b2d033f 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -284,7 +284,7 @@ impl FlatReadFormat { /// Returns a map that the key is the column id and the value is the column position /// in the SST. /// It only supports SSTs with raw primary key columns. -fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap { +pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap { let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len()); let mut column_index = 0; // keys