feat: support nested projection in mito2 read path (#7959)

* feat: support nested projection

* fix: cr

* fix: cr

* fix: code review

* fix: cargo clippy

* fix: ci

* fix: unit test

* avoid repeated evaluation of is_schema_matched

* add comment for append time index

* fix: keep nested schema alignment after filling missing projected roots
This commit is contained in:
fys
2026-05-07 17:50:19 +08:00
committed by GitHub
parent bb58ff1400
commit d0e0c21600
20 changed files with 722 additions and 578 deletions

View File

@@ -57,6 +57,7 @@ use crate::read::Batch;
use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::parquet::PARQUET_METADATA_KEY;
use crate::sst::parquet::read_columns::ParquetReadColumns;
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
@@ -1223,24 +1224,27 @@ pub enum SelectorResult {
pub struct SelectorResultValue {
/// Batches of rows selected by the selector.
pub result: SelectorResult,
/// Projection of rows.
pub projection: Vec<usize>,
/// The read columns of rows.
pub read_cols: ParquetReadColumns,
}
impl SelectorResultValue {
/// Creates a new selector result value with primary key format.
pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
pub fn new(result: Vec<Batch>, read_cols: ParquetReadColumns) -> SelectorResultValue {
SelectorResultValue {
result: SelectorResult::PrimaryKey(result),
projection,
read_cols,
}
}
/// Creates a new selector result value with flat format.
pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
pub fn new_flat(
result: Vec<RecordBatch>,
read_cols: ParquetReadColumns,
) -> SelectorResultValue {
SelectorResultValue {
result: SelectorResult::Flat(result),
projection,
read_cols,
}
}
@@ -1289,6 +1293,7 @@ mod tests {
use crate::read::range_cache::{
RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
};
use crate::read::read_columns::ReadColumns;
use crate::sst::parquet::row_selection::RowGroupSelection;
#[tokio::test]
@@ -1442,7 +1447,10 @@ mod tests {
selector: TimeSeriesRowSelector::LastRow,
};
assert!(cache.get_selector_result(&key).is_none());
let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
let result = Arc::new(SelectorResultValue::new(
Vec::new(),
ParquetReadColumns::from_deduped(Vec::new()),
));
cache.put_selector_result(key, result);
assert!(cache.get_selector_result(&key).is_some());
}
@@ -1459,7 +1467,7 @@ mod tests {
region_id: RegionId::new(1, 1),
row_groups: vec![(FileId::random(), 0)],
scan: ScanRequestFingerprintBuilder {
read_column_ids: vec![],
read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
read_column_types: vec![],
filters: vec!["tag_0 = 1".to_string()],
time_filters: vec![],

View File

@@ -1240,6 +1240,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to cast column"))]
CastColumn {
#[snafu(source)]
error: datafusion::error::DataFusionError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1329,6 +1337,7 @@ impl ErrorExt for Error {
| ReadDataPart { .. }
| BuildEntry { .. }
| Metadata { .. }
| CastColumn { .. }
| MitoManifestInfo { .. } => StatusCode::Internal,
FetchManifests { source, .. } => source.status_code(),

View File

@@ -25,6 +25,7 @@ use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::read::read_columns::ReadColumns;
use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, build_bulk_filter_plan};
@@ -65,26 +66,23 @@ impl BulkIterContext {
) -> Result<Self> {
let codec = build_primary_key_codec(&region_metadata);
let read_format = if let Some(column_ids) = projection {
FlatReadFormat::new(
region_metadata.clone(),
column_ids.iter().copied(),
None,
"memtable",
skip_auto_convert,
)?
let read_cols = if let Some(col_ids) = projection {
ReadColumns::from_deduped_column_ids(col_ids.iter().copied())
} else {
FlatReadFormat::new(
region_metadata.clone(),
ReadColumns::from_deduped_column_ids(
region_metadata
.column_metadatas
.iter()
.map(|col| col.column_id),
None,
"memtable",
skip_auto_convert,
)?
)
};
let read_format = FlatReadFormat::new(
region_metadata.clone(),
read_cols,
None,
"memtable",
skip_auto_convert,
)?;
let dyn_filters = predicate
.as_ref()

View File

@@ -64,10 +64,13 @@ impl EncodedBulkPartIter {
let data = encoded_part.data().clone();
let series_count = encoded_part.metadata().num_series as usize;
let projection_mask = ProjectionMask::roots(
parquet_meta.file_metadata().schema_descr(),
context.read_format().projection_indices().iter().copied(),
);
// TODO(fys): Nested projection pushdown to the memtable layer is not supported yet.
let root_indices = context
.read_format()
.parquet_read_columns()
.root_indices_iter();
let projection_mask =
ProjectionMask::roots(parquet_meta.file_metadata().schema_descr(), root_indices);
let builder =
MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
@@ -276,7 +279,11 @@ impl BulkPartBatchIter {
/// Applies projection to the RecordBatch if needed.
fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
let projection_indices = self.context.read_format().projection_indices();
let projection_indices = self
.context
.read_format()
.parquet_read_columns()
.root_indices();
if projection_indices.len() == record_batch.num_columns() {
return Ok(record_batch);
}

View File

@@ -688,7 +688,7 @@ mod tests {
BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids);
let rb = adapter.into_iter().next().unwrap().unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 3]).unwrap();
assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
// tag_0 + field_1 + ts + 3 internal columns.
assert_eq!(6, rb.num_columns());

View File

@@ -609,6 +609,7 @@ mod tests {
use super::*;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::read_columns::ReadColumns;
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
@@ -713,7 +714,7 @@ mod tests {
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[0, 1, 2, 3].into_iter(),
ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
None,
"test",
false,
@@ -799,16 +800,15 @@ mod tests {
&[1],
));
// Output projection: tag_1, field_2. Read also includes field_3.
let mapper = FlatProjectionMapper::new_with_read_columns(
&expected_metadata,
vec![1, 2],
vec![1, 2, 3],
ReadColumns::from_deduped_column_ids([1, 2, 3]),
)
.unwrap();
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[1, 2, 3].into_iter(),
ReadColumns::from_deduped_column_ids([1, 2, 3]),
None,
"test",
false,
@@ -899,7 +899,7 @@ mod tests {
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[0, 1, 2, 3].into_iter(),
ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
None,
"test",
false,
@@ -993,7 +993,7 @@ mod tests {
let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
let read_format = FlatReadFormat::new(
actual_metadata.clone(),
[0, 2, 3].into_iter(),
ReadColumns::from_deduped_column_ids([0, 2, 3]),
None,
"test",
true,

View File

@@ -33,6 +33,7 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
use crate::read::read_columns::ReadColumns;
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
@@ -49,11 +50,11 @@ pub struct FlatProjectionMapper {
metadata: RegionMetadataRef,
/// Schema for converted [RecordBatch] to return.
output_schema: SchemaRef,
/// Ids of columns to read from memtables and SSTs.
/// The columns to read from memtables and SSTs.
/// The mapper won't deduplicate the column ids.
///
/// Note that this doesn't contain the `__table_id` and `__tsid`.
read_column_ids: Vec<ColumnId>,
read_cols: ReadColumns,
/// Ids and DataTypes of columns of the expected batch.
/// We can use this to check if the batch is compatible with the expected schema.
///
@@ -74,50 +75,48 @@ impl FlatProjectionMapper {
/// empty `RecordBatch` and only use its row count in this query.
pub fn new(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
projection: impl IntoIterator<Item = usize>,
) -> Result<Self> {
let projection: Vec<_> = projection.collect();
let projection: Vec<_> = projection.into_iter().collect();
let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
Self::new_with_read_columns(metadata, projection, read_column_ids)
let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids);
Self::new_with_read_columns(metadata, projection, read_cols)
}
/// Returns a new mapper with output projection and explicit read columns.
pub fn new_with_read_columns(
metadata: &RegionMetadataRef,
projection: Vec<usize>,
read_column_ids: Vec<ColumnId>,
read_cols: ReadColumns,
) -> Result<Self> {
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
// Output column schemas for the projection.
let mut column_schemas = Vec::with_capacity(projection.len());
let mut col_schemas = Vec::with_capacity(projection.len());
// Column ids of the output projection without deduplication.
let mut output_column_ids = Vec::with_capacity(projection.len());
let mut output_col_ids = Vec::with_capacity(projection.len());
for idx in &projection {
// For each projection index, we get the column id for projection.
let column =
metadata
.column_metadatas
.get(*idx)
.with_context(|| InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
output_column_ids.push(column.column_id);
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
let col = metadata
.column_metadatas
.get(*idx)
.with_context(|| InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
output_col_ids.push(col.column_id);
col_schemas.push(col.column_schema.clone());
}
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(metadata);
// TODO(yingwen): Support different flat schema options.
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
// All columns with internal columns.
metadata.column_metadatas.len() + 3,
read_column_ids.iter().copied(),
read_cols.clone(),
);
let batch_schema = flat_projected_columns(metadata, &format_projection);
@@ -130,13 +129,13 @@ impl FlatProjectionMapper {
Arc::new(Schema::new(vec![]))
} else {
// Safety: Columns come from existing schema.
Arc::new(Schema::new(column_schemas))
Arc::new(Schema::new(col_schemas))
};
let batch_indices = if is_empty_projection {
vec![]
} else {
output_column_ids
output_col_ids
.iter()
.map(|id| {
// Safety: The map is computed from the read projection.
@@ -164,7 +163,7 @@ impl FlatProjectionMapper {
Ok(FlatProjectionMapper {
metadata: metadata.clone(),
output_schema,
read_column_ids,
read_cols,
batch_schema,
is_empty_projection,
batch_indices,
@@ -181,11 +180,9 @@ impl FlatProjectionMapper {
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
&self.metadata
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
&self.read_column_ids
/// Returns projected columns that we need to read from memtables and SSTs.
pub(crate) fn read_columns(&self) -> &ReadColumns {
&self.read_cols
}
/// Returns the field column start index in output batch.
@@ -439,15 +436,9 @@ impl CompactionProjectionMapper {
.chain([metadata.time_index_column_pos()])
.collect::<Vec<_>>();
let mapper = FlatProjectionMapper::new_with_read_columns(
metadata,
projection,
metadata
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect(),
)?;
let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id);
let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids);
let mapper = FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols)?;
let assembler = DfBatchAssembler::new(mapper.output_schema());
Ok(Self { mapper, assembler })

View File

@@ -34,6 +34,7 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::flat_format::{primary_key_column_index, time_index_column_index};
use crate::sst::parquet::format::{PrimaryKeyArray, primary_key_offsets};
use crate::sst::parquet::read_columns::ParquetReadColumns;
use crate::sst::parquet::reader::FlatRowGroupReader;
/// Reader to keep the last row for each time series.
@@ -134,7 +135,7 @@ impl FlatRowGroupLastRowCachedReader {
file_id: FileId,
row_group_idx: usize,
cache_strategy: CacheStrategy,
projection: &[usize],
read_cols: &ParquetReadColumns,
reader: FlatRowGroupReader,
) -> Self {
let key = SelectorResultKey {
@@ -145,14 +146,14 @@ impl FlatRowGroupLastRowCachedReader {
if let Some(value) = cache_strategy.get_selector_result(&key) {
let is_flat = matches!(&value.result, SelectorResult::Flat(_));
let schema_matches = value.projection == projection;
let schema_matches = value.read_cols == *read_cols;
if is_flat && schema_matches {
Self::new_hit(value)
} else {
Self::new_miss(key, projection, reader, cache_strategy)
Self::new_miss(key, read_cols, reader, cache_strategy)
}
} else {
Self::new_miss(key, projection, reader, cache_strategy)
Self::new_miss(key, read_cols, reader, cache_strategy)
}
}
@@ -171,14 +172,14 @@ impl FlatRowGroupLastRowCachedReader {
fn new_miss(
key: SelectorResultKey,
projection: &[usize],
read_cols: &ParquetReadColumns,
reader: FlatRowGroupReader,
cache_strategy: CacheStrategy,
) -> Self {
selector_result_cache_miss();
Self::Miss(FlatRowGroupLastRowReader::new(
key,
projection.to_vec(),
read_cols.clone(),
reader,
cache_strategy,
))
@@ -257,7 +258,7 @@ pub(crate) struct FlatRowGroupLastRowReader {
selector: FlatLastTimestampSelector,
yielded_batches: Vec<RecordBatch>,
cache_strategy: CacheStrategy,
projection: Vec<usize>,
read_cols: ParquetReadColumns,
/// Accumulates small selector-output batches before concatenating.
pending: BatchBuffer,
}
@@ -265,7 +266,7 @@ pub(crate) struct FlatRowGroupLastRowReader {
impl FlatRowGroupLastRowReader {
fn new(
key: SelectorResultKey,
projection: Vec<usize>,
read_cols: ParquetReadColumns,
reader: FlatRowGroupReader,
cache_strategy: CacheStrategy,
) -> Self {
@@ -275,7 +276,7 @@ impl FlatRowGroupLastRowReader {
selector: FlatLastTimestampSelector::default(),
yielded_batches: vec![],
cache_strategy,
projection,
read_cols,
pending: BatchBuffer::new(),
}
}
@@ -323,7 +324,7 @@ impl FlatRowGroupLastRowReader {
let batches = std::mem::take(&mut self.yielded_batches);
let value = Arc::new(SelectorResultValue::new_flat(
batches,
self.projection.clone(),
self.read_cols.clone(),
));
self.cache_strategy.put_selector_result(self.key, value);
}

View File

@@ -108,6 +108,7 @@ mod tests {
use super::*;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::read_columns::ReadColumns;
fn print_record_batch(record_batch: RecordBatch) -> String {
pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
@@ -221,7 +222,10 @@ mod tests {
);
let cache = CacheStrategy::Disabled;
let mapper = FlatProjectionMapper::all(&metadata).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!(
&[0, 1, 2, 3, 4],
mapper.read_columns().column_ids().as_slice()
);
assert_eq!(
[
(1, ConcreteDataType::int64_datatype()),
@@ -255,8 +259,8 @@ mod tests {
.build(),
);
let cache = CacheStrategy::Disabled;
let mapper = FlatProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
assert_eq!([4, 1], mapper.column_ids());
let mapper = FlatProjectionMapper::new(&metadata, [4, 1]).unwrap();
assert_eq!(&[4, 1], mapper.read_columns().column_ids().as_slice());
assert_eq!(
[
(1, ConcreteDataType::int64_datatype()),
@@ -288,10 +292,13 @@ mod tests {
.build(),
);
let cache = CacheStrategy::Disabled;
let mapper =
FlatProjectionMapper::new_with_read_columns(&metadata, vec![4, 1], vec![4, 1, 3])
.unwrap();
assert_eq!([4, 1, 3], mapper.column_ids());
let mapper = FlatProjectionMapper::new_with_read_columns(
&metadata,
vec![4, 1],
ReadColumns::from_deduped_column_ids([4, 1, 3]),
)
.unwrap();
assert_eq!(&[4, 1, 3], mapper.read_columns().column_ids().as_slice());
let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.convert(&batch, &cache).unwrap();
@@ -315,8 +322,8 @@ mod tests {
.build(),
);
let cache = CacheStrategy::Disabled;
let mapper = FlatProjectionMapper::new(&metadata, [].into_iter()).unwrap();
assert_eq!([0], mapper.column_ids());
let mapper = FlatProjectionMapper::new(&metadata, []).unwrap();
assert_eq!(&[0], mapper.read_columns().column_ids().as_slice());
assert!(mapper.output_schema().is_empty());
assert_eq!(
[(0, ConcreteDataType::timestamp_millisecond_datatype())],

View File

@@ -26,12 +26,13 @@ use datatypes::prelude::ConcreteDataType;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::region_engine::PartitionRange;
use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
use store_api::storage::{FileId, RegionId, TimeSeriesRowSelector};
use tokio::sync::{mpsc, oneshot};
use crate::cache::CacheStrategy;
use crate::error::{ComputeArrowSnafu, Result};
use crate::read::BoxedRecordBatchStream;
use crate::read::read_columns::ReadColumns;
use crate::read::scan_region::StreamContext;
use crate::read::scan_util::PartitionMetrics;
use crate::region::options::MergeMode;
@@ -63,7 +64,7 @@ pub(crate) struct ScanRequestFingerprint {
#[derive(Debug)]
pub(crate) struct ScanRequestFingerprintBuilder {
pub(crate) read_column_ids: Vec<ColumnId>,
pub(crate) read_columns: ReadColumns,
pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
pub(crate) filters: Vec<String>,
pub(crate) time_filters: Vec<String>,
@@ -77,7 +78,7 @@ pub(crate) struct ScanRequestFingerprintBuilder {
impl ScanRequestFingerprintBuilder {
pub(crate) fn build(self) -> ScanRequestFingerprint {
let Self {
read_column_ids,
read_columns,
read_column_types,
filters,
time_filters,
@@ -90,7 +91,7 @@ impl ScanRequestFingerprintBuilder {
ScanRequestFingerprint {
inner: Arc::new(SharedScanRequestFingerprint {
read_column_ids,
read_columns,
read_column_types,
filters,
}),
@@ -107,8 +108,8 @@ impl ScanRequestFingerprintBuilder {
/// Non-copiable struct of the fingerprint.
#[derive(Debug, PartialEq, Eq, Hash)]
struct SharedScanRequestFingerprint {
/// Column ids of the projection.
read_column_ids: Vec<ColumnId>,
/// Logical columns of the projection.
read_columns: ReadColumns,
/// Column types of the projection.
/// We keep this to ensure we won't reuse the fingerprint after a schema change.
read_column_types: Vec<Option<ConcreteDataType>>,
@@ -118,8 +119,8 @@ struct SharedScanRequestFingerprint {
impl ScanRequestFingerprint {
#[cfg(test)]
pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
&self.inner.read_column_ids
pub(crate) fn read_columns(&self) -> &ReadColumns {
&self.inner.read_columns
}
#[cfg(test)]
@@ -154,7 +155,7 @@ impl ScanRequestFingerprint {
pub(crate) fn estimated_size(&self) -> usize {
mem::size_of::<SharedScanRequestFingerprint>()
+ self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
+ self.inner.read_columns.estimated_size()
+ self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
+ self.inner.filters.capacity() * mem::size_of::<String>()
+ self
@@ -591,7 +592,7 @@ pub fn bench_cache_flat_range_stream(
let cache_strategy = CacheStrategy::EnableAll(cache_manager);
let fingerprint = ScanRequestFingerprintBuilder {
read_column_ids: vec![],
read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
read_column_types: vec![],
filters: vec![],
time_filters: vec![],
@@ -654,8 +655,9 @@ mod tests {
filter_deleted: bool,
partition_expr_version: u64,
) -> ScanRequestFingerprint {
let read_columns = ReadColumns::from_deduped_column_ids([1, 2]);
ScanRequestFingerprintBuilder {
read_column_ids: vec![1, 2],
read_columns,
read_column_types: vec![None, None],
filters,
time_filters,
@@ -704,7 +706,7 @@ mod tests {
) -> (StreamContext, PartitionRange) {
let env = SchedulerEnv::new().await;
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
let file_id = FileId::random();
let file = sst_file_handle_with_file_id(
@@ -848,7 +850,7 @@ mod tests {
let reset = fingerprint.without_time_filters();
assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
assert_eq!(reset.read_columns(), fingerprint.read_columns());
assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
assert_eq!(reset.filters(), fingerprint.filters());
assert!(reset.time_filters().is_empty());

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(fys): remove this once the module is used
#![allow(dead_code)]
use std::collections::{BTreeMap, HashSet};
use std::mem;
@@ -65,7 +62,7 @@ use crate::read::scan_region::PredicateGroup;
/// If `nested_paths` is empty, the whole column will be read.
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)]
pub struct ReadColumns {
cols: Vec<ReadColumn>,
pub cols: Vec<ReadColumn>,
}
impl ReadColumns {
@@ -85,7 +82,7 @@ impl ReadColumns {
}
pub fn column_ids_iter(&self) -> impl Iterator<Item = ColumnId> + '_ {
self.cols.iter().map(|column| column.column_id())
self.cols.iter().map(|column| column.column_id)
}
pub fn column_ids(&self) -> Vec<ColumnId> {
@@ -108,10 +105,10 @@ impl ReadColumns {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ReadColumn {
column_id: ColumnId,
pub column_id: ColumnId,
/// Nested field paths under this column.
/// Empty means reading the whole column.
nested_paths: Vec<NestedPath>,
pub nested_paths: Vec<NestedPath>,
}
impl ReadColumn {
@@ -122,10 +119,6 @@ impl ReadColumn {
}
}
pub fn column_id(&self) -> ColumnId {
self.column_id
}
pub fn nested_paths(&self) -> &[NestedPath] {
&self.nested_paths
}

View File

@@ -34,11 +34,11 @@ use datafusion_expr::utils::expr_to_columns;
use futures::StreamExt;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::{OptionExt as _, ResultExt};
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{
ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
TimeSeriesRowSelector,
};
use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
@@ -48,7 +48,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
use crate::error::{InvalidPartitionExprSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::{MemtableRange, RangesOptions};
@@ -57,6 +57,9 @@ use crate::read::compat::{self, FlatCompatBatch};
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::range_cache::ScanRequestFingerprint;
use crate::read::read_columns::{
ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
};
use crate::read::seq_scan::SeqScan;
use crate::read::series_scan::SeriesScan;
use crate::read::stream::ScanBatchStream;
@@ -399,23 +402,33 @@ impl ScanRegion {
let time_range = self.build_time_range_predicate();
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
let read_column_ids = match self.request.projection_indices() {
Some(p) => self.build_read_column_ids(p, &predicate)?,
None => self
.version
.metadata
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect(),
let read_cols = match &self.request.projection_input {
Some(p) => {
// Read columns include the pushed-down projection and columns
// resolved from the predicate.
let metadata = &self.version.metadata;
let from_projection = read_columns_from_projection(p.clone(), metadata)?;
let from_predicate = read_columns_from_predicate(&predicate, metadata);
merge(from_projection, from_predicate)
}
None => {
let read_col_ids = self
.version
.metadata
.column_metadatas
.iter()
.map(|col| col.column_id);
ReadColumns::from_deduped_column_ids(read_col_ids)
}
};
let read_col_ids = read_cols.column_ids();
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match self.request.projection_indices() {
Some(p) => FlatProjectionMapper::new_with_read_columns(
&self.version.metadata,
p.to_vec(),
read_column_ids.clone(),
read_cols,
)?,
None => FlatProjectionMapper::all(&self.version.metadata)?,
};
@@ -464,7 +477,7 @@ impl ScanRegion {
continue;
}
let ranges_in_memtable = m.ranges(
Some(read_column_ids.as_slice()),
Some(&read_col_ids),
RangesOptions::default()
.with_predicate(predicate.clone())
.with_sequence(SequenceRange::new(
@@ -573,72 +586,6 @@ impl ScanRegion {
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
}
/// Return all columns id to read according to the projection and filters.
fn build_read_column_ids(
&self,
projection: &[usize],
predicate: &PredicateGroup,
) -> Result<Vec<ColumnId>> {
let metadata = &self.version.metadata;
// use Vec for read_column_ids to keep the order of columns.
let mut read_column_ids = Vec::new();
let mut seen = HashSet::new();
for idx in projection {
let column =
metadata
.column_metadatas
.get(*idx)
.with_context(|| InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
seen.insert(column.column_id);
// keep the projection order
read_column_ids.push(column.column_id);
}
if projection.is_empty() {
let time_index = metadata.time_index_column().column_id;
if seen.insert(time_index) {
read_column_ids.push(time_index);
}
}
let mut extra_names = HashSet::new();
let mut columns = HashSet::new();
for expr in &self.request.filters {
columns.clear();
if expr_to_columns(expr, &mut columns).is_err() {
continue;
}
extra_names.extend(columns.iter().map(|column| column.name.clone()));
}
if let Some(expr) = predicate.region_partition_expr() {
expr.collect_column_names(&mut extra_names);
}
if !extra_names.is_empty() {
for column in &metadata.column_metadatas {
if extra_names.contains(column.column_schema.name.as_str())
&& !seen.contains(&column.column_id)
{
read_column_ids.push(column.column_id);
}
extra_names.remove(column.column_schema.name.as_str());
}
if !extra_names.is_empty() {
warn!(
"Some columns in filters are not found in region {}: {:?}",
metadata.region_id, extra_names
);
}
}
Ok(read_column_ids)
}
/// Partitions filters into two groups: non-field filters and field filters.
/// Returns `(non_field_filters, field_filters)`.
fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
@@ -797,10 +744,10 @@ pub struct ScanInput {
access_layer: AccessLayerRef,
/// Maps projected Batches to RecordBatches.
pub(crate) mapper: Arc<FlatProjectionMapper>,
/// Column ids to read from memtables and SSTs.
/// The columns to read from memtables and SSTs.
/// Notice this is different from the columns in `mapper` which are projected columns.
/// But this read columns might also include non-projected columns needed for filtering.
pub(crate) read_column_ids: Vec<ColumnId>,
pub(crate) read_cols: ReadColumns,
/// Time range filter for time index.
pub(crate) time_range: Option<TimestampRange>,
/// Predicate to push down.
@@ -855,7 +802,7 @@ impl ScanInput {
pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
ScanInput {
access_layer,
read_column_ids: mapper.column_ids().to_vec(),
read_cols: mapper.read_columns().clone(),
mapper: Arc::new(mapper),
time_range: None,
predicate: PredicateGroup::default(),
@@ -1102,14 +1049,14 @@ impl ScanInput {
let decode_pk_values = !self.compaction
&& self
.mapper
.column_ids()
.iter()
.any(|column_id| self.mapper.metadata().primary_key.contains(column_id));
.read_columns()
.column_ids_iter()
.any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
let reader = self
.access_layer
.read_sst(file.clone())
.predicate(predicate)
.projection(Some(self.read_column_ids.clone()))
.projection(Some(self.read_cols.clone()))
.cache(self.cache_strategy.clone())
.inverted_index_appliers(self.inverted_index_appliers.clone())
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
@@ -1408,19 +1355,18 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFin
// Ensure the filters are sorted for consistent fingerprinting.
filters.sort_unstable();
time_filters.sort_unstable();
let read_columns = input.read_cols.clone();
Some(
crate::read::range_cache::ScanRequestFingerprintBuilder {
read_column_ids: input.read_column_ids.clone(),
read_column_types: input
.read_column_ids
.iter()
read_column_types: read_columns
.column_ids_iter()
.map(|id| {
metadata
.column_by_id(*id)
.column_by_id(id)
.map(|col| col.column_schema.data_type.clone())
})
.collect(),
read_columns,
filters,
time_filters,
series_row_selector: input.series_row_selector,
@@ -1797,31 +1743,17 @@ mod tests {
use datatypes::value::Value;
use partition::expr::col as partition_col;
use store_api::metadata::RegionMetadataBuilder;
use store_api::storage::{
ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
};
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
use super::*;
use crate::cache::CacheManager;
use crate::memtable::time_partition::TimePartitions;
use crate::read::range_cache::ScanRequestFingerprintBuilder;
use crate::region::version::VersionBuilder;
use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
use crate::test_util::memtable_util::metadata_with_primary_key;
use crate::test_util::scheduler_util::SchedulerEnv;
fn new_version(metadata: RegionMetadataRef) -> VersionRef {
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
Arc::new(EmptyMemtableBuilder::default()),
0,
None,
));
Arc::new(VersionBuilder::new(metadata, mutable).build())
}
async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
let env = SchedulerEnv::new().await;
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
let file = FileHandle::new(
crate::sst::file::FileMeta::default(),
@@ -1838,86 +1770,6 @@ mod tests {
.with_files(vec![file])
}
#[tokio::test]
async fn test_build_read_column_ids_includes_filters() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection_input: Some(vec![4].into()),
filters: vec![
col("v0").gt(lit(1)),
col("ts").gt(lit(0)),
col("k0").eq(lit("foo")),
],
..Default::default()
};
let scan_region = ScanRegion::new(
version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = &scan_region.request.projection_indices().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
assert_eq!(vec![4, 0, 2, 3], read_ids);
}
#[tokio::test]
async fn test_build_read_column_ids_empty_projection() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection_input: Some(ProjectionInput::default()),
..Default::default()
};
let scan_region = ScanRegion::new(
version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = &scan_region.request.projection_indices().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
// Empty projection should still read the time index column (id 2 in this test schema).
assert_eq!(vec![2], read_ids);
}
#[tokio::test]
async fn test_build_read_column_ids_keeps_projection_order() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let version = new_version(metadata.clone());
let env = SchedulerEnv::new().await;
let request = ScanRequest {
projection_input: Some(vec![4, 1].into()),
filters: vec![col("v0").gt(lit(1))],
..Default::default()
};
let scan_region = ScanRegion::new(
version,
env.access_layer.clone(),
request,
CacheStrategy::Disabled,
);
let predicate =
PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
let projection = &scan_region.request.projection_indices().unwrap();
let read_ids = scan_region
.build_read_column_ids(projection, &predicate)
.unwrap();
// Projection order preserved, extra columns appended in schema order.
assert_eq!(vec![4, 1, 3], read_ids);
}
/// Helper to create a timestamp millisecond literal.
fn ts_lit(val: i64) -> datafusion_expr::Expr {
lit(ScalarValue::TimestampMillisecond(Some(val), None))
@@ -1943,7 +1795,7 @@ mod tests {
let fingerprint = build_scan_fingerprint(&input).unwrap();
let expected = ScanRequestFingerprintBuilder {
read_column_ids: input.read_column_ids.clone(),
read_columns: input.read_cols,
read_column_types: vec![
metadata
.column_by_id(0)
@@ -2019,7 +1871,7 @@ mod tests {
let fingerprint = build_scan_fingerprint(&input).unwrap();
let expected = ScanRequestFingerprintBuilder {
read_column_ids: input.read_column_ids.clone(),
read_columns: input.read_cols,
read_column_types: vec![
metadata
.column_by_id(0)

View File

@@ -1368,7 +1368,7 @@ mod split_tests {
async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
let env = SchedulerEnv::new().await;
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
StreamContext {
@@ -1745,7 +1745,7 @@ mod tests {
) -> Arc<StreamContext> {
let env = SchedulerEnv::new().await;
let metadata = metadata_for_test();
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
let input = ScanInput::new(env.access_layer.clone(), mapper)
.with_cache(CacheStrategy::Disabled)
.with_memtables(memtables)

View File

@@ -207,7 +207,7 @@ impl FileRange {
self.file_handle().file_id().file_id(),
self.row_group_idx,
cache_strategy,
self.context.read_format().projection_indices(),
self.context.read_format().parquet_read_columns(),
flat_row_group_reader,
);
FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
@@ -655,14 +655,18 @@ mod tests {
use datafusion_expr::{col, lit};
use super::*;
use crate::read::read_columns::ReadColumns;
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata};
fn new_test_range_base(filters: Vec<SimpleFilterContext>) -> RangeBase {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
true,
@@ -705,7 +709,9 @@ mod tests {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
true,

View File

@@ -51,10 +51,12 @@ use crate::error::{
ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
NewRecordBatchSnafu, Result,
};
use crate::read::read_columns::ReadColumns;
use crate::sst::parquet::format::{
FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
PrimaryKeyReadFormat, StatValues, column_null_counts, column_values,
};
use crate::sst::parquet::read_columns::ParquetReadColumns;
use crate::sst::{
FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
to_flat_sst_arrow_schema, with_field_id,
@@ -162,7 +164,7 @@ impl FlatReadFormat {
/// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding.
pub fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
read_cols: ReadColumns,
num_columns: Option<usize>,
file_path: &str,
skip_auto_convert: bool,
@@ -178,16 +180,16 @@ impl FlatReadFormat {
// Only skip auto convert when the primary key encoding is sparse.
ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
metadata,
column_ids,
read_cols,
skip_auto_convert,
))
} else {
ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
metadata, column_ids, false,
metadata, read_cols, false,
))
}
} else {
ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols))
};
Ok(FlatReadFormat {
@@ -258,9 +260,10 @@ impl FlatReadFormat {
/// Gets the projected output schema produced by parquet reading.
pub(crate) fn output_arrow_schema(&self) -> Result<SchemaRef> {
let projection = self.parquet_read_columns().root_indices();
let schema = self
.arrow_schema()
.project(self.projection_indices())
.project(projection)
.context(ComputeArrowSnafu)?;
Ok(Arc::new(schema))
}
@@ -273,11 +276,11 @@ impl FlatReadFormat {
}
}
/// Gets sorted projection indices to read from the SST file.
pub(crate) fn projection_indices(&self) -> &[usize] {
/// Get the sorted read columns to read from the sst file.
pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
ParquetAdapter::Flat(p) => &p.format_projection.parquet_read_cols,
ParquetAdapter::PrimaryKeyToFlat(p) => p.format.parquet_read_columns(),
}
}
@@ -413,7 +416,7 @@ impl ParquetPrimaryKeyToFlat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
read_cols: ReadColumns,
skip_auto_convert: bool,
) -> ParquetPrimaryKeyToFlat {
assert!(if skip_auto_convert {
@@ -422,20 +425,18 @@ impl ParquetPrimaryKeyToFlat {
true
});
let column_ids: Vec<_> = column_ids.collect();
// Creates a map to lookup index based on the new format.
let id_to_index = sst_column_id_indices(&metadata);
let sst_column_num =
flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
let codec = build_primary_key_codec(&metadata);
let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
let format = PrimaryKeyReadFormat::new(metadata.clone(), read_cols.clone());
let (convert_format, format_projection) = if skip_auto_convert {
(
None,
FormatProjection {
projection_indices: format.projection_indices().to_vec(),
parquet_read_cols: format.parquet_read_columns().clone(),
column_id_to_projected_index: format.field_id_to_projected_index().clone(),
},
)
@@ -444,7 +445,7 @@ impl ParquetPrimaryKeyToFlat {
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
sst_column_num,
column_ids.iter().copied(),
read_cols.clone(),
);
(
FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
@@ -482,14 +483,14 @@ struct ParquetFlat {
impl ParquetFlat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> ParquetFlat {
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(&metadata);
let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let sst_column_num =
flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
let format_projection =
FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
FormatProjection::compute_format_projection(&id_to_index, sst_column_num, read_cols);
Self {
metadata,
@@ -789,7 +790,9 @@ impl FlatReadFormat {
pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
Self::new(
Arc::clone(&metadata),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
false,
@@ -810,6 +813,7 @@ mod tests {
use store_api::storage::RegionId;
use super::{FlatReadFormat, field_column_start};
use crate::read::read_columns::ReadColumns;
use crate::sst::{
FlatSchemaOptions, flat_sst_arrow_schema_column_num, to_flat_sst_arrow_schema,
};
@@ -892,7 +896,7 @@ mod tests {
let metadata = Arc::new(build_metadata(1, 2, PrimaryKeyEncoding::Dense));
let read_format = FlatReadFormat::new(
metadata.clone(),
[0_u32, 2_u32].into_iter(),
ReadColumns::from_deduped_column_ids([0_u32, 2_u32]),
None,
"test",
false,
@@ -900,9 +904,10 @@ mod tests {
.unwrap();
let output_schema = read_format.output_arrow_schema().unwrap();
let projection = read_format.parquet_read_columns().root_indices();
let expected = Arc::new(
to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default())
.project(read_format.projection_indices())
.project(projection)
.unwrap(),
);

View File

@@ -52,8 +52,10 @@ use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::read_columns::ReadColumns;
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns};
use crate::sst::to_sst_arrow_schema;
/// Arrow array type for the primary key dictionary.
@@ -212,7 +214,7 @@ pub struct PrimaryKeyReadFormat {
/// In SST schema, fields are stored in the front of the schema.
field_id_to_index: HashMap<ColumnId, usize>,
/// Indices of columns to read from the SST. It contains all internal columns.
projection_indices: Vec<usize>,
parquet_read_cols: ParquetReadColumns,
/// Field column id to their index in the projected schema (
/// the schema of [Batch]).
field_id_to_projected_index: HashMap<ColumnId, usize>,
@@ -222,10 +224,7 @@ pub struct PrimaryKeyReadFormat {
impl PrimaryKeyReadFormat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
pub fn new(
metadata: RegionMetadataRef,
column_ids: impl Iterator<Item = ColumnId>,
) -> PrimaryKeyReadFormat {
pub fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> PrimaryKeyReadFormat {
let field_id_to_index: HashMap<_, _> = metadata
.field_columns()
.enumerate()
@@ -236,14 +235,14 @@ impl PrimaryKeyReadFormat {
let format_projection = FormatProjection::compute_format_projection(
&field_id_to_index,
arrow_schema.fields.len(),
column_ids,
read_cols,
);
PrimaryKeyReadFormat {
metadata,
arrow_schema,
field_id_to_index,
projection_indices: format_projection.projection_indices,
parquet_read_cols: format_projection.parquet_read_cols,
field_id_to_projected_index: format_projection.column_id_to_projected_index,
primary_key_codec: None,
}
@@ -262,9 +261,8 @@ impl PrimaryKeyReadFormat {
&self.metadata
}
/// Gets sorted projection indices to read.
pub(crate) fn projection_indices(&self) -> &[usize] {
&self.projection_indices
pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
&self.parquet_read_cols
}
/// Gets the field id to projected index.
@@ -580,8 +578,8 @@ impl PrimaryKeyReadFormat {
/// Helper to compute the projection for the SST.
pub(crate) struct FormatProjection {
/// Indices of columns to read from the SST. It contains all internal columns.
pub(crate) projection_indices: Vec<usize>,
/// The columns to read from the SST. It contains all internal columns.
pub(crate) parquet_read_cols: ParquetReadColumns,
/// Column id to their index in the projected schema (
/// the schema after projection).
///
@@ -596,50 +594,91 @@ impl FormatProjection {
pub(crate) fn compute_format_projection(
id_to_index: &HashMap<ColumnId, usize>,
sst_column_num: usize,
column_ids: impl Iterator<Item = ColumnId>,
cols: ReadColumns,
) -> Self {
// Maps column id of a projected column to its index in SST.
// It also ignores columns not in the SST.
// [(column id, index in SST)]
let mut projected_schema: Vec<_> = column_ids
.filter_map(|column_id| {
let mut projected_columns: Vec<_> = cols
.cols
.into_iter()
.filter_map(|col| {
id_to_index
.get(&column_id)
.get(&col.column_id)
.copied()
.map(|index| (column_id, index))
.map(|index_of_sst| (col.column_id, index_of_sst, col.nested_paths))
})
.collect();
// Sorts columns by their indices in the SST. SST uses a bitmap for projection.
// This ensures the schema of `projected_schema` is the same as the batch returned from the SST.
projected_schema.sort_unstable_by_key(|x| x.1);
// Dedups the entries to avoid the case that `column_ids` has duplicated columns.
projected_schema.dedup_by_key(|x| x.1);
// Collects all projected indices.
// It contains the positions of all columns we need to read.
let mut projection_indices: Vec<_> = projected_schema
.iter()
.map(|(_column_id, index)| *index)
// We need to add all fixed position columns.
.chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
.collect();
projection_indices.sort_unstable();
// Removes duplications.
projection_indices.dedup();
// This ensures the schema of `projected_columns` is the same as the batch returned from the SST.
projected_columns.sort_unstable_by_key(|(_, index, _)| *index);
let mut parquet_read_cols: Vec<ParquetReadColumn> =
Vec::with_capacity(projected_columns.len() + FIXED_POS_COLUMN_NUM);
// Creates a map from column id to the index of that column in the projected record batch.
let column_id_to_projected_index = projected_schema
.into_iter()
.map(|(column_id, _)| column_id)
.enumerate()
.map(|(index, column_id)| (column_id, index))
.collect();
let mut column_id_to_projected_index = HashMap::with_capacity(projected_columns.len());
for (col_id, index_of_sst, nested_paths) in projected_columns {
Self::merge_or_push_parquet_column(&mut parquet_read_cols, index_of_sst, nested_paths);
column_id_to_projected_index
.entry(col_id)
.or_insert_with(|| parquet_read_cols.len() - 1);
}
// In SST schema, fixed-position columns are always in the tail:
// `time index, __primary_key, __sequence, __op_type`.
Self::append_time_index_if_needed(&mut parquet_read_cols, sst_column_num);
Self::append_fixed_internal_columns(&mut parquet_read_cols, sst_column_num);
Self {
projection_indices,
parquet_read_cols: ParquetReadColumns::from_deduped(parquet_read_cols),
column_id_to_projected_index,
}
}
fn merge_or_push_parquet_column(
parquet_read_cols: &mut Vec<ParquetReadColumn>,
index_of_sst: usize,
nested_paths: Vec<Vec<String>>,
) {
// `projected_columns` is sorted by parquet root index, so repeated reads
// for the same root column are always adjacent.
if let Some(last_col) = parquet_read_cols.last_mut()
&& last_col.root_index() == index_of_sst
{
last_col.merge_nested_paths(nested_paths);
return;
}
let parquet_col = ParquetReadColumn::new(index_of_sst).with_nested_paths(nested_paths);
parquet_read_cols.push(parquet_col);
}
fn append_time_index_if_needed(
parquet_read_cols: &mut Vec<ParquetReadColumn>,
sst_column_num: usize,
) {
let time_index = sst_column_num - FIXED_POS_COLUMN_NUM;
// Existing projected roots are already sorted by SST root index, and may
// already include the time index, so we compare against the last root to
// decide whether we still need to append `time index`.
let needs_time_index = parquet_read_cols
.last()
.map(|col| col.root_index() != time_index)
.unwrap_or(true);
if needs_time_index {
parquet_read_cols.push(ParquetReadColumn::new(time_index));
}
}
// Append internal columns in fixed order: `__primary_key`, `__sequence`,
// `__op_type`.
fn append_fixed_internal_columns(
parquet_read_cols: &mut Vec<ParquetReadColumn>,
sst_column_num: usize,
) {
for index in sst_column_num - INTERNAL_COLUMN_NUM..sst_column_num {
parquet_read_cols.push(ParquetReadColumn::new(index));
}
}
}
/// Values of column statistics of the SST.
@@ -671,7 +710,9 @@ impl PrimaryKeyReadFormat {
pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
Self::new(
Arc::clone(&metadata),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
)
}
}
@@ -929,17 +970,33 @@ mod tests {
fn test_projection_indices() {
let metadata = build_test_region_metadata();
// Only read tag1
let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [3].iter().copied());
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
let read_format =
PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([3]));
assert_eq!(
&[2, 3, 4, 5],
read_format.parquet_read_columns().root_indices()
);
// Only read field1
let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [4].iter().copied());
assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
let read_format =
PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([4]));
assert_eq!(
&[0, 2, 3, 4, 5],
read_format.parquet_read_columns().root_indices()
);
// Only read ts
let read_format = PrimaryKeyReadFormat::new(metadata.clone(), [5].iter().copied());
assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
let read_format =
PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([5]));
assert_eq!(
&[2, 3, 4, 5],
read_format.parquet_read_columns().root_indices()
);
// Read field0, tag0, ts
let read_format = PrimaryKeyReadFormat::new(metadata, [2, 1, 5].iter().copied());
assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
let read_format =
PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids([2, 1, 5]));
assert_eq!(
&[1, 2, 3, 4, 5],
read_format.parquet_read_columns().root_indices()
);
}
#[test]
@@ -985,7 +1042,8 @@ mod tests {
.iter()
.map(|col| col.column_id)
.collect();
let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
let read_format =
PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
assert_eq!(arrow_schema, *read_format.arrow_schema());
let record_batch = RecordBatch::new_empty(arrow_schema);
@@ -1004,7 +1062,8 @@ mod tests {
.iter()
.map(|col| col.column_id)
.collect();
let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
let read_format =
PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
@@ -1030,12 +1089,12 @@ mod tests {
#[test]
fn test_convert_record_batch_with_override_sequence() {
let metadata = build_test_region_metadata();
let column_ids: Vec<_> = metadata
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect();
let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
let read_format = PrimaryKeyReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
);
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
@@ -1202,27 +1261,60 @@ mod tests {
// The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
// Only read tag1 (column_id=3, index=1) + fixed columns
let read_format =
FlatReadFormat::new(metadata.clone(), [3].iter().copied(), None, "test", false)
.unwrap();
assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
let read_format = FlatReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids([3]),
None,
"test",
false,
)
.unwrap();
assert_eq!(
&[1, 4, 5, 6, 7],
read_format.parquet_read_columns().root_indices()
);
// Only read field1 (column_id=4, index=2) + fixed columns
let read_format =
FlatReadFormat::new(metadata.clone(), [4].iter().copied(), None, "test", false)
.unwrap();
assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
let read_format = FlatReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids([4]),
None,
"test",
false,
)
.unwrap();
assert_eq!(
&[2, 4, 5, 6, 7],
read_format.parquet_read_columns().root_indices()
);
// Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
let read_format =
FlatReadFormat::new(metadata.clone(), [5].iter().copied(), None, "test", false)
.unwrap();
assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
let read_format = FlatReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids([5]),
None,
"test",
false,
)
.unwrap();
assert_eq!(
&[4, 5, 6, 7],
read_format.parquet_read_columns().root_indices()
);
// Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
let read_format =
FlatReadFormat::new(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
let read_format = FlatReadFormat::new(
metadata,
ReadColumns::from_deduped_column_ids([2, 1, 5]),
None,
"test",
false,
)
.unwrap();
assert_eq!(
&[0, 3, 4, 5, 6, 7],
read_format.parquet_read_columns().root_indices()
);
}
#[test]
@@ -1230,7 +1322,7 @@ mod tests {
let metadata = build_test_region_metadata();
let mut format = FlatReadFormat::new(
metadata,
std::iter::once(1), // Just read tag0
ReadColumns::from_deduped_column_ids(std::iter::once(1)), // Just read tag0
Some(8),
"test",
false,
@@ -1447,7 +1539,7 @@ mod tests {
.collect();
let format = FlatReadFormat::new(
metadata.clone(),
column_ids.into_iter(),
ReadColumns::from_deduped_column_ids(column_ids),
Some(6),
"test",
false,
@@ -1513,7 +1605,7 @@ mod tests {
.collect();
let format = FlatReadFormat::new(
metadata.clone(),
column_ids.clone().into_iter(),
ReadColumns::from_deduped_column_ids(column_ids.clone()),
None,
"test",
false,
@@ -1581,9 +1673,14 @@ mod tests {
// Compare the actual result with the expected record batch
assert_eq!(expected_record_batch, result);
let format =
FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
.unwrap();
let format = FlatReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids(column_ids),
None,
"test",
true,
)
.unwrap();
// Test conversion with sparse encoding and skip convert.
let result = format.convert_batch(record_batch.clone(), None).unwrap();
assert_eq!(record_batch, result);

View File

@@ -474,7 +474,7 @@ impl PrefilterContextBuilder {
return None;
}
let total_count = read_format.projection_indices().len();
let total_count = read_format.parquet_read_columns().root_indices().len();
let remaining_count = total_count.saturating_sub(prefilter_count);
if pk_filters.is_none() && prefilter_count >= total_count {
return None;
@@ -734,6 +734,7 @@ mod tests {
use store_api::codec::PrimaryKeyEncoding;
use super::*;
use crate::read::read_columns::ReadColumns;
use crate::sst::internal_fields;
use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
use crate::test_util::sst_util::{
@@ -981,7 +982,9 @@ mod tests {
Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
false,
@@ -1017,7 +1020,9 @@ mod tests {
));
let legacy_read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"memtable",
false,
@@ -1044,7 +1049,9 @@ mod tests {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let raw_pk_read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"memtable",
true,
@@ -1078,7 +1085,9 @@ mod tests {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let full_read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
true,
@@ -1108,7 +1117,7 @@ mod tests {
let ts = metadata.time_index_column().column_id;
let projected_read_format = FlatReadFormat::new(
metadata.clone(),
[field_0, ts].into_iter(),
ReadColumns::from_deduped_column_ids([field_0, ts]),
None,
"test",
true,
@@ -1161,7 +1170,9 @@ mod tests {
Arc::new(sst_region_metadata_with_encoding(PrimaryKeyEncoding::Dense));
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
false,

View File

@@ -23,21 +23,46 @@ pub type ParquetNestedPath = Vec<String>;
/// The parquet columns to read.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParquetReadColumns {
/// Root parquet column indices in the same order as `cols`.
///
/// Most readers need these indices as a borrowed slice for Arrow schema
/// projection or parquet root-column projection. Keeping them here avoids
/// repeatedly collecting `cols.iter().map(|col| col.root_index)`.
root_indices: Vec<usize>,
cols: Vec<ParquetReadColumn>,
has_nested: bool,
}
impl ParquetReadColumns {
/// Builds parquet read columns from deduplicated, normalized input.
///
/// `cols` must not contain duplicate root indices, and nested paths must
/// already be merged. Empty `nested_paths` means reading the whole root column.
///
/// This constructor does not validate or merge input.
pub fn from_deduped(cols: Vec<ParquetReadColumn>) -> Self {
let has_nested = cols.iter().any(|col| !col.nested_paths.is_empty());
let root_indices = cols.iter().map(|col| col.root_index).collect();
Self {
root_indices,
cols,
has_nested,
}
}
/// Builds root-column projections from root indices that are already
/// deduplicated.
///
/// Note: this constructor does not check for duplicates.
pub fn from_deduped_root_indices(root_indices: impl IntoIterator<Item = usize>) -> Self {
let root_indices = root_indices.into_iter().collect::<Vec<_>>();
let cols = root_indices
.into_iter()
.iter()
.copied()
.map(ParquetReadColumn::new)
.collect();
Self {
root_indices,
cols,
has_nested: false,
}
@@ -52,7 +77,12 @@ impl ParquetReadColumns {
}
pub fn root_indices_iter(&self) -> impl Iterator<Item = usize> + '_ {
self.cols.iter().map(|col| col.root_index)
self.root_indices.iter().copied()
}
/// Returns root parquet column indices.
pub fn root_indices(&self) -> &[usize] {
&self.root_indices
}
}
@@ -95,6 +125,17 @@ impl ParquetReadColumn {
}
}
/// Merges additional nested paths into this root column.
pub fn merge_nested_paths(&mut self, nested_paths: Vec<ParquetNestedPath>) {
let reads_whole_root = self.nested_paths.is_empty() || nested_paths.is_empty();
if reads_whole_root {
// Empty nested paths means reading the whole root column.
self.nested_paths = vec![];
} else {
self.nested_paths.extend(nested_paths);
}
}
pub fn root_index(&self) -> usize {
self.root_index
}
@@ -235,13 +276,10 @@ mod tests {
fn test_reads_whole_root() {
let parquet_schema_desc = build_test_nested_parquet_schema();
let projection = ParquetReadColumns {
cols: vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![],
}],
has_nested: false,
};
let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![],
}]);
let (leaf_indices, matched_roots) =
build_parquet_leaves_indices(&parquet_schema_desc, &projection);
@@ -253,19 +291,16 @@ mod tests {
fn test_filters_nested_paths() {
let parquet_schema_desc = build_test_nested_parquet_schema();
let projection = ParquetReadColumns {
cols: vec![
ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
},
ParquetReadColumn {
root_index: 1,
nested_paths: vec![],
},
],
has_nested: true,
};
let projection = ParquetReadColumns::from_deduped(vec![
ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
},
ParquetReadColumn {
root_index: 1,
nested_paths: vec![],
},
]);
let (leaf_indices, matched_roots) =
build_parquet_leaves_indices(&parquet_schema_desc, &projection);
@@ -277,13 +312,10 @@ mod tests {
fn test_reads_middle_level_path() {
let parquet_schema_desc = build_test_nested_parquet_schema();
let projection = ParquetReadColumns {
cols: vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
}],
has_nested: true,
};
let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "b".to_string()]],
}]);
let (leaf_indices, matched_roots) =
build_parquet_leaves_indices(&parquet_schema_desc, &projection);
@@ -295,13 +327,10 @@ mod tests {
fn test_reads_leaf_level_path() {
let parquet_schema_desc = build_test_nested_parquet_schema();
let projection = ParquetReadColumns {
cols: vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]],
}],
has_nested: true,
};
let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "b".to_string(), "c".to_string()]],
}]);
let (leaf_indices, matched_roots) =
build_parquet_leaves_indices(&parquet_schema_desc, &projection);
@@ -313,19 +342,16 @@ mod tests {
fn test_build_projection_mask_with_unmatched_roots() {
let parquet_schema_desc = build_test_nested_parquet_schema();
let projection = ParquetReadColumns {
cols: vec![
ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "missing".to_string()]],
},
ParquetReadColumn {
root_index: 1,
nested_paths: vec![],
},
],
has_nested: true,
};
let projection = ParquetReadColumns::from_deduped(vec![
ParquetReadColumn {
root_index: 0,
nested_paths: vec![vec!["j".to_string(), "missing".to_string()]],
},
ParquetReadColumn {
root_index: 1,
nested_paths: vec![],
},
]);
let plan = build_projection_plan(&projection, &parquet_schema_desc);
@@ -340,16 +366,13 @@ mod tests {
fn test_merges_mixed_paths() {
let parquet_schema_desc = build_test_nested_parquet_schema();
let projection = ParquetReadColumns {
cols: vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![
vec!["j".to_string(), "a".to_string()],
vec!["j".to_string(), "b".to_string(), "d".to_string()],
],
}],
has_nested: true,
};
let projection = ParquetReadColumns::from_deduped(vec![ParquetReadColumn {
root_index: 0,
nested_paths: vec![
vec!["j".to_string(), "a".to_string()],
vec!["j".to_string(), "b".to_string(), "d".to_string()],
],
}]);
let (leaf_indices, matched_roots) =
build_parquet_leaves_indices(&parquet_schema_desc, &projection);
@@ -357,6 +380,32 @@ mod tests {
assert_eq!(HashSet::from([0]), matched_roots);
}
#[test]
fn test_merge_nested_paths_extends_paths() {
let mut col = ParquetReadColumn::new(0)
.with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]);
col.merge_nested_paths(vec![vec!["j".to_string(), "b".to_string()]]);
assert_eq!(
&[
vec!["j".to_string(), "a".to_string()],
vec!["j".to_string(), "b".to_string()],
],
col.nested_paths()
);
}
#[test]
fn test_merge_nested_paths_with_whole_root() {
let mut col = ParquetReadColumn::new(0)
.with_nested_paths(vec![vec!["j".to_string(), "a".to_string()]]);
col.merge_nested_paths(vec![]);
assert!(col.nested_paths().is_empty());
}
// Test schema:
// schema
// |- j

View File

@@ -48,6 +48,7 @@ use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId};
use table::predicate::Predicate;
use self::stream::{NestedSchemaAligner, ParquetErrorAdapter, ProjectedRecordBatchStream};
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::{CacheStrategy, CachedSstMeta};
#[cfg(feature = "vector_index")]
@@ -59,6 +60,7 @@ use crate::metrics::{
};
use crate::read::flat_projection::CompactionProjectionMapper;
use crate::read::prune::FlatPruneReader;
use crate::read::read_columns::ReadColumns;
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::{
BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
@@ -82,10 +84,7 @@ use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::prefilter::{
PrefilterContextBuilder, build_reader_filter_plan, execute_prefilter,
};
use crate::sst::parquet::read_columns::{
ParquetReadColumns, ProjectionMaskPlan, build_projection_plan,
};
use crate::sst::parquet::reader::stream::{ParquetErrorAdapter, ProjectedRecordBatchStream};
use crate::sst::parquet::read_columns::{ProjectionMaskPlan, build_projection_plan};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
@@ -127,11 +126,11 @@ pub struct ParquetReaderBuilder {
object_store: ObjectStore,
/// Predicate to push down.
predicate: Option<Predicate>,
/// Metadata of columns to read.
/// The columns to read.
///
/// `None` reads all columns. Due to schema change, the projection
/// can contain columns not in the parquet file.
projection: Option<Vec<ColumnId>>,
read_cols: Option<ReadColumns>,
/// Strategy to cache SST data.
cache_strategy: CacheStrategy,
/// Index appliers.
@@ -171,7 +170,7 @@ impl ParquetReaderBuilder {
file_handle,
object_store,
predicate: None,
projection: None,
read_cols: None,
cache_strategy: CacheStrategy::Disabled,
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
@@ -199,8 +198,8 @@ impl ParquetReaderBuilder {
///
/// The reader only applies the projection to fields.
#[must_use]
pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
self.projection = projection;
pub fn projection(mut self, read_cols: Option<ReadColumns>) -> ParquetReaderBuilder {
self.read_cols = read_cols;
self
}
@@ -377,30 +376,25 @@ impl ParquetReaderBuilder {
None
};
let mut read_format = if let Some(column_ids) = &self.projection {
FlatReadFormat::new(
region_meta.clone(),
column_ids.iter().copied(),
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
skip_auto_convert,
)?
let read_cols = if let Some(read_cols) = &self.read_cols {
read_cols.clone()
} else {
// Lists all column ids to read, we always use the expected metadata if possible.
let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
let column_ids: Vec<_> = expected_meta
.column_metadatas
.iter()
.map(|col| col.column_id)
.collect();
FlatReadFormat::new(
region_meta.clone(),
column_ids.iter().copied(),
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
skip_auto_convert,
)?
// Lists all column ids to read, we always use the expected metadata if possible.
ReadColumns::from_deduped_column_ids(
expected_meta
.column_metadatas
.iter()
.map(|col| col.column_id),
)
};
let mut read_format = FlatReadFormat::new(
region_meta.clone(),
read_cols,
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
&file_path,
skip_auto_convert,
)?;
if need_override_sequence(&parquet_meta) {
read_format
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
@@ -408,12 +402,8 @@ impl ParquetReaderBuilder {
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
let parquet_read_cols = ParquetReadColumns::from_deduped_root_indices(
read_format.projection_indices().iter().copied(),
);
let projection_plan = build_projection_plan(&parquet_read_cols, parquet_schema_desc);
let parquet_read_cols = read_format.parquet_read_columns();
let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
let selection = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
@@ -479,6 +469,7 @@ impl ParquetReaderBuilder {
output_schema,
object_store: self.object_store.clone(),
projection: projection_plan,
has_nested_projection: parquet_read_cols.has_nested(),
cache_strategy: self.cache_strategy.clone(),
prefilter_builder: filter_plan.prefilter_builder,
};
@@ -1627,6 +1618,8 @@ pub(crate) struct RowGroupReaderBuilder {
object_store: ObjectStore,
/// Projection mask.
projection: ProjectionMaskPlan,
/// Whether projected read columns include nested paths.
has_nested_projection: bool,
/// Cache.
cache_strategy: CacheStrategy,
/// Pre-built prefilter state. `None` if prefiltering is not applicable.
@@ -1730,11 +1723,16 @@ impl RowGroupReaderBuilder {
stream: ParquetRecordBatchStream<SstAsyncFileReader>,
) -> Result<ProjectedRecordBatchStream> {
let stream = ParquetErrorAdapter::new(stream, self.file_path.clone());
ProjectedRecordBatchStream::new(
if !self.has_nested_projection {
return Ok(stream.boxed());
}
Ok(NestedSchemaAligner::new(
stream,
self.projection.projected_root_presence.clone(),
self.output_schema.clone(),
)
)?
.boxed())
}
/// Builds a [ParquetRecordBatchStream] with a custom projection mask.
@@ -2222,10 +2220,12 @@ mod tests {
let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format = FlatReadFormat::new(
region_metadata.clone(),
region_metadata
.column_metadatas
.iter()
.map(|column| column.column_id),
ReadColumns::from_deduped_column_ids(
region_metadata
.column_metadatas
.iter()
.map(|column| column.column_id),
),
None,
&file_path,
false,
@@ -2328,7 +2328,9 @@ mod tests {
let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
true,
@@ -2350,7 +2352,9 @@ mod tests {
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
let read_format = FlatReadFormat::new(
metadata.clone(),
metadata.column_metadatas.iter().map(|c| c.column_id),
ReadColumns::from_deduped_column_ids(
metadata.column_metadatas.iter().map(|c| c.column_id),
),
None,
"test",
true,

View File

@@ -15,75 +15,95 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use datatypes::arrow::array::new_null_array;
use datatypes::arrow::datatypes::SchemaRef;
use datafusion_common::cast_column;
use datafusion_common::format::DEFAULT_CAST_OPTIONS;
use datatypes::arrow::array::{ArrayRef, new_null_array};
use datatypes::arrow::datatypes::{DataType, FieldRef, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use futures::Stream;
use futures::stream::BoxStream;
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use snafu::{IntoError, ResultExt, ensure};
use crate::error::{NewRecordBatchSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
use crate::error::{
CastColumnSnafu, NewRecordBatchSnafu, ReadParquetSnafu, Result, UnexpectedSnafu,
};
use crate::sst::parquet::async_reader::SstAsyncFileReader;
/// Wraps a parquet record batch stream and fills missing projected root columns.
/// Aligns projected batches to the expected output schema for nested projections.
///
/// Background
/// ----------
/// Nested projection may ask parquet to read leaves under a root column. If none
/// of the requested leaves exists in the current parquet file, parquet decoding
/// omits the whole root from the physical `RecordBatch`. The logical projection
/// still contains that root, so this wrapper restores the output shape by
/// inserting a root-level null array.
pub struct MissingColFiller<S> {
/// Inner stream that yields record batches from parquet reader.
/// omits the whole root from the physical [`RecordBatch`].
///
/// In addition, after nested-path filtering, returned struct arrays may contain
/// only a subset of fields. The current output schema is not pruned by nested
/// paths, so physical struct fields can be a subset of the expected struct
/// fields, and their nested schema can differ from the expected output schema.
///
/// To keep projected batches schema-consistent before entering upper readers:
/// - Root-column presence alignment restores missing projected root columns by
/// inserting root-level null arrays.
/// - Nested struct alignment aligns struct arrays to the expected nested field
/// layout.
pub struct NestedSchemaAligner<S> {
inner: S,
/// Output schema expected by the upper reader.
output_schema: SchemaRef,
/// Whether each projected root exists in the physical batch returned by parquet.
projected_root_matches: Vec<bool>,
/// Whether each projected root exists in the physical batch returned by
/// parquet.
projected_root_presence: Vec<bool>,
/// Number of columns expected from the physical batch returned by parquet.
expected_input_col_num: usize,
/// Whether all projected roots are present and the stream can pass batches through.
all_matched: bool,
/// Whether all projected roots are present and the stream can pass batches
/// through.
all_roots_present: bool,
/// The cache for whether incoming batches already match output schema.
is_schema_matched: Option<bool>,
}
pub(crate) type ProjectedRecordBatchStream = MissingColFiller<ParquetErrorAdapter>;
pub(crate) type ProjectedRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
impl<S> MissingColFiller<S>
impl<S> NestedSchemaAligner<S>
where
S: Stream<Item = Result<RecordBatch>>,
{
pub fn new(
inner: S,
projected_root_matches: Vec<bool>,
projected_root_presence: Vec<bool>,
output_schema: SchemaRef,
) -> Result<MissingColFiller<S>> {
) -> Result<NestedSchemaAligner<S>> {
ensure!(
projected_root_matches.len() == output_schema.fields().len(),
projected_root_presence.len() == output_schema.fields().len(),
UnexpectedSnafu {
reason: format!(
"MissingColFiller projected root matches len {} does not match output schema columns {}",
projected_root_matches.len(),
"NestedSchemaAligner projected root presence len {} does not match output schema columns {}",
projected_root_presence.len(),
output_schema.fields().len()
),
}
);
let expected_input_col_num = projected_root_matches
let expected_input_col_num = projected_root_presence
.iter()
.filter(|matched| **matched)
.count();
let all_matched = projected_root_matches.iter().all(|&m| m);
let all_roots_present = projected_root_presence.iter().all(|&m| m);
Ok(MissingColFiller {
Ok(NestedSchemaAligner {
inner,
output_schema,
projected_root_matches,
projected_root_presence,
expected_input_col_num,
all_matched,
all_roots_present,
is_schema_matched: None,
})
}
}
impl<S> Stream for MissingColFiller<S>
impl<S> Stream for NestedSchemaAligner<S>
where
S: Stream<Item = Result<RecordBatch>> + Unpin,
{
@@ -94,18 +114,26 @@ where
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(Ok(rb))) => {
let output_schema = &this.output_schema;
let rb = if this.all_matched {
let rb = if this.all_roots_present {
rb
} else {
fill_missing_cols(
rb,
output_schema,
&this.projected_root_matches,
&this.output_schema,
&this.projected_root_presence,
this.expected_input_col_num,
)?
};
Poll::Ready(Some(Ok(rb)))
let is_schema_matched = *this
.is_schema_matched
.get_or_insert_with(|| rb.schema() == this.output_schema);
if is_schema_matched {
Poll::Ready(Some(Ok(rb)))
} else {
Poll::Ready(Some(align_batch_to_schema(rb, &this.output_schema)))
}
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
@@ -124,7 +152,7 @@ fn fill_missing_cols(
rb.columns().len() == expected_input_col_num,
UnexpectedSnafu {
reason: format!(
"MissingColFiller expected {} input columns but got {}",
"NestedSchemaAligner expected {} input columns but got {}",
expected_input_col_num,
rb.columns().len()
),
@@ -146,6 +174,40 @@ fn fill_missing_cols(
RecordBatch::try_new(output_schema.clone(), cols).context(NewRecordBatchSnafu)
}
fn align_batch_to_schema(rb: RecordBatch, output_schema: &SchemaRef) -> Result<RecordBatch> {
ensure!(
rb.num_columns() == output_schema.fields().len(),
UnexpectedSnafu {
reason: format!(
"NestedSchemaAligner expected {} columns but got {}",
output_schema.fields().len(),
rb.num_columns()
),
}
);
let columns = rb
.columns()
.iter()
.zip(output_schema.fields())
.map(|(array, field)| align_array(array, field))
.collect::<Result<Vec<_>>>()?;
RecordBatch::try_new(output_schema.clone(), columns).context(NewRecordBatchSnafu)
}
fn align_array(array: &ArrayRef, field: &FieldRef) -> Result<ArrayRef> {
if array.data_type() == field.data_type() {
return Ok(array.clone());
}
if !matches!(field.data_type(), DataType::Struct(_)) {
return Ok(array.clone());
}
cast_column(array, field.as_ref(), &DEFAULT_CAST_OPTIONS).context(CastColumnSnafu)
}
/// Maps parquet stream errors into mito errors before batches enter the filler.
pub(crate) struct ParquetErrorAdapter {
inner: ParquetRecordBatchStream<SstAsyncFileReader>,
@@ -181,14 +243,14 @@ impl Stream for ParquetErrorAdapter {
mod tests {
use std::sync::Arc;
use datatypes::arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use datatypes::arrow::array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
use datatypes::arrow::datatypes::{DataType, Field, Fields, Schema};
use futures::{StreamExt, stream};
use super::*;
#[tokio::test]
async fn test_filler_with_all_projected_roots_match() {
async fn test_aligner_with_all_projected_roots_match() {
let output_schema = schema([
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Utf8, true),
@@ -200,16 +262,16 @@ mod tests {
.unwrap();
let stream = stream::iter([Ok(input.clone())]);
let mut filler =
MissingColFiller::new(stream, vec![true, true], output_schema.clone()).unwrap();
let output = filler.next().await.unwrap().unwrap();
let mut aligner =
NestedSchemaAligner::new(stream, vec![true, true], output_schema.clone()).unwrap();
let output = aligner.next().await.unwrap().unwrap();
assert_eq!(input, output);
assert!(filler.next().await.is_none());
assert!(aligner.next().await.is_none());
}
#[tokio::test]
async fn test_filler_with_fills_null_root_columns() {
async fn test_aligner_with_fills_null_root_columns() {
let input_schema = schema([Field::new("a", DataType::Int64, true)]);
let output_schema = schema([
Field::new("a", DataType::Int64, true),
@@ -219,9 +281,10 @@ mod tests {
let input = RecordBatch::try_new(input_schema, vec![int_array([10, 20])]).unwrap();
let stream = stream::iter([Ok(input)]);
let mut filler =
MissingColFiller::new(stream, vec![true, false, false], output_schema.clone()).unwrap();
let output = filler.next().await.unwrap().unwrap();
let mut aligner =
NestedSchemaAligner::new(stream, vec![true, false, false], output_schema.clone())
.unwrap();
let output = aligner.next().await.unwrap().unwrap();
assert_eq!(output_schema, output.schema());
assert_eq!(3, output.num_columns());
@@ -243,7 +306,7 @@ mod tests {
}
#[tokio::test]
async fn test_filler_with_fills_missing_struct_root_column() {
async fn test_aligner_with_fills_missing_struct_root_column() {
let input_schema = schema([Field::new("a", DataType::Int64, true)]);
let struct_type = DataType::Struct(Fields::from(vec![
Field::new("x", DataType::Int64, true),
@@ -256,9 +319,9 @@ mod tests {
let input = RecordBatch::try_new(input_schema, vec![int_array([10, 20])]).unwrap();
let stream = stream::iter([Ok(input)]);
let mut filler =
MissingColFiller::new(stream, vec![true, false], output_schema.clone()).unwrap();
let output = filler.next().await.unwrap().unwrap();
let mut aligner =
NestedSchemaAligner::new(stream, vec![true, false], output_schema.clone()).unwrap();
let output = aligner.next().await.unwrap().unwrap();
assert_eq!(output_schema, output.schema());
assert_eq!(2, output.num_columns());
@@ -267,20 +330,23 @@ mod tests {
}
#[tokio::test]
async fn test_filler_with_reject_projection_len_mismatch() {
async fn test_aligner_reject_projection_len_mismatch() {
let output_schema = schema([Field::new("a", DataType::Int64, true)]);
let stream = stream::iter([]);
let err = match MissingColFiller::new(stream, vec![true, false], output_schema) {
Ok(_) => panic!("MissingColFiller should reject projection length mismatch"),
let err = match NestedSchemaAligner::new(stream, vec![true, false], output_schema) {
Ok(_) => panic!("NestedSchemaAligner should reject projection length mismatch"),
Err(err) => err,
};
assert!(err.to_string().contains("projected root matches len 2"));
assert!(
err.to_string()
.contains("projected root presence len 2 does not match output schema columns 1")
);
}
#[tokio::test]
async fn test_filler_reject_with_input_column_mismatch() {
async fn test_aligner_reject_with_input_column_mismatch() {
let input_schema = schema([Field::new("a", DataType::Int64, true)]);
let output_schema = schema([
Field::new("a", DataType::Int64, true),
@@ -290,9 +356,9 @@ mod tests {
let input = RecordBatch::try_new(input_schema, vec![int_array([1, 2])]).unwrap();
let stream = stream::iter([Ok(input)]);
let mut filler =
MissingColFiller::new(stream, vec![true, true, false], output_schema).unwrap();
let err = filler.next().await.unwrap().unwrap_err();
let mut aligner =
NestedSchemaAligner::new(stream, vec![true, true, false], output_schema).unwrap();
let err = aligner.next().await.unwrap().unwrap_err();
assert!(
err.to_string()
@@ -300,6 +366,44 @@ mod tests {
);
}
#[tokio::test]
async fn test_nested_schema_aligner_aligns_struct_field() {
let output_schema = schema([Field::new(
"nested",
DataType::Struct(Fields::from(vec![
Field::new("x", DataType::Int64, true),
Field::new("y", DataType::Utf8, true),
])),
true,
)]);
let input = RecordBatch::try_new(
schema([Field::new(
"nested",
DataType::Struct(Fields::from(vec![Field::new("x", DataType::Int64, true)])),
true,
)]),
vec![Arc::new(StructArray::from(vec![(
Arc::new(Field::new("x", DataType::Int64, true)),
int_array([1, 2]),
)]))],
)
.unwrap();
let mut aligner =
NestedSchemaAligner::new(stream::iter([Ok(input)]), vec![true], output_schema.clone())
.unwrap();
let output = aligner.next().await.unwrap().unwrap();
assert_eq!(output_schema, output.schema());
let nested = output
.column(0)
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
assert_eq!(2, nested.columns().len());
assert_eq!(2, nested.column(1).null_count());
}
fn schema(fields: impl IntoIterator<Item = Field>) -> SchemaRef {
Arc::new(Schema::new(fields.into_iter().collect::<Vec<_>>()))
}