diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 284ca5a3d9..12fbdcbfaf 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -887,7 +887,7 @@ mod tests { use super::*; use crate::memtable::bulk::context::BulkIterContext; - use crate::sst::parquet::format::ReadFormat; + use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat}; use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; use crate::test_util::memtable_util::{ build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key, @@ -995,7 +995,7 @@ mod tests { let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup) .unwrap() .unwrap(); - let read_format = ReadFormat::new_with_all_columns(metadata.clone()); + let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone()); let mut batches = VecDeque::new(); read_format .convert_record_batch(&batch, None, &mut batches) diff --git a/src/mito2/src/read/plain_batch.rs b/src/mito2/src/read/plain_batch.rs index 627e93789e..e227154700 100644 --- a/src/mito2/src/read/plain_batch.rs +++ b/src/mito2/src/read/plain_batch.rs @@ -30,7 +30,11 @@ use crate::error::{ ComputeArrowSnafu, CreateDefaultSnafu, InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu, }; -use crate::sst::parquet::plain_format::PLAIN_FIXED_POS_COLUMN_NUM; + +/// Number of columns that have fixed positions. +/// +/// Contains all internal columns. +pub(crate) const PLAIN_FIXED_POS_COLUMN_NUM: usize = 2; /// [PlainBatch] represents a batch of rows. /// It is a wrapper around [RecordBatch]. diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index c591fb233c..cd3a991757 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -24,11 +24,11 @@ use crate::sst::index::IndexOutput; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; pub(crate) mod file_range; +pub mod flat_format; pub mod format; pub(crate) mod helper; pub(crate) mod metadata; pub(crate) mod page_reader; -pub mod plain_format; pub mod reader; pub mod row_group; pub mod row_selection; @@ -117,7 +117,7 @@ mod tests { use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl}; - use crate::sst::parquet::format::WriteFormat; + use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics}; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; @@ -523,7 +523,7 @@ mod tests { let writer_props = props_builder.build(); - let write_format = WriteFormat::new(metadata); + let write_format = PrimaryKeyWriteFormat::new(metadata); let fields: Vec<_> = write_format .arrow_schema() .fields() diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 5c4486a393..24697c68ec 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -308,8 +308,12 @@ impl RangeBase { } } SemanticType::Field => { - let Some(field_index) = - self.read_format.field_index_by_id(filter_ctx.column_id()) + // Safety: Input is Batch so we are using primary key format. + let Some(field_index) = self + .read_format + .as_primary_key() + .unwrap() + .field_index_by_id(filter_ctx.column_id()) else { continue; }; diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs new file mode 100644 index 0000000000..59cbd341e8 --- /dev/null +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -0,0 +1,302 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Format to store in parquet. +//! +//! It can store both encoded primary key and raw key columns. +//! +//! We store two additional internal columns at last: +//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary) +//! - `__sequence`, the sequence number of a row. Type: uint64 +//! - `__op_type`, the op type of the row. Type: uint8 +//! +//! The format is +//! ```text +//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type. +//! +//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()) +//! and stores primary key columns in the same order as [RegionMetadata::primary_key]. + +use std::borrow::Borrow; +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::SemanticType; +use datatypes::arrow::array::{ArrayRef, UInt64Array}; +use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::record_batch::RecordBatch; +use parquet::file::metadata::RowGroupMetaData; +use snafu::ResultExt; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::{ColumnId, SequenceNumber}; + +use crate::error::{NewRecordBatchSnafu, Result}; +use crate::sst::parquet::format::{FormatProjection, ReadFormat, StatValues}; +use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; + +/// Helper for writing the SST format. +#[allow(dead_code)] +pub(crate) struct FlatWriteFormat { + metadata: RegionMetadataRef, + /// SST file schema. + arrow_schema: SchemaRef, + override_sequence: Option, +} + +impl FlatWriteFormat { + /// Creates a new helper. + #[allow(dead_code)] + pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat { + let arrow_schema = to_flat_sst_arrow_schema(&metadata, options); + FlatWriteFormat { + metadata, + arrow_schema, + override_sequence: None, + } + } + + /// Set override sequence. + #[allow(dead_code)] + pub(crate) fn with_override_sequence( + mut self, + override_sequence: Option, + ) -> Self { + self.override_sequence = override_sequence; + self + } + + /// Gets the arrow schema to store in parquet. + #[allow(dead_code)] + pub(crate) fn arrow_schema(&self) -> &SchemaRef { + &self.arrow_schema + } + + /// Convert `batch` to a arrow record batch to store in parquet. + #[allow(dead_code)] + pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result { + debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len()); + + let Some(override_sequence) = self.override_sequence else { + return Ok(batch.clone()); + }; + + let mut columns = batch.columns().to_vec(); + let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()])); + columns[sequence_column_index(batch.num_columns())] = sequence_array; + + RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) + } +} + +/// Returns the position of the sequence column. +pub(crate) fn sequence_column_index(num_columns: usize) -> usize { + num_columns - 2 +} + +// TODO(yingwen): Add an option to skip reading internal columns. +/// Helper for reading the flat SST format with projection. +/// +/// It only supports flat format that stores primary keys additionally. +pub struct FlatReadFormat { + /// The metadata stored in the SST. + metadata: RegionMetadataRef, + /// SST file schema. + arrow_schema: SchemaRef, + /// Indices of columns to read from the SST. It contains all internal columns. + projection_indices: Vec, + /// Column id to their index in the projected schema ( + /// the schema after projection). + column_id_to_projected_index: HashMap, + /// Column id to index in SST. + column_id_to_sst_index: HashMap, + /// Sequence number to override the sequence read from the SST. + override_sequence: Option, +} + +impl FlatReadFormat { + /// Creates a helper with existing `metadata` and `column_ids` to read. + pub fn new( + metadata: RegionMetadataRef, + column_ids: impl Iterator, + ) -> FlatReadFormat { + let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + + // Creates a map to lookup index. + let id_to_index = sst_column_id_indices(&metadata); + + let format_projection = FormatProjection::compute_format_projection( + &id_to_index, + arrow_schema.fields.len(), + column_ids, + ); + + FlatReadFormat { + metadata, + arrow_schema, + projection_indices: format_projection.projection_indices, + column_id_to_projected_index: format_projection.column_id_to_projected_index, + column_id_to_sst_index: id_to_index, + override_sequence: None, + } + } + + /// Sets the sequence number to override. + #[allow(dead_code)] + pub(crate) fn set_override_sequence(&mut self, sequence: Option) { + self.override_sequence = sequence; + } + + /// Index of a column in the projected batch by its column id. + pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option { + self.column_id_to_projected_index.get(&column_id).copied() + } + + /// Returns min values of specific column in row groups. + pub fn min_values( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + self.get_stat_values(row_groups, column_id, true) + } + + /// Returns max values of specific column in row groups. + pub fn max_values( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + self.get_stat_values(row_groups, column_id, false) + } + + /// Returns null counts of specific column in row groups. + pub fn null_counts( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + let Some(index) = self.column_id_to_sst_index.get(&column_id) else { + // No such column in the SST. + return StatValues::NoColumn; + }; + + let stats = ReadFormat::column_null_counts(row_groups, *index); + StatValues::from_stats_opt(stats) + } + + /// Gets the arrow schema of the SST file. + /// + /// This schema is computed from the region metadata but should be the same + /// as the arrow schema decoded from the file metadata. + pub(crate) fn arrow_schema(&self) -> &SchemaRef { + &self.arrow_schema + } + + /// Gets the metadata of the SST. + pub(crate) fn metadata(&self) -> &RegionMetadataRef { + &self.metadata + } + + /// Gets sorted projection indices to read. + pub(crate) fn projection_indices(&self) -> &[usize] { + &self.projection_indices + } + + /// Creates a sequence array to override. + #[allow(dead_code)] + pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { + self.override_sequence + .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef) + } + + /// Convert a record batch to apply override sequence array. + /// + /// Returns a new RecordBatch with the sequence column replaced by the override sequence array. + #[allow(dead_code)] + pub(crate) fn convert_batch( + &self, + record_batch: &RecordBatch, + override_sequence_array: Option<&ArrayRef>, + ) -> Result { + let Some(override_array) = override_sequence_array else { + return Ok(record_batch.clone()); + }; + + let mut columns = record_batch.columns().to_vec(); + let sequence_column_idx = sequence_column_index(record_batch.num_columns()); + + // Use the provided override sequence array, slicing if necessary to match batch length + let sequence_array = if override_array.len() > record_batch.num_rows() { + override_array.slice(0, record_batch.num_rows()) + } else { + override_array.clone() + }; + + columns[sequence_column_idx] = sequence_array; + + RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu) + } + + fn get_stat_values( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + is_min: bool, + ) -> StatValues { + let Some(column) = self.metadata.column_by_id(column_id) else { + // No such column in the SST. + return StatValues::NoColumn; + }; + // Safety: `column_id_to_sst_index` is built from `metadata`. + let index = self.column_id_to_sst_index.get(&column_id).unwrap(); + + let stats = ReadFormat::column_values(row_groups, column, *index, is_min); + StatValues::from_stats_opt(stats) + } +} + +/// Returns a map that the key is the column id and the value is the column position +/// in the SST. +/// It only supports SSTs with raw primary key columns. +fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap { + let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len()); + let mut column_index = 0; + // keys + for pk_id in &metadata.primary_key { + id_to_index.insert(*pk_id, column_index); + column_index += 1; + } + // fields + for column in &metadata.column_metadatas { + if column.semantic_type == SemanticType::Field { + id_to_index.insert(column.column_id, column_index); + column_index += 1; + } + } + // time index + id_to_index.insert(metadata.time_index_column().column_id, column_index); + + id_to_index +} + +#[cfg(test)] +impl FlatReadFormat { + /// Creates a helper with existing `metadata` and all columns. + pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat { + Self::new( + Arc::clone(&metadata), + metadata.column_metadatas.iter().map(|c| c.column_id), + ) + } +} diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 58dc35540d..437c21c43f 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -50,6 +50,7 @@ use crate::error::{ }; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::sst::file::{FileMeta, FileTimeRange}; +use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::to_sst_arrow_schema; /// Arrow array type for the primary key dictionary. @@ -58,21 +59,21 @@ pub(crate) type PrimaryKeyArray = DictionaryArray; /// Number of columns that have fixed positions. /// /// Contains: time index and internal columns. -const FIXED_POS_COLUMN_NUM: usize = 4; +pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4; -/// Helper for writing the SST format. -pub(crate) struct WriteFormat { +/// Helper for writing the SST format with primary key. +pub(crate) struct PrimaryKeyWriteFormat { metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, override_sequence: Option, } -impl WriteFormat { +impl PrimaryKeyWriteFormat { /// Creates a new helper. - pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat { + pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat { let arrow_schema = to_sst_arrow_schema(&metadata); - WriteFormat { + PrimaryKeyWriteFormat { metadata, arrow_schema, override_sequence: None, @@ -132,8 +133,198 @@ impl WriteFormat { } } +/// Helper to read parquet formats. +pub enum ReadFormat { + PrimaryKey(PrimaryKeyReadFormat), + Flat(FlatReadFormat), +} + +impl ReadFormat { + // TODO(yingwen): Add a flag to choose format type. + pub(crate) fn new( + metadata: RegionMetadataRef, + column_ids: impl Iterator, + ) -> Self { + Self::new_primary_key(metadata, column_ids) + } + + /// Creates a helper to read the primary key format. + pub fn new_primary_key( + metadata: RegionMetadataRef, + column_ids: impl Iterator, + ) -> Self { + ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids)) + } + + /// Creates a helper to read the flat format. + pub fn new_flat( + metadata: RegionMetadataRef, + column_ids: impl Iterator, + ) -> Self { + ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids)) + } + + pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> { + match self { + ReadFormat::PrimaryKey(format) => Some(format), + _ => None, + } + } + + /// Gets the arrow schema of the SST file. + /// + /// This schema is computed from the region metadata but should be the same + /// as the arrow schema decoded from the file metadata. + pub(crate) fn arrow_schema(&self) -> &SchemaRef { + match self { + ReadFormat::PrimaryKey(format) => format.arrow_schema(), + ReadFormat::Flat(format) => format.arrow_schema(), + } + } + + /// Gets the metadata of the SST. + pub(crate) fn metadata(&self) -> &RegionMetadataRef { + match self { + ReadFormat::PrimaryKey(format) => format.metadata(), + ReadFormat::Flat(format) => format.metadata(), + } + } + + /// Gets sorted projection indices to read. + pub(crate) fn projection_indices(&self) -> &[usize] { + match self { + ReadFormat::PrimaryKey(format) => format.projection_indices(), + ReadFormat::Flat(format) => format.projection_indices(), + } + } + + /// Returns min values of specific column in row groups. + pub fn min_values( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + match self { + ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id), + ReadFormat::Flat(format) => format.min_values(row_groups, column_id), + } + } + + /// Returns max values of specific column in row groups. + pub fn max_values( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + match self { + ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id), + ReadFormat::Flat(format) => format.max_values(row_groups, column_id), + } + } + + /// Returns null counts of specific column in row groups. + pub fn null_counts( + &self, + row_groups: &[impl Borrow], + column_id: ColumnId, + ) -> StatValues { + match self { + ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id), + ReadFormat::Flat(format) => format.null_counts(row_groups, column_id), + } + } + + /// Returns min/max values of specific columns. + /// Returns None if the column does not have statistics. + /// The column should not be encoded as a part of a primary key. + pub(crate) fn column_values( + row_groups: &[impl Borrow], + column: &ColumnMetadata, + column_index: usize, + is_min: bool, + ) -> Option { + let null_scalar: ScalarValue = column + .column_schema + .data_type + .as_arrow_type() + .try_into() + .ok()?; + let scalar_values = row_groups + .iter() + .map(|meta| { + let stats = meta.borrow().column(column_index).statistics()?; + match stats { + Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + + Statistics::Int96(_) => None, + Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min { + *s.min_opt()? + } else { + *s.max_opt()? + }))), + Statistics::ByteArray(s) => { + let bytes = if is_min { + s.min_bytes_opt()? + } else { + s.max_bytes_opt()? + }; + let s = String::from_utf8(bytes.to_vec()).ok(); + Some(ScalarValue::Utf8(s)) + } + + Statistics::FixedLenByteArray(_) => None, + } + }) + .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone())) + .collect::>(); + debug_assert_eq!(scalar_values.len(), row_groups.len()); + ScalarValue::iter_to_array(scalar_values).ok() + } + + /// Returns null counts of specific columns. + /// The column should not be encoded as a part of a primary key. + pub(crate) fn column_null_counts( + row_groups: &[impl Borrow], + column_index: usize, + ) -> Option { + let values = row_groups.iter().map(|meta| { + let col = meta.borrow().column(column_index); + let stat = col.statistics()?; + stat.null_count_opt() + }); + Some(Arc::new(UInt64Array::from_iter(values))) + } + + /// Creates a sequence array to override. + pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { + match self { + ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length), + ReadFormat::Flat(format) => format.new_override_sequence_array(length), + } + } +} + /// Helper for reading the SST format. -pub struct ReadFormat { +pub struct PrimaryKeyReadFormat { /// The metadata stored in the SST. metadata: RegionMetadataRef, /// SST file schema. @@ -150,12 +341,12 @@ pub struct ReadFormat { override_sequence: Option, } -impl ReadFormat { +impl PrimaryKeyReadFormat { /// Creates a helper with existing `metadata` and `column_ids` to read. pub fn new( metadata: RegionMetadataRef, column_ids: impl Iterator, - ) -> ReadFormat { + ) -> PrimaryKeyReadFormat { let field_id_to_index: HashMap<_, _> = metadata .field_columns() .enumerate() @@ -163,42 +354,18 @@ impl ReadFormat { .collect(); let arrow_schema = to_sst_arrow_schema(&metadata); - // Maps column id of a projected field to its index in SST. - let mut projected_field_id_index: Vec<_> = column_ids - .filter_map(|column_id| { - // Only apply projection to fields. - field_id_to_index - .get(&column_id) - .copied() - .map(|index| (column_id, index)) - }) - .collect(); - let mut projection_indices: Vec<_> = projected_field_id_index - .iter() - .map(|(_column_id, index)| *index) - // We need to add all fixed position columns. - .chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len()) - .collect(); - projection_indices.sort_unstable(); + let format_projection = FormatProjection::compute_format_projection( + &field_id_to_index, + arrow_schema.fields.len(), + column_ids, + ); - // Sort fields by their indices in the SST. Then the order of fields is their order - // in the Batch. - projected_field_id_index.sort_unstable_by_key(|x| x.1); - // Because the SST put fields before other columns, we don't need to consider other - // columns. - let field_id_to_projected_index = projected_field_id_index - .into_iter() - .map(|(column_id, _)| column_id) - .enumerate() - .map(|(index, column_id)| (column_id, index)) - .collect(); - - ReadFormat { + PrimaryKeyReadFormat { metadata, arrow_schema, field_id_to_index, - projection_indices, - field_id_to_projected_index, + projection_indices: format_projection.projection_indices, + field_id_to_projected_index: format_projection.column_id_to_projected_index, override_sequence: None, } } @@ -344,12 +511,12 @@ impl ReadFormat { SemanticType::Field => { // Safety: `field_id_to_index` is initialized by the semantic type. let index = self.field_id_to_index.get(&column_id).unwrap(); - let stats = Self::column_values(row_groups, column, *index, true); + let stats = ReadFormat::column_values(row_groups, column, *index, true); StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - let stats = Self::column_values(row_groups, column, index, true); + let stats = ReadFormat::column_values(row_groups, column, index, true); StatValues::from_stats_opt(stats) } } @@ -370,12 +537,12 @@ impl ReadFormat { SemanticType::Field => { // Safety: `field_id_to_index` is initialized by the semantic type. let index = self.field_id_to_index.get(&column_id).unwrap(); - let stats = Self::column_values(row_groups, column, *index, false); + let stats = ReadFormat::column_values(row_groups, column, *index, false); StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - let stats = Self::column_values(row_groups, column, index, false); + let stats = ReadFormat::column_values(row_groups, column, index, false); StatValues::from_stats_opt(stats) } } @@ -396,12 +563,12 @@ impl ReadFormat { SemanticType::Field => { // Safety: `field_id_to_index` is initialized by the semantic type. let index = self.field_id_to_index.get(&column_id).unwrap(); - let stats = Self::column_null_counts(row_groups, *index); + let stats = ReadFormat::column_null_counts(row_groups, *index); StatValues::from_stats_opt(stats) } SemanticType::Timestamp => { let index = self.time_index_position(); - let stats = Self::column_null_counts(row_groups, index); + let stats = ReadFormat::column_null_counts(row_groups, index); StatValues::from_stats_opt(stats) } } @@ -516,84 +683,6 @@ impl ReadFormat { Some(vector.to_arrow_array()) } - /// Returns min/max values of specific non-tag columns. - /// Returns None if the column does not have statistics. - fn column_values( - row_groups: &[impl Borrow], - column: &ColumnMetadata, - column_index: usize, - is_min: bool, - ) -> Option { - let null_scalar: ScalarValue = column - .column_schema - .data_type - .as_arrow_type() - .try_into() - .ok()?; - let scalar_values = row_groups - .iter() - .map(|meta| { - let stats = meta.borrow().column(column_index).statistics()?; - match stats { - Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - - Statistics::Int96(_) => None, - Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min { - *s.min_opt()? - } else { - *s.max_opt()? - }))), - Statistics::ByteArray(s) => { - let bytes = if is_min { - s.min_bytes_opt()? - } else { - s.max_bytes_opt()? - }; - let s = String::from_utf8(bytes.to_vec()).ok(); - Some(ScalarValue::Utf8(s)) - } - - Statistics::FixedLenByteArray(_) => None, - } - }) - .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone())) - .collect::>(); - debug_assert_eq!(scalar_values.len(), row_groups.len()); - ScalarValue::iter_to_array(scalar_values).ok() - } - - /// Returns null counts of specific non-tag columns. - fn column_null_counts( - row_groups: &[impl Borrow], - column_index: usize, - ) -> Option { - let values = row_groups.iter().map(|meta| { - let col = meta.borrow().column(column_index); - let stat = col.statistics()?; - stat.null_count_opt() - }); - Some(Arc::new(UInt64Array::from_iter(values))) - } - /// Index in SST of the primary key. fn primary_key_position(&self) -> usize { self.arrow_schema.fields.len() - 3 @@ -610,6 +699,68 @@ impl ReadFormat { } } +/// Helper to compute the projection for the SST. +pub(crate) struct FormatProjection { + /// Indices of columns to read from the SST. It contains all internal columns. + pub(crate) projection_indices: Vec, + /// Column id to their index in the projected schema ( + /// the schema after projection). + pub(crate) column_id_to_projected_index: HashMap, +} + +impl FormatProjection { + /// Computes the projection. + /// + /// `id_to_index` is a mapping from column id to the index of the column in the SST. + pub(crate) fn compute_format_projection( + id_to_index: &HashMap, + sst_column_num: usize, + column_ids: impl Iterator, + ) -> Self { + // Maps column id of a projected column to its index in SST. + // It also ignores columns not in the SST. + // [(column id, index in SST)] + let mut projected_schema: Vec<_> = column_ids + .filter_map(|column_id| { + id_to_index + .get(&column_id) + .copied() + .map(|index| (column_id, index)) + }) + .collect(); + // Sorts columns by their indices in the SST. SST uses a bitmap for projection. + // This ensures the schema of `projected_schema` is the same as the batch returned from the SST. + projected_schema.sort_unstable_by_key(|x| x.1); + // Dedups the entries to avoid the case that `column_ids` has duplicated columns. + projected_schema.dedup_by_key(|x| x.1); + + // Collects all projected indices. + // It contains the positions of all columns we need to read. + let mut projection_indices: Vec<_> = projected_schema + .iter() + .map(|(_column_id, index)| *index) + // We need to add all fixed position columns. + .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num) + .collect(); + projection_indices.sort_unstable(); + // Removes duplications. + projection_indices.dedup(); + + // Creates a map from column id to the index of that column in the projected record batch. + let column_id_to_projected_index = projected_schema + .into_iter() + .map(|(column_id, _)| column_id) + .enumerate() + .map(|(index, column_id)| (column_id, index)) + .collect(); + + Self { + projection_indices, + column_id_to_projected_index, + } + } +} + /// Values of column statistics of the SST. /// /// It also distinguishes the case that a column is not found and @@ -634,9 +785,9 @@ impl StatValues { } #[cfg(test)] -impl ReadFormat { +impl PrimaryKeyReadFormat { /// Creates a helper with existing `metadata` and all columns. - pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat { + pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat { Self::new( Arc::clone(&metadata), metadata.column_metadatas.iter().map(|c| c.column_id), @@ -754,6 +905,8 @@ pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool { #[cfg(test)] mod tests { + use std::sync::Arc; + use api::v1::OpType; use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit}; @@ -764,6 +917,8 @@ mod tests { use store_api::storage::RegionId; use super::*; + use crate::sst::parquet::flat_format::FlatWriteFormat; + use crate::sst::FlatSchemaOptions; const TEST_SEQUENCE: u64 = 1; const TEST_OP_TYPE: u8 = OpType::Put as u8; @@ -870,7 +1025,7 @@ mod tests { #[test] fn test_to_sst_arrow_schema() { let metadata = build_test_region_metadata(); - let write_format = WriteFormat::new(metadata); + let write_format = PrimaryKeyWriteFormat::new(metadata); assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema()); } @@ -896,7 +1051,7 @@ mod tests { #[test] fn test_convert_batch() { let metadata = build_test_region_metadata(); - let write_format = WriteFormat::new(metadata); + let write_format = PrimaryKeyWriteFormat::new(metadata); let num_rows = 4; let batch = new_batch(b"test", 1, 2, num_rows); @@ -917,7 +1072,8 @@ mod tests { #[test] fn test_convert_batch_with_override_sequence() { let metadata = build_test_region_metadata(); - let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411)); + let write_format = + PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411)); let num_rows = 4; let batch = new_batch(b"test", 1, 2, num_rows); @@ -939,16 +1095,16 @@ mod tests { fn test_projection_indices() { let metadata = build_test_region_metadata(); // Only read tag1 - let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied()); + let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied()); assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Only read field1 - let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied()); + let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied()); assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices()); // Only read ts - let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied()); + let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied()); assert_eq!(&[2, 3, 4, 5], read_format.projection_indices()); // Read field0, tag0, ts - let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied()); + let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied()); assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices()); } @@ -995,7 +1151,7 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = ReadFormat::new(metadata, column_ids.iter().copied()); + let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied()); assert_eq!(arrow_schema, *read_format.arrow_schema()); let record_batch = RecordBatch::new_empty(arrow_schema); @@ -1014,7 +1170,7 @@ mod tests { .iter() .map(|col| col.column_id) .collect(); - let read_format = ReadFormat::new(metadata, column_ids.iter().copied()); + let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied()); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 @@ -1065,6 +1221,8 @@ mod tests { let mut batches = VecDeque::new(); read_format + .as_primary_key() + .unwrap() .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches) .unwrap(); @@ -1077,4 +1235,164 @@ mod tests { batches.into_iter().collect::>(), ); } + + fn build_test_flat_sst_schema() -> SchemaRef { + let fields = vec![ + Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first + Field::new("tag1", ArrowDataType::Int64, true), + Field::new("field1", ArrowDataType::Int64, true), // then field columns + Field::new("field0", ArrowDataType::Int64, true), + Field::new( + "ts", + ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new( + "__primary_key", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt32), + Box::new(ArrowDataType::Binary), + ), + false, + ), + Field::new("__sequence", ArrowDataType::UInt64, false), + Field::new("__op_type", ArrowDataType::UInt8, false), + ]; + Arc::new(Schema::new(fields)) + } + + #[test] + fn test_flat_to_sst_arrow_schema() { + let metadata = build_test_region_metadata(); + let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()); + assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema()); + } + + fn input_columns_for_flat_batch(num_rows: usize) -> Vec { + vec![ + Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 + Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key + Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ] + } + + #[test] + fn test_flat_convert_batch() { + let metadata = build_test_region_metadata(); + let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()); + + let num_rows = 4; + let columns: Vec = input_columns_for_flat_batch(num_rows); + let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap(); + let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap(); + + let actual = format.convert_batch(&batch).unwrap(); + assert_eq!(expect_record, actual); + } + + #[test] + fn test_flat_convert_with_override_sequence() { + let metadata = build_test_region_metadata(); + let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()) + .with_override_sequence(Some(415411)); + + let num_rows = 4; + let columns: Vec = input_columns_for_flat_batch(num_rows); + let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap(); + + let expected_columns: Vec = vec![ + Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 + Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key + Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type + ]; + let expected_record = + RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap(); + + let actual = format.convert_batch(&batch).unwrap(); + assert_eq!(expected_record, actual); + } + + #[test] + fn test_flat_projection_indices() { + let metadata = build_test_region_metadata(); + // Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7) + // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7) + + // Only read tag1 (column_id=3, index=1) + fixed columns + let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied()); + assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices()); + + // Only read field1 (column_id=4, index=2) + fixed columns + let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied()); + assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices()); + + // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed) + let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied()); + assert_eq!(&[4, 5, 6, 7], read_format.projection_indices()); + + // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns + let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied()); + assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices()); + } + + #[test] + fn test_flat_read_format_convert_batch() { + let metadata = build_test_region_metadata(); + let mut format = FlatReadFormat::new( + metadata, + std::iter::once(1), // Just read tag0 + ); + + let num_rows = 4; + let original_sequence = 100u64; + let override_sequence = 200u64; + + // Create a test record batch + let columns: Vec = input_columns_for_flat_batch(num_rows); + let mut test_columns = columns.clone(); + // Replace sequence column with original sequence values + test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows])); + let record_batch = + RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap(); + + // Test without override sequence - should return clone + let result = format.convert_batch(&record_batch, None).unwrap(); + let sequence_column = result.column( + crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()), + ); + let sequence_array = sequence_column + .as_any() + .downcast_ref::() + .unwrap(); + + let expected_original = UInt64Array::from(vec![original_sequence; num_rows]); + assert_eq!(sequence_array, &expected_original); + + // Set override sequence and test with new_override_sequence_array + format.set_override_sequence(Some(override_sequence)); + let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap(); + let result = format + .convert_batch(&record_batch, Some(&override_sequence_array)) + .unwrap(); + let sequence_column = result.column( + crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()), + ); + let sequence_array = sequence_column + .as_any() + .downcast_ref::() + .unwrap(); + + let expected_override = UInt64Array::from(vec![override_sequence; num_rows]); + assert_eq!(sequence_array, &expected_override); + } } diff --git a/src/mito2/src/sst/parquet/plain_format.rs b/src/mito2/src/sst/parquet/plain_format.rs deleted file mode 100644 index 812065a1fa..0000000000 --- a/src/mito2/src/sst/parquet/plain_format.rs +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Format to store in parquet. -//! -//! We store two additional internal columns at last: -//! - `__sequence`, the sequence number of a row. Type: uint64 -//! - `__op_type`, the op type of the row. Type: uint8 -//! -//! We store other columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()). -//! - -/// Number of columns that have fixed positions. -/// -/// Contains all internal columns. -pub(crate) const PLAIN_FIXED_POS_COLUMN_NUM: usize = 2; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 90f97bef1f..f70beeac1b 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -56,7 +56,7 @@ use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; -use crate::sst::parquet::format::{need_override_sequence, ReadFormat}; +use crate::sst::parquet::format::{need_override_sequence, PrimaryKeyReadFormat, ReadFormat}; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::row_selection::RowGroupSelection; @@ -227,11 +227,11 @@ impl ParquetReaderBuilder { // Gets the metadata stored in the SST. let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); let mut read_format = if let Some(column_ids) = &self.projection { - ReadFormat::new(region_meta.clone(), column_ids.iter().copied()) + PrimaryKeyReadFormat::new(region_meta.clone(), column_ids.iter().copied()) } else { // Lists all column ids to read, we always use the expected metadata if possible. let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); - ReadFormat::new( + PrimaryKeyReadFormat::new( region_meta.clone(), expected_meta .column_metadatas @@ -243,6 +243,7 @@ impl ParquetReaderBuilder { read_format .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get())); } + let read_format = ReadFormat::PrimaryKey(read_format); // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr(); @@ -1254,12 +1255,14 @@ impl RowGroupReaderBase where T: RowGroupReaderContext, { - /// Creates a new reader. + /// Creates a new reader to read the primary key format. pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self { // The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE. let override_sequence = context .read_format() .new_override_sequence_array(DEFAULT_READ_BATCH_SIZE); + assert!(context.read_format().as_primary_key().is_some()); + Self { context, reader, @@ -1301,11 +1304,16 @@ where }; self.metrics.num_record_batches += 1; - self.context.read_format().convert_record_batch( - &record_batch, - self.override_sequence.as_ref(), - &mut self.batches, - )?; + // Safety: We ensures the format is primary key in the RowGroupReaderBase::create(). + self.context + .read_format() + .as_primary_key() + .unwrap() + .convert_record_batch( + &record_batch, + self.override_sequence.as_ref(), + &mut self.batches, + )?; self.metrics.num_batches += self.batches.len(); } let batch = self.batches.pop_front(); diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index eb79bbd5fe..bec76cf128 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -44,7 +44,7 @@ use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu use crate::read::{Batch, Source}; use crate::sst::file::{FileId, RegionFileId}; use crate::sst::index::{Indexer, IndexerBuilder}; -use crate::sst::parquet::format::WriteFormat; +use crate::sst::parquet::format::PrimaryKeyWriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; @@ -229,8 +229,8 @@ where opts: &WriteOptions, ) -> Result { let mut results = smallvec![]; - let write_format = - WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); + let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone()) + .with_override_sequence(override_sequence); let mut stats = SourceStats::default(); while let Some(res) = self @@ -292,7 +292,7 @@ where async fn write_next_batch( &mut self, source: &mut Source, - write_format: &WriteFormat, + write_format: &PrimaryKeyWriteFormat, opts: &WriteOptions, ) -> Result> { let start = Instant::now();