mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 14:00:39 +00:00
refactor(mito2): remove PrimaryKey variants (#7982)
* refactor: make scan compat path flat-only Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: use flat projection mapper directly Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: remove dead compat batch enum Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: drop dead primary-key projection helpers Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: use flat read format directly Signed-off-by: evenyag <realevenyag@gmail.com> * chore: fix warnings Signed-off-by: evenyag <realevenyag@gmail.com> * chore: fmt code Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -60,7 +60,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
|
||||
use crate::read::BoxedRecordBatchStream;
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::scan_region::{PredicateGroup, ScanInput};
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::region::options::{MergeMode, RegionOptions};
|
||||
@@ -1002,7 +1002,7 @@ impl CompactionSstReaderBuilder<'_> {
|
||||
}
|
||||
|
||||
fn build_scan_input(self) -> Result<ScanInput> {
|
||||
let mapper = ProjectionMapper::all(&self.metadata)?;
|
||||
let mapper = FlatProjectionMapper::all(&self.metadata)?;
|
||||
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
|
||||
.with_files(self.inputs.to_vec())
|
||||
.with_append_mode(self.append_mode)
|
||||
|
||||
@@ -26,7 +26,7 @@ use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::flat_format::FlatReadFormat;
|
||||
use crate::sst::parquet::prefilter::CachedPrimaryKeyFilter;
|
||||
use crate::sst::parquet::reader::SimpleFilterContext;
|
||||
use crate::sst::parquet::stats::RowGroupPruningStats;
|
||||
@@ -77,14 +77,26 @@ impl BulkIterContext {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let read_format = ReadFormat::new(
|
||||
region_metadata.clone(),
|
||||
projection,
|
||||
true,
|
||||
None,
|
||||
"memtable",
|
||||
skip_auto_convert,
|
||||
)?;
|
||||
let read_format = if let Some(column_ids) = projection {
|
||||
FlatReadFormat::new(
|
||||
region_metadata.clone(),
|
||||
column_ids.iter().copied(),
|
||||
None,
|
||||
"memtable",
|
||||
skip_auto_convert,
|
||||
)?
|
||||
} else {
|
||||
FlatReadFormat::new(
|
||||
region_metadata.clone(),
|
||||
region_metadata
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|col| col.column_id),
|
||||
None,
|
||||
"memtable",
|
||||
skip_auto_convert,
|
||||
)?
|
||||
};
|
||||
|
||||
let dyn_filters = predicate
|
||||
.as_ref()
|
||||
@@ -143,11 +155,10 @@ impl BulkIterContext {
|
||||
|
||||
/// Extracts PK filters if flat format with dictionary-encoded PKs is used.
|
||||
fn extract_pk_filters(
|
||||
read_format: &ReadFormat,
|
||||
read_format: &FlatReadFormat,
|
||||
filters: &[SimpleFilterContext],
|
||||
) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
|
||||
let flat_format = read_format.as_flat()?;
|
||||
if flat_format.batch_has_raw_pk_columns() {
|
||||
if read_format.batch_has_raw_pk_columns() {
|
||||
return None;
|
||||
}
|
||||
let metadata = read_format.metadata();
|
||||
@@ -179,7 +190,7 @@ impl BulkIterContext {
|
||||
Some(CachedPrimaryKeyFilter::new(inner))
|
||||
}
|
||||
|
||||
pub(crate) fn read_format(&self) -> &ReadFormat {
|
||||
pub(crate) fn read_format(&self) -> &FlatReadFormat {
|
||||
&self.base.read_format
|
||||
}
|
||||
|
||||
|
||||
@@ -60,8 +60,6 @@ impl EncodedBulkPartIter {
|
||||
sequence: Option<SequenceRange>,
|
||||
mem_scan_metrics: Option<MemScanMetrics>,
|
||||
) -> error::Result<Self> {
|
||||
assert!(context.read_format().as_flat().is_some());
|
||||
|
||||
let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
|
||||
let data = encoded_part.data().clone();
|
||||
let series_count = encoded_part.metadata().num_series as usize;
|
||||
@@ -238,8 +236,6 @@ impl BulkPartBatchIter {
|
||||
series_count: usize,
|
||||
mem_scan_metrics: Option<MemScanMetrics>,
|
||||
) -> Self {
|
||||
assert!(context.read_format().as_flat().is_some());
|
||||
|
||||
let pk_filter = context.build_pk_filter();
|
||||
|
||||
Self {
|
||||
@@ -406,8 +402,7 @@ fn apply_combined_filters(
|
||||
};
|
||||
|
||||
// Converts the format to the flat format.
|
||||
let format = context.read_format().as_flat().unwrap();
|
||||
let record_batch = format.convert_batch(record_batch, None)?;
|
||||
let record_batch = context.read_format().convert_batch(record_batch, None)?;
|
||||
|
||||
let num_rows = record_batch.num_rows();
|
||||
let mut combined_filter = None;
|
||||
|
||||
@@ -36,7 +36,7 @@ pub mod series_scan;
|
||||
pub mod stream;
|
||||
pub(crate) mod unordered_scan;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -63,7 +63,6 @@ use futures::TryStreamExt;
|
||||
use futures::stream::BoxStream;
|
||||
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
|
||||
|
||||
use crate::error::{
|
||||
@@ -71,8 +70,6 @@ use crate::error::{
|
||||
Result,
|
||||
};
|
||||
use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
|
||||
use crate::read::prune::PruneReader;
|
||||
|
||||
/// Storage internal representation of a batch of rows for a primary key (time series).
|
||||
///
|
||||
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
|
||||
@@ -573,24 +570,6 @@ impl Batch {
|
||||
size
|
||||
}
|
||||
|
||||
/// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
|
||||
pub(crate) fn projected_fields(
|
||||
metadata: &RegionMetadata,
|
||||
projection: &[ColumnId],
|
||||
) -> Vec<(ColumnId, ConcreteDataType)> {
|
||||
let projected_ids: HashSet<_> = projection.iter().copied().collect();
|
||||
metadata
|
||||
.field_columns()
|
||||
.filter_map(|column| {
|
||||
if projected_ids.contains(&column.column_id) {
|
||||
Some((column.column_id, column.column_schema.data_type.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns timestamps in a native slice or `None` if the batch is empty.
|
||||
pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
|
||||
if self.timestamps.is_empty() {
|
||||
@@ -1111,8 +1090,6 @@ pub enum Source {
|
||||
Iter(BoxedBatchIterator),
|
||||
/// Source from a [BoxedBatchStream].
|
||||
Stream(BoxedBatchStream),
|
||||
/// Source from a [PruneReader].
|
||||
PruneReader(PruneReader),
|
||||
}
|
||||
|
||||
impl Source {
|
||||
@@ -1122,7 +1099,6 @@ impl Source {
|
||||
Source::Reader(reader) => reader.next_batch().await,
|
||||
Source::Iter(iter) => iter.next().transpose(),
|
||||
Source::Stream(stream) => stream.try_next().await,
|
||||
Source::PruneReader(reader) => reader.next_batch().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{Helper, VectorRef};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use mito_codec::row_converter::{
|
||||
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
|
||||
build_primary_key_codec_with_fields,
|
||||
@@ -39,127 +39,14 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::{
|
||||
CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu,
|
||||
DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
|
||||
UnsupportedOperationSnafu,
|
||||
CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
|
||||
NewRecordBatchSnafu, RecordBatchSnafu, Result, UnsupportedOperationSnafu,
|
||||
};
|
||||
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
|
||||
use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
|
||||
use crate::read::{Batch, BatchColumn, BatchReader};
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
|
||||
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
|
||||
|
||||
/// Reader to adapt schema of underlying reader to expected schema.
|
||||
pub struct CompatReader<R> {
|
||||
/// Underlying reader.
|
||||
reader: R,
|
||||
/// Helper to compat batches.
|
||||
compat: PrimaryKeyCompatBatch,
|
||||
}
|
||||
|
||||
impl<R> CompatReader<R> {
|
||||
/// Creates a new compat reader.
|
||||
/// - `mapper` is built from the metadata users expect to see.
|
||||
/// - `reader_meta` is the metadata of the input reader.
|
||||
/// - `reader` is the input reader.
|
||||
pub fn new(
|
||||
mapper: &ProjectionMapper,
|
||||
reader_meta: RegionMetadataRef,
|
||||
reader: R,
|
||||
) -> Result<CompatReader<R>> {
|
||||
Ok(CompatReader {
|
||||
reader,
|
||||
compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<R: BatchReader> BatchReader for CompatReader<R> {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
let Some(mut batch) = self.reader.next_batch().await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
batch = self.compat.compat_batch(batch)?;
|
||||
|
||||
Ok(Some(batch))
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to adapt schema of the batch to an expected schema.
|
||||
pub(crate) enum CompatBatch {
|
||||
/// Adapter for primary key format.
|
||||
PrimaryKey(PrimaryKeyCompatBatch),
|
||||
/// Adapter for flat format.
|
||||
Flat(FlatCompatBatch),
|
||||
}
|
||||
|
||||
impl CompatBatch {
|
||||
/// Returns the inner primary key batch adapter if this is a PrimaryKey format.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
|
||||
match self {
|
||||
CompatBatch::PrimaryKey(batch) => Some(batch),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the inner flat batch adapter if this is a Flat format.
|
||||
pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
|
||||
match self {
|
||||
CompatBatch::Flat(batch) => Some(batch),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper struct to adapt schema of the batch to an expected schema.
|
||||
pub(crate) struct PrimaryKeyCompatBatch {
|
||||
/// Optional primary key adapter.
|
||||
rewrite_pk: Option<RewritePrimaryKey>,
|
||||
/// Optional primary key adapter.
|
||||
compat_pk: Option<CompatPrimaryKey>,
|
||||
/// Optional fields adapter.
|
||||
compat_fields: Option<CompatFields>,
|
||||
}
|
||||
|
||||
impl PrimaryKeyCompatBatch {
|
||||
/// Creates a new [CompatBatch].
|
||||
/// - `mapper` is built from the metadata users expect to see.
|
||||
/// - `reader_meta` is the metadata of the input reader.
|
||||
pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
|
||||
let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
|
||||
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
|
||||
let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
|
||||
reason: "Unexpected format",
|
||||
})?;
|
||||
let compat_fields = may_compat_fields(mapper, &reader_meta)?;
|
||||
|
||||
Ok(Self {
|
||||
rewrite_pk,
|
||||
compat_pk,
|
||||
compat_fields,
|
||||
})
|
||||
}
|
||||
|
||||
/// Adapts the `batch` to the expected schema.
|
||||
pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
|
||||
if let Some(rewrite_pk) = &self.rewrite_pk {
|
||||
batch = rewrite_pk.compat(batch)?;
|
||||
}
|
||||
if let Some(compat_pk) = &self.compat_pk {
|
||||
batch = compat_pk.compat(batch)?;
|
||||
}
|
||||
if let Some(compat_fields) = &self.compat_fields {
|
||||
batch = compat_fields.compat(batch)?;
|
||||
}
|
||||
|
||||
Ok(batch)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if `left` and `right` have same columns and primary key encoding.
|
||||
pub(crate) fn has_same_columns_and_pk_encoding(
|
||||
left: &RegionMetadata,
|
||||
@@ -293,7 +180,6 @@ impl FlatCompatBatch {
|
||||
),
|
||||
})?;
|
||||
index_or_defaults.push(IndexOrDefault::DefaultValue {
|
||||
column_id: expect_column.column_id,
|
||||
default_vector,
|
||||
semantic_type: expect_column.semantic_type,
|
||||
});
|
||||
@@ -367,7 +253,6 @@ impl FlatCompatBatch {
|
||||
}
|
||||
}
|
||||
IndexOrDefault::DefaultValue {
|
||||
column_id: _,
|
||||
default_vector,
|
||||
semantic_type,
|
||||
} => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
|
||||
@@ -415,121 +300,6 @@ fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<Arra
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to make primary key compatible.
|
||||
#[derive(Debug)]
|
||||
struct CompatPrimaryKey {
|
||||
/// Row converter to append values to primary keys.
|
||||
converter: Arc<dyn PrimaryKeyCodec>,
|
||||
/// Default values to append.
|
||||
values: Vec<(ColumnId, Value)>,
|
||||
}
|
||||
|
||||
impl CompatPrimaryKey {
|
||||
/// Make primary key of the `batch` compatible.
|
||||
fn compat(&self, mut batch: Batch) -> Result<Batch> {
|
||||
let mut buffer = Vec::with_capacity(
|
||||
batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
|
||||
);
|
||||
buffer.extend_from_slice(batch.primary_key());
|
||||
self.converter
|
||||
.encode_values(&self.values, &mut buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
|
||||
batch.set_primary_key(buffer);
|
||||
|
||||
// update cache
|
||||
if let Some(pk_values) = &mut batch.pk_values {
|
||||
pk_values.extend(&self.values);
|
||||
}
|
||||
|
||||
Ok(batch)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to make fields compatible.
|
||||
#[derive(Debug)]
|
||||
struct CompatFields {
|
||||
/// Column Ids and DataTypes the reader actually returns.
|
||||
actual_fields: Vec<(ColumnId, ConcreteDataType)>,
|
||||
/// Indices to convert actual fields to expect fields.
|
||||
index_or_defaults: Vec<IndexOrDefault>,
|
||||
}
|
||||
|
||||
impl CompatFields {
|
||||
/// Make fields of the `batch` compatible.
|
||||
fn compat(&self, batch: Batch) -> Result<Batch> {
|
||||
debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
|
||||
debug_assert!(
|
||||
self.actual_fields
|
||||
.iter()
|
||||
.zip(batch.fields())
|
||||
.all(|((id, _), batch_column)| *id == batch_column.column_id)
|
||||
);
|
||||
|
||||
let len = batch.num_rows();
|
||||
self.index_or_defaults
|
||||
.iter()
|
||||
.map(|index_or_default| match index_or_default {
|
||||
IndexOrDefault::Index { pos, cast_type } => {
|
||||
let old_column = &batch.fields()[*pos];
|
||||
|
||||
let data = if let Some(ty) = cast_type {
|
||||
if let Some(json_type) = ty.as_json() {
|
||||
let json_array = old_column.data.to_arrow_array();
|
||||
let json_array =
|
||||
align_json_array(&json_array, &json_type.as_arrow_type())
|
||||
.context(RecordBatchSnafu)?;
|
||||
Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)?
|
||||
} else {
|
||||
old_column.data.cast(ty).with_context(|_| CastVectorSnafu {
|
||||
from: old_column.data.data_type(),
|
||||
to: ty.clone(),
|
||||
})?
|
||||
}
|
||||
} else {
|
||||
old_column.data.clone()
|
||||
};
|
||||
Ok(BatchColumn {
|
||||
column_id: old_column.column_id,
|
||||
data,
|
||||
})
|
||||
}
|
||||
IndexOrDefault::DefaultValue {
|
||||
column_id,
|
||||
default_vector,
|
||||
semantic_type: _,
|
||||
} => {
|
||||
let data = default_vector.replicate(&[len]);
|
||||
Ok(BatchColumn {
|
||||
column_id: *column_id,
|
||||
data,
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.and_then(|fields| batch.with_fields(fields))
|
||||
}
|
||||
}
|
||||
|
||||
fn may_rewrite_primary_key(
|
||||
expect: &RegionMetadata,
|
||||
actual: &RegionMetadata,
|
||||
) -> Option<RewritePrimaryKey> {
|
||||
if expect.primary_key_encoding == actual.primary_key_encoding {
|
||||
return None;
|
||||
}
|
||||
|
||||
let fields = expect.primary_key.clone();
|
||||
let original = build_primary_key_codec(actual);
|
||||
let new = build_primary_key_codec(expect);
|
||||
|
||||
Some(RewritePrimaryKey {
|
||||
original,
|
||||
new,
|
||||
fields,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns true if the actual primary keys is the same as expected.
|
||||
fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
|
||||
ensure!(
|
||||
@@ -557,113 +327,6 @@ fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Resu
|
||||
Ok(actual.primary_key.len() == expect.primary_key.len())
|
||||
}
|
||||
|
||||
/// Creates a [CompatPrimaryKey] if needed.
|
||||
fn may_compat_primary_key(
|
||||
expect: &RegionMetadata,
|
||||
actual: &RegionMetadata,
|
||||
) -> Result<Option<CompatPrimaryKey>> {
|
||||
if is_primary_key_same(expect, actual)? {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// We need to append default values to the primary key.
|
||||
let to_add = &expect.primary_key[actual.primary_key.len()..];
|
||||
let mut fields = Vec::with_capacity(to_add.len());
|
||||
let mut values = Vec::with_capacity(to_add.len());
|
||||
for column_id in to_add {
|
||||
// Safety: The id comes from expect region metadata.
|
||||
let column = expect.column_by_id(*column_id).unwrap();
|
||||
fields.push((
|
||||
*column_id,
|
||||
SortField::new(column.column_schema.data_type.clone()),
|
||||
));
|
||||
let default_value = column
|
||||
.column_schema
|
||||
.create_default()
|
||||
.context(CreateDefaultSnafu {
|
||||
region_id: expect.region_id,
|
||||
column: &column.column_schema.name,
|
||||
})?
|
||||
.with_context(|| CompatReaderSnafu {
|
||||
region_id: expect.region_id,
|
||||
reason: format!(
|
||||
"key column {} does not have a default value to read",
|
||||
column.column_schema.name
|
||||
),
|
||||
})?;
|
||||
values.push((*column_id, default_value));
|
||||
}
|
||||
// Using expect primary key encoding to build the converter
|
||||
let converter =
|
||||
build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
|
||||
|
||||
Ok(Some(CompatPrimaryKey { converter, values }))
|
||||
}
|
||||
|
||||
/// Creates a [CompatFields] if needed.
|
||||
fn may_compat_fields(
|
||||
mapper: &PrimaryKeyProjectionMapper,
|
||||
actual: &RegionMetadata,
|
||||
) -> Result<Option<CompatFields>> {
|
||||
let expect_fields = mapper.batch_fields();
|
||||
let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
|
||||
if expect_fields == actual_fields {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let source_field_index: HashMap<_, _> = actual_fields
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
|
||||
.collect();
|
||||
|
||||
let index_or_defaults = expect_fields
|
||||
.iter()
|
||||
.map(|(column_id, expect_data_type)| {
|
||||
if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
|
||||
let mut cast_type = None;
|
||||
|
||||
if expect_data_type != *actual_data_type {
|
||||
cast_type = Some(expect_data_type.clone())
|
||||
}
|
||||
// Source has this field.
|
||||
Ok(IndexOrDefault::Index {
|
||||
pos: *index,
|
||||
cast_type,
|
||||
})
|
||||
} else {
|
||||
// Safety: mapper must have this column.
|
||||
let column = mapper.metadata().column_by_id(*column_id).unwrap();
|
||||
// Create a default vector with 1 element for that column.
|
||||
let default_vector = column
|
||||
.column_schema
|
||||
.create_default_vector(1)
|
||||
.context(CreateDefaultSnafu {
|
||||
region_id: mapper.metadata().region_id,
|
||||
column: &column.column_schema.name,
|
||||
})?
|
||||
.with_context(|| CompatReaderSnafu {
|
||||
region_id: mapper.metadata().region_id,
|
||||
reason: format!(
|
||||
"column {} does not have a default value to read",
|
||||
column.column_schema.name
|
||||
),
|
||||
})?;
|
||||
Ok(IndexOrDefault::DefaultValue {
|
||||
column_id: column.column_id,
|
||||
default_vector,
|
||||
semantic_type: SemanticType::Field,
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
Ok(Some(CompatFields {
|
||||
actual_fields,
|
||||
index_or_defaults,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Index in source batch or a default value to fill a column.
|
||||
#[derive(Debug)]
|
||||
enum IndexOrDefault {
|
||||
@@ -674,8 +337,6 @@ enum IndexOrDefault {
|
||||
},
|
||||
/// Default value for the column.
|
||||
DefaultValue {
|
||||
/// Id of the column.
|
||||
column_id: ColumnId,
|
||||
/// Default value. The vector has only 1 element.
|
||||
default_vector: VectorRef,
|
||||
/// Semantic type of the column.
|
||||
@@ -683,58 +344,6 @@ enum IndexOrDefault {
|
||||
},
|
||||
}
|
||||
|
||||
/// Adapter to rewrite primary key.
|
||||
struct RewritePrimaryKey {
|
||||
/// Original primary key codec.
|
||||
original: Arc<dyn PrimaryKeyCodec>,
|
||||
/// New primary key codec.
|
||||
new: Arc<dyn PrimaryKeyCodec>,
|
||||
/// Order of the fields in the new primary key.
|
||||
fields: Vec<ColumnId>,
|
||||
}
|
||||
|
||||
impl RewritePrimaryKey {
|
||||
/// Make primary key of the `batch` compatible.
|
||||
fn compat(&self, mut batch: Batch) -> Result<Batch> {
|
||||
if batch.pk_values().is_none() {
|
||||
let new_pk_values = self
|
||||
.original
|
||||
.decode(batch.primary_key())
|
||||
.context(DecodeSnafu)?;
|
||||
batch.set_pk_values(new_pk_values);
|
||||
}
|
||||
// Safety: We ensure pk_values is not None.
|
||||
let values = batch.pk_values().unwrap();
|
||||
|
||||
let mut buffer = Vec::with_capacity(
|
||||
batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
|
||||
);
|
||||
match values {
|
||||
CompositeValues::Dense(values) => {
|
||||
self.new
|
||||
.encode_values(values.as_slice(), &mut buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
}
|
||||
CompositeValues::Sparse(values) => {
|
||||
let values = self
|
||||
.fields
|
||||
.iter()
|
||||
.map(|id| {
|
||||
let value = values.get_or_null(*id);
|
||||
(*id, value.as_value_ref())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
self.new
|
||||
.encode_value_refs(&values, &mut buffer)
|
||||
.context(EncodeSnafu)?;
|
||||
}
|
||||
}
|
||||
batch.set_primary_key(buffer);
|
||||
|
||||
Ok(batch)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to rewrite primary key to another encoding for flat format.
|
||||
struct FlatRewritePrimaryKey {
|
||||
/// New primary key encoder.
|
||||
@@ -1052,128 +661,6 @@ mod tests {
|
||||
buffer
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_pk_len() {
|
||||
let reader_meta = new_metadata(
|
||||
&[
|
||||
(
|
||||
0,
|
||||
SemanticType::Timestamp,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
),
|
||||
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(2, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
|
||||
],
|
||||
&[1, 2],
|
||||
);
|
||||
let expect_meta = new_metadata(
|
||||
&[
|
||||
(
|
||||
0,
|
||||
SemanticType::Timestamp,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
),
|
||||
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
|
||||
],
|
||||
&[1],
|
||||
);
|
||||
may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_different_pk() {
|
||||
let reader_meta = new_metadata(
|
||||
&[
|
||||
(
|
||||
0,
|
||||
SemanticType::Timestamp,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
),
|
||||
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(2, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
|
||||
],
|
||||
&[2, 1],
|
||||
);
|
||||
let expect_meta = new_metadata(
|
||||
&[
|
||||
(
|
||||
0,
|
||||
SemanticType::Timestamp,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
),
|
||||
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(2, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
|
||||
(4, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
],
|
||||
&[1, 2, 4],
|
||||
);
|
||||
may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_pk() {
|
||||
let reader_meta = new_metadata(
|
||||
&[
|
||||
(
|
||||
0,
|
||||
SemanticType::Timestamp,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
),
|
||||
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
|
||||
],
|
||||
&[1],
|
||||
);
|
||||
assert!(
|
||||
may_compat_primary_key(&reader_meta, &reader_meta)
|
||||
.unwrap()
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_pk_encoding() {
|
||||
let reader_meta = Arc::new(new_metadata(
|
||||
&[
|
||||
(
|
||||
0,
|
||||
SemanticType::Timestamp,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
),
|
||||
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
],
|
||||
&[1],
|
||||
));
|
||||
|
||||
assert!(
|
||||
may_compat_primary_key(&reader_meta, &reader_meta)
|
||||
.unwrap()
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_same_fields() {
|
||||
let reader_meta = Arc::new(new_metadata(
|
||||
&[
|
||||
(
|
||||
0,
|
||||
SemanticType::Timestamp,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
),
|
||||
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
|
||||
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
|
||||
],
|
||||
&[1],
|
||||
));
|
||||
let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
|
||||
assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
|
||||
}
|
||||
|
||||
/// Creates a primary key array for flat format testing.
|
||||
fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
|
||||
let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
|
||||
|
||||
@@ -240,11 +240,6 @@ impl FlatProjectionMapper {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
|
||||
/// Returns an empty [RecordBatch].
|
||||
pub(crate) fn empty_record_batch(&self) -> RecordBatch {
|
||||
RecordBatch::new_empty(self.output_schema.clone())
|
||||
}
|
||||
|
||||
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
|
||||
///
|
||||
/// The batch must match the `projection` using to build the mapper.
|
||||
|
||||
@@ -20,7 +20,6 @@ use async_trait::async_trait;
|
||||
use datatypes::arrow::array::{Array, BinaryArray};
|
||||
use datatypes::arrow::compute::concat_batches;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::vectors::UInt32Vector;
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{FileId, TimeSeriesRowSelector};
|
||||
@@ -35,7 +34,7 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
|
||||
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
|
||||
use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
|
||||
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
|
||||
use crate::sst::parquet::reader::FlatRowGroupReader;
|
||||
|
||||
/// Reader to keep the last row for each time series.
|
||||
/// It assumes that batches from the input reader are
|
||||
@@ -81,183 +80,6 @@ impl BatchReader for LastRowReader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cached last row reader for specific row group.
|
||||
/// If the last rows for current row group are already cached, this reader returns the cached value.
|
||||
/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache
|
||||
/// upon finish.
|
||||
pub(crate) enum RowGroupLastRowCachedReader {
|
||||
/// Cache hit, reads last rows from cached value.
|
||||
Hit(LastRowCacheReader),
|
||||
/// Cache miss, reads from row group reader and update cache.
|
||||
Miss(RowGroupLastRowReader),
|
||||
}
|
||||
|
||||
impl RowGroupLastRowCachedReader {
|
||||
pub(crate) fn new(
|
||||
file_id: FileId,
|
||||
row_group_idx: usize,
|
||||
cache_strategy: CacheStrategy,
|
||||
row_group_reader: RowGroupReader,
|
||||
) -> Self {
|
||||
let key = SelectorResultKey {
|
||||
file_id,
|
||||
row_group_idx,
|
||||
selector: TimeSeriesRowSelector::LastRow,
|
||||
};
|
||||
|
||||
if let Some(value) = cache_strategy.get_selector_result(&key) {
|
||||
let is_primary_key = matches!(&value.result, SelectorResult::PrimaryKey(_));
|
||||
let schema_matches =
|
||||
value.projection == row_group_reader.read_format().projection_indices();
|
||||
if is_primary_key && schema_matches {
|
||||
// Format and schema match, use cache batches.
|
||||
Self::new_hit(value)
|
||||
} else {
|
||||
Self::new_miss(key, row_group_reader, cache_strategy)
|
||||
}
|
||||
} else {
|
||||
Self::new_miss(key, row_group_reader, cache_strategy)
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the underlying reader metrics if uncached.
|
||||
pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
|
||||
match self {
|
||||
RowGroupLastRowCachedReader::Hit(_) => None,
|
||||
RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates new Hit variant and updates metrics.
|
||||
fn new_hit(value: Arc<SelectorResultValue>) -> Self {
|
||||
selector_result_cache_hit();
|
||||
Self::Hit(LastRowCacheReader { value, idx: 0 })
|
||||
}
|
||||
|
||||
/// Creates new Miss variant and updates metrics.
|
||||
fn new_miss(
|
||||
key: SelectorResultKey,
|
||||
row_group_reader: RowGroupReader,
|
||||
cache_strategy: CacheStrategy,
|
||||
) -> Self {
|
||||
selector_result_cache_miss();
|
||||
Self::Miss(RowGroupLastRowReader::new(
|
||||
key,
|
||||
row_group_reader,
|
||||
cache_strategy,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BatchReader for RowGroupLastRowCachedReader {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
match self {
|
||||
RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
|
||||
RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Last row reader that returns the cached last rows for row group.
|
||||
pub(crate) struct LastRowCacheReader {
|
||||
value: Arc<SelectorResultValue>,
|
||||
idx: usize,
|
||||
}
|
||||
|
||||
impl LastRowCacheReader {
|
||||
/// Iterates cached last rows.
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
let batches = match &self.value.result {
|
||||
SelectorResult::PrimaryKey(batches) => batches,
|
||||
SelectorResult::Flat(_) => unreachable!(),
|
||||
};
|
||||
if self.idx < batches.len() {
|
||||
let res = Ok(Some(batches[self.idx].clone()));
|
||||
self.idx += 1;
|
||||
res
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RowGroupLastRowReader {
|
||||
key: SelectorResultKey,
|
||||
reader: RowGroupReader,
|
||||
selector: LastRowSelector,
|
||||
yielded_batches: Vec<Batch>,
|
||||
cache_strategy: CacheStrategy,
|
||||
/// Index buffer to take a new batch from the last row.
|
||||
take_index: UInt32Vector,
|
||||
}
|
||||
|
||||
impl RowGroupLastRowReader {
|
||||
fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
|
||||
Self {
|
||||
key,
|
||||
reader,
|
||||
selector: LastRowSelector::default(),
|
||||
yielded_batches: vec![],
|
||||
cache_strategy,
|
||||
take_index: UInt32Vector::from_vec(vec![0]),
|
||||
}
|
||||
}
|
||||
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
while let Some(batch) = self.reader.next_batch().await? {
|
||||
if let Some(yielded) = self.selector.on_next(batch) {
|
||||
push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
|
||||
return Ok(Some(yielded));
|
||||
}
|
||||
}
|
||||
let last_batch = if let Some(last_batch) = self.selector.finish() {
|
||||
push_yielded_batches(
|
||||
last_batch.clone(),
|
||||
&self.take_index,
|
||||
&mut self.yielded_batches,
|
||||
)?;
|
||||
Some(last_batch)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// All last rows in row group are yielded, update cache.
|
||||
self.maybe_update_cache();
|
||||
Ok(last_batch)
|
||||
}
|
||||
|
||||
/// Updates row group's last row cache if cache manager is present.
|
||||
fn maybe_update_cache(&mut self) {
|
||||
if self.yielded_batches.is_empty() {
|
||||
// we always expect that row groups yields batches.
|
||||
return;
|
||||
}
|
||||
let value = Arc::new(SelectorResultValue::new(
|
||||
std::mem::take(&mut self.yielded_batches),
|
||||
self.reader.read_format().projection_indices().to_vec(),
|
||||
));
|
||||
self.cache_strategy.put_selector_result(self.key, value);
|
||||
}
|
||||
|
||||
fn metrics(&self) -> &ReaderMetrics {
|
||||
self.reader.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
/// Push last row into `yielded_batches`.
|
||||
fn push_yielded_batches(
|
||||
mut batch: Batch,
|
||||
take_index: &UInt32Vector,
|
||||
yielded_batches: &mut Vec<Batch>,
|
||||
) -> Result<()> {
|
||||
assert_eq!(1, batch.num_rows());
|
||||
batch.take_in_place(take_index)?;
|
||||
yielded_batches.push(batch);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Common struct that selects only the last row of each time series.
|
||||
#[derive(Default)]
|
||||
pub struct LastRowSelector {
|
||||
|
||||
@@ -12,360 +12,24 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities for projection operations.
|
||||
//! Projection helpers shared by flat projection code.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu};
|
||||
use common_recordbatch::error::DataTypesSnafu;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::{InvalidRequestSnafu, Result};
|
||||
use crate::read::Batch;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
|
||||
/// Only cache vector when its length `<=` this value.
|
||||
pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
|
||||
|
||||
/// Wrapper enum for different projection mapper implementations.
|
||||
pub enum ProjectionMapper {
|
||||
/// Projection mapper for primary key format.
|
||||
PrimaryKey(PrimaryKeyProjectionMapper),
|
||||
/// Projection mapper for flat format.
|
||||
Flat(FlatProjectionMapper),
|
||||
}
|
||||
|
||||
impl ProjectionMapper {
|
||||
/// Returns a new mapper with projection.
|
||||
pub fn new(
|
||||
metadata: &RegionMetadataRef,
|
||||
projection: impl Iterator<Item = usize> + Clone,
|
||||
) -> Result<Self> {
|
||||
Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
|
||||
metadata, projection,
|
||||
)?))
|
||||
}
|
||||
|
||||
/// Returns a new mapper with output projection and explicit read columns.
|
||||
pub fn new_with_read_columns(
|
||||
metadata: &RegionMetadataRef,
|
||||
projection: impl Iterator<Item = usize>,
|
||||
read_column_ids: Vec<ColumnId>,
|
||||
) -> Result<Self> {
|
||||
let projection: Vec<_> = projection.collect();
|
||||
Ok(ProjectionMapper::Flat(
|
||||
FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns a new mapper without projection.
|
||||
pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
|
||||
Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
|
||||
}
|
||||
|
||||
/// Returns the metadata that created the mapper.
|
||||
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(m) => m.metadata(),
|
||||
ProjectionMapper::Flat(m) => m.metadata(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the projection includes any tag columns.
|
||||
pub(crate) fn has_tags(&self) -> bool {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(m) => m.has_tags(),
|
||||
ProjectionMapper::Flat(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns ids of projected columns that we need to read
|
||||
/// from memtables and SSTs.
|
||||
pub(crate) fn column_ids(&self) -> &[ColumnId] {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(m) => m.column_ids(),
|
||||
ProjectionMapper::Flat(m) => m.column_ids(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the schema of converted [RecordBatch].
|
||||
pub(crate) fn output_schema(&self) -> SchemaRef {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(m) => m.output_schema(),
|
||||
ProjectionMapper::Flat(m) => m.output_schema(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the primary key projection mapper or None if this is not a primary key mapper.
|
||||
pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(m) => Some(m),
|
||||
ProjectionMapper::Flat(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the flat projection mapper or None if this is not a flat mapper.
|
||||
pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(_) => None,
|
||||
ProjectionMapper::Flat(m) => Some(m),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an empty [RecordBatch].
|
||||
// TODO(yingwen): This is unused now. Use it after we finishing the flat format.
|
||||
pub fn empty_record_batch(&self) -> RecordBatch {
|
||||
match self {
|
||||
ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
|
||||
ProjectionMapper::Flat(m) => m.empty_record_batch(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
|
||||
#[allow(dead_code)]
|
||||
pub struct PrimaryKeyProjectionMapper {
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
/// Maps column in [RecordBatch] to index in [Batch].
|
||||
batch_indices: Vec<BatchIndex>,
|
||||
/// Output record batch contains tags.
|
||||
has_tags: bool,
|
||||
/// Decoder for primary key.
|
||||
codec: Arc<dyn PrimaryKeyCodec>,
|
||||
/// Schema for converted [RecordBatch].
|
||||
output_schema: SchemaRef,
|
||||
/// Ids of columns to read from memtables and SSTs.
|
||||
read_column_ids: Vec<ColumnId>,
|
||||
/// Ids and DataTypes of field columns in the read [Batch].
|
||||
batch_fields: Vec<(ColumnId, ConcreteDataType)>,
|
||||
/// `true` If the original projection is empty.
|
||||
is_empty_projection: bool,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl PrimaryKeyProjectionMapper {
|
||||
/// Returns a new mapper with projection.
|
||||
/// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
|
||||
/// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
|
||||
/// empty `RecordBatch` and only use its row count in this query.
|
||||
pub fn new(
|
||||
metadata: &RegionMetadataRef,
|
||||
projection: impl Iterator<Item = usize>,
|
||||
) -> Result<PrimaryKeyProjectionMapper> {
|
||||
let projection: Vec<_> = projection.collect();
|
||||
let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
|
||||
Self::new_with_read_columns(metadata, projection, read_column_ids)
|
||||
}
|
||||
|
||||
/// Returns a new mapper with output projection and explicit read columns.
|
||||
pub fn new_with_read_columns(
|
||||
metadata: &RegionMetadataRef,
|
||||
projection: Vec<usize>,
|
||||
read_column_ids: Vec<ColumnId>,
|
||||
) -> Result<PrimaryKeyProjectionMapper> {
|
||||
// If the original projection is empty.
|
||||
let is_empty_projection = projection.is_empty();
|
||||
|
||||
let mut column_schemas = Vec::with_capacity(projection.len());
|
||||
for idx in &projection {
|
||||
// For each projection index, we get the column schema for projection
|
||||
column_schemas.push(
|
||||
metadata
|
||||
.schema
|
||||
.column_schemas()
|
||||
.get(*idx)
|
||||
.with_context(|| InvalidRequestSnafu {
|
||||
region_id: metadata.region_id,
|
||||
reason: format!("projection index {} is out of bound", idx),
|
||||
})?
|
||||
.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
let codec = build_primary_key_codec(metadata);
|
||||
// If projection is empty, we don't output any column.
|
||||
let output_schema = if is_empty_projection {
|
||||
Arc::new(Schema::new(vec![]))
|
||||
} else {
|
||||
// Safety: Columns come from existing schema.
|
||||
Arc::new(Schema::new(column_schemas))
|
||||
};
|
||||
// Get fields in each read batch.
|
||||
let batch_fields = Batch::projected_fields(metadata, &read_column_ids);
|
||||
|
||||
// Field column id to its index in batch.
|
||||
let field_id_to_index: HashMap<_, _> = batch_fields
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(index, (column_id, _))| (*column_id, index))
|
||||
.collect();
|
||||
// For each projected column, compute its index in batches.
|
||||
let mut batch_indices = Vec::with_capacity(projection.len());
|
||||
let mut has_tags = false;
|
||||
if !is_empty_projection {
|
||||
for idx in &projection {
|
||||
// Safety: idx is valid.
|
||||
let column = &metadata.column_metadatas[*idx];
|
||||
// Get column index in a batch by its semantic type and column id.
|
||||
let batch_index = match column.semantic_type {
|
||||
SemanticType::Tag => {
|
||||
// Safety: It is a primary key column.
|
||||
let index = metadata.primary_key_index(column.column_id).unwrap();
|
||||
// We need to output a tag.
|
||||
has_tags = true;
|
||||
// We always read all primary key so the column always exists and the tag
|
||||
// index is always valid.
|
||||
BatchIndex::Tag((index, column.column_id))
|
||||
}
|
||||
SemanticType::Timestamp => BatchIndex::Timestamp,
|
||||
SemanticType::Field => {
|
||||
let index = *field_id_to_index.get(&column.column_id).context(
|
||||
InvalidRequestSnafu {
|
||||
region_id: metadata.region_id,
|
||||
reason: format!(
|
||||
"field column {} is missing in read projection",
|
||||
column.column_schema.name
|
||||
),
|
||||
},
|
||||
)?;
|
||||
BatchIndex::Field(index)
|
||||
}
|
||||
};
|
||||
batch_indices.push(batch_index);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(PrimaryKeyProjectionMapper {
|
||||
metadata: metadata.clone(),
|
||||
batch_indices,
|
||||
has_tags,
|
||||
codec,
|
||||
output_schema,
|
||||
read_column_ids,
|
||||
batch_fields,
|
||||
is_empty_projection,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a new mapper without projection.
|
||||
pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
|
||||
PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
|
||||
}
|
||||
|
||||
/// Returns the metadata that created the mapper.
|
||||
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
/// Returns true if the projection includes any tag columns.
|
||||
pub(crate) fn has_tags(&self) -> bool {
|
||||
self.has_tags
|
||||
}
|
||||
|
||||
/// Returns ids of projected columns that we need to read
|
||||
/// from memtables and SSTs.
|
||||
pub(crate) fn column_ids(&self) -> &[ColumnId] {
|
||||
&self.read_column_ids
|
||||
}
|
||||
|
||||
/// Returns ids of fields in [Batch]es the mapper expects to convert.
|
||||
pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
|
||||
&self.batch_fields
|
||||
}
|
||||
|
||||
/// Returns the schema of converted [RecordBatch].
|
||||
/// This is the schema that the stream will output. This schema may contain
|
||||
/// less columns than [PrimaryKeyProjectionMapper::column_ids()].
|
||||
pub(crate) fn output_schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
|
||||
/// Returns an empty [RecordBatch].
|
||||
pub(crate) fn empty_record_batch(&self) -> RecordBatch {
|
||||
RecordBatch::new_empty(self.output_schema.clone())
|
||||
}
|
||||
|
||||
/// Converts a [Batch] to a [RecordBatch].
|
||||
///
|
||||
/// The batch must match the `projection` using to build the mapper.
|
||||
pub(crate) fn convert(
|
||||
&self,
|
||||
batch: &Batch,
|
||||
cache_strategy: &CacheStrategy,
|
||||
) -> common_recordbatch::error::Result<RecordBatch> {
|
||||
if self.is_empty_projection {
|
||||
return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
|
||||
}
|
||||
|
||||
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
|
||||
debug_assert!(
|
||||
self.batch_fields
|
||||
.iter()
|
||||
.zip(batch.fields())
|
||||
.all(|((id, _), batch_col)| *id == batch_col.column_id)
|
||||
);
|
||||
|
||||
// Skips decoding pk if we don't need to output it.
|
||||
let pk_values = if self.has_tags {
|
||||
match batch.pk_values() {
|
||||
Some(v) => v.clone(),
|
||||
None => self
|
||||
.codec
|
||||
.decode(batch.primary_key())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?,
|
||||
}
|
||||
} else {
|
||||
CompositeValues::Dense(vec![])
|
||||
};
|
||||
|
||||
let mut columns = Vec::with_capacity(self.output_schema.num_columns());
|
||||
let num_rows = batch.num_rows();
|
||||
for (index, column_schema) in self
|
||||
.batch_indices
|
||||
.iter()
|
||||
.zip(self.output_schema.column_schemas())
|
||||
{
|
||||
match index {
|
||||
BatchIndex::Tag((idx, column_id)) => {
|
||||
let value = match &pk_values {
|
||||
CompositeValues::Dense(v) => &v[*idx].1,
|
||||
CompositeValues::Sparse(v) => v.get_or_null(*column_id),
|
||||
};
|
||||
let vector = repeated_vector_with_cache(
|
||||
&column_schema.data_type,
|
||||
value,
|
||||
num_rows,
|
||||
cache_strategy,
|
||||
)?;
|
||||
columns.push(vector);
|
||||
}
|
||||
BatchIndex::Timestamp => {
|
||||
columns.push(batch.timestamps().clone());
|
||||
}
|
||||
BatchIndex::Field(idx) => {
|
||||
columns.push(batch.fields()[*idx].data.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::new(self.output_schema.clone(), columns)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn read_column_ids_from_projection(
|
||||
metadata: &RegionMetadataRef,
|
||||
projection: &[usize],
|
||||
@@ -389,18 +53,6 @@ pub(crate) fn read_column_ids_from_projection(
|
||||
Ok(column_ids)
|
||||
}
|
||||
|
||||
/// Index of a vector in a [Batch].
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[allow(dead_code)]
|
||||
enum BatchIndex {
|
||||
/// Index in primary keys.
|
||||
Tag((usize, ColumnId)),
|
||||
/// The time index column.
|
||||
Timestamp,
|
||||
/// Index in fields.
|
||||
Field(usize),
|
||||
}
|
||||
|
||||
/// Gets a vector with repeated values from specific cache or creates a new one.
|
||||
pub(crate) fn repeated_vector_with_cache(
|
||||
data_type: &ConcreteDataType,
|
||||
@@ -409,8 +61,6 @@ pub(crate) fn repeated_vector_with_cache(
|
||||
cache_strategy: &CacheStrategy,
|
||||
) -> common_recordbatch::error::Result<VectorRef> {
|
||||
if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
|
||||
// Tries to get the vector from cache manager. If the vector doesn't
|
||||
// have enough length, creates a new one.
|
||||
match vector.len().cmp(&num_rows) {
|
||||
Ordering::Less => (),
|
||||
Ordering::Equal => return Ok(vector),
|
||||
@@ -418,9 +68,7 @@ pub(crate) fn repeated_vector_with_cache(
|
||||
}
|
||||
}
|
||||
|
||||
// Creates a new one.
|
||||
let vector = new_repeated_vector(data_type, value, num_rows)?;
|
||||
// Updates cache.
|
||||
if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
|
||||
cache_strategy.put_repeated_vector(value.clone(), vector.clone());
|
||||
}
|
||||
@@ -438,7 +86,6 @@ pub(crate) fn new_repeated_vector(
|
||||
mutable_vector
|
||||
.try_push_value_ref(&value.as_value_ref())
|
||||
.context(DataTypesSnafu)?;
|
||||
// This requires an additional allocation.
|
||||
let base_vector = mutable_vector.to_vector();
|
||||
Ok(base_vector.replicate(&[num_rows]))
|
||||
}
|
||||
@@ -448,6 +95,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
|
||||
use datatypes::arrow::datatypes::Field;
|
||||
use datatypes::arrow::util::pretty;
|
||||
@@ -459,6 +107,7 @@ mod tests {
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
|
||||
fn print_record_batch(record_batch: RecordBatch) -> String {
|
||||
pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
|
||||
@@ -475,9 +124,6 @@ mod tests {
|
||||
let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
|
||||
let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
|
||||
|
||||
// Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type
|
||||
|
||||
// Primary key columns first
|
||||
for (i, tag) in idx_tags {
|
||||
let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
|
||||
*tag, num_rows,
|
||||
@@ -490,7 +136,6 @@ mod tests {
|
||||
));
|
||||
}
|
||||
|
||||
// Field columns
|
||||
for (i, field) in idx_fields {
|
||||
let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
|
||||
*field, num_rows,
|
||||
@@ -503,7 +148,6 @@ mod tests {
|
||||
));
|
||||
}
|
||||
|
||||
// Time index
|
||||
if let Some(ts_start) = ts_start {
|
||||
let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
|
||||
(0..num_rows).map(|i| ts_start + i as i64 * 1000),
|
||||
@@ -519,8 +163,6 @@ mod tests {
|
||||
));
|
||||
}
|
||||
|
||||
// __primary_key column (encoded primary key as dictionary)
|
||||
// Create encoded primary key
|
||||
let converter = DensePrimaryKeyCodec::with_fields(
|
||||
(0..idx_tags.len())
|
||||
.map(|idx| {
|
||||
@@ -535,7 +177,6 @@ mod tests {
|
||||
.encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
|
||||
.unwrap();
|
||||
|
||||
// Create dictionary array for the encoded primary key
|
||||
let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
|
||||
let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
|
||||
let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
|
||||
@@ -549,7 +190,6 @@ mod tests {
|
||||
false,
|
||||
));
|
||||
|
||||
// __sequence column
|
||||
columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
|
||||
fields.push(Field::new(
|
||||
SEQUENCE_COLUMN_NAME,
|
||||
@@ -557,7 +197,6 @@ mod tests {
|
||||
false,
|
||||
));
|
||||
|
||||
// __op_type column
|
||||
columns.push(Arc::new(UInt8Array::from_iter_values(
|
||||
(0..num_rows).map(|_| OpType::Put as u8),
|
||||
)) as _);
|
||||
@@ -581,7 +220,7 @@ mod tests {
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
let mapper = ProjectionMapper::all(&metadata).unwrap();
|
||||
let mapper = FlatProjectionMapper::all(&metadata).unwrap();
|
||||
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
|
||||
assert_eq!(
|
||||
[
|
||||
@@ -591,11 +230,11 @@ mod tests {
|
||||
(4, ConcreteDataType::int64_datatype()),
|
||||
(0, ConcreteDataType::timestamp_millisecond_datatype())
|
||||
],
|
||||
mapper.as_flat().unwrap().batch_schema()
|
||||
mapper.batch_schema()
|
||||
);
|
||||
|
||||
let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
|
||||
let record_batch = mapper.convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+---------------------+----+----+----+----+
|
||||
| ts | k0 | k1 | v0 | v1 |
|
||||
@@ -616,8 +255,7 @@ mod tests {
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
// Columns v1, k0
|
||||
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
|
||||
let mapper = FlatProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
|
||||
assert_eq!([4, 1], mapper.column_ids());
|
||||
assert_eq!(
|
||||
[
|
||||
@@ -625,11 +263,11 @@ mod tests {
|
||||
(4, ConcreteDataType::int64_datatype()),
|
||||
(0, ConcreteDataType::timestamp_millisecond_datatype())
|
||||
],
|
||||
mapper.as_flat().unwrap().batch_schema()
|
||||
mapper.batch_schema()
|
||||
);
|
||||
|
||||
let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
|
||||
let record_batch = mapper.convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+----+----+
|
||||
| v1 | k0 |
|
||||
@@ -650,14 +288,13 @@ mod tests {
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
// Output columns v1, k0. Read also includes v0.
|
||||
let mapper =
|
||||
ProjectionMapper::new_with_read_columns(&metadata, [4, 1].into_iter(), vec![4, 1, 3])
|
||||
FlatProjectionMapper::new_with_read_columns(&metadata, vec![4, 1], vec![4, 1, 3])
|
||||
.unwrap();
|
||||
assert_eq!([4, 1, 3], mapper.column_ids());
|
||||
|
||||
let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
|
||||
let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
|
||||
let record_batch = mapper.convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+----+----+
|
||||
| v1 | k0 |
|
||||
@@ -678,18 +315,16 @@ mod tests {
|
||||
.build(),
|
||||
);
|
||||
let cache = CacheStrategy::Disabled;
|
||||
// Empty projection
|
||||
let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
|
||||
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
|
||||
let mapper = FlatProjectionMapper::new(&metadata, [].into_iter()).unwrap();
|
||||
assert_eq!([0], mapper.column_ids());
|
||||
assert!(mapper.output_schema().is_empty());
|
||||
let flat_mapper = mapper.as_flat().unwrap();
|
||||
assert_eq!(
|
||||
[(0, ConcreteDataType::timestamp_millisecond_datatype())],
|
||||
flat_mapper.batch_schema()
|
||||
mapper.batch_schema()
|
||||
);
|
||||
|
||||
let batch = new_flat_batch(Some(0), &[], &[], 3);
|
||||
let record_batch = flat_mapper.convert(&batch, &cache).unwrap();
|
||||
let record_batch = mapper.convert(&batch, &cache).unwrap();
|
||||
assert_eq!(3, record_batch.num_rows());
|
||||
assert_eq!(0, record_batch.num_columns());
|
||||
assert!(record_batch.schema.is_empty());
|
||||
|
||||
@@ -24,122 +24,11 @@ use snafu::ResultExt;
|
||||
|
||||
use crate::error::{RecordBatchSnafu, Result};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::read::Batch;
|
||||
use crate::read::last_row::FlatRowGroupLastRowCachedReader;
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::parquet::file_range::FileRangeContextRef;
|
||||
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub enum Source {
|
||||
RowGroup(RowGroupReader),
|
||||
LastRow(RowGroupLastRowCachedReader),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl Source {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
match self {
|
||||
Source::RowGroup(r) => r.next_batch().await,
|
||||
Source::LastRow(r) => r.next_batch().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct PruneReader {
|
||||
/// Context for file ranges.
|
||||
context: FileRangeContextRef,
|
||||
source: Source,
|
||||
metrics: ReaderMetrics,
|
||||
/// Whether to skip field filters for this row group.
|
||||
skip_fields: bool,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl PruneReader {
|
||||
pub(crate) fn new_with_row_group_reader(
|
||||
ctx: FileRangeContextRef,
|
||||
reader: RowGroupReader,
|
||||
skip_fields: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
context: ctx,
|
||||
source: Source::RowGroup(reader),
|
||||
metrics: Default::default(),
|
||||
skip_fields,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_last_row_reader(
|
||||
ctx: FileRangeContextRef,
|
||||
reader: RowGroupLastRowCachedReader,
|
||||
skip_fields: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
context: ctx,
|
||||
source: Source::LastRow(reader),
|
||||
metrics: Default::default(),
|
||||
skip_fields,
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge metrics with the inner reader and return the merged metrics.
|
||||
pub(crate) fn metrics(&self) -> ReaderMetrics {
|
||||
let mut metrics = self.metrics.clone();
|
||||
match &self.source {
|
||||
Source::RowGroup(r) => {
|
||||
metrics.merge_from(r.metrics());
|
||||
}
|
||||
Source::LastRow(r) => {
|
||||
if let Some(inner_metrics) = r.metrics() {
|
||||
metrics.merge_from(inner_metrics);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics
|
||||
}
|
||||
|
||||
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
while let Some(b) = self.source.next_batch().await? {
|
||||
match self.prune(b)? {
|
||||
Some(b) => {
|
||||
return Ok(Some(b));
|
||||
}
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Prunes batches by the pushed down predicate.
|
||||
fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
|
||||
// fast path
|
||||
if self.context.filters().is_empty() && !self.context.has_partition_filter() {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
let num_rows_before_filter = batch.num_rows();
|
||||
let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
|
||||
// the entire batch is filtered out
|
||||
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// update metric
|
||||
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
|
||||
self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
|
||||
|
||||
if !batch_filtered.is_empty() {
|
||||
Ok(Some(batch_filtered))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics};
|
||||
|
||||
/// An iterator that prunes batches by time range.
|
||||
pub(crate) struct PruneTimeIterator {
|
||||
|
||||
@@ -631,7 +631,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::cache::CacheManager;
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
|
||||
use crate::read::scan_region::{PredicateGroup, ScanInput};
|
||||
use crate::test_util::memtable_util::metadata_with_primary_key;
|
||||
@@ -691,7 +691,7 @@ mod tests {
|
||||
) -> (StreamContext, PartitionRange) {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
|
||||
let file_id = FileId::random();
|
||||
let file = sst_file_handle_with_file_id(
|
||||
|
||||
@@ -53,8 +53,8 @@ use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
|
||||
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
|
||||
use crate::memtable::{MemtableRange, RangesOptions};
|
||||
use crate::metrics::READ_SST_COUNT;
|
||||
use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::compat::{self, FlatCompatBatch};
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
||||
use crate::read::range_cache::ScanRequestFingerprint;
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
@@ -412,12 +412,12 @@ impl ScanRegion {
|
||||
|
||||
// The mapper always computes projected column ids as the schema of SSTs may change.
|
||||
let mapper = match self.request.projection_indices() {
|
||||
Some(p) => ProjectionMapper::new_with_read_columns(
|
||||
Some(p) => FlatProjectionMapper::new_with_read_columns(
|
||||
&self.version.metadata,
|
||||
p.iter().copied(),
|
||||
p.to_vec(),
|
||||
read_column_ids.clone(),
|
||||
)?,
|
||||
None => ProjectionMapper::all(&self.version.metadata)?,
|
||||
None => FlatProjectionMapper::all(&self.version.metadata)?,
|
||||
};
|
||||
|
||||
let ssts = &self.version.ssts;
|
||||
@@ -796,7 +796,7 @@ pub struct ScanInput {
|
||||
/// Region SST access layer.
|
||||
access_layer: AccessLayerRef,
|
||||
/// Maps projected Batches to RecordBatches.
|
||||
pub(crate) mapper: Arc<ProjectionMapper>,
|
||||
pub(crate) mapper: Arc<FlatProjectionMapper>,
|
||||
/// Column ids to read from memtables and SSTs.
|
||||
/// Notice this is different from the columns in `mapper` which are projected columns.
|
||||
/// But this read columns might also include non-projected columns needed for filtering.
|
||||
@@ -852,7 +852,7 @@ pub struct ScanInput {
|
||||
impl ScanInput {
|
||||
/// Creates a new [ScanInput].
|
||||
#[must_use]
|
||||
pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
|
||||
pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
|
||||
ScanInput {
|
||||
access_layer,
|
||||
read_column_ids: mapper.column_ids().to_vec(),
|
||||
@@ -1099,7 +1099,12 @@ impl ScanInput {
|
||||
reader_metrics: &mut ReaderMetrics,
|
||||
) -> Result<FileRangeBuilder> {
|
||||
let predicate = self.predicate_for_file(file);
|
||||
let decode_pk_values = !self.compaction && self.mapper.has_tags();
|
||||
let decode_pk_values = !self.compaction
|
||||
&& self
|
||||
.mapper
|
||||
.column_ids()
|
||||
.iter()
|
||||
.any(|column_id| self.mapper.metadata().primary_key.contains(column_id));
|
||||
let reader = self
|
||||
.access_layer
|
||||
.read_sst(file.clone())
|
||||
@@ -1146,22 +1151,12 @@ impl ScanInput {
|
||||
if need_compat {
|
||||
// They have different schema. We need to adapt the batch first so the
|
||||
// mapper can convert it.
|
||||
let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
|
||||
let mapper = self.mapper.as_flat().unwrap();
|
||||
FlatCompatBatch::try_new(
|
||||
mapper,
|
||||
flat_format.metadata(),
|
||||
flat_format.format_projection(),
|
||||
self.compaction,
|
||||
)?
|
||||
.map(CompatBatch::Flat)
|
||||
} else {
|
||||
let compact_batch = PrimaryKeyCompatBatch::new(
|
||||
&self.mapper,
|
||||
file_range_ctx.read_format().metadata().clone(),
|
||||
)?;
|
||||
Some(CompatBatch::PrimaryKey(compact_batch))
|
||||
};
|
||||
let compat = FlatCompatBatch::try_new(
|
||||
&self.mapper,
|
||||
file_range_ctx.read_format().metadata(),
|
||||
file_range_ctx.read_format().format_projection(),
|
||||
self.compaction,
|
||||
)?;
|
||||
file_range_ctx.set_compat_batch(compat);
|
||||
}
|
||||
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
|
||||
@@ -1826,7 +1821,7 @@ mod tests {
|
||||
|
||||
async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
|
||||
let file = FileHandle::new(
|
||||
crate::sst::file::FileMeta::default(),
|
||||
@@ -1994,7 +1989,7 @@ mod tests {
|
||||
|
||||
let disabled = ScanInput::new(
|
||||
SchedulerEnv::new().await.access_layer.clone(),
|
||||
ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
|
||||
FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
|
||||
)
|
||||
.with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
|
||||
assert!(build_scan_fingerprint(&disabled).is_none());
|
||||
|
||||
@@ -29,10 +29,9 @@ use datatypes::timestamp::timestamp_array_to_primitive;
|
||||
use futures::Stream;
|
||||
use prometheus::IntGauge;
|
||||
use smallvec::SmallVec;
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{Result, UnexpectedSnafu};
|
||||
use crate::error::Result;
|
||||
use crate::memtable::MemScanMetrics;
|
||||
use crate::metrics::{
|
||||
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
|
||||
@@ -1358,7 +1357,7 @@ mod split_tests {
|
||||
use store_api::storage::FileId;
|
||||
|
||||
use super::*;
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
|
||||
use crate::read::scan_region::{ScanInput, StreamContext};
|
||||
use crate::sst::file::FileHandle;
|
||||
@@ -1369,7 +1368,7 @@ mod split_tests {
|
||||
async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
|
||||
|
||||
StreamContext {
|
||||
@@ -1498,7 +1497,7 @@ pub(crate) async fn scan_flat_file_ranges(
|
||||
fields(read_type = read_type, range_count = ranges.len())
|
||||
)]
|
||||
pub fn build_flat_file_range_scan_stream(
|
||||
_stream_ctx: Arc<StreamContext>,
|
||||
stream_ctx: Arc<StreamContext>,
|
||||
part_metrics: PartitionMetrics,
|
||||
read_type: &'static str,
|
||||
ranges: SmallVec<[FileRange; 2]>,
|
||||
@@ -1516,18 +1515,19 @@ pub fn build_flat_file_range_scan_stream(
|
||||
};
|
||||
for range in ranges {
|
||||
let build_reader_start = Instant::now();
|
||||
let Some(mut reader) = range.flat_reader(_stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else{continue};
|
||||
let Some(mut reader) = range
|
||||
.flat_reader(
|
||||
stream_ctx.input.series_row_selector,
|
||||
fetch_metrics.as_deref(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let build_cost = build_reader_start.elapsed();
|
||||
part_metrics.inc_build_reader_cost(build_cost);
|
||||
|
||||
let may_compat = range
|
||||
.compat_batch()
|
||||
.map(|compat| {
|
||||
compat.as_flat().context(UnexpectedSnafu {
|
||||
reason: "Invalid compat for flat format",
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
let may_compat = range.compat_batch();
|
||||
|
||||
let mapper = range.compaction_projection_mapper();
|
||||
while let Some(record_batch) = reader.next_batch().await? {
|
||||
@@ -1707,7 +1707,7 @@ mod tests {
|
||||
BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
|
||||
MemtableRangeContext, MemtableStats,
|
||||
};
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::range::{MemRangeBuilder, SourceIndex};
|
||||
use crate::read::scan_region::ScanInput;
|
||||
use crate::sst::file::{FileHandle, FileMeta};
|
||||
@@ -1741,7 +1741,7 @@ mod tests {
|
||||
) -> Arc<StreamContext> {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let metadata = metadata_for_test();
|
||||
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
|
||||
let input = ScanInput::new(env.access_layer.clone(), mapper)
|
||||
.with_cache(CacheStrategy::Disabled)
|
||||
.with_memtables(memtables)
|
||||
|
||||
@@ -213,7 +213,7 @@ impl SeqScan {
|
||||
}
|
||||
}
|
||||
|
||||
let mapper = stream_ctx.input.mapper.as_flat().unwrap();
|
||||
let mapper = &stream_ctx.input.mapper;
|
||||
let reader: BoxedRecordBatchStream = if sources.len() == 1 {
|
||||
// Currently, we can't skip dedup when there is only one source because
|
||||
// that source may have duplicate rows.
|
||||
|
||||
@@ -27,7 +27,7 @@ use snafu::ResultExt;
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::Result;
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::scan_util::PartitionMetrics;
|
||||
use crate::read::series_scan::SeriesBatch;
|
||||
|
||||
@@ -42,7 +42,7 @@ pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
|
||||
/// A stream that takes [`ScanBatch`]es and produces (converts them to) [`RecordBatch`]es.
|
||||
pub(crate) struct ConvertBatchStream {
|
||||
inner: ScanBatchStream,
|
||||
projection_mapper: Arc<ProjectionMapper>,
|
||||
projection_mapper: Arc<FlatProjectionMapper>,
|
||||
#[allow(dead_code)]
|
||||
cache_strategy: CacheStrategy,
|
||||
partition_metrics: PartitionMetrics,
|
||||
@@ -52,7 +52,7 @@ pub(crate) struct ConvertBatchStream {
|
||||
impl ConvertBatchStream {
|
||||
pub(crate) fn new(
|
||||
inner: ScanBatchStream,
|
||||
projection_mapper: Arc<ProjectionMapper>,
|
||||
projection_mapper: Arc<FlatProjectionMapper>,
|
||||
cache_strategy: CacheStrategy,
|
||||
partition_metrics: PartitionMetrics,
|
||||
) -> Self {
|
||||
@@ -75,11 +75,11 @@ impl ConvertBatchStream {
|
||||
|
||||
let SeriesBatch::Flat(flat_batch) = series;
|
||||
// Safety: Only flat format returns this batch.
|
||||
let mapper = self.projection_mapper.as_flat().unwrap();
|
||||
|
||||
for batch in flat_batch.batches {
|
||||
self.pending
|
||||
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
|
||||
self.pending.push_back(
|
||||
self.projection_mapper
|
||||
.convert(&batch, &self.cache_strategy)?,
|
||||
);
|
||||
}
|
||||
|
||||
let output_schema = self.projection_mapper.output_schema();
|
||||
@@ -90,9 +90,8 @@ impl ConvertBatchStream {
|
||||
}
|
||||
ScanBatch::RecordBatch(df_record_batch) => {
|
||||
// Safety: Only flat format returns this batch.
|
||||
let mapper = self.projection_mapper.as_flat().unwrap();
|
||||
|
||||
mapper.convert(&df_record_batch, &self.cache_strategy)
|
||||
self.projection_mapper
|
||||
.convert(&df_record_batch, &self.cache_strategy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,9 +26,8 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
|
||||
use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
|
||||
use datatypes::arrow::buffer::BooleanBuffer;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::Schema;
|
||||
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
|
||||
use mito_codec::row_converter::PrimaryKeyCodec;
|
||||
use parquet::arrow::arrow_reader::RowSelection;
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -39,22 +38,19 @@ use table::predicate::Predicate;
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::{
|
||||
ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
|
||||
EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
|
||||
UnexpectedSnafu,
|
||||
ComputeArrowSnafu, DecodeStatsSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
|
||||
RecordBatchSnafu, Result, StatsNotPresentSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::read::Batch;
|
||||
use crate::read::compat::CompatBatch;
|
||||
use crate::read::compat::FlatCompatBatch;
|
||||
use crate::read::flat_projection::CompactionProjectionMapper;
|
||||
use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
|
||||
use crate::read::prune::{FlatPruneReader, PruneReader};
|
||||
use crate::read::last_row::FlatRowGroupLastRowCachedReader;
|
||||
use crate::read::prune::FlatPruneReader;
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::parquet::flat_format::{
|
||||
DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
|
||||
DecodedPrimaryKeys, FlatReadFormat, decode_primary_keys, time_index_column_index,
|
||||
};
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::reader::{
|
||||
FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReader, RowGroupReaderBuilder,
|
||||
FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReaderBuilder,
|
||||
SimpleFilterContext,
|
||||
};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
@@ -158,69 +154,6 @@ impl FileRange {
|
||||
.unwrap_or(true) // unexpected, not skip just in case
|
||||
}
|
||||
|
||||
/// Returns a reader to read the [FileRange].
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn reader(
|
||||
&self,
|
||||
selector: Option<TimeSeriesRowSelector>,
|
||||
fetch_metrics: Option<&ParquetFetchMetrics>,
|
||||
) -> Result<Option<PruneReader>> {
|
||||
if !self.in_dynamic_filter_range() {
|
||||
return Ok(None);
|
||||
}
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = self.context.base.pre_filter_mode.skip_fields();
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder
|
||||
.build(self.context.build_context(
|
||||
self.row_group_idx,
|
||||
self.row_selection.clone(),
|
||||
fetch_metrics,
|
||||
skip_fields,
|
||||
))
|
||||
.await?;
|
||||
|
||||
let use_last_row_reader = if selector
|
||||
.map(|s| s == TimeSeriesRowSelector::LastRow)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// Only use LastRowReader if row group does not contain DELETE
|
||||
// and all rows are selected.
|
||||
let put_only = !self
|
||||
.context
|
||||
.contains_delete(self.row_group_idx)
|
||||
.inspect_err(|e| {
|
||||
error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
|
||||
})
|
||||
.unwrap_or(true);
|
||||
put_only && self.select_all()
|
||||
} else {
|
||||
// No selector provided, use RowGroupReader
|
||||
false
|
||||
};
|
||||
|
||||
let prune_reader = if use_last_row_reader {
|
||||
// Row group is PUT only, use LastRowReader to skip unnecessary rows.
|
||||
let reader = RowGroupLastRowCachedReader::new(
|
||||
self.file_handle().file_id().file_id(),
|
||||
self.row_group_idx,
|
||||
self.context.reader_builder.cache_strategy().clone(),
|
||||
RowGroupReader::new(self.context.clone(), parquet_reader),
|
||||
);
|
||||
PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
|
||||
} else {
|
||||
// Row group contains DELETE, fallback to default reader.
|
||||
PruneReader::new_with_row_group_reader(
|
||||
self.context.clone(),
|
||||
RowGroupReader::new(self.context.clone(), parquet_reader),
|
||||
skip_fields,
|
||||
)
|
||||
};
|
||||
|
||||
Ok(Some(prune_reader))
|
||||
}
|
||||
|
||||
/// Creates a flat reader that returns RecordBatch.
|
||||
pub(crate) async fn flat_reader(
|
||||
&self,
|
||||
@@ -293,7 +226,7 @@ impl FileRange {
|
||||
}
|
||||
|
||||
/// Returns the helper to compat batches.
|
||||
pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
|
||||
pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> {
|
||||
self.context.compat_batch()
|
||||
}
|
||||
|
||||
@@ -343,7 +276,7 @@ impl FileRangeContext {
|
||||
}
|
||||
|
||||
/// Returns the format helper.
|
||||
pub(crate) fn read_format(&self) -> &ReadFormat {
|
||||
pub(crate) fn read_format(&self) -> &FlatReadFormat {
|
||||
&self.base.read_format
|
||||
}
|
||||
|
||||
@@ -353,7 +286,7 @@ impl FileRangeContext {
|
||||
}
|
||||
|
||||
/// Returns the helper to compat batches.
|
||||
pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
|
||||
pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> {
|
||||
self.base.compat_batch.as_ref()
|
||||
}
|
||||
|
||||
@@ -362,18 +295,11 @@ impl FileRangeContext {
|
||||
self.base.compaction_projection_mapper.as_ref()
|
||||
}
|
||||
|
||||
/// Sets the `CompatBatch` to the context.
|
||||
pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
|
||||
/// Sets the compat helper to the context.
|
||||
pub(crate) fn set_compat_batch(&mut self, compat: Option<FlatCompatBatch>) {
|
||||
self.base.compat_batch = compat;
|
||||
}
|
||||
|
||||
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
|
||||
/// Return the filtered batch. If the entire batch is filtered out, return None.
|
||||
/// If a partition expr filter is configured, it is also applied.
|
||||
pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
|
||||
self.base.precise_filter(input, skip_fields)
|
||||
}
|
||||
|
||||
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
|
||||
/// If a partition expr filter is configured, it is also applied.
|
||||
pub(crate) fn precise_filter_flat(
|
||||
@@ -452,14 +378,14 @@ pub(crate) struct RangeBase {
|
||||
/// Dynamic filter physical exprs.
|
||||
pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
|
||||
/// Helper to read the SST.
|
||||
pub(crate) read_format: ReadFormat,
|
||||
pub(crate) read_format: FlatReadFormat,
|
||||
pub(crate) expected_metadata: Option<RegionMetadataRef>,
|
||||
/// Schema used for pruning with dynamic filters.
|
||||
pub(crate) prune_schema: Arc<Schema>,
|
||||
/// Decoder for primary keys
|
||||
pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
|
||||
/// Optional helper to compat batches.
|
||||
pub(crate) compat_batch: Option<CompatBatch>,
|
||||
pub(crate) compat_batch: Option<FlatCompatBatch>,
|
||||
/// Optional helper to project batches.
|
||||
pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
|
||||
/// Mode to pre-filter columns.
|
||||
@@ -483,122 +409,6 @@ impl TagDecodeState {
|
||||
}
|
||||
|
||||
impl RangeBase {
|
||||
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
|
||||
/// Return the filtered batch. If the entire batch is filtered out, return None.
|
||||
///
|
||||
/// Supported filter expr type is defined in [SimpleFilterEvaluator].
|
||||
///
|
||||
/// When a filter is referencing primary key column, this method will decode
|
||||
/// the primary key and put it into the batch.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `input` - The batch to filter
|
||||
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode
|
||||
pub(crate) fn precise_filter(
|
||||
&self,
|
||||
mut input: Batch,
|
||||
skip_fields: bool,
|
||||
) -> Result<Option<Batch>> {
|
||||
let mut mask = BooleanBuffer::new_set(input.num_rows());
|
||||
|
||||
// Run filter one by one and combine them result
|
||||
// TODO(ruihang): run primary key filter first. It may short circuit other filters
|
||||
for filter_ctx in &self.filters {
|
||||
let filter = match filter_ctx.filter() {
|
||||
MaybeFilter::Filter(f) => f,
|
||||
// Column matches.
|
||||
MaybeFilter::Matched => continue,
|
||||
// Column doesn't match, filter the entire batch.
|
||||
MaybeFilter::Pruned => return Ok(None),
|
||||
};
|
||||
let result = match filter_ctx.semantic_type() {
|
||||
SemanticType::Tag => {
|
||||
let pk_values = if let Some(pk_values) = input.pk_values() {
|
||||
pk_values
|
||||
} else {
|
||||
input.set_pk_values(
|
||||
self.codec
|
||||
.decode(input.primary_key())
|
||||
.context(DecodeSnafu)?,
|
||||
);
|
||||
input.pk_values().unwrap()
|
||||
};
|
||||
let pk_value = match pk_values {
|
||||
CompositeValues::Dense(v) => {
|
||||
// Safety: this is a primary key
|
||||
let pk_index = self
|
||||
.read_format
|
||||
.metadata()
|
||||
.primary_key_index(filter_ctx.column_id())
|
||||
.unwrap();
|
||||
v[pk_index]
|
||||
.1
|
||||
.try_to_scalar_value(filter_ctx.data_type())
|
||||
.context(DataTypeMismatchSnafu)?
|
||||
}
|
||||
CompositeValues::Sparse(v) => {
|
||||
let v = v.get_or_null(filter_ctx.column_id());
|
||||
v.try_to_scalar_value(filter_ctx.data_type())
|
||||
.context(DataTypeMismatchSnafu)?
|
||||
}
|
||||
};
|
||||
if filter
|
||||
.evaluate_scalar(&pk_value)
|
||||
.context(RecordBatchSnafu)?
|
||||
{
|
||||
continue;
|
||||
} else {
|
||||
// PK not match means the entire batch is filtered out.
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
SemanticType::Field => {
|
||||
// Skip field filters if skip_fields is true
|
||||
if skip_fields {
|
||||
continue;
|
||||
}
|
||||
// Safety: Input is Batch so we are using primary key format.
|
||||
let Some(field_index) = self
|
||||
.read_format
|
||||
.as_primary_key()
|
||||
.unwrap()
|
||||
.field_index_by_id(filter_ctx.column_id())
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let field_col = &input.fields()[field_index].data;
|
||||
filter
|
||||
.evaluate_vector(field_col)
|
||||
.context(RecordBatchSnafu)?
|
||||
}
|
||||
SemanticType::Timestamp => filter
|
||||
.evaluate_vector(input.timestamps())
|
||||
.context(RecordBatchSnafu)?,
|
||||
};
|
||||
|
||||
mask = mask.bitand(&result);
|
||||
}
|
||||
|
||||
if mask.count_set_bits() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Apply partition filter
|
||||
if let Some(partition_filter) = &self.partition_filter {
|
||||
let record_batch = self
|
||||
.build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?;
|
||||
let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
|
||||
mask = mask.bitand(&partition_mask);
|
||||
}
|
||||
|
||||
if mask.count_set_bits() == 0 {
|
||||
Ok(None)
|
||||
} else {
|
||||
input.filter(&BooleanArray::from(mask).into())?;
|
||||
Ok(Some(input))
|
||||
}
|
||||
}
|
||||
|
||||
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
|
||||
///
|
||||
/// It assumes all necessary tags are already decoded from the primary key.
|
||||
@@ -668,13 +478,7 @@ impl RangeBase {
|
||||
) -> Result<Option<BooleanBuffer>> {
|
||||
let mut mask = BooleanBuffer::new_set(input.num_rows());
|
||||
|
||||
let flat_format = self
|
||||
.read_format
|
||||
.as_flat()
|
||||
.context(crate::error::UnexpectedSnafu {
|
||||
reason: "Expected flat format for precise_filter_flat",
|
||||
})?;
|
||||
let metadata = flat_format.metadata();
|
||||
let metadata = self.read_format.metadata();
|
||||
|
||||
// Run filter one by one and combine them result
|
||||
for filter_ctx in &self.filters {
|
||||
@@ -700,7 +504,9 @@ impl RangeBase {
|
||||
// Get the column directly by its projected index.
|
||||
// If the column is missing and it's not a tag/time column, this filter is skipped.
|
||||
// Assumes the projection indices align with the input batch schema.
|
||||
let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
|
||||
let column_idx = self
|
||||
.read_format
|
||||
.projected_index_by_id(filter_ctx.column_id());
|
||||
if let Some(idx) = column_idx {
|
||||
let column = &input.columns().get(idx).unwrap();
|
||||
let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
|
||||
@@ -804,84 +610,6 @@ impl RangeBase {
|
||||
Ok(mask)
|
||||
}
|
||||
|
||||
/// Builds a `RecordBatch` from the input `Batch` matching the given schema.
|
||||
///
|
||||
/// This is used for partition expression evaluation. The schema should only contain
|
||||
/// the columns referenced by the partition expression to minimize overhead.
|
||||
fn build_record_batch_for_pruning(
|
||||
&self,
|
||||
input: &mut Batch,
|
||||
schema: &Arc<Schema>,
|
||||
) -> Result<RecordBatch> {
|
||||
let arrow_schema = schema.arrow_schema();
|
||||
let mut columns = Vec::with_capacity(arrow_schema.fields().len());
|
||||
|
||||
// Decode primary key if necessary.
|
||||
if input.pk_values().is_none() {
|
||||
input.set_pk_values(
|
||||
self.codec
|
||||
.decode(input.primary_key())
|
||||
.context(DecodeSnafu)?,
|
||||
);
|
||||
}
|
||||
|
||||
for field in arrow_schema.fields() {
|
||||
let metadata = self.read_format.metadata();
|
||||
let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
|
||||
|
||||
// Partition pruning schema should be a subset of the input batch schema.
|
||||
let Some(column_id) = column_id else {
|
||||
return UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Partition pruning schema expects column '{}' but it is missing in \
|
||||
region metadata",
|
||||
field.name()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
// 1. Check if it's a tag.
|
||||
if let Some(pk_index) = metadata.primary_key_index(column_id) {
|
||||
let pk_values = input.pk_values().unwrap();
|
||||
let value = match pk_values {
|
||||
CompositeValues::Dense(v) => &v[pk_index].1,
|
||||
CompositeValues::Sparse(v) => v.get_or_null(column_id),
|
||||
};
|
||||
let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
|
||||
let arrow_scalar = value
|
||||
.try_to_scalar_value(&concrete_type)
|
||||
.context(DataTypeMismatchSnafu)?;
|
||||
let array = arrow_scalar
|
||||
.to_array_of_size(input.num_rows())
|
||||
.context(EvalPartitionFilterSnafu)?;
|
||||
columns.push(array);
|
||||
} else if metadata.time_index_column().column_id == column_id {
|
||||
// 2. Check if it's the timestamp column.
|
||||
columns.push(input.timestamps().to_arrow_array());
|
||||
} else if let Some(field_index) = self
|
||||
.read_format
|
||||
.as_primary_key()
|
||||
.and_then(|f| f.field_index_by_id(column_id))
|
||||
{
|
||||
// 3. Check if it's a field column.
|
||||
columns.push(input.fields()[field_index].data.to_arrow_array());
|
||||
} else {
|
||||
return UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Partition pruning schema expects column '{}' (id {}) but it is not \
|
||||
present in input batch",
|
||||
field.name(),
|
||||
column_id
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
|
||||
}
|
||||
|
||||
/// Projects the input `RecordBatch` to match the given schema.
|
||||
///
|
||||
/// This is used for partition expression evaluation. The schema should only contain
|
||||
@@ -895,13 +623,7 @@ impl RangeBase {
|
||||
let arrow_schema = schema.arrow_schema();
|
||||
let mut columns = Vec::with_capacity(arrow_schema.fields().len());
|
||||
|
||||
let flat_format = self
|
||||
.read_format
|
||||
.as_flat()
|
||||
.context(crate::error::UnexpectedSnafu {
|
||||
reason: "Expected flat format for precise_filter_flat",
|
||||
})?;
|
||||
let metadata = flat_format.metadata();
|
||||
let metadata = self.read_format.metadata();
|
||||
|
||||
for field in arrow_schema.fields() {
|
||||
let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
|
||||
@@ -917,7 +639,7 @@ impl RangeBase {
|
||||
.fail();
|
||||
};
|
||||
|
||||
if let Some(idx) = flat_format.projected_index_by_id(column_id) {
|
||||
if let Some(idx) = self.read_format.projected_index_by_id(column_id) {
|
||||
columns.push(input.column(idx).clone());
|
||||
continue;
|
||||
}
|
||||
@@ -957,12 +679,12 @@ mod tests {
|
||||
use datafusion_expr::{col, lit};
|
||||
|
||||
use super::*;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::flat_format::FlatReadFormat;
|
||||
use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata};
|
||||
|
||||
fn new_test_range_base(filters: Vec<SimpleFilterContext>) -> RangeBase {
|
||||
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
let read_format = ReadFormat::new_flat(
|
||||
let read_format = FlatReadFormat::new(
|
||||
metadata.clone(),
|
||||
metadata.column_metadatas.iter().map(|c| c.column_id),
|
||||
None,
|
||||
|
||||
@@ -53,7 +53,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::sst::parquet::format::{
|
||||
FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
|
||||
PrimaryKeyReadFormat, ReadFormat, StatValues,
|
||||
PrimaryKeyReadFormat, StatValues, column_null_counts, column_values,
|
||||
};
|
||||
use crate::sst::{
|
||||
FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
|
||||
@@ -518,7 +518,7 @@ impl ParquetFlat {
|
||||
return StatValues::NoColumn;
|
||||
};
|
||||
|
||||
let stats = ReadFormat::column_null_counts(row_groups, *index);
|
||||
let stats = column_null_counts(row_groups, *index);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
|
||||
@@ -535,7 +535,7 @@ impl ParquetFlat {
|
||||
// Safety: `column_id_to_sst_index` is built from `metadata`.
|
||||
let index = self.column_id_to_sst_index.get(&column_id).unwrap();
|
||||
|
||||
let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
|
||||
let stats = column_values(row_groups, column, *index, is_min);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,8 +41,7 @@ use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::vectors::Helper;
|
||||
use mito_codec::row_converter::{
|
||||
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
|
||||
build_primary_key_codec_with_fields,
|
||||
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields,
|
||||
};
|
||||
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
|
||||
use parquet::file::statistics::Statistics;
|
||||
@@ -55,7 +54,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::sst::file::{FileMeta, FileTimeRange};
|
||||
use crate::sst::parquet::flat_format::FlatReadFormat;
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
|
||||
/// Arrow array type for the primary key dictionary.
|
||||
@@ -125,267 +123,82 @@ impl PrimaryKeyWriteFormat {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to read parquet formats.
|
||||
pub enum ReadFormat {
|
||||
/// The parquet is in the old primary key format.
|
||||
PrimaryKey(PrimaryKeyReadFormat),
|
||||
/// The parquet is in the new flat format.
|
||||
Flat(FlatReadFormat),
|
||||
/// Returns min/max values of specific columns.
|
||||
/// Returns None if the column does not have statistics.
|
||||
/// The column should not be encoded as a part of a primary key.
|
||||
pub(crate) fn column_values(
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column: &ColumnMetadata,
|
||||
column_index: usize,
|
||||
is_min: bool,
|
||||
) -> Option<ArrayRef> {
|
||||
let null_scalar: ScalarValue = column
|
||||
.column_schema
|
||||
.data_type
|
||||
.as_arrow_type()
|
||||
.try_into()
|
||||
.ok()?;
|
||||
let scalar_values = row_groups
|
||||
.iter()
|
||||
.map(|meta| {
|
||||
let stats = meta.borrow().column(column_index).statistics()?;
|
||||
match stats {
|
||||
Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::Int96(_) => None,
|
||||
Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::ByteArray(s) => {
|
||||
let bytes = if is_min {
|
||||
s.min_bytes_opt()?
|
||||
} else {
|
||||
s.max_bytes_opt()?
|
||||
};
|
||||
let s = String::from_utf8(bytes.to_vec()).ok();
|
||||
Some(ScalarValue::Utf8(s))
|
||||
}
|
||||
Statistics::FixedLenByteArray(_) => None,
|
||||
}
|
||||
})
|
||||
.map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
|
||||
.collect::<Vec<ScalarValue>>();
|
||||
debug_assert_eq!(scalar_values.len(), row_groups.len());
|
||||
ScalarValue::iter_to_array(scalar_values).ok()
|
||||
}
|
||||
|
||||
impl ReadFormat {
|
||||
/// Creates a helper to read the primary key format.
|
||||
pub fn new_primary_key(
|
||||
metadata: RegionMetadataRef,
|
||||
column_ids: impl Iterator<Item = ColumnId>,
|
||||
) -> Self {
|
||||
ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
|
||||
}
|
||||
|
||||
/// Creates a helper to read the flat format.
|
||||
pub fn new_flat(
|
||||
metadata: RegionMetadataRef,
|
||||
column_ids: impl Iterator<Item = ColumnId>,
|
||||
num_columns: Option<usize>,
|
||||
file_path: &str,
|
||||
skip_auto_convert: bool,
|
||||
) -> Result<Self> {
|
||||
Ok(ReadFormat::Flat(FlatReadFormat::new(
|
||||
metadata,
|
||||
column_ids,
|
||||
num_columns,
|
||||
file_path,
|
||||
skip_auto_convert,
|
||||
)?))
|
||||
}
|
||||
|
||||
/// Creates a new read format.
|
||||
pub fn new(
|
||||
region_metadata: RegionMetadataRef,
|
||||
projection: Option<&[ColumnId]>,
|
||||
flat_format: bool,
|
||||
num_columns: Option<usize>,
|
||||
file_path: &str,
|
||||
skip_auto_convert: bool,
|
||||
) -> Result<ReadFormat> {
|
||||
if flat_format {
|
||||
if let Some(column_ids) = projection {
|
||||
ReadFormat::new_flat(
|
||||
region_metadata,
|
||||
column_ids.iter().copied(),
|
||||
num_columns,
|
||||
file_path,
|
||||
skip_auto_convert,
|
||||
)
|
||||
} else {
|
||||
// No projection, lists all column ids to read.
|
||||
ReadFormat::new_flat(
|
||||
region_metadata.clone(),
|
||||
region_metadata
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|col| col.column_id),
|
||||
num_columns,
|
||||
file_path,
|
||||
skip_auto_convert,
|
||||
)
|
||||
}
|
||||
} else if let Some(column_ids) = projection {
|
||||
Ok(ReadFormat::new_primary_key(
|
||||
region_metadata,
|
||||
column_ids.iter().copied(),
|
||||
))
|
||||
} else {
|
||||
// No projection, lists all column ids to read.
|
||||
Ok(ReadFormat::new_primary_key(
|
||||
region_metadata.clone(),
|
||||
region_metadata
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|col| col.column_id),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => Some(format),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
|
||||
match self {
|
||||
ReadFormat::Flat(format) => Some(format),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the arrow schema of the SST file.
|
||||
///
|
||||
/// This schema is computed from the region metadata but should be the same
|
||||
/// as the arrow schema decoded from the file metadata.
|
||||
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.arrow_schema(),
|
||||
ReadFormat::Flat(format) => format.arrow_schema(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the metadata of the SST.
|
||||
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.metadata(),
|
||||
ReadFormat::Flat(format) => format.metadata(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets sorted projection indices to read.
|
||||
pub(crate) fn projection_indices(&self) -> &[usize] {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.projection_indices(),
|
||||
ReadFormat::Flat(format) => format.projection_indices(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns min values of specific column in row groups.
|
||||
pub fn min_values(
|
||||
&self,
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_id: ColumnId,
|
||||
) -> StatValues {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
|
||||
ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns max values of specific column in row groups.
|
||||
pub fn max_values(
|
||||
&self,
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_id: ColumnId,
|
||||
) -> StatValues {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
|
||||
ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns null counts of specific column in row groups.
|
||||
pub fn null_counts(
|
||||
&self,
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_id: ColumnId,
|
||||
) -> StatValues {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
|
||||
ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns min/max values of specific columns.
|
||||
/// Returns None if the column does not have statistics.
|
||||
/// The column should not be encoded as a part of a primary key.
|
||||
pub(crate) fn column_values(
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column: &ColumnMetadata,
|
||||
column_index: usize,
|
||||
is_min: bool,
|
||||
) -> Option<ArrayRef> {
|
||||
let null_scalar: ScalarValue = column
|
||||
.column_schema
|
||||
.data_type
|
||||
.as_arrow_type()
|
||||
.try_into()
|
||||
.ok()?;
|
||||
let scalar_values = row_groups
|
||||
.iter()
|
||||
.map(|meta| {
|
||||
let stats = meta.borrow().column(column_index).statistics()?;
|
||||
match stats {
|
||||
Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
|
||||
Statistics::Int96(_) => None,
|
||||
Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
|
||||
*s.min_opt()?
|
||||
} else {
|
||||
*s.max_opt()?
|
||||
}))),
|
||||
Statistics::ByteArray(s) => {
|
||||
let bytes = if is_min {
|
||||
s.min_bytes_opt()?
|
||||
} else {
|
||||
s.max_bytes_opt()?
|
||||
};
|
||||
let s = String::from_utf8(bytes.to_vec()).ok();
|
||||
Some(ScalarValue::Utf8(s))
|
||||
}
|
||||
|
||||
Statistics::FixedLenByteArray(_) => None,
|
||||
}
|
||||
})
|
||||
.map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
|
||||
.collect::<Vec<ScalarValue>>();
|
||||
debug_assert_eq!(scalar_values.len(), row_groups.len());
|
||||
ScalarValue::iter_to_array(scalar_values).ok()
|
||||
}
|
||||
|
||||
/// Returns null counts of specific columns.
|
||||
/// The column should not be encoded as a part of a primary key.
|
||||
pub(crate) fn column_null_counts(
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_index: usize,
|
||||
) -> Option<ArrayRef> {
|
||||
let values = row_groups.iter().map(|meta| {
|
||||
let col = meta.borrow().column(column_index);
|
||||
let stat = col.statistics()?;
|
||||
stat.null_count_opt()
|
||||
});
|
||||
Some(Arc::new(UInt64Array::from_iter(values)))
|
||||
}
|
||||
|
||||
/// Sets the sequence number to override.
|
||||
pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
|
||||
ReadFormat::Flat(format) => format.set_override_sequence(sequence),
|
||||
}
|
||||
}
|
||||
|
||||
/// Enables or disables eager decoding of primary key values into batches.
|
||||
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
|
||||
if let ReadFormat::PrimaryKey(format) = self {
|
||||
format.set_decode_primary_key_values(decode);
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a sequence array to override.
|
||||
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
|
||||
match self {
|
||||
ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
|
||||
ReadFormat::Flat(format) => format.new_override_sequence_array(length),
|
||||
}
|
||||
}
|
||||
/// Returns null counts of specific columns.
|
||||
/// The column should not be encoded as a part of a primary key.
|
||||
pub(crate) fn column_null_counts(
|
||||
row_groups: &[impl Borrow<RowGroupMetaData>],
|
||||
column_index: usize,
|
||||
) -> Option<ArrayRef> {
|
||||
let values = row_groups.iter().map(|meta| {
|
||||
let col = meta.borrow().column(column_index);
|
||||
let stat = col.statistics()?;
|
||||
stat.null_count_opt()
|
||||
});
|
||||
Some(Arc::new(UInt64Array::from_iter(values)))
|
||||
}
|
||||
|
||||
/// Helper for reading the SST format.
|
||||
@@ -402,8 +215,6 @@ pub struct PrimaryKeyReadFormat {
|
||||
/// Field column id to their index in the projected schema (
|
||||
/// the schema of [Batch]).
|
||||
field_id_to_projected_index: HashMap<ColumnId, usize>,
|
||||
/// Sequence number to override the sequence read from the SST.
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
/// Codec used to decode primary key values if eager decoding is enabled.
|
||||
primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
|
||||
}
|
||||
@@ -433,25 +244,10 @@ impl PrimaryKeyReadFormat {
|
||||
field_id_to_index,
|
||||
projection_indices: format_projection.projection_indices,
|
||||
field_id_to_projected_index: format_projection.column_id_to_projected_index,
|
||||
override_sequence: None,
|
||||
primary_key_codec: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the sequence number to override.
|
||||
pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
|
||||
self.override_sequence = sequence;
|
||||
}
|
||||
|
||||
/// Enables or disables eager decoding of primary key values into batches.
|
||||
pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
|
||||
self.primary_key_codec = if decode {
|
||||
Some(build_primary_key_codec(&self.metadata))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
/// Gets the arrow schema of the SST file.
|
||||
///
|
||||
/// This schema is computed from the region metadata but should be the same
|
||||
@@ -475,12 +271,6 @@ impl PrimaryKeyReadFormat {
|
||||
&self.field_id_to_projected_index
|
||||
}
|
||||
|
||||
/// Creates a sequence array to override.
|
||||
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
|
||||
self.override_sequence
|
||||
.map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
|
||||
}
|
||||
|
||||
/// Convert a arrow record batch into `batches`.
|
||||
///
|
||||
/// The length of `override_sequence_array` must be larger than the length of the record batch.
|
||||
@@ -598,12 +388,12 @@ impl PrimaryKeyReadFormat {
|
||||
SemanticType::Field => {
|
||||
// Safety: `field_id_to_index` is initialized by the semantic type.
|
||||
let index = self.field_id_to_index.get(&column_id).unwrap();
|
||||
let stats = ReadFormat::column_values(row_groups, column, *index, true);
|
||||
let stats = column_values(row_groups, column, *index, true);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
SemanticType::Timestamp => {
|
||||
let index = self.time_index_position();
|
||||
let stats = ReadFormat::column_values(row_groups, column, index, true);
|
||||
let stats = column_values(row_groups, column, index, true);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
}
|
||||
@@ -624,12 +414,12 @@ impl PrimaryKeyReadFormat {
|
||||
SemanticType::Field => {
|
||||
// Safety: `field_id_to_index` is initialized by the semantic type.
|
||||
let index = self.field_id_to_index.get(&column_id).unwrap();
|
||||
let stats = ReadFormat::column_values(row_groups, column, *index, false);
|
||||
let stats = column_values(row_groups, column, *index, false);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
SemanticType::Timestamp => {
|
||||
let index = self.time_index_position();
|
||||
let stats = ReadFormat::column_values(row_groups, column, index, false);
|
||||
let stats = column_values(row_groups, column, index, false);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
}
|
||||
@@ -650,12 +440,12 @@ impl PrimaryKeyReadFormat {
|
||||
SemanticType::Field => {
|
||||
// Safety: `field_id_to_index` is initialized by the semantic type.
|
||||
let index = self.field_id_to_index.get(&column_id).unwrap();
|
||||
let stats = ReadFormat::column_null_counts(row_groups, *index);
|
||||
let stats = column_null_counts(row_groups, *index);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
SemanticType::Timestamp => {
|
||||
let index = self.time_index_position();
|
||||
let stats = ReadFormat::column_null_counts(row_groups, index);
|
||||
let stats = column_null_counts(row_groups, index);
|
||||
StatValues::from_stats_opt(stats)
|
||||
}
|
||||
}
|
||||
@@ -1006,7 +796,9 @@ mod tests {
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
|
||||
use super::*;
|
||||
use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
|
||||
use crate::sst::parquet::flat_format::{
|
||||
FlatReadFormat, FlatWriteFormat, sequence_column_index,
|
||||
};
|
||||
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
|
||||
|
||||
const TEST_SEQUENCE: u64 = 1;
|
||||
@@ -1134,16 +926,16 @@ mod tests {
|
||||
fn test_projection_indices() {
|
||||
let metadata = build_test_region_metadata();
|
||||
// Only read tag1
|
||||
let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
|
||||
let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [3].iter().copied());
|
||||
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
|
||||
// Only read field1
|
||||
let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
|
||||
let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [4].iter().copied());
|
||||
assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
|
||||
// Only read ts
|
||||
let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
|
||||
let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [5].iter().copied());
|
||||
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
|
||||
// Read field0, tag0, ts
|
||||
let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
|
||||
let read_format = PrimaryKeyReadFormat::new(metadata, [2, 1, 5].iter().copied());
|
||||
assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
|
||||
}
|
||||
|
||||
@@ -1240,8 +1032,7 @@ mod tests {
|
||||
.iter()
|
||||
.map(|col| col.column_id)
|
||||
.collect();
|
||||
let read_format =
|
||||
ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
|
||||
let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
|
||||
|
||||
let columns: Vec<ArrayRef> = vec![
|
||||
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
|
||||
@@ -1261,8 +1052,6 @@ mod tests {
|
||||
|
||||
let mut batches = VecDeque::new();
|
||||
read_format
|
||||
.as_primary_key()
|
||||
.unwrap()
|
||||
.convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
|
||||
.unwrap();
|
||||
|
||||
@@ -1370,25 +1159,25 @@ mod tests {
|
||||
|
||||
// Only read tag1 (column_id=3, index=1) + fixed columns
|
||||
let read_format =
|
||||
ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
|
||||
FlatReadFormat::new(metadata.clone(), [3].iter().copied(), None, "test", false)
|
||||
.unwrap();
|
||||
assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
|
||||
|
||||
// Only read field1 (column_id=4, index=2) + fixed columns
|
||||
let read_format =
|
||||
ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
|
||||
FlatReadFormat::new(metadata.clone(), [4].iter().copied(), None, "test", false)
|
||||
.unwrap();
|
||||
assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
|
||||
|
||||
// Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
|
||||
let read_format =
|
||||
ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
|
||||
FlatReadFormat::new(metadata.clone(), [5].iter().copied(), None, "test", false)
|
||||
.unwrap();
|
||||
assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
|
||||
|
||||
// Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
|
||||
let read_format =
|
||||
ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
|
||||
FlatReadFormat::new(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
|
||||
assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
|
||||
}
|
||||
|
||||
|
||||
@@ -34,8 +34,8 @@ use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
|
||||
use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
|
||||
use crate::sst::parquet::format::PrimaryKeyArray;
|
||||
use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder};
|
||||
use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact;
|
||||
|
||||
@@ -251,7 +251,7 @@ impl PrefilterContextBuilder {
|
||||
/// - The read format doesn't use flat layout with dictionary-encoded PKs
|
||||
/// - The primary key is empty
|
||||
pub(crate) fn new(
|
||||
read_format: &ReadFormat,
|
||||
read_format: &FlatReadFormat,
|
||||
codec: &Arc<dyn PrimaryKeyCodec>,
|
||||
primary_key_filters: Option<&Arc<Vec<SimpleFilterEvaluator>>>,
|
||||
parquet_schema: &SchemaDescriptor,
|
||||
@@ -267,8 +267,7 @@ impl PrefilterContextBuilder {
|
||||
}
|
||||
|
||||
// Only perform PK prefiltering for primary-key-to-flat conversion path.
|
||||
let flat_format = read_format.as_flat()?;
|
||||
if flat_format.batch_has_raw_pk_columns() {
|
||||
if read_format.batch_has_raw_pk_columns() {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -404,7 +403,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::sst::internal_fields;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::flat_format::FlatReadFormat;
|
||||
use crate::test_util::sst_util::{
|
||||
new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding,
|
||||
};
|
||||
@@ -414,7 +413,7 @@ mod tests {
|
||||
let metadata = Arc::new(sst_region_metadata_with_encoding(
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
));
|
||||
let read_format = ReadFormat::new_flat(
|
||||
let read_format = FlatReadFormat::new(
|
||||
metadata.clone(),
|
||||
metadata.column_metadatas.iter().map(|c| c.column_id),
|
||||
None,
|
||||
@@ -422,7 +421,7 @@ mod tests {
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(read_format.as_flat().is_some());
|
||||
assert!(!read_format.batch_has_raw_pk_columns());
|
||||
|
||||
let filter = SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap();
|
||||
assert!(is_usable_primary_key_filter(&metadata, None, &filter));
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
#[cfg(feature = "vector_index")]
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -55,7 +55,6 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::read::flat_projection::CompactionProjectionMapper;
|
||||
use crate::read::prune::FlatPruneReader;
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::index::bloom_filter::applier::{
|
||||
BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
|
||||
@@ -73,7 +72,8 @@ use crate::sst::parquet::async_reader::SstAsyncFileReader;
|
||||
use crate::sst::parquet::file_range::{
|
||||
FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
|
||||
};
|
||||
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
|
||||
use crate::sst::parquet::flat_format::FlatReadFormat;
|
||||
use crate::sst::parquet::format::need_override_sequence;
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
use crate::sst::parquet::prefilter::{
|
||||
PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter,
|
||||
@@ -371,10 +371,9 @@ impl ParquetReaderBuilder {
|
||||
};
|
||||
|
||||
let mut read_format = if let Some(column_ids) = &self.projection {
|
||||
ReadFormat::new(
|
||||
FlatReadFormat::new(
|
||||
region_meta.clone(),
|
||||
Some(column_ids),
|
||||
true, // Always reads as flat format.
|
||||
column_ids.iter().copied(),
|
||||
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
|
||||
&file_path,
|
||||
skip_auto_convert,
|
||||
@@ -387,18 +386,14 @@ impl ParquetReaderBuilder {
|
||||
.iter()
|
||||
.map(|col| col.column_id)
|
||||
.collect();
|
||||
ReadFormat::new(
|
||||
FlatReadFormat::new(
|
||||
region_meta.clone(),
|
||||
Some(&column_ids),
|
||||
true, // Always reads as flat format.
|
||||
column_ids.iter().copied(),
|
||||
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
|
||||
&file_path,
|
||||
skip_auto_convert,
|
||||
)?
|
||||
};
|
||||
if self.decode_primary_key_values {
|
||||
read_format.set_decode_primary_key_values(true);
|
||||
}
|
||||
if need_override_sequence(&parquet_meta) {
|
||||
read_format
|
||||
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
|
||||
@@ -540,7 +535,7 @@ impl ParquetReaderBuilder {
|
||||
/// and build a partition filter if they differ.
|
||||
fn build_partition_filter(
|
||||
&self,
|
||||
read_format: &ReadFormat,
|
||||
read_format: &FlatReadFormat,
|
||||
prune_schema: &Arc<datatypes::schema::Schema>,
|
||||
) -> Result<Option<PartitionFilterContext>> {
|
||||
let region_partition_expr_str = self
|
||||
@@ -567,15 +562,13 @@ impl ParquetReaderBuilder {
|
||||
region_partition_expr.collect_column_names(&mut referenced_columns);
|
||||
|
||||
// Build a partition_schema containing only referenced columns.
|
||||
let is_flat = read_format.as_flat().is_some();
|
||||
let partition_schema = Arc::new(datatypes::schema::Schema::new(
|
||||
prune_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.filter(|col| referenced_columns.contains(&col.name))
|
||||
.map(|col| {
|
||||
if is_flat
|
||||
&& let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
|
||||
if let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
|
||||
&& column_meta.semantic_type == SemanticType::Tag
|
||||
&& col.data_type.is_string()
|
||||
{
|
||||
@@ -656,7 +649,7 @@ impl ParquetReaderBuilder {
|
||||
)]
|
||||
async fn row_groups_to_read(
|
||||
&self,
|
||||
read_format: &ReadFormat,
|
||||
read_format: &FlatReadFormat,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
metrics: &mut ReaderFilterMetrics,
|
||||
) -> RowGroupSelection {
|
||||
@@ -1116,7 +1109,7 @@ impl ParquetReaderBuilder {
|
||||
/// Computes row groups selection after min-max pruning.
|
||||
fn row_groups_by_minmax(
|
||||
&self,
|
||||
read_format: &ReadFormat,
|
||||
read_format: &FlatReadFormat,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
row_group_size: usize,
|
||||
total_row_count: usize,
|
||||
@@ -1791,8 +1784,6 @@ pub(crate) struct SimpleFilterContext {
|
||||
column_id: ColumnId,
|
||||
/// Semantic type of the column.
|
||||
semantic_type: SemanticType,
|
||||
/// The data type of the column.
|
||||
data_type: ConcreteDataType,
|
||||
/// Whether this filter can be applied by flat parquet primary-key prefiltering.
|
||||
usable_primary_key_filter: bool,
|
||||
}
|
||||
@@ -1849,7 +1840,6 @@ impl SimpleFilterContext {
|
||||
filter: maybe_filter,
|
||||
column_id: column_metadata.column_id,
|
||||
semantic_type: column_metadata.semantic_type,
|
||||
data_type: column_metadata.column_schema.data_type.clone(),
|
||||
usable_primary_key_filter,
|
||||
})
|
||||
}
|
||||
@@ -1869,11 +1859,6 @@ impl SimpleFilterContext {
|
||||
self.semantic_type
|
||||
}
|
||||
|
||||
/// Returns the data type of the column.
|
||||
pub(crate) fn data_type(&self) -> &ConcreteDataType {
|
||||
&self.data_type
|
||||
}
|
||||
|
||||
/// Returns whether this filter is eligible for flat parquet PK prefiltering.
|
||||
pub(crate) fn usable_primary_key_filter(&self) -> bool {
|
||||
self.usable_primary_key_filter
|
||||
@@ -1967,7 +1952,6 @@ impl ParquetReader {
|
||||
context: FileRangeContextRef,
|
||||
mut selection: RowGroupSelection,
|
||||
) -> Result<Self> {
|
||||
debug_assert!(context.read_format().as_flat().is_some());
|
||||
let fetch_metrics = ParquetFetchMetrics::default();
|
||||
let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
|
||||
let skip_fields = context.pre_filter_mode().skip_fields();
|
||||
@@ -2007,140 +1991,6 @@ impl ParquetReader {
|
||||
}
|
||||
}
|
||||
|
||||
/// RowGroupReaderContext represents the fields that cannot be shared
|
||||
/// between different `RowGroupReader`s.
|
||||
pub(crate) trait RowGroupReaderContext: Send {
|
||||
fn read_format(&self) -> &ReadFormat;
|
||||
|
||||
fn file_path(&self) -> &str;
|
||||
}
|
||||
|
||||
impl RowGroupReaderContext for FileRangeContextRef {
|
||||
fn read_format(&self) -> &ReadFormat {
|
||||
self.as_ref().read_format()
|
||||
}
|
||||
|
||||
fn file_path(&self) -> &str {
|
||||
self.as_ref().file_path()
|
||||
}
|
||||
}
|
||||
|
||||
/// [RowGroupReader] that reads from [FileRange].
|
||||
pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl RowGroupReader {
|
||||
/// Creates a new reader from file range.
|
||||
pub(crate) fn new(
|
||||
context: FileRangeContextRef,
|
||||
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
|
||||
) -> Self {
|
||||
Self::create(context, stream)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reader to read a row group of a parquet file.
|
||||
pub(crate) struct RowGroupReaderBase<T> {
|
||||
/// Context of [RowGroupReader] so adapts to different underlying implementation.
|
||||
context: T,
|
||||
/// Inner parquet record batch stream.
|
||||
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
|
||||
/// Buffered batches to return.
|
||||
batches: VecDeque<Batch>,
|
||||
/// Local scan metrics.
|
||||
metrics: ReaderMetrics,
|
||||
/// Cached sequence array to override sequences.
|
||||
override_sequence: Option<ArrayRef>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl<T> RowGroupReaderBase<T>
|
||||
where
|
||||
T: RowGroupReaderContext,
|
||||
{
|
||||
/// Creates a new reader to read the primary key format.
|
||||
pub(crate) fn create(context: T, stream: ParquetRecordBatchStream<SstAsyncFileReader>) -> Self {
|
||||
// The batch length from the reader should be less than or equal to DEFAULT_READ_BATCH_SIZE.
|
||||
let override_sequence = context
|
||||
.read_format()
|
||||
.new_override_sequence_array(DEFAULT_READ_BATCH_SIZE);
|
||||
assert!(context.read_format().as_primary_key().is_some());
|
||||
|
||||
Self {
|
||||
context,
|
||||
stream,
|
||||
batches: VecDeque::new(),
|
||||
metrics: ReaderMetrics::default(),
|
||||
override_sequence,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the metrics.
|
||||
pub(crate) fn metrics(&self) -> &ReaderMetrics {
|
||||
&self.metrics
|
||||
}
|
||||
|
||||
/// Gets [ReadFormat] of underlying reader.
|
||||
pub(crate) fn read_format(&self) -> &ReadFormat {
|
||||
self.context.read_format()
|
||||
}
|
||||
|
||||
/// Tries to fetch next [RecordBatch] from the stream asynchronously.
|
||||
async fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
|
||||
match self.stream.next().await.transpose() {
|
||||
Ok(batch) => Ok(batch),
|
||||
Err(e) => Err(e).context(ReadParquetSnafu {
|
||||
path: self.context.file_path(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the next [Batch].
|
||||
pub(crate) async fn next_inner(&mut self) -> Result<Option<Batch>> {
|
||||
let scan_start = Instant::now();
|
||||
if let Some(batch) = self.batches.pop_front() {
|
||||
self.metrics.num_rows += batch.num_rows();
|
||||
self.metrics.scan_cost += scan_start.elapsed();
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
// We need to fetch next record batch and convert it to batches.
|
||||
while self.batches.is_empty() {
|
||||
let Some(record_batch) = self.fetch_next_record_batch().await? else {
|
||||
self.metrics.scan_cost += scan_start.elapsed();
|
||||
return Ok(None);
|
||||
};
|
||||
self.metrics.num_record_batches += 1;
|
||||
|
||||
// Safety: We ensures the format is primary key in the RowGroupReaderBase::create().
|
||||
self.context
|
||||
.read_format()
|
||||
.as_primary_key()
|
||||
.unwrap()
|
||||
.convert_record_batch(
|
||||
&record_batch,
|
||||
self.override_sequence.as_ref(),
|
||||
&mut self.batches,
|
||||
)?;
|
||||
self.metrics.num_batches += self.batches.len();
|
||||
}
|
||||
let batch = self.batches.pop_front();
|
||||
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
|
||||
self.metrics.scan_cost += scan_start.elapsed();
|
||||
Ok(batch)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T> BatchReader for RowGroupReaderBase<T>
|
||||
where
|
||||
T: RowGroupReaderContext + Send + Sync,
|
||||
{
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
self.next_inner().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Reader to read a row group of a parquet file in flat format, returning RecordBatch.
|
||||
pub(crate) struct FlatRowGroupReader {
|
||||
/// Context for file ranges.
|
||||
@@ -2177,10 +2027,10 @@ impl FlatRowGroupReader {
|
||||
path: self.context.file_path(),
|
||||
})?;
|
||||
|
||||
// Safety: Only flat format use FlatRowGroupReader.
|
||||
let flat_format = self.context.read_format().as_flat().unwrap();
|
||||
let record_batch =
|
||||
flat_format.convert_batch(record_batch, self.override_sequence.as_ref())?;
|
||||
let record_batch = self
|
||||
.context
|
||||
.read_format()
|
||||
.convert_batch(record_batch, self.override_sequence.as_ref())?;
|
||||
Ok(Some(record_batch))
|
||||
}
|
||||
None => Ok(None),
|
||||
@@ -2269,8 +2119,17 @@ mod tests {
|
||||
object_store.write(&file_path, parquet_bytes).await.unwrap();
|
||||
|
||||
let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
let read_format =
|
||||
ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap();
|
||||
let read_format = FlatReadFormat::new(
|
||||
region_metadata.clone(),
|
||||
region_metadata
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(|column| column.column_id),
|
||||
None,
|
||||
&file_path,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mut cache_metrics = MetadataCacheMetrics::default();
|
||||
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
|
||||
|
||||
@@ -26,14 +26,15 @@ use parquet::file::metadata::RowGroupMetaData;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::sst::parquet::format::{ReadFormat, StatValues};
|
||||
use crate::sst::parquet::flat_format::FlatReadFormat;
|
||||
use crate::sst::parquet::format::StatValues;
|
||||
|
||||
/// Statistics for pruning row groups.
|
||||
pub(crate) struct RowGroupPruningStats<'a, T> {
|
||||
/// Metadata of SST row groups.
|
||||
row_groups: &'a [T],
|
||||
/// Helper to read the SST.
|
||||
read_format: &'a ReadFormat,
|
||||
read_format: &'a FlatReadFormat,
|
||||
/// The metadata of the region.
|
||||
/// It contains the schema a query expects to read. If it is not None, we use it instead
|
||||
/// of the metadata in the SST to get the column id of a column as the SST may have
|
||||
@@ -47,7 +48,7 @@ impl<'a, T> RowGroupPruningStats<'a, T> {
|
||||
/// Creates a new statistics to prune specific `row_groups`.
|
||||
pub(crate) fn new(
|
||||
row_groups: &'a [T],
|
||||
read_format: &'a ReadFormat,
|
||||
read_format: &'a FlatReadFormat,
|
||||
expected_metadata: Option<RegionMetadataRef>,
|
||||
skip_fields: bool,
|
||||
) -> Self {
|
||||
|
||||
Reference in New Issue
Block a user