feat: implements FlatReadFormat to project parquets with flat schema (#6638)

* feat: add plain read format

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: reduce unused code

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: reuse code

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: allow dead code

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: change ReadFormat to enum

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: as_primary_key() returns option

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove some allow dead_code

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename WriteFormat to PrimaryKeyWriteFormat

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add tests for read/write format

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: format code

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: dedup column ids in format

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename plain to flat

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: implements FlatReadFormat based on the new format

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support override sequence

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: new_override_sequence_array for ReadFormat

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address comment

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-04 20:43:50 +08:00
committed by Zhenchi
parent e0b1ebdfb6
commit 89b661c98a
9 changed files with 793 additions and 184 deletions

View File

@@ -887,7 +887,7 @@ mod tests {
use super::*;
use crate::memtable::bulk::context::BulkIterContext;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key,
@@ -995,7 +995,7 @@ mod tests {
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
.unwrap()
.unwrap();
let read_format = ReadFormat::new_with_all_columns(metadata.clone());
let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
let mut batches = VecDeque::new();
read_format
.convert_record_batch(&batch, None, &mut batches)

View File

@@ -30,7 +30,11 @@ use crate::error::{
ComputeArrowSnafu, CreateDefaultSnafu, InvalidRequestSnafu, NewRecordBatchSnafu, Result,
UnexpectedSnafu,
};
use crate::sst::parquet::plain_format::PLAIN_FIXED_POS_COLUMN_NUM;
/// Number of columns that have fixed positions.
///
/// Contains all internal columns.
pub(crate) const PLAIN_FIXED_POS_COLUMN_NUM: usize = 2;
/// [PlainBatch] represents a batch of rows.
/// It is a wrapper around [RecordBatch].

View File

@@ -24,11 +24,11 @@ use crate::sst::index::IndexOutput;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
pub(crate) mod file_range;
pub mod flat_format;
pub mod format;
pub(crate) mod helper;
pub(crate) mod metadata;
pub(crate) mod page_reader;
pub mod plain_format;
pub mod reader;
pub mod row_group;
pub mod row_selection;
@@ -117,7 +117,7 @@ mod tests {
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
@@ -523,7 +523,7 @@ mod tests {
let writer_props = props_builder.build();
let write_format = WriteFormat::new(metadata);
let write_format = PrimaryKeyWriteFormat::new(metadata);
let fields: Vec<_> = write_format
.arrow_schema()
.fields()

View File

@@ -308,8 +308,12 @@ impl RangeBase {
}
}
SemanticType::Field => {
let Some(field_index) =
self.read_format.field_index_by_id(filter_ctx.column_id())
// Safety: Input is Batch so we are using primary key format.
let Some(field_index) = self
.read_format
.as_primary_key()
.unwrap()
.field_index_by_id(filter_ctx.column_id())
else {
continue;
};

View File

@@ -0,0 +1,302 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Format to store in parquet.
//!
//! It can store both encoded primary key and raw key columns.
//!
//! We store two additional internal columns at last:
//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
//! - `__sequence`, the sequence number of a row. Type: uint64
//! - `__op_type`, the op type of the row. Type: uint8
//!
//! The format is
//! ```text
//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
//!
//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
use std::borrow::Borrow;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{ArrayRef, UInt64Array};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use parquet::file::metadata::RowGroupMetaData;
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{NewRecordBatchSnafu, Result};
use crate::sst::parquet::format::{FormatProjection, ReadFormat, StatValues};
use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
/// Helper for writing the SST format.
#[allow(dead_code)]
pub(crate) struct FlatWriteFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
override_sequence: Option<SequenceNumber>,
}
impl FlatWriteFormat {
/// Creates a new helper.
#[allow(dead_code)]
pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
FlatWriteFormat {
metadata,
arrow_schema,
override_sequence: None,
}
}
/// Set override sequence.
#[allow(dead_code)]
pub(crate) fn with_override_sequence(
mut self,
override_sequence: Option<SequenceNumber>,
) -> Self {
self.override_sequence = override_sequence;
self
}
/// Gets the arrow schema to store in parquet.
#[allow(dead_code)]
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}
/// Convert `batch` to a arrow record batch to store in parquet.
#[allow(dead_code)]
pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
let Some(override_sequence) = self.override_sequence else {
return Ok(batch.clone());
};
let mut columns = batch.columns().to_vec();
let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
columns[sequence_column_index(batch.num_columns())] = sequence_array;
RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
}
}
/// Returns the position of the sequence column.
pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
num_columns - 2
}
// TODO(yingwen): Add an option to skip reading internal columns.
/// Helper for reading the flat SST format with projection.
///
/// It only supports flat format that stores primary keys additionally.
pub struct FlatReadFormat {
/// The metadata stored in the SST.
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
/// Indices of columns to read from the SST. It contains all internal columns.
projection_indices: Vec<usize>,
/// Column id to their index in the projected schema (
/// the schema after projection).
column_id_to_projected_index: HashMap<ColumnId, usize>,
/// Column id to index in SST.
column_id_to_sst_index: HashMap<ColumnId, usize>,
/// Sequence number to override the sequence read from the SST.
override_sequence: Option<SequenceNumber>,
}
impl FlatReadFormat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
pub fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> FlatReadFormat {
let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(&metadata);
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
arrow_schema.fields.len(),
column_ids,
);
FlatReadFormat {
metadata,
arrow_schema,
projection_indices: format_projection.projection_indices,
column_id_to_projected_index: format_projection.column_id_to_projected_index,
column_id_to_sst_index: id_to_index,
override_sequence: None,
}
}
/// Sets the sequence number to override.
#[allow(dead_code)]
pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
self.override_sequence = sequence;
}
/// Index of a column in the projected batch by its column id.
pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
self.column_id_to_projected_index.get(&column_id).copied()
}
/// Returns min values of specific column in row groups.
pub fn min_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
self.get_stat_values(row_groups, column_id, true)
}
/// Returns max values of specific column in row groups.
pub fn max_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
self.get_stat_values(row_groups, column_id, false)
}
/// Returns null counts of specific column in row groups.
pub fn null_counts(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
let stats = ReadFormat::column_null_counts(row_groups, *index);
StatValues::from_stats_opt(stats)
}
/// Gets the arrow schema of the SST file.
///
/// This schema is computed from the region metadata but should be the same
/// as the arrow schema decoded from the file metadata.
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}
/// Gets the metadata of the SST.
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
&self.metadata
}
/// Gets sorted projection indices to read.
pub(crate) fn projection_indices(&self) -> &[usize] {
&self.projection_indices
}
/// Creates a sequence array to override.
#[allow(dead_code)]
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
self.override_sequence
.map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
}
/// Convert a record batch to apply override sequence array.
///
/// Returns a new RecordBatch with the sequence column replaced by the override sequence array.
#[allow(dead_code)]
pub(crate) fn convert_batch(
&self,
record_batch: &RecordBatch,
override_sequence_array: Option<&ArrayRef>,
) -> Result<RecordBatch> {
let Some(override_array) = override_sequence_array else {
return Ok(record_batch.clone());
};
let mut columns = record_batch.columns().to_vec();
let sequence_column_idx = sequence_column_index(record_batch.num_columns());
// Use the provided override sequence array, slicing if necessary to match batch length
let sequence_array = if override_array.len() > record_batch.num_rows() {
override_array.slice(0, record_batch.num_rows())
} else {
override_array.clone()
};
columns[sequence_column_idx] = sequence_array;
RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu)
}
fn get_stat_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
is_min: bool,
) -> StatValues {
let Some(column) = self.metadata.column_by_id(column_id) else {
// No such column in the SST.
return StatValues::NoColumn;
};
// Safety: `column_id_to_sst_index` is built from `metadata`.
let index = self.column_id_to_sst_index.get(&column_id).unwrap();
let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
StatValues::from_stats_opt(stats)
}
}
/// Returns a map that the key is the column id and the value is the column position
/// in the SST.
/// It only supports SSTs with raw primary key columns.
fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
let mut column_index = 0;
// keys
for pk_id in &metadata.primary_key {
id_to_index.insert(*pk_id, column_index);
column_index += 1;
}
// fields
for column in &metadata.column_metadatas {
if column.semantic_type == SemanticType::Field {
id_to_index.insert(column.column_id, column_index);
column_index += 1;
}
}
// time index
id_to_index.insert(metadata.time_index_column().column_id, column_index);
id_to_index
}
#[cfg(test)]
impl FlatReadFormat {
/// Creates a helper with existing `metadata` and all columns.
pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
Self::new(
Arc::clone(&metadata),
metadata.column_metadatas.iter().map(|c| c.column_id),
)
}
}

View File

@@ -50,6 +50,7 @@ use crate::error::{
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::to_sst_arrow_schema;
/// Arrow array type for the primary key dictionary.
@@ -58,21 +59,21 @@ pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
/// Number of columns that have fixed positions.
///
/// Contains: time index and internal columns.
const FIXED_POS_COLUMN_NUM: usize = 4;
pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
/// Helper for writing the SST format.
pub(crate) struct WriteFormat {
/// Helper for writing the SST format with primary key.
pub(crate) struct PrimaryKeyWriteFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
override_sequence: Option<SequenceNumber>,
}
impl WriteFormat {
impl PrimaryKeyWriteFormat {
/// Creates a new helper.
pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat {
pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
let arrow_schema = to_sst_arrow_schema(&metadata);
WriteFormat {
PrimaryKeyWriteFormat {
metadata,
arrow_schema,
override_sequence: None,
@@ -132,8 +133,198 @@ impl WriteFormat {
}
}
/// Helper to read parquet formats.
pub enum ReadFormat {
PrimaryKey(PrimaryKeyReadFormat),
Flat(FlatReadFormat),
}
impl ReadFormat {
// TODO(yingwen): Add a flag to choose format type.
pub(crate) fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> Self {
Self::new_primary_key(metadata, column_ids)
}
/// Creates a helper to read the primary key format.
pub fn new_primary_key(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> Self {
ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
}
/// Creates a helper to read the flat format.
pub fn new_flat(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> Self {
ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids))
}
pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
match self {
ReadFormat::PrimaryKey(format) => Some(format),
_ => None,
}
}
/// Gets the arrow schema of the SST file.
///
/// This schema is computed from the region metadata but should be the same
/// as the arrow schema decoded from the file metadata.
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
match self {
ReadFormat::PrimaryKey(format) => format.arrow_schema(),
ReadFormat::Flat(format) => format.arrow_schema(),
}
}
/// Gets the metadata of the SST.
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
match self {
ReadFormat::PrimaryKey(format) => format.metadata(),
ReadFormat::Flat(format) => format.metadata(),
}
}
/// Gets sorted projection indices to read.
pub(crate) fn projection_indices(&self) -> &[usize] {
match self {
ReadFormat::PrimaryKey(format) => format.projection_indices(),
ReadFormat::Flat(format) => format.projection_indices(),
}
}
/// Returns min values of specific column in row groups.
pub fn min_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
match self {
ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
}
}
/// Returns max values of specific column in row groups.
pub fn max_values(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
match self {
ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
}
}
/// Returns null counts of specific column in row groups.
pub fn null_counts(
&self,
row_groups: &[impl Borrow<RowGroupMetaData>],
column_id: ColumnId,
) -> StatValues {
match self {
ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
}
}
/// Returns min/max values of specific columns.
/// Returns None if the column does not have statistics.
/// The column should not be encoded as a part of a primary key.
pub(crate) fn column_values(
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
column_index: usize,
is_min: bool,
) -> Option<ArrayRef> {
let null_scalar: ScalarValue = column
.column_schema
.data_type
.as_arrow_type()
.try_into()
.ok()?;
let scalar_values = row_groups
.iter()
.map(|meta| {
let stats = meta.borrow().column(column_index).statistics()?;
match stats {
Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Int96(_) => None,
Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::ByteArray(s) => {
let bytes = if is_min {
s.min_bytes_opt()?
} else {
s.max_bytes_opt()?
};
let s = String::from_utf8(bytes.to_vec()).ok();
Some(ScalarValue::Utf8(s))
}
Statistics::FixedLenByteArray(_) => None,
}
})
.map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
.collect::<Vec<ScalarValue>>();
debug_assert_eq!(scalar_values.len(), row_groups.len());
ScalarValue::iter_to_array(scalar_values).ok()
}
/// Returns null counts of specific columns.
/// The column should not be encoded as a part of a primary key.
pub(crate) fn column_null_counts(
row_groups: &[impl Borrow<RowGroupMetaData>],
column_index: usize,
) -> Option<ArrayRef> {
let values = row_groups.iter().map(|meta| {
let col = meta.borrow().column(column_index);
let stat = col.statistics()?;
stat.null_count_opt()
});
Some(Arc::new(UInt64Array::from_iter(values)))
}
/// Creates a sequence array to override.
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
match self {
ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
ReadFormat::Flat(format) => format.new_override_sequence_array(length),
}
}
}
/// Helper for reading the SST format.
pub struct ReadFormat {
pub struct PrimaryKeyReadFormat {
/// The metadata stored in the SST.
metadata: RegionMetadataRef,
/// SST file schema.
@@ -150,12 +341,12 @@ pub struct ReadFormat {
override_sequence: Option<SequenceNumber>,
}
impl ReadFormat {
impl PrimaryKeyReadFormat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
pub fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> ReadFormat {
) -> PrimaryKeyReadFormat {
let field_id_to_index: HashMap<_, _> = metadata
.field_columns()
.enumerate()
@@ -163,42 +354,18 @@ impl ReadFormat {
.collect();
let arrow_schema = to_sst_arrow_schema(&metadata);
// Maps column id of a projected field to its index in SST.
let mut projected_field_id_index: Vec<_> = column_ids
.filter_map(|column_id| {
// Only apply projection to fields.
field_id_to_index
.get(&column_id)
.copied()
.map(|index| (column_id, index))
})
.collect();
let mut projection_indices: Vec<_> = projected_field_id_index
.iter()
.map(|(_column_id, index)| *index)
// We need to add all fixed position columns.
.chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len())
.collect();
projection_indices.sort_unstable();
let format_projection = FormatProjection::compute_format_projection(
&field_id_to_index,
arrow_schema.fields.len(),
column_ids,
);
// Sort fields by their indices in the SST. Then the order of fields is their order
// in the Batch.
projected_field_id_index.sort_unstable_by_key(|x| x.1);
// Because the SST put fields before other columns, we don't need to consider other
// columns.
let field_id_to_projected_index = projected_field_id_index
.into_iter()
.map(|(column_id, _)| column_id)
.enumerate()
.map(|(index, column_id)| (column_id, index))
.collect();
ReadFormat {
PrimaryKeyReadFormat {
metadata,
arrow_schema,
field_id_to_index,
projection_indices,
field_id_to_projected_index,
projection_indices: format_projection.projection_indices,
field_id_to_projected_index: format_projection.column_id_to_projected_index,
override_sequence: None,
}
}
@@ -344,12 +511,12 @@ impl ReadFormat {
SemanticType::Field => {
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, true);
let stats = ReadFormat::column_values(row_groups, column, *index, true);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
let stats = Self::column_values(row_groups, column, index, true);
let stats = ReadFormat::column_values(row_groups, column, index, true);
StatValues::from_stats_opt(stats)
}
}
@@ -370,12 +537,12 @@ impl ReadFormat {
SemanticType::Field => {
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_values(row_groups, column, *index, false);
let stats = ReadFormat::column_values(row_groups, column, *index, false);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
let stats = Self::column_values(row_groups, column, index, false);
let stats = ReadFormat::column_values(row_groups, column, index, false);
StatValues::from_stats_opt(stats)
}
}
@@ -396,12 +563,12 @@ impl ReadFormat {
SemanticType::Field => {
// Safety: `field_id_to_index` is initialized by the semantic type.
let index = self.field_id_to_index.get(&column_id).unwrap();
let stats = Self::column_null_counts(row_groups, *index);
let stats = ReadFormat::column_null_counts(row_groups, *index);
StatValues::from_stats_opt(stats)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
let stats = Self::column_null_counts(row_groups, index);
let stats = ReadFormat::column_null_counts(row_groups, index);
StatValues::from_stats_opt(stats)
}
}
@@ -516,84 +683,6 @@ impl ReadFormat {
Some(vector.to_arrow_array())
}
/// Returns min/max values of specific non-tag columns.
/// Returns None if the column does not have statistics.
fn column_values(
row_groups: &[impl Borrow<RowGroupMetaData>],
column: &ColumnMetadata,
column_index: usize,
is_min: bool,
) -> Option<ArrayRef> {
let null_scalar: ScalarValue = column
.column_schema
.data_type
.as_arrow_type()
.try_into()
.ok()?;
let scalar_values = row_groups
.iter()
.map(|meta| {
let stats = meta.borrow().column(column_index).statistics()?;
match stats {
Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Int96(_) => None,
Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
*s.min_opt()?
} else {
*s.max_opt()?
}))),
Statistics::ByteArray(s) => {
let bytes = if is_min {
s.min_bytes_opt()?
} else {
s.max_bytes_opt()?
};
let s = String::from_utf8(bytes.to_vec()).ok();
Some(ScalarValue::Utf8(s))
}
Statistics::FixedLenByteArray(_) => None,
}
})
.map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
.collect::<Vec<ScalarValue>>();
debug_assert_eq!(scalar_values.len(), row_groups.len());
ScalarValue::iter_to_array(scalar_values).ok()
}
/// Returns null counts of specific non-tag columns.
fn column_null_counts(
row_groups: &[impl Borrow<RowGroupMetaData>],
column_index: usize,
) -> Option<ArrayRef> {
let values = row_groups.iter().map(|meta| {
let col = meta.borrow().column(column_index);
let stat = col.statistics()?;
stat.null_count_opt()
});
Some(Arc::new(UInt64Array::from_iter(values)))
}
/// Index in SST of the primary key.
fn primary_key_position(&self) -> usize {
self.arrow_schema.fields.len() - 3
@@ -610,6 +699,68 @@ impl ReadFormat {
}
}
/// Helper to compute the projection for the SST.
pub(crate) struct FormatProjection {
/// Indices of columns to read from the SST. It contains all internal columns.
pub(crate) projection_indices: Vec<usize>,
/// Column id to their index in the projected schema (
/// the schema after projection).
pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
}
impl FormatProjection {
/// Computes the projection.
///
/// `id_to_index` is a mapping from column id to the index of the column in the SST.
pub(crate) fn compute_format_projection(
id_to_index: &HashMap<ColumnId, usize>,
sst_column_num: usize,
column_ids: impl Iterator<Item = ColumnId>,
) -> Self {
// Maps column id of a projected column to its index in SST.
// It also ignores columns not in the SST.
// [(column id, index in SST)]
let mut projected_schema: Vec<_> = column_ids
.filter_map(|column_id| {
id_to_index
.get(&column_id)
.copied()
.map(|index| (column_id, index))
})
.collect();
// Sorts columns by their indices in the SST. SST uses a bitmap for projection.
// This ensures the schema of `projected_schema` is the same as the batch returned from the SST.
projected_schema.sort_unstable_by_key(|x| x.1);
// Dedups the entries to avoid the case that `column_ids` has duplicated columns.
projected_schema.dedup_by_key(|x| x.1);
// Collects all projected indices.
// It contains the positions of all columns we need to read.
let mut projection_indices: Vec<_> = projected_schema
.iter()
.map(|(_column_id, index)| *index)
// We need to add all fixed position columns.
.chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
.collect();
projection_indices.sort_unstable();
// Removes duplications.
projection_indices.dedup();
// Creates a map from column id to the index of that column in the projected record batch.
let column_id_to_projected_index = projected_schema
.into_iter()
.map(|(column_id, _)| column_id)
.enumerate()
.map(|(index, column_id)| (column_id, index))
.collect();
Self {
projection_indices,
column_id_to_projected_index,
}
}
}
/// Values of column statistics of the SST.
///
/// It also distinguishes the case that a column is not found and
@@ -634,9 +785,9 @@ impl StatValues {
}
#[cfg(test)]
impl ReadFormat {
impl PrimaryKeyReadFormat {
/// Creates a helper with existing `metadata` and all columns.
pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat {
pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
Self::new(
Arc::clone(&metadata),
metadata.column_metadatas.iter().map(|c| c.column_id),
@@ -754,6 +905,8 @@ pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
@@ -764,6 +917,8 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::sst::parquet::flat_format::FlatWriteFormat;
use crate::sst::FlatSchemaOptions;
const TEST_SEQUENCE: u64 = 1;
const TEST_OP_TYPE: u8 = OpType::Put as u8;
@@ -870,7 +1025,7 @@ mod tests {
#[test]
fn test_to_sst_arrow_schema() {
let metadata = build_test_region_metadata();
let write_format = WriteFormat::new(metadata);
let write_format = PrimaryKeyWriteFormat::new(metadata);
assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
}
@@ -896,7 +1051,7 @@ mod tests {
#[test]
fn test_convert_batch() {
let metadata = build_test_region_metadata();
let write_format = WriteFormat::new(metadata);
let write_format = PrimaryKeyWriteFormat::new(metadata);
let num_rows = 4;
let batch = new_batch(b"test", 1, 2, num_rows);
@@ -917,7 +1072,8 @@ mod tests {
#[test]
fn test_convert_batch_with_override_sequence() {
let metadata = build_test_region_metadata();
let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411));
let write_format =
PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
let num_rows = 4;
let batch = new_batch(b"test", 1, 2, num_rows);
@@ -939,16 +1095,16 @@ mod tests {
fn test_projection_indices() {
let metadata = build_test_region_metadata();
// Only read tag1
let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied());
let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
// Only read field1
let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied());
let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
// Only read ts
let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied());
let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
// Read field0, tag0, ts
let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied());
let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
}
@@ -995,7 +1151,7 @@ mod tests {
.iter()
.map(|col| col.column_id)
.collect();
let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
assert_eq!(arrow_schema, *read_format.arrow_schema());
let record_batch = RecordBatch::new_empty(arrow_schema);
@@ -1014,7 +1170,7 @@ mod tests {
.iter()
.map(|col| col.column_id)
.collect();
let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
@@ -1065,6 +1221,8 @@ mod tests {
let mut batches = VecDeque::new();
read_format
.as_primary_key()
.unwrap()
.convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
.unwrap();
@@ -1077,4 +1235,164 @@ mod tests {
batches.into_iter().collect::<Vec<_>>(),
);
}
fn build_test_flat_sst_schema() -> SchemaRef {
let fields = vec![
Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
Field::new("tag1", ArrowDataType::Int64, true),
Field::new("field1", ArrowDataType::Int64, true), // then field columns
Field::new("field0", ArrowDataType::Int64, true),
Field::new(
"ts",
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
),
Field::new("__sequence", ArrowDataType::UInt64, false),
Field::new("__op_type", ArrowDataType::UInt8, false),
];
Arc::new(Schema::new(fields))
}
#[test]
fn test_flat_to_sst_arrow_schema() {
let metadata = build_test_region_metadata();
let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
}
fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
vec![
Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
]
}
#[test]
fn test_flat_convert_batch() {
let metadata = build_test_region_metadata();
let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
let num_rows = 4;
let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
let actual = format.convert_batch(&batch).unwrap();
assert_eq!(expect_record, actual);
}
#[test]
fn test_flat_convert_with_override_sequence() {
let metadata = build_test_region_metadata();
let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
.with_override_sequence(Some(415411));
let num_rows = 4;
let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
let expected_columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expected_record =
RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
let actual = format.convert_batch(&batch).unwrap();
assert_eq!(expected_record, actual);
}
#[test]
fn test_flat_projection_indices() {
let metadata = build_test_region_metadata();
// Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7)
// The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
// Only read tag1 (column_id=3, index=1) + fixed columns
let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied());
assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
// Only read field1 (column_id=4, index=2) + fixed columns
let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied());
assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
// Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied());
assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
// Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied());
assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
}
#[test]
fn test_flat_read_format_convert_batch() {
let metadata = build_test_region_metadata();
let mut format = FlatReadFormat::new(
metadata,
std::iter::once(1), // Just read tag0
);
let num_rows = 4;
let original_sequence = 100u64;
let override_sequence = 200u64;
// Create a test record batch
let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
let mut test_columns = columns.clone();
// Replace sequence column with original sequence values
test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
let record_batch =
RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
// Test without override sequence - should return clone
let result = format.convert_batch(&record_batch, None).unwrap();
let sequence_column = result.column(
crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()),
);
let sequence_array = sequence_column
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
assert_eq!(sequence_array, &expected_original);
// Set override sequence and test with new_override_sequence_array
format.set_override_sequence(Some(override_sequence));
let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
let result = format
.convert_batch(&record_batch, Some(&override_sequence_array))
.unwrap();
let sequence_column = result.column(
crate::sst::parquet::flat_format::sequence_column_index(result.num_columns()),
);
let sequence_array = sequence_column
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
assert_eq!(sequence_array, &expected_override);
}
}

View File

@@ -1,27 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Format to store in parquet.
//!
//! We store two additional internal columns at last:
//! - `__sequence`, the sequence number of a row. Type: uint64
//! - `__op_type`, the op type of the row. Type: uint8
//!
//! We store other columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
//!
/// Number of columns that have fixed positions.
///
/// Contains all internal columns.
pub(crate) const PLAIN_FIXED_POS_COLUMN_NUM: usize = 2;

View File

@@ -56,7 +56,7 @@ use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::format::{need_override_sequence, ReadFormat};
use crate::sst::parquet::format::{need_override_sequence, PrimaryKeyReadFormat, ReadFormat};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_selection::RowGroupSelection;
@@ -227,11 +227,11 @@ impl ParquetReaderBuilder {
// Gets the metadata stored in the SST.
let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
let mut read_format = if let Some(column_ids) = &self.projection {
ReadFormat::new(region_meta.clone(), column_ids.iter().copied())
PrimaryKeyReadFormat::new(region_meta.clone(), column_ids.iter().copied())
} else {
// Lists all column ids to read, we always use the expected metadata if possible.
let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
ReadFormat::new(
PrimaryKeyReadFormat::new(
region_meta.clone(),
expected_meta
.column_metadatas
@@ -243,6 +243,7 @@ impl ParquetReaderBuilder {
read_format
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
}
let read_format = ReadFormat::PrimaryKey(read_format);
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
@@ -1254,12 +1255,14 @@ impl<T> RowGroupReaderBase<T>
where
T: RowGroupReaderContext,
{
/// Creates a new reader.
/// Creates a new reader to read the primary key format.
pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
// The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
let override_sequence = context
.read_format()
.new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
assert!(context.read_format().as_primary_key().is_some());
Self {
context,
reader,
@@ -1301,11 +1304,16 @@ where
};
self.metrics.num_record_batches += 1;
self.context.read_format().convert_record_batch(
&record_batch,
self.override_sequence.as_ref(),
&mut self.batches,
)?;
// Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
self.context
.read_format()
.as_primary_key()
.unwrap()
.convert_record_batch(
&record_batch,
self.override_sequence.as_ref(),
&mut self.batches,
)?;
self.metrics.num_batches += self.batches.len();
}
let batch = self.batches.pop_front();

View File

@@ -44,7 +44,7 @@ use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu
use crate::read::{Batch, Source};
use crate::sst::file::{FileId, RegionFileId};
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
@@ -229,8 +229,8 @@ where
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut results = smallvec![];
let write_format =
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
.with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
while let Some(res) = self
@@ -292,7 +292,7 @@ where
async fn write_next_batch(
&mut self,
source: &mut Source,
write_format: &WriteFormat,
write_format: &PrimaryKeyWriteFormat,
opts: &WriteOptions,
) -> Result<Option<Batch>> {
let start = Instant::now();