feat: Projection mapper for flat schema (#6679)

* feat: plain projection mapper wip

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: change ProjectionMapper to enum

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: convert plain batch

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add tests for the mapper

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: allow dead code

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: format code

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler errors

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: change PlainProjectionMapper to FlatProjectionMapper

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix projection tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: Address  comments

Removes some unwrap()

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fmt

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address comment

as_plain -> as_flat

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-12 15:36:23 +08:00
committed by GitHub
parent 5cec0d4e3a
commit 1977ae50ee
10 changed files with 600 additions and 58 deletions

View File

@@ -638,15 +638,18 @@ struct CompactionSstReaderBuilder<'a> {
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode);
let mut scan_input = ScanInput::new(
self.sst_layer,
ProjectionMapper::all(&self.metadata, false)?,
)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.

View File

@@ -18,6 +18,7 @@ pub mod compat;
pub mod dedup;
pub mod flat_dedup;
pub mod flat_merge;
pub mod flat_projection;
pub mod last_row;
pub mod merge;
pub mod plain_batch;

View File

@@ -28,8 +28,10 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::error::{
CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result, UnexpectedSnafu,
};
use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
use crate::read::{Batch, BatchColumn, BatchReader};
/// Reader to adapt schema of underlying reader to expected schema.
@@ -87,6 +89,9 @@ impl CompatBatch {
pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
reason: "Unexpected format",
})?;
let compat_fields = may_compat_fields(mapper, &reader_meta)?;
Ok(Self {
@@ -313,7 +318,7 @@ fn may_compat_primary_key(
/// Creates a [CompatFields] if needed.
fn may_compat_fields(
mapper: &ProjectionMapper,
mapper: &PrimaryKeyProjectionMapper,
actual: &RegionMetadata,
) -> Result<Option<CompatFields>> {
let expect_fields = mapper.batch_fields();
@@ -675,7 +680,7 @@ mod tests {
],
&[1],
));
let mapper = ProjectionMapper::all(&reader_meta).unwrap();
let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
}
@@ -707,7 +712,7 @@ mod tests {
],
&[1, 3],
));
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
@@ -756,7 +761,7 @@ mod tests {
],
&[1],
));
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
@@ -801,7 +806,7 @@ mod tests {
],
&[1],
));
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
@@ -858,7 +863,7 @@ mod tests {
&[1],
));
// tag_1, field_2, field_3
let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap();
let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
@@ -871,7 +876,7 @@ mod tests {
.await;
// tag_1, field_4, field_3
let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap();
let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
@@ -916,7 +921,7 @@ mod tests {
expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
let expect_meta = Arc::new(expect_meta);
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[

View File

@@ -0,0 +1,221 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for projection on flat format.
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{InvalidRequestSnafu, Result};
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
/// Handles projection and converts batches in flat format with correct schema.
///
/// This mapper support duplicate and unsorted projection indices.
/// The output schema is determined by the projection indices.
#[allow(dead_code)]
pub struct FlatProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Schema for converted [RecordBatch] to return.
output_schema: SchemaRef,
/// Ids of columns to project. It keeps ids in the same order as the `projection`
/// indices to build the mapper.
/// The mapper won't deduplicate the column ids.
column_ids: Vec<ColumnId>,
/// Ids and DataTypes of columns of the expected batch.
/// We can use this to check if the batch is compatible with the expected schema.
batch_schema: Vec<(ColumnId, ConcreteDataType)>,
/// `true` If the original projection is empty.
is_empty_projection: bool,
/// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
batch_indices: Vec<usize>,
}
impl FlatProjectionMapper {
/// Returns a new mapper with projection.
/// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
/// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
/// empty `RecordBatch` and only use its row count in this query.
pub fn new(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
) -> Result<Self> {
let mut projection: Vec<_> = projection.collect();
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
if is_empty_projection {
// If the projection is empty, we still read the time index column.
projection.push(metadata.time_index_column_pos());
}
// Output column schemas for the projection.
let mut column_schemas = Vec::with_capacity(projection.len());
// Column ids of the projection without deduplication.
let mut column_ids = Vec::with_capacity(projection.len());
for idx in &projection {
// For each projection index, we get the column id for projection.
let column = metadata
.column_metadatas
.get(*idx)
.context(InvalidRequestSnafu {
region_id: metadata.region_id,
reason: format!("projection index {} is out of bound", idx),
})?;
column_ids.push(column.column_id);
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
}
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(FlatProjectionMapper {
metadata: metadata.clone(),
output_schema: Arc::new(Schema::new(vec![])),
column_ids,
batch_schema: vec![],
is_empty_projection,
batch_indices: vec![],
});
}
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(metadata);
// TODO(yingwen): Support different flat schema options.
let format_projection = FormatProjection::compute_format_projection(
&id_to_index,
// All columns with internal columns.
metadata.column_metadatas.len() + 3,
column_ids.iter().copied(),
);
let batch_indices: Vec<_> = column_ids
.iter()
.map(|id| {
// Safety: The map is computed from `projection` itself.
format_projection
.column_id_to_projected_index
.get(id)
.copied()
.unwrap()
})
.collect();
let batch_schema = plain_projected_columns(metadata, &format_projection);
Ok(FlatProjectionMapper {
metadata: metadata.clone(),
output_schema,
column_ids,
batch_schema,
is_empty_projection,
batch_indices,
})
}
/// Returns a new mapper without projection.
pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
FlatProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
}
/// Returns the metadata that created the mapper.
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
&self.metadata
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
&self.column_ids
}
/// Returns ids of columns of the batch that the mapper expects to convert.
#[allow(dead_code)]
pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
&self.batch_schema
}
/// Returns the schema of converted [RecordBatch].
/// This is the schema that the stream will output. This schema may contain
/// less columns than [ProjectionMapper::column_ids()].
pub(crate) fn output_schema(&self) -> SchemaRef {
self.output_schema.clone()
}
/// Returns an empty [RecordBatch].
pub(crate) fn empty_record_batch(&self) -> RecordBatch {
RecordBatch::new_empty(self.output_schema.clone())
}
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
#[allow(dead_code)]
pub(crate) fn convert(
&self,
batch: &datatypes::arrow::record_batch::RecordBatch,
) -> common_recordbatch::error::Result<RecordBatch> {
if self.is_empty_projection {
return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
}
let mut columns = Vec::with_capacity(self.output_schema.num_columns());
for index in &self.batch_indices {
let array = batch.column(*index).clone();
let vector = Helper::try_into_vector(array)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
columns.push(vector);
}
RecordBatch::new(self.output_schema.clone(), columns)
}
}
/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
pub(crate) fn plain_projected_columns(
metadata: &RegionMetadata,
format_projection: &FormatProjection,
) -> Vec<(ColumnId, ConcreteDataType)> {
let mut schema = vec![None; format_projection.column_id_to_projected_index.len()];
for (column_id, index) in &format_projection.column_id_to_projected_index {
// Safety: FormatProjection ensures the id is valid.
schema[*index] = Some((
*column_id,
metadata
.column_by_id(*column_id)
.unwrap()
.column_schema
.data_type
.clone(),
));
}
// Safety: FormatProjection ensures all indices can be unwrapped.
schema.into_iter().map(|id_type| id_type.unwrap()).collect()
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for projection.
//! Utilities for projection operations.
use std::cmp::Ordering;
use std::collections::HashMap;
@@ -33,13 +33,102 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::Batch;
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
/// Wrapper enum for different projection mapper implementations.
pub enum ProjectionMapper {
/// Projection mapper for primary key format.
PrimaryKey(PrimaryKeyProjectionMapper),
/// Projection mapper for flat format.
Flat(FlatProjectionMapper),
}
impl ProjectionMapper {
/// Returns a new mapper with projection.
pub fn new(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize> + Clone,
flat_format: bool,
) -> Result<Self> {
if flat_format {
Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
metadata, projection,
)?))
} else {
Ok(ProjectionMapper::PrimaryKey(
PrimaryKeyProjectionMapper::new(metadata, projection)?,
))
}
}
/// Returns a new mapper without projection.
pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
if flat_format {
Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
} else {
Ok(ProjectionMapper::PrimaryKey(
PrimaryKeyProjectionMapper::all(metadata)?,
))
}
}
/// Returns the metadata that created the mapper.
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
match self {
ProjectionMapper::PrimaryKey(m) => m.metadata(),
ProjectionMapper::Flat(m) => m.metadata(),
}
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
match self {
ProjectionMapper::PrimaryKey(m) => m.column_ids(),
ProjectionMapper::Flat(m) => m.column_ids(),
}
}
/// Returns the schema of converted [RecordBatch].
pub(crate) fn output_schema(&self) -> SchemaRef {
match self {
ProjectionMapper::PrimaryKey(m) => m.output_schema(),
ProjectionMapper::Flat(m) => m.output_schema(),
}
}
/// Returns the primary key projection mapper or None if this is not a primary key mapper.
pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
match self {
ProjectionMapper::PrimaryKey(m) => Some(m),
ProjectionMapper::Flat(_) => None,
}
}
/// Returns the flat projection mapper or None if this is not a flat mapper.
pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
match self {
ProjectionMapper::PrimaryKey(_) => None,
ProjectionMapper::Flat(m) => Some(m),
}
}
/// Returns an empty [RecordBatch].
// TODO(yingwen): This is unused now. Use it after we finishing the flat format.
pub fn empty_record_batch(&self) -> RecordBatch {
match self {
ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
ProjectionMapper::Flat(m) => m.empty_record_batch(),
}
}
}
/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
pub struct ProjectionMapper {
pub struct PrimaryKeyProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Maps column in [RecordBatch] to index in [Batch].
@@ -59,7 +148,7 @@ pub struct ProjectionMapper {
is_empty_projection: bool,
}
impl ProjectionMapper {
impl PrimaryKeyProjectionMapper {
/// Returns a new mapper with projection.
/// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
/// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
@@ -67,7 +156,7 @@ impl ProjectionMapper {
pub fn new(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
) -> Result<ProjectionMapper> {
) -> Result<PrimaryKeyProjectionMapper> {
let mut projection: Vec<_> = projection.collect();
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
@@ -96,7 +185,7 @@ impl ProjectionMapper {
let codec = build_primary_key_codec(metadata);
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(ProjectionMapper {
return Ok(PrimaryKeyProjectionMapper {
metadata: metadata.clone(),
batch_indices: vec![],
has_tags: false,
@@ -146,7 +235,7 @@ impl ProjectionMapper {
batch_indices.push(batch_index);
}
Ok(ProjectionMapper {
Ok(PrimaryKeyProjectionMapper {
metadata: metadata.clone(),
batch_indices,
has_tags,
@@ -159,8 +248,8 @@ impl ProjectionMapper {
}
/// Returns a new mapper without projection.
pub fn all(metadata: &RegionMetadataRef) -> Result<ProjectionMapper> {
ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
}
/// Returns the metadata that created the mapper.
@@ -181,7 +270,7 @@ impl ProjectionMapper {
/// Returns the schema of converted [RecordBatch].
/// This is the schema that the stream will output. This schema may contain
/// less columns than [ProjectionMapper::column_ids()].
/// less columns than [PrimaryKeyProjectionMapper::column_ids()].
pub(crate) fn output_schema(&self) -> SchemaRef {
self.output_schema.clone()
}
@@ -318,10 +407,14 @@ mod tests {
use api::v1::OpType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::datatypes::Field;
use datatypes::arrow::util::pretty;
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use mito_codec::test_util::TestRegionMetadataBuilder;
use store_api::storage::consts::{
OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
use super::*;
use crate::cache::CacheManager;
@@ -386,21 +479,26 @@ mod tests {
.num_fields(2)
.build(),
);
let mapper = ProjectionMapper::all(&metadata).unwrap();
// Create the enum wrapper with default format (primary key)
let mapper = ProjectionMapper::all(&metadata, false).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!(
[
(3, ConcreteDataType::int64_datatype()),
(4, ConcreteDataType::int64_datatype())
],
mapper.batch_fields()
mapper.as_primary_key().unwrap().batch_fields()
);
// With vector cache.
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.convert(&batch, &cache).unwrap();
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
@@ -420,7 +518,11 @@ mod tests {
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
.is_none());
let record_batch = mapper.convert(&batch, &cache).unwrap();
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}
@@ -433,17 +535,21 @@ mod tests {
.build(),
);
// Columns v1, k0
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
assert_eq!([4, 1], mapper.column_ids());
assert_eq!(
[(4, ConcreteDataType::int64_datatype())],
mapper.batch_fields()
mapper.as_primary_key().unwrap().batch_fields()
);
let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = mapper.convert(&batch, &cache).unwrap();
let record_batch = mapper
.as_primary_key()
.unwrap()
.convert(&batch, &cache)
.unwrap();
let expect = "\
+----+----+
| v1 | k0 |
@@ -464,18 +570,213 @@ mod tests {
.build(),
);
// Empty projection
let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
assert!(mapper.batch_fields().is_empty());
assert!(!mapper.has_tags);
assert!(mapper.batch_indices.is_empty());
assert!(mapper.output_schema().is_empty());
assert!(mapper.is_empty_projection);
let pk_mapper = mapper.as_primary_key().unwrap();
assert!(pk_mapper.batch_fields().is_empty());
assert!(!pk_mapper.has_tags);
assert!(pk_mapper.batch_indices.is_empty());
assert!(pk_mapper.is_empty_projection);
let batch = new_batch(0, &[1, 2], &[], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = mapper.convert(&batch, &cache).unwrap();
let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
assert_eq!(3, record_batch.num_rows());
assert_eq!(0, record_batch.num_columns());
assert!(record_batch.schema.is_empty());
}
fn new_flat_batch(
ts_start: Option<i64>,
idx_tags: &[(usize, i64)],
idx_fields: &[(usize, i64)],
num_rows: usize,
) -> datatypes::arrow::record_batch::RecordBatch {
let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
// Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type
// Primary key columns first
for (i, tag) in idx_tags {
let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
*tag, num_rows,
))) as _;
columns.push(array);
fields.push(Field::new(
format!("k{i}"),
datatypes::arrow::datatypes::DataType::Int64,
true,
));
}
// Field columns
for (i, field) in idx_fields {
let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
*field, num_rows,
))) as _;
columns.push(array);
fields.push(Field::new(
format!("v{i}"),
datatypes::arrow::datatypes::DataType::Int64,
true,
));
}
// Time index
if let Some(ts_start) = ts_start {
let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
(0..num_rows).map(|i| ts_start + i as i64 * 1000),
)) as _;
columns.push(timestamps);
fields.push(Field::new(
"ts",
datatypes::arrow::datatypes::DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
None,
),
true,
));
}
// __primary_key column (encoded primary key as dictionary)
// Create encoded primary key
let converter = DensePrimaryKeyCodec::with_fields(
(0..idx_tags.len())
.map(|idx| {
(
idx as u32,
SortField::new(ConcreteDataType::int64_datatype()),
)
})
.collect(),
);
let encoded_pk = converter
.encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
.unwrap();
// Create dictionary array for the encoded primary key
let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
let pk_array =
Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
columns.push(pk_array);
fields.push(Field::new_dictionary(
PRIMARY_KEY_COLUMN_NAME,
datatypes::arrow::datatypes::DataType::UInt32,
datatypes::arrow::datatypes::DataType::Binary,
false,
));
// __sequence column
columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
fields.push(Field::new(
SEQUENCE_COLUMN_NAME,
datatypes::arrow::datatypes::DataType::UInt64,
false,
));
// __op_type column
columns.push(Arc::new(UInt8Array::from_iter_values(
(0..num_rows).map(|_| OpType::Put as u8),
)) as _);
fields.push(Field::new(
OP_TYPE_COLUMN_NAME,
datatypes::arrow::datatypes::DataType::UInt8,
false,
));
let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
}
#[test]
fn test_plain_projection_mapper_all() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
let mapper = ProjectionMapper::all(&metadata, true).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!(
[
(1, ConcreteDataType::int64_datatype()),
(2, ConcreteDataType::int64_datatype()),
(3, ConcreteDataType::int64_datatype()),
(4, ConcreteDataType::int64_datatype()),
(0, ConcreteDataType::timestamp_millisecond_datatype())
],
mapper.as_flat().unwrap().batch_schema()
);
let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
+---------------------+----+----+----+----+
| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
+---------------------+----+----+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_plain_projection_mapper_with_projection() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Columns v1, k0
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
assert_eq!([4, 1], mapper.column_ids());
assert_eq!(
[
(1, ConcreteDataType::int64_datatype()),
(4, ConcreteDataType::int64_datatype())
],
mapper.as_flat().unwrap().batch_schema()
);
let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
let expect = "\
+----+----+
| v1 | k0 |
+----+----+
| 4 | 1 |
| 4 | 1 |
| 4 | 1 |
+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_plain_projection_mapper_empty_projection() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Empty projection
let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
assert!(mapper.output_schema().is_empty());
let plain_mapper = mapper.as_flat().unwrap();
assert!(plain_mapper.batch_schema().is_empty());
let batch = new_flat_batch(Some(0), &[], &[], 3);
let record_batch = plain_mapper.convert(&batch).unwrap();
assert_eq!(3, record_batch.num_rows());
assert_eq!(0, record_batch.num_columns());
assert!(record_batch.schema.is_empty());

View File

@@ -374,8 +374,8 @@ impl ScanRegion {
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?,
None => ProjectionMapper::all(&self.version.metadata, false)?,
};
let ssts = &self.version.ssts;
@@ -451,8 +451,8 @@ impl ScanRegion {
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?,
None => ProjectionMapper::all(&self.version.metadata, false)?,
};
let input = ScanInput::new(self.access_layer, mapper)

View File

@@ -27,7 +27,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use snafu::ensure;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
@@ -35,7 +35,7 @@ use store_api::region_engine::{
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
@@ -274,6 +274,9 @@ impl SeqScan {
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
let mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
reason: "Unexpected format",
})?;
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
@@ -309,7 +312,7 @@ impl SeqScan {
#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"SeqScan",
stream_ctx.input.mapper.metadata().region_id,
mapper.metadata().region_id,
partition,
part_range,
&batch,

View File

@@ -23,10 +23,10 @@ use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::compute;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::cache::CacheStrategy;
use crate::error::Result;
use crate::error::{Result, UnexpectedSnafu};
use crate::read::projection::ProjectionMapper;
use crate::read::scan_util::PartitionMetrics;
use crate::read::series_scan::SeriesBatch;
@@ -66,12 +66,21 @@ impl ConvertBatchStream {
}
fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
let mapper = self
.projection_mapper
.as_primary_key()
.context(UnexpectedSnafu {
reason: "Unexpected format",
})
.map_err(|e| BoxedError::new(e) as _)
.context(ExternalSnafu)?;
match batch {
ScanBatch::Normal(batch) => {
if batch.is_empty() {
Ok(self.projection_mapper.empty_record_batch())
Ok(mapper.empty_record_batch())
} else {
self.projection_mapper.convert(&batch, &self.cache_strategy)
mapper.convert(&batch, &self.cache_strategy)
}
}
ScanBatch::Series(series) => {
@@ -79,13 +88,11 @@ impl ConvertBatchStream {
self.buffer.reserve(series.batches.len());
for batch in series.batches {
let record_batch = self
.projection_mapper
.convert(&batch, &self.cache_strategy)?;
let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
self.buffer.push(record_batch.into_df_record_batch());
}
let output_schema = self.projection_mapper.output_schema();
let output_schema = mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;

View File

@@ -227,6 +227,7 @@ impl UnorderedScan {
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mapper = &stream_ctx.input.mapper;
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start))
@@ -252,7 +253,7 @@ impl UnorderedScan {
#[cfg(debug_assertions)]
checker.ensure_part_range_batch(
"UnorderedScan",
stream_ctx.input.mapper.metadata().region_id,
mapper.metadata().region_id,
partition,
part_range,
&batch,

View File

@@ -284,7 +284,7 @@ impl FlatReadFormat {
/// Returns a map that the key is the column id and the value is the column position
/// in the SST.
/// It only supports SSTs with raw primary key columns.
fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
let mut column_index = 0;
// keys