feat: Implements a reader to make schema compatible (#2326)

* docs: update comment

* feat: Add compat reader to SeqScan

* feat: add struct to compat pk and fields

* refactor: remove unused fields from ParquetReader

* feat: compat framework

* feat: Implement CompatPrimaryKey and CompatFields

* feat: implement compat reader

* feat: Test compat reader

* test: test compat reader

* feat: add more checks to concat

* style: fix clippy

* test: more tests for compat reader

* test: test reader with projection
This commit is contained in:
Yingwen
2023-09-07 13:11:56 +08:00
committed by Ruihang Xia
parent 6215f124f7
commit 3f6d557b8d
10 changed files with 797 additions and 52 deletions

View File

@@ -434,6 +434,18 @@ pub enum Error {
region_id: RegionId,
location: Location,
},
#[snafu(display(
"Failed to compat readers for region {}, reason: {}, location: {}",
region_id,
reason,
location
))]
CompatReader {
region_id: RegionId,
reason: String,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -500,6 +512,7 @@ impl ErrorExt for Error {
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompatReader { .. } => StatusCode::Unexpected,
}
}

View File

@@ -14,11 +14,13 @@
//! Common structs and utilities for reading data.
pub mod compat;
pub mod merge;
pub(crate) mod projection;
pub mod projection;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::OpType;
@@ -34,6 +36,7 @@ use datatypes::vectors::{
BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef,
};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
@@ -41,10 +44,10 @@ use crate::error::{
};
use crate::memtable::BoxedBatchIterator;
/// Storage internal representation of a batch of rows
/// for a primary key (time series).
/// Storage internal representation of a batch of rows for a primary key (time series).
///
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc.
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
/// always keep the same relative order as fields in [RegionMetadata](store_api::metadata::RegionMetadata).
#[derive(Debug, PartialEq, Clone)]
pub struct Batch {
/// Primary key encoded in a comparable form.
@@ -77,6 +80,17 @@ impl Batch {
.build()
}
/// Tries to set fields for the batch.
pub fn with_fields(self, fields: Vec<BatchColumn>) -> Result<Batch> {
Batch::new(
self.primary_key,
self.timestamps,
self.sequences,
self.op_types,
fields,
)
}
/// Returns primary key of the batch.
pub fn primary_key(&self) -> &[u8] {
&self.primary_key
@@ -150,6 +164,11 @@ impl Batch {
Some(self.get_sequence(self.sequences.len() - 1))
}
/// Replaces the primary key of the batch.
pub fn set_primary_key(&mut self, primary_key: Vec<u8>) {
self.primary_key = primary_key;
}
/// Slice the batch, returning a new batch.
///
/// # Panics
@@ -202,15 +221,22 @@ impl Batch {
reason: "batches have different primary key",
}
);
ensure!(
batches
.iter()
.skip(1)
.all(|b| b.fields().len() == first.fields().len()),
InvalidBatchSnafu {
reason: "batches have different field num",
for b in batches.iter().skip(1) {
ensure!(
b.fields.len() == first.fields.len(),
InvalidBatchSnafu {
reason: "batches have different field num",
}
);
for (l, r) in b.fields.iter().zip(&first.fields) {
ensure!(
l.column_id == r.column_id,
InvalidBatchSnafu {
reason: "batches have different fields",
}
);
}
);
}
// We take the primary key from the first batch.
let mut builder = BatchBuilder::new(primary_key);
@@ -311,6 +337,24 @@ impl Batch {
self.take_in_place(&indices)
}
/// Returns ids of fields in the [Batch] after applying the `projection`.
pub(crate) fn projected_fields(
metadata: &RegionMetadata,
projection: &[ColumnId],
) -> Vec<ColumnId> {
let projected_ids: HashSet<_> = projection.iter().copied().collect();
metadata
.field_columns()
.filter_map(|column| {
if projected_ids.contains(&column.column_id) {
Some(column.column_id)
} else {
None
}
})
.collect()
}
/// Takes the batch in place.
fn take_in_place(&mut self, indices: &UInt32Vector) -> Result<()> {
self.timestamps = self.timestamps.take(indices).context(ComputeVectorSnafu)?;
@@ -566,8 +610,6 @@ impl Source {
/// The reader must guarantee [Batch]es returned by it have the same schema.
#[async_trait]
pub trait BatchReader: Send {
// TODO(yingwen): fields of the batch returned.
/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`
@@ -729,6 +771,37 @@ mod tests {
);
}
#[test]
fn test_concat_different_fields() {
let batch1 = new_batch(&[1], &[1], &[OpType::Put], &[1]);
let fields = vec![
batch1.fields()[0].clone(),
BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
},
];
// Batch 2 has more fields.
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1.clone(), batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
// Batch 2 has different field.
let fields = vec![BatchColumn {
column_id: 2,
data: Arc::new(UInt64Vector::from_slice([2])),
}];
let batch2 = batch1.clone().with_fields(fields).unwrap();
let err = Batch::concat(vec![batch1, batch2]).unwrap_err();
assert!(
matches!(err, Error::InvalidBatch { .. }),
"unexpected err: {err}"
);
}
#[test]
fn test_filter_deleted_empty() {
let mut batch = new_batch(&[], &[], &[], &[]);

View File

@@ -0,0 +1,600 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to adapt readers with different schema.
use std::collections::HashMap;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
/// Underlying reader.
reader: R,
/// Optional primary key adapter.
compat_pk: Option<CompatPrimaryKey>,
/// Optional fields adapter.
compat_fields: Option<CompatFields>,
}
impl<R> CompatReader<R> {
/// Creates a new compat reader.
/// - `mapper` is built from the metadata users expect to see.
/// - `reader_meta` is the metadata of the input reader.
/// - `reader` is the input reader.
pub fn new(
mapper: &ProjectionMapper,
reader_meta: RegionMetadataRef,
reader: R,
) -> Result<CompatReader<R>> {
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
let compat_fields = may_compat_fields(mapper, &reader_meta)?;
Ok(CompatReader {
reader,
compat_pk,
compat_fields,
})
}
}
#[async_trait::async_trait]
impl<R: BatchReader> BatchReader for CompatReader<R> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let Some(mut batch) = self.reader.next_batch().await? else {
return Ok(None);
};
if let Some(compat_pk) = &self.compat_pk {
batch = compat_pk.compat(batch)?;
}
if let Some(compat_fields) = &self.compat_fields {
batch = compat_fields.compat(batch);
}
Ok(Some(batch))
}
}
/// Returns true if `left` and `right` have same columns to read.
///
/// It only consider column ids.
pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) -> bool {
if left.column_metadatas.len() != right.column_metadatas.len() {
return false;
}
for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
if left_col.column_id != right_col.column_id {
return false;
}
debug_assert_eq!(
left_col.column_schema.data_type,
right_col.column_schema.data_type
);
debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
}
true
}
/// Helper to make primary key compatible.
#[derive(Debug)]
struct CompatPrimaryKey {
/// Row converter to append values to primary keys.
converter: McmpRowCodec,
/// Default values to append.
values: Vec<Value>,
}
impl CompatPrimaryKey {
/// Make primary key of the `batch` compatible.
fn compat(&self, mut batch: Batch) -> Result<Batch> {
let mut buffer =
Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size());
buffer.extend_from_slice(batch.primary_key());
self.converter.encode_to_vec(
self.values.iter().map(|value| value.as_value_ref()),
&mut buffer,
)?;
batch.set_primary_key(buffer);
Ok(batch)
}
}
/// Helper to make fields compatible.
#[derive(Debug)]
struct CompatFields {
/// Column Ids the reader actually returns.
actual_fields: Vec<ColumnId>,
/// Indices to convert actual fields to expect fields.
index_or_defaults: Vec<IndexOrDefault>,
}
impl CompatFields {
/// Make fields of the `batch` compatible.
#[must_use]
fn compat(&self, batch: Batch) -> Batch {
debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
debug_assert!(self
.actual_fields
.iter()
.zip(batch.fields())
.all(|(id, batch_column)| *id == batch_column.column_id));
let len = batch.num_rows();
let fields = self
.index_or_defaults
.iter()
.map(|index_or_default| match index_or_default {
IndexOrDefault::Index(index) => batch.fields()[*index].clone(),
IndexOrDefault::DefaultValue {
column_id,
default_vector,
} => {
let data = default_vector.replicate(&[len]);
BatchColumn {
column_id: *column_id,
data,
}
}
})
.collect();
// Safety: We ensure all columns have the same length and the new batch should be valid.
batch.with_fields(fields).unwrap()
}
}
/// Creates a [CompatPrimaryKey] if needed.
fn may_compat_primary_key(
expect: &RegionMetadata,
actual: &RegionMetadata,
) -> Result<Option<CompatPrimaryKey>> {
ensure!(
actual.primary_key.len() <= expect.primary_key.len(),
CompatReaderSnafu {
region_id: expect.region_id,
reason: format!(
"primary key has more columns {} than exepct {}",
actual.primary_key.len(),
expect.primary_key.len()
),
}
);
ensure!(
actual.primary_key == expect.primary_key[..actual.primary_key.len()],
CompatReaderSnafu {
region_id: expect.region_id,
reason: format!(
"primary key has different prefix, expect: {:?}, actual: {:?}",
expect.primary_key, actual.primary_key
),
}
);
if actual.primary_key.len() == expect.primary_key.len() {
return Ok(None);
}
// We need to append default values to the primary key.
let to_add = &expect.primary_key[actual.primary_key.len()..];
let mut fields = Vec::with_capacity(to_add.len());
let mut values = Vec::with_capacity(to_add.len());
for column_id in to_add {
// Safety: The id comes from expect region metadata.
let column = expect.column_by_id(*column_id).unwrap();
fields.push(SortField::new(column.column_schema.data_type.clone()));
let default_value = column
.column_schema
.create_default()
.context(CreateDefaultSnafu {
region_id: expect.region_id,
column: &column.column_schema.name,
})?
.with_context(|| CompatReaderSnafu {
region_id: expect.region_id,
reason: format!(
"key column {} does not have a default value to read",
column.column_schema.name
),
})?;
values.push(default_value);
}
let converter = McmpRowCodec::new(fields);
Ok(Some(CompatPrimaryKey { converter, values }))
}
/// Creates a [CompatFields] if needed.
fn may_compat_fields(
mapper: &ProjectionMapper,
actual: &RegionMetadata,
) -> Result<Option<CompatFields>> {
let expect_fields = mapper.batch_fields();
let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
if expect_fields == actual_fields {
return Ok(None);
}
let source_field_index: HashMap<_, _> = actual_fields
.iter()
.enumerate()
.map(|(idx, column_id)| (*column_id, idx))
.collect();
let index_or_defaults = expect_fields
.iter()
.map(|column_id| {
if let Some(index) = source_field_index.get(column_id) {
// Source has this field.
Ok(IndexOrDefault::Index(*index))
} else {
// Safety: mapper must have this column.
let column = mapper.metadata().column_by_id(*column_id).unwrap();
// Create a default vector with 1 element for that column.
let default_vector = column
.column_schema
.create_default_vector(1)
.context(CreateDefaultSnafu {
region_id: mapper.metadata().region_id,
column: &column.column_schema.name,
})?
.with_context(|| CompatReaderSnafu {
region_id: mapper.metadata().region_id,
reason: format!(
"column {} does not have a default value to read",
column.column_schema.name
),
})?;
Ok(IndexOrDefault::DefaultValue {
column_id: column.column_id,
default_vector,
})
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(CompatFields {
actual_fields,
index_or_defaults,
}))
}
/// Index in source batch or a default value to fill a column.
#[derive(Debug)]
enum IndexOrDefault {
/// Index of the column in source batch.
Index(usize),
/// Default value for the column.
DefaultValue {
/// Id of the column.
column_id: ColumnId,
/// Default value. The vector has only 1 element.
default_vector: VectorRef,
},
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::test_util::{check_reader_result, VecBatchReader};
/// Creates a new [RegionMetadata].
fn new_metadata(
semantic_types: &[(ColumnId, SemanticType)],
primary_key: &[ColumnId],
) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
for (id, semantic_type) in semantic_types {
let column_schema = match semantic_type {
SemanticType::Tag => ColumnSchema::new(
format!("tag_{id}"),
ConcreteDataType::string_datatype(),
true,
),
SemanticType::Field => ColumnSchema::new(
format!("field_{id}"),
ConcreteDataType::int64_datatype(),
true,
),
SemanticType::Timestamp => ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
};
builder.push_column_metadata(ColumnMetadata {
column_schema,
semantic_type: *semantic_type,
column_id: *id,
});
}
builder.primary_key(primary_key.to_vec());
builder.build().unwrap()
}
/// Encode primary key.
fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
let fields = (0..keys.len())
.map(|_| SortField::new(ConcreteDataType::string_datatype()))
.collect();
let converter = McmpRowCodec::new(fields);
let row = keys.iter().map(|str_opt| match str_opt {
Some(v) => ValueRef::String(v),
None => ValueRef::Null,
});
converter.encode(row).unwrap()
}
/// Creates a batch for specific primary `key`.
///
/// `fields`: [(column_id of the field, is null)]
fn new_batch(
primary_key: &[u8],
fields: &[(ColumnId, bool)],
start_ts: i64,
num_rows: usize,
) -> Batch {
let timestamps = Arc::new(TimestampMillisecondVector::from_values(
start_ts..start_ts + num_rows as i64,
));
let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
let field_columns = fields
.iter()
.map(|(id, is_null)| {
let data = if *is_null {
Arc::new(Int64Vector::from(vec![None; num_rows]))
} else {
Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
};
BatchColumn {
column_id: *id,
data,
}
})
.collect();
Batch::new(
primary_key.to_vec(),
timestamps,
sequences,
op_types,
field_columns,
)
.unwrap()
}
#[test]
fn test_invalid_pk_len() {
let reader_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Tag),
(3, SemanticType::Field),
],
&[1, 2],
);
let expect_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
],
&[1],
);
may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
}
#[test]
fn test_different_pk() {
let reader_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Tag),
(3, SemanticType::Field),
],
&[2, 1],
);
let expect_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Tag),
(3, SemanticType::Field),
(4, SemanticType::Tag),
],
&[1, 2, 4],
);
may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
}
#[test]
fn test_same_pk() {
let reader_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
],
&[1],
);
assert!(may_compat_primary_key(&reader_meta, &reader_meta)
.unwrap()
.is_none());
}
#[test]
fn test_same_fields() {
let reader_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
],
&[1],
));
let mapper = ProjectionMapper::all(&reader_meta).unwrap();
assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
}
#[tokio::test]
async fn test_compat_reader() {
let reader_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(3, SemanticType::Tag),
(4, SemanticType::Field),
],
&[1, 3],
));
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
let k1 = encode_key(&[Some("a"), None]);
let k2 = encode_key(&[Some("b"), None]);
check_reader_result(
&mut compat_reader,
&[
new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_different_order() {
let reader_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(3, SemanticType::Field),
(2, SemanticType::Field),
(4, SemanticType::Field),
],
&[1],
));
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[
new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_projection() {
let reader_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(3, SemanticType::Field),
(2, SemanticType::Field),
(4, SemanticType::Field),
],
&[1],
));
// tag_1, field_2, field_3
let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
let mut compat_reader =
CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
)
.await;
// tag_1, field_4, field_3
let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap();
let k1 = encode_key(&[Some("a")]);
let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
)
.await;
}
}

View File

@@ -341,7 +341,7 @@ mod tests {
use api::v1::OpType;
use super::*;
use crate::test_util::{new_batch, VecBatchReader};
use crate::test_util::{check_reader_result, new_batch, VecBatchReader};
#[tokio::test]
async fn test_merge_reader_empty() {
@@ -350,15 +350,6 @@ mod tests {
assert!(reader.next_batch().await.unwrap().is_none());
}
async fn check_merge_result(reader: &mut MergeReader, expect: &[Batch]) {
let mut result = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
result.push(batch);
}
assert_eq!(expect, result);
}
#[tokio::test]
async fn test_merge_non_overlapping() {
let reader1 = VecBatchReader::new(&[
@@ -397,7 +388,7 @@ mod tests {
.build()
.await
.unwrap();
check_merge_result(
check_reader_result(
&mut reader,
&[
new_batch(
@@ -468,7 +459,7 @@ mod tests {
.build()
.await
.unwrap();
check_merge_result(
check_reader_result(
&mut reader,
&[
new_batch(

View File

@@ -25,7 +25,7 @@ use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::ValueRef;
use datatypes::vectors::VectorRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error::{InvalidRequestSnafu, Result};
@@ -33,21 +33,26 @@ use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
pub(crate) struct ProjectionMapper {
pub struct ProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Maps column in [RecordBatch] to index in [Batch].
batch_indices: Vec<BatchIndex>,
/// Decoder for primary key.
codec: McmpRowCodec,
/// Schema for converted [RecordBatch].
output_schema: SchemaRef,
/// Id of columns to project.
/// Ids of columns to project. It keeps ids in the same order as the `projection`
/// indices to build the mapper.
column_ids: Vec<ColumnId>,
/// Ids of field columns in the [Batch].
batch_fields: Vec<ColumnId>,
}
impl ProjectionMapper {
/// Returns a new mapper with projection.
pub(crate) fn new(
metadata: &RegionMetadata,
pub fn new(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
) -> Result<ProjectionMapper> {
let projection_len = projection.size_hint().0;
@@ -92,25 +97,38 @@ impl ProjectionMapper {
);
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
let batch_fields = Batch::projected_fields(metadata, &column_ids);
Ok(ProjectionMapper {
metadata: metadata.clone(),
batch_indices,
codec,
output_schema,
column_ids,
batch_fields,
})
}
/// Returns a new mapper without projection.
pub(crate) fn all(metadata: &RegionMetadata) -> Result<ProjectionMapper> {
pub fn all(metadata: &RegionMetadataRef) -> Result<ProjectionMapper> {
ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
}
/// Returns the metadata that created the mapper.
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
&self.metadata
}
/// Returns ids of projected columns.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
&self.column_ids
}
/// Returns ids of fields in [Batch]es the mapper expects to convert.
pub(crate) fn batch_fields(&self) -> &[ColumnId] {
&self.batch_fields
}
/// Returns the schema of converted [RecordBatch].
pub(crate) fn output_schema(&self) -> SchemaRef {
self.output_schema.clone()
@@ -120,6 +138,13 @@ impl ProjectionMapper {
///
/// The batch must match the `projection` using to build the mapper.
pub(crate) fn convert(&self, batch: &Batch) -> common_recordbatch::error::Result<RecordBatch> {
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
debug_assert!(self
.batch_fields
.iter()
.zip(batch.fields())
.all(|(id, batch_col)| *id == batch_col.column_id));
let pk_values = self
.codec
.decode(batch.primary_key())
@@ -179,3 +204,5 @@ fn new_repeated_vector(
let base_vector = mutable_vector.to_vector();
Ok(base_vector.replicate(&[num_rows]))
}
// TODO(yingwen): Add tests for mapper.

View File

@@ -28,6 +28,7 @@ use table::predicate::Predicate;
use crate::access_layer::AccessLayerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::read::compat::{self, CompatReader};
use crate::read::merge::MergeReaderBuilder;
use crate::read::projection::ProjectionMapper;
use crate::read::BatchReader;
@@ -119,7 +120,15 @@ impl SeqScan {
.projection(Some(self.mapper.column_ids().to_vec()))
.build()
.await?;
builder.push_batch_reader(Box::new(reader));
if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) {
builder.push_batch_reader(Box::new(reader));
} else {
// They have different schema. We need to adapt the batch first so the
// mapper can convert the it.
let compat_reader =
CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?;
builder.push_batch_reader(Box::new(compat_reader));
}
}
let mut reader = builder.build().await?;
// Creates a stream to poll the batch reader and convert batch into record batch.

View File

@@ -36,10 +36,18 @@ pub trait RowCodec {
where
I: Iterator<Item = ValueRef<'a>>;
/// Encodes rows to specific vec.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>;
/// Decode row values from bytes.
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>>;
}
#[derive(Debug)]
pub struct SortField {
data_type: ConcreteDataType,
}
@@ -199,6 +207,7 @@ impl SortField {
}
/// A memory-comparable row [Value] encoder/decoder.
#[derive(Debug)]
pub struct McmpRowCodec {
fields: Vec<SortField>,
}
@@ -223,12 +232,21 @@ impl RowCodec for McmpRowCodec {
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut bytes = Vec::with_capacity(self.estimated_size());
let mut serializer = Serializer::new(&mut bytes);
let mut buffer = Vec::new();
self.encode_to_vec(row, &mut buffer)?;
Ok(buffer)
}
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
buffer.reserve(self.estimated_size());
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
field.serialize(&mut serializer, &value)?;
}
Ok(bytes)
Ok(())
}
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>> {

View File

@@ -138,6 +138,11 @@ impl ReadFormat {
&self.arrow_schema
}
/// Gets the metadata of the SST.
pub(crate) fn metadata(&self) -> &RegionMetadataRef {
&self.metadata
}
/// Gets sorted projection indices to read `columns` from parquet files.
///
/// This function ignores columns not in `metadata` to for compatibility between

View File

@@ -27,7 +27,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::errors::ParquetError;
use parquet::format::KeyValue;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use tokio::io::BufReader;
@@ -42,11 +42,18 @@ use crate::sst::parquet::PARQUET_METADATA_KEY;
/// Parquet SST reader builder.
pub struct ParquetReaderBuilder {
/// SST directory.
file_dir: String,
file_handle: FileHandle,
object_store: ObjectStore,
/// Predicate to push down.
predicate: Option<Predicate>,
/// Time range to filter.
time_range: Option<TimestampRange>,
/// Metadata of columns to read.
///
/// `None` reads all columns. Due to schema change, the projection
/// can contain columns not in the parquet file.
projection: Option<Vec<ColumnId>>,
}
@@ -97,10 +104,6 @@ impl ParquetReaderBuilder {
Ok(ParquetReader {
file_path,
file_handle: self.file_handle,
object_store: self.object_store,
predicate: self.predicate,
time_range: self.time_range,
projection: self.projection,
stream,
read_format,
batches: Vec::new(),
@@ -206,17 +209,6 @@ pub struct ParquetReader {
///
/// Holds the file handle to avoid the file purge purge it.
file_handle: FileHandle,
object_store: ObjectStore,
/// Predicate to push down.
predicate: Option<Predicate>,
/// Time range to filter.
time_range: Option<TimestampRange>,
/// Metadata of columns to read.
///
/// `None` reads all columns. Due to schema change, the projection
/// can contain columns not in the parquet file.
projection: Option<Vec<ColumnId>>,
/// Inner parquet record batch stream.
stream: BoxedRecordBatchStream,
/// Helper to read record batches.
@@ -250,3 +242,10 @@ impl BatchReader for ParquetReader {
Ok(self.batches.pop())
}
}
impl ParquetReader {
/// Returns the metadata of the SST.
pub fn metadata(&self) -> &RegionMetadataRef {
self.read_format.metadata()
}
}

View File

@@ -384,6 +384,16 @@ pub fn new_batch(
.unwrap()
}
/// Ensure the reader returns batch as `expect`.
pub async fn check_reader_result<R: BatchReader>(reader: &mut R, expect: &[Batch]) {
let mut result = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
result.push(batch);
}
assert_eq!(expect, result);
}
/// A mock [WriteBufferManager] that supports controlling whether to flush/stall.
#[derive(Debug, Default)]
pub struct MockWriteBufferManager {