feat: Region supports reading data with different schema (#342)

* feat(storage): Implement skeleton of ReadResolver

ReadResolver is used to resolve difference between schemas

* feat(storage): Add user_column_end to ReadResover

* feat(storage): Implement Batch::batch_from_parts

Used to construct Batch from parts according to the schema that user
expects to read.

* feat(storage): Compat memtable schema

* feat(storage): Compat parquet file schema

* fix(storage): ReadResolver supports projection under same schema version

Now ReadResolver takes ProjectedSchemaRef as dest schema, and checks
whether a value column is needed by the schema after projection.

* feat(storage): Check whether columns are same columns

is_source_column_readable() takes ColumnMetadata instead of
ColumnSchema, and compares their column id to check whether they are
same columns.

* refactor(storage): Use row_key_end/user_column_end in source_schema

Rename ReadResolver::is_needed to ReadResolver::is_source_needed, and
remove row_key_end/user_column_end from ReadResolver, since they should
be same as source_schema's

* chore(storage): Remove unused codes

* test(storage): Add tests for the resolver

* feat(storage): Returns error on different source and dest column names

* style(storage): Fix clippy

* refactor: Rename ReadResolver to ReadAdapter

* chore(table): Removed unused comment

* refactor: rename to is_source_column_compatible
This commit is contained in:
Yingwen
2022-10-31 11:42:07 +08:00
committed by GitHub
parent 0604eb7509
commit f4e22282a4
9 changed files with 858 additions and 141 deletions

View File

@@ -177,12 +177,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Parquet file schema is invalid, source: {}", source))]
InvalidParquetSchema {
#[snafu(backtrace)]
source: MetadataError,
},
#[snafu(display("Region is under {} state, cannot proceed operation", state))]
InvalidRegionState {
state: &'static str,
@@ -308,6 +302,40 @@ pub enum Error {
#[snafu(backtrace)]
source: MetadataError,
},
#[snafu(display("Incompatible schema to read, reason: {}", reason))]
CompatRead {
reason: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to read column {}, could not create default value, source: {}",
column,
source
))]
CreateDefaultToRead {
column: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to read column {}, no proper default value for it", column))]
NoDefaultToRead {
column: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to convert arrow chunk to batch, name: {}, source: {}",
name,
source
))]
ConvertChunk {
name: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -334,14 +362,16 @@ impl ErrorExt for Error {
| Cancelled { .. }
| DecodeMetaActionList { .. }
| Readline { .. }
| InvalidParquetSchema { .. }
| WalDataCorrupted { .. }
| VersionNotFound { .. }
| SequenceNotMonotonic { .. }
| ConvertStoreSchema { .. }
| InvalidRawRegion { .. }
| FilterColumn { .. }
| AlterMetadata { .. } => StatusCode::Unexpected,
| AlterMetadata { .. }
| CompatRead { .. }
| CreateDefaultToRead { .. }
| NoDefaultToRead { .. } => StatusCode::Unexpected,
FlushIo { .. }
| WriteParquet { .. }
@@ -364,6 +394,7 @@ impl ErrorExt for Error {
| ConvertColumnSchema { source, .. } => source.status_code(),
PushBatch { source, .. } => source.status_code(),
AddDefault { source, .. } => source.status_code(),
ConvertChunk { source, .. } => source.status_code(),
}
}

View File

@@ -18,6 +18,7 @@ use crate::memtable::{
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering,
};
use crate::read::Batch;
use crate::schema::compat::ReadAdapter;
use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef};
type RwLockMap = RwLock<BTreeMap<InnerKey, RowValue>>;
@@ -69,7 +70,7 @@ impl Memtable for BTreeMemtable {
fn iter(&self, ctx: &IterContext) -> Result<BoxedBatchIterator> {
assert!(ctx.batch_size > 0);
let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone());
let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone())?;
Ok(Box::new(iter))
}
@@ -85,6 +86,7 @@ struct BTreeIterator {
schema: RegionSchemaRef,
/// Projected schema that user expect to read.
projected_schema: ProjectedSchemaRef,
adapter: ReadAdapter,
map: Arc<RwLockMap>,
last_key: Option<InnerKey>,
}
@@ -103,27 +105,33 @@ impl Iterator for BTreeIterator {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Result<Batch>> {
self.next_batch().map(Ok)
self.next_batch().transpose()
}
}
impl BTreeIterator {
fn new(ctx: IterContext, schema: RegionSchemaRef, map: Arc<RwLockMap>) -> BTreeIterator {
fn new(
ctx: IterContext,
schema: RegionSchemaRef,
map: Arc<RwLockMap>,
) -> Result<BTreeIterator> {
let projected_schema = ctx
.projected_schema
.clone()
.unwrap_or_else(|| Arc::new(ProjectedSchema::no_projection(schema.clone())));
let adapter = ReadAdapter::new(schema.store_schema().clone(), projected_schema.clone())?;
BTreeIterator {
Ok(BTreeIterator {
ctx,
schema,
projected_schema,
adapter,
map,
last_key: None,
}
})
}
fn next_batch(&mut self) -> Option<Batch> {
fn next_batch(&mut self) -> Result<Option<Batch>> {
let map = self.map.read().unwrap();
let iter = if let Some(last_key) = &self.last_key {
map.range((Bound::Excluded(last_key), Bound::Unbounded))
@@ -139,7 +147,7 @@ impl BTreeIterator {
};
if keys.is_empty() {
return None;
return Ok(None);
}
self.last_key = keys.last().map(|k| {
let mut last_key = (*k).clone();
@@ -151,27 +159,30 @@ impl BTreeIterator {
.schema
.row_key_columns()
.map(|column_meta| column_meta.desc.data_type.clone());
let key_needed = vec![true; self.schema.num_row_key_columns()];
let value_data_types = self
.schema
.value_columns()
.map(|column_meta| column_meta.desc.data_type.clone());
let value_needed: Vec<_> = self
.schema
.value_columns()
.map(|column_meta| self.projected_schema.is_needed(column_meta.id()))
.collect();
let key_columns = rows_to_vectors(key_data_types, &key_needed, keys.as_slice());
let value_columns = rows_to_vectors(value_data_types, &value_needed, values.as_slice());
let batch = self.projected_schema.batch_from_parts(
let key_columns = rows_to_vectors(
key_data_types,
self.adapter.source_key_needed(),
keys.as_slice(),
);
let value_columns = rows_to_vectors(
value_data_types,
self.adapter.source_value_needed(),
values.as_slice(),
);
let batch = self.adapter.batch_from_parts(
key_columns,
value_columns,
Arc::new(sequences),
Arc::new(op_types),
);
)?;
Some(batch)
Ok(Some(batch))
}
}

View File

@@ -1,16 +1,15 @@
use std::sync::Arc;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::prelude::ScalarVector;
use datatypes::vectors::Int64Vector;
use datatypes::vectors::TimestampVector;
use datatypes::prelude::*;
use datatypes::vectors::{Int64Vector, TimestampVector};
use log_store::fs::log::LocalFileLogStore;
use store_api::storage::PutOperation;
use store_api::storage::WriteRequest;
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnId,
Region, RegionMeta, SchemaRef, WriteResponse,
AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor,
ColumnDescriptorBuilder, ColumnId, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot,
WriteResponse,
};
use tempdir::TempDir;
@@ -38,6 +37,7 @@ struct AlterTester {
base: Option<FileTesterBase>,
}
#[derive(Debug, Clone, PartialEq)]
struct DataRow {
key: Option<i64>,
ts: Timestamp,
@@ -145,9 +145,59 @@ impl AlterTester {
metadata.version()
}
async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
async fn full_scan_with_init_schema(&self) -> Vec<(i64, Option<i64>)> {
self.base().full_scan().await
}
async fn full_scan(&self) -> Vec<DataRow> {
let read_ctx = &self.base().read_ctx;
let snapshot = self.base().region.snapshot(read_ctx).unwrap();
let resp = snapshot
.scan(read_ctx, ScanRequest::default())
.await
.unwrap();
let mut reader = resp.reader;
let metadata = self.base().region.in_memory_metadata();
assert_eq!(metadata.schema(), reader.schema());
let mut dst = Vec::new();
while let Some(chunk) = reader.next_chunk().await.unwrap() {
append_chunk_to(&chunk, &mut dst);
}
dst
}
}
fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<DataRow>) {
assert_eq!(4, chunk.columns.len());
let k0_vector = chunk.columns[0]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let ts_vector = chunk.columns[1]
.as_any()
.downcast_ref::<TimestampVector>()
.unwrap();
let v0_vector = chunk.columns[2]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
let v1_vector = chunk.columns[3]
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
for i in 0..k0_vector.len() {
dst.push(DataRow::new(
k0_vector.get_data(i),
ts_vector.get_data(i).unwrap().value(),
v0_vector.get_data(i),
v1_vector.get_data(i),
));
}
}
fn new_column_desc(id: ColumnId, name: &str) -> ColumnDescriptor {
@@ -200,12 +250,8 @@ async fn test_alter_region_with_reopen() {
let mut tester = AlterTester::new(store_dir).await;
let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))];
tester.put_with_init_schema(&data).await;
assert_eq!(3, tester.full_scan().await.len());
let schema = tester.schema();
check_schema_names(&schema, &["timestamp", "v0"]);
assert_eq!(3, tester.full_scan_with_init_schema().await.len());
let req = add_column_req(&[
(new_column_desc(4, "k0"), true), // key column k0
@@ -216,6 +262,7 @@ async fn test_alter_region_with_reopen() {
let schema = tester.schema();
check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]);
// Put data after schema altered.
let data = vec![
DataRow::new(Some(10000), 1003, Some(103), Some(201)),
DataRow::new(Some(10001), 1004, Some(104), Some(202)),
@@ -223,14 +270,26 @@ async fn test_alter_region_with_reopen() {
];
tester.put(&data).await;
// Scan with new schema before reopen.
let mut expect = vec![
DataRow::new(None, 1000, Some(100), None),
DataRow::new(None, 1001, Some(101), None),
DataRow::new(None, 1002, Some(102), None),
];
expect.extend_from_slice(&data);
let scanned = tester.full_scan().await;
assert_eq!(expect, scanned);
// Reopen and put more data.
tester.reopen().await;
let data = vec![
DataRow::new(Some(10003), 1006, Some(106), Some(204)),
DataRow::new(Some(10004), 1007, Some(107), Some(205)),
DataRow::new(Some(10005), 1008, Some(108), Some(206)),
];
tester.put(&data).await;
// Extend expected result.
expect.extend_from_slice(&data);
// add columns,then remove them without writing data.
let req = add_column_req(&[
@@ -248,8 +307,12 @@ async fn test_alter_region_with_reopen() {
check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]);
let data = vec![DataRow::new(Some(10006), 1009, Some(109), Some(207))];
tester.put(&data).await;
expect.extend_from_slice(&data);
// Scan with new schema after reopen and write.
let scanned = tester.full_scan().await;
assert_eq!(expect, scanned);
}
#[tokio::test]
@@ -308,11 +371,23 @@ async fn test_put_old_schema_after_alter() {
tester.alter(req).await;
// Put with old schema.
let data = vec![(1003, Some(103)), (1004, Some(104))];
let data = vec![(1005, Some(105)), (1006, Some(106))];
tester.put_with_init_schema(&data).await;
// Put data with old schema directly to the inner writer, to check that the region
// writer could compat the schema of write batch.
let data = vec![(1003, Some(103)), (1004, Some(104))];
tester.put_inner_with_init_schema(&data).await;
let expect = vec![
DataRow::new(None, 1000, Some(100), None),
DataRow::new(None, 1001, Some(101), None),
DataRow::new(None, 1002, Some(102), None),
DataRow::new(None, 1003, Some(103), None),
DataRow::new(None, 1004, Some(104), None),
DataRow::new(None, 1005, Some(105), None),
DataRow::new(None, 1006, Some(106), None),
];
let scanned = tester.full_scan().await;
assert_eq!(expect, scanned);
}

View File

@@ -11,32 +11,41 @@ pub use crate::schema::store::{StoreSchema, StoreSchemaRef};
mod tests {
use std::sync::Arc;
use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector};
use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector, VectorRef};
use super::*;
use crate::metadata::RegionMetadata;
use crate::read::Batch;
use crate::test_util::descriptor_util;
pub const REGION_NAME: &str = "test";
pub(crate) fn new_batch() -> Batch {
new_batch_with_num_values(1)
}
pub(crate) fn new_batch_with_num_values(num_value_columns: usize) -> Batch {
let k0 = Int64Vector::from_slice(&[1, 2, 3]);
let timestamp = Int64Vector::from_slice(&[4, 5, 6]);
let v0 = Int64Vector::from_slice(&[7, 8, 9]);
let mut columns: Vec<VectorRef> = vec![Arc::new(k0), Arc::new(timestamp)];
for i in 0..num_value_columns {
let vi = Int64Vector::from_slice(&[i as i64, i as i64, i as i64]);
columns.push(Arc::new(vi));
}
let sequences = UInt64Vector::from_slice(&[100, 100, 100]);
let op_types = UInt8Vector::from_slice(&[0, 0, 0]);
Batch::new(vec![
Arc::new(k0),
Arc::new(timestamp),
Arc::new(v0),
Arc::new(sequences),
Arc::new(op_types),
])
columns.push(Arc::new(sequences));
columns.push(Arc::new(op_types));
Batch::new(columns)
}
pub(crate) fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema {
let metadata: RegionMetadata =
descriptor_util::desc_with_value_columns("test", num_value_columns)
descriptor_util::desc_with_value_columns(REGION_NAME, num_value_columns)
.try_into()
.unwrap();

View File

@@ -1,8 +1,18 @@
//! Utilities for resolving schema compatibility problems.
use datatypes::schema::SchemaRef;
use std::sync::Arc;
use crate::error::Result;
use datatypes::arrow::array::Array;
use datatypes::arrow::chunk::Chunk;
use datatypes::arrow::datatypes::Field;
use datatypes::schema::SchemaRef;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::metadata::ColumnMetadata;
use crate::read::Batch;
use crate::schema::{ProjectedSchemaRef, StoreSchemaRef};
/// Make schema compatible to write to target with another schema.
pub trait CompatWrite {
@@ -14,3 +24,631 @@ pub trait CompatWrite {
/// If there are columns not in `dest_schema`, an error would be returned.
fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()>;
}
/// Checks whether column with `source_column` could be read as a column with `dest_column`.
///
/// Returns
/// - `Ok(true)` if `source_column` is compatible to read using `dest_column` as schema.
/// - `Ok(false)` if they are considered different columns.
/// - `Err` if there is incompatible issue that could not be resolved.
fn is_source_column_compatible(
source_column: &ColumnMetadata,
dest_column: &ColumnMetadata,
) -> Result<bool> {
ensure!(
source_column.name() == dest_column.name(),
error::CompatReadSnafu {
reason: format!(
"try to use column in {} for column {}",
source_column.name(),
dest_column.name()
),
}
);
if source_column.id() != dest_column.id() {
return Ok(false);
}
ensure!(
source_column.desc.data_type == dest_column.desc.data_type,
error::CompatReadSnafu {
reason: format!(
"could not read column {} from {:?} type as {:?} type",
dest_column.name(),
source_column.desc.data_type,
dest_column.desc.data_type
),
}
);
ensure!(
dest_column.desc.is_nullable() || !source_column.desc.is_nullable(),
error::CompatReadSnafu {
reason: format!(
"unable to read nullable data for non null column {}",
dest_column.name()
),
}
);
Ok(true)
}
/// Adapter to help reading data with source schema as data with dest schema.
#[derive(Debug)]
pub struct ReadAdapter {
/// Schema of data source.
source_schema: StoreSchemaRef,
/// Schema user expects to read.
dest_schema: ProjectedSchemaRef,
/// For each column in dest schema, stores the index in read result for
/// this column, or None if the column is not in result.
///
/// This vec would be left empty if `source_version == dest_version`.
indices_in_result: Vec<Option<usize>>,
/// For each column in source schema, stores whether we need to read that column. All
/// columns are needed by default.
is_source_needed: Vec<bool>,
}
impl ReadAdapter {
/// Creates a new [ReadAdapter] that could convert data with `source_schema` into data
/// with `dest_schema`.
pub fn new(
source_schema: StoreSchemaRef,
dest_schema: ProjectedSchemaRef,
) -> Result<ReadAdapter> {
if source_schema.version() == dest_schema.schema_to_read().version() {
ReadAdapter::from_same_version(source_schema, dest_schema)
} else {
ReadAdapter::from_different_version(source_schema, dest_schema)
}
}
fn from_same_version(
source_schema: StoreSchemaRef,
dest_schema: ProjectedSchemaRef,
) -> Result<ReadAdapter> {
let mut is_source_needed = vec![true; source_schema.num_columns()];
if source_schema.num_columns() != dest_schema.schema_to_read().num_columns() {
// `dest_schema` might be projected, so we need to find out value columns that not be read
// by the `dest_schema`.
for (offset, value_column) in source_schema.value_columns().iter().enumerate() {
// Iterate value columns in source and mark those not in destination as unneeded.
if !dest_schema.is_needed(value_column.id()) {
is_source_needed[source_schema.value_column_index_by_offset(offset)] = false;
}
}
}
Ok(ReadAdapter {
source_schema,
dest_schema,
indices_in_result: Vec::new(),
is_source_needed,
})
}
fn from_different_version(
source_schema: StoreSchemaRef,
dest_schema: ProjectedSchemaRef,
) -> Result<ReadAdapter> {
let schema_to_read = dest_schema.schema_to_read();
let mut indices_in_result = vec![None; schema_to_read.num_columns()];
let mut is_source_needed = vec![true; source_schema.num_columns()];
// Number of columns in result from source data.
let mut num_columns_in_result = 0;
for (idx, source_column) in source_schema.columns().iter().enumerate() {
// For each column in source schema, check whether we need to read it.
if let Some(dest_idx) = schema_to_read
.schema()
.column_index_by_name(source_column.name())
{
let dest_column = &schema_to_read.columns()[dest_idx];
// Check whether we could read this column.
if is_source_column_compatible(source_column, dest_column)? {
// Mark that this column could be read from source data, since some
// columns in source schema would be skipped, we should not use
// the source column's index directly.
indices_in_result[dest_idx] = Some(num_columns_in_result);
num_columns_in_result += 1;
} else {
// This column is not the same column in dest schema, should be fill by default value
// instead of reading from source data.
is_source_needed[idx] = false;
}
} else {
// The column is not in `dest_schema`, we don't need to read it.
is_source_needed[idx] = false;
}
}
Ok(ReadAdapter {
source_schema,
dest_schema,
indices_in_result,
is_source_needed,
})
}
/// Returns a bool slice to denote which key column in source is needed.
#[inline]
pub fn source_key_needed(&self) -> &[bool] {
&self.is_source_needed[..self.source_schema.row_key_end()]
}
/// Returns a bool slice to denote which value column in source is needed.
#[inline]
pub fn source_value_needed(&self) -> &[bool] {
&self.is_source_needed
[self.source_schema.row_key_end()..self.source_schema.user_column_end()]
}
/// Construct a new [Batch] from row key, value, sequence and op_type.
///
/// # Panics
/// Panics if input `VectorRef` is empty.
pub fn batch_from_parts(
&self,
row_key_columns: Vec<VectorRef>,
mut value_columns: Vec<VectorRef>,
sequences: VectorRef,
op_types: VectorRef,
) -> Result<Batch> {
// Each vector should has same length, so here we just use the length of `sequence`.
let num_rows = sequences.len();
let mut source = row_key_columns;
// Reserve space for value, sequence and op_type
source.reserve(value_columns.len() + 2);
source.append(&mut value_columns);
// Internal columns are push in sequence, op_type order.
source.push(sequences);
source.push(op_types);
if !self.need_compat() {
return Ok(Batch::new(source));
}
self.source_columns_to_batch(source, num_rows)
}
/// Returns list of fields need to read from the parquet file.
pub fn fields_to_read(&self) -> Vec<Field> {
if !self.need_compat() {
return self
.dest_schema
.schema_to_read()
.arrow_schema()
.fields
.clone();
}
self.source_schema
.arrow_schema()
.fields
.iter()
.zip(self.is_source_needed.iter())
.filter_map(|(field, is_needed)| {
if *is_needed {
Some(field.clone())
} else {
None
}
})
.collect()
}
/// Convert chunk read from the parquet file into [Batch].
///
/// The chunk should have the same schema as [`ReadAdapter::fields_to_read()`].
pub fn arrow_chunk_to_batch(&self, chunk: &Chunk<Arc<dyn Array>>) -> Result<Batch> {
let names = self
.source_schema
.schema()
.column_schemas()
.iter()
.zip(self.is_source_needed.iter())
.filter_map(|(column_schema, is_needed)| {
if *is_needed {
Some(&column_schema.name)
} else {
None
}
});
let source = chunk
.iter()
.zip(names)
.map(|(column, name)| {
Helper::try_into_vector(column.clone()).context(error::ConvertChunkSnafu { name })
})
.collect::<Result<_>>()?;
if !self.need_compat() || chunk.is_empty() {
return Ok(Batch::new(source));
}
let num_rows = chunk.len();
self.source_columns_to_batch(source, num_rows)
}
#[inline]
fn need_compat(&self) -> bool {
self.source_schema.version() != self.dest_schema.schema_to_read().version()
}
fn source_columns_to_batch(&self, source: Vec<VectorRef>, num_rows: usize) -> Result<Batch> {
let column_schemas = self.dest_schema.schema_to_read().schema().column_schemas();
let columns = self
.indices_in_result
.iter()
.zip(column_schemas)
.map(|(index_opt, column_schema)| {
if let Some(idx) = index_opt {
Ok(source[*idx].clone())
} else {
let vector = column_schema
.create_default_vector(num_rows)
.context(error::CreateDefaultToReadSnafu {
column: &column_schema.name,
})?
.context(error::NoDefaultToReadSnafu {
column: &column_schema.name,
})?;
Ok(vector)
}
})
.collect::<Result<Vec<_>>>()?;
Ok(Batch::new(columns))
}
}
#[cfg(test)]
mod tests {
use datatypes::data_type::ConcreteDataType;
use store_api::storage::consts;
use store_api::storage::ColumnDescriptorBuilder;
use super::*;
use crate::error::Error;
use crate::metadata::RegionMetadata;
use crate::schema::tests;
use crate::schema::{ProjectedSchema, RegionSchema};
use crate::test_util::descriptor_util;
fn check_fields(fields: &[Field], names: &[&str]) {
for (field, name) in fields.iter().zip(names) {
assert_eq!(&field.name, name);
}
}
fn call_batch_from_parts(
adapter: &ReadAdapter,
batch: &Batch,
num_value_columns: usize,
) -> Batch {
let key = batch.columns()[0..2].to_vec();
let value = batch.columns()[2..2 + num_value_columns].to_vec();
let sequence = batch.column(2 + num_value_columns).clone();
let op_type = batch.column(2 + num_value_columns + 1).clone();
adapter
.batch_from_parts(key, value, sequence, op_type)
.unwrap()
}
fn check_batch_from_parts_without_padding(
adapter: &ReadAdapter,
batch: &Batch,
num_value_columns: usize,
) {
let new_batch = call_batch_from_parts(adapter, batch, num_value_columns);
assert_eq!(*batch, new_batch);
}
fn call_arrow_chunk_to_batch(adapter: &ReadAdapter, batch: &Batch) -> Batch {
let arrays = batch.columns().iter().map(|v| v.to_arrow_array()).collect();
let chunk = Chunk::new(arrays);
adapter.arrow_chunk_to_batch(&chunk).unwrap()
}
fn check_arrow_chunk_to_batch_without_padding(adapter: &ReadAdapter, batch: &Batch) {
let new_batch = call_arrow_chunk_to_batch(adapter, batch);
assert_eq!(*batch, new_batch);
}
fn check_batch_with_null_padding(batch: &Batch, new_batch: &Batch, null_columns: &[usize]) {
assert_eq!(
batch.num_columns() + null_columns.len(),
new_batch.num_columns()
);
let columns_from_source = new_batch
.columns()
.iter()
.enumerate()
.filter_map(|(i, v)| {
if null_columns.contains(&i) {
None
} else {
Some(v.clone())
}
})
.collect::<Vec<_>>();
assert_eq!(batch.columns(), &columns_from_source);
for idx in null_columns {
assert!(new_batch.column(*idx).only_null());
}
}
#[test]
fn test_compat_same_schema() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema = Arc::new(tests::new_region_schema(0, 2));
let projected_schema = Arc::new(ProjectedSchema::no_projection(region_schema.clone()));
let source_schema = region_schema.store_schema().clone();
let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap();
assert_eq!(&[true, true], adapter.source_key_needed());
assert_eq!(&[true, true], adapter.source_value_needed());
let batch = tests::new_batch_with_num_values(2);
check_batch_from_parts_without_padding(&adapter, &batch, 2);
check_fields(
&adapter.fields_to_read(),
&[
"k0",
"timestamp",
"v0",
"v1",
consts::SEQUENCE_COLUMN_NAME,
consts::OP_TYPE_COLUMN_NAME,
],
);
check_arrow_chunk_to_batch_without_padding(&adapter, &batch);
}
#[test]
fn test_compat_same_version_with_projection() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema = Arc::new(tests::new_region_schema(0, 2));
// Just read v0, k0.
let projected_schema =
Arc::new(ProjectedSchema::new(region_schema.clone(), Some(vec![2, 0])).unwrap());
let source_schema = region_schema.store_schema().clone();
let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap();
assert_eq!(&[true, true], adapter.source_key_needed());
assert_eq!(&[true, false], adapter.source_value_needed());
// One value column has been filtered out, so the result batch should only contains one value column.
let batch = tests::new_batch_with_num_values(1);
check_batch_from_parts_without_padding(&adapter, &batch, 1);
check_fields(
&adapter.fields_to_read(),
&[
"k0",
"timestamp",
"v0",
consts::SEQUENCE_COLUMN_NAME,
consts::OP_TYPE_COLUMN_NAME,
],
);
check_arrow_chunk_to_batch_without_padding(&adapter, &batch);
}
#[test]
fn test_compat_old_column() {
// (k0, timestamp, v0) with version 0.
let region_schema_old = Arc::new(tests::new_region_schema(0, 1));
// (k0, timestamp, v0, v1) with version 1.
let region_schema_new = Arc::new(tests::new_region_schema(1, 1));
// Just read v0, k0
let projected_schema =
Arc::new(ProjectedSchema::new(region_schema_new, Some(vec![2, 0])).unwrap());
let source_schema = region_schema_old.store_schema().clone();
let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap();
assert_eq!(&[true, true], adapter.source_key_needed());
assert_eq!(&[true], adapter.source_value_needed());
let batch = tests::new_batch_with_num_values(1);
check_batch_from_parts_without_padding(&adapter, &batch, 1);
check_fields(
&adapter.fields_to_read(),
&[
"k0",
"timestamp",
"v0",
consts::SEQUENCE_COLUMN_NAME,
consts::OP_TYPE_COLUMN_NAME,
],
);
check_arrow_chunk_to_batch_without_padding(&adapter, &batch);
}
#[test]
fn test_compat_new_column() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema_old = Arc::new(tests::new_region_schema(0, 2));
// (k0, timestamp, v0, v1, v2) with version 1.
let region_schema_new = Arc::new(tests::new_region_schema(1, 3));
// Just read v2, v0, k0
let projected_schema =
Arc::new(ProjectedSchema::new(region_schema_new, Some(vec![4, 2, 0])).unwrap());
let source_schema = region_schema_old.store_schema().clone();
let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap();
assert_eq!(&[true, true], adapter.source_key_needed());
assert_eq!(&[true, false], adapter.source_value_needed());
// Only read one value column from source.
let batch = tests::new_batch_with_num_values(1);
// New batch should contains k0, timestamp, v0, sequence, op_type.
let new_batch = call_batch_from_parts(&adapter, &batch, 1);
// v2 is filled by null.
check_batch_with_null_padding(&batch, &new_batch, &[3]);
check_fields(
&adapter.fields_to_read(),
&[
"k0",
"timestamp",
"v0",
consts::SEQUENCE_COLUMN_NAME,
consts::OP_TYPE_COLUMN_NAME,
],
);
let new_batch = call_arrow_chunk_to_batch(&adapter, &batch);
check_batch_with_null_padding(&batch, &new_batch, &[3]);
}
#[test]
fn test_compat_different_column() {
// (k0, timestamp, v0, v1) with version 0.
let region_schema_old = Arc::new(tests::new_region_schema(0, 2));
let mut descriptor = descriptor_util::desc_with_value_columns(tests::REGION_NAME, 2);
// Assign a much larger column id to v0.
descriptor.default_cf.columns[0].id = descriptor.default_cf.columns.last().unwrap().id + 10;
let metadata: RegionMetadata = descriptor.try_into().unwrap();
let columns = metadata.columns;
// (k0, timestamp, v0, v1) with version 2, and v0 has different column id.
let region_schema_new = Arc::new(RegionSchema::new(columns, 2).unwrap());
let projected_schema = Arc::new(ProjectedSchema::no_projection(region_schema_new));
let source_schema = region_schema_old.store_schema().clone();
let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap();
assert_eq!(&[true, true], adapter.source_key_needed());
// v0 is discarded as it has different column id than new schema's.
assert_eq!(&[false, true], adapter.source_value_needed());
// New batch should contains k0, timestamp, v1, sequence, op_type, so we need to remove v0
// from the created batch.
let batch = tests::new_batch_with_num_values(2);
let mut columns = batch.columns().to_vec();
// Remove v0.
columns.remove(2);
let batch = Batch::new(columns);
let new_batch = call_batch_from_parts(&adapter, &batch, 1);
// v0 is filled by null.
check_batch_with_null_padding(&batch, &new_batch, &[2]);
check_fields(
&adapter.fields_to_read(),
&[
"k0",
"timestamp",
"v1",
consts::SEQUENCE_COLUMN_NAME,
consts::OP_TYPE_COLUMN_NAME,
],
);
let new_batch = call_arrow_chunk_to_batch(&adapter, &batch);
check_batch_with_null_padding(&batch, &new_batch, &[2]);
}
#[inline]
fn new_column_desc_builder() -> ColumnDescriptorBuilder {
ColumnDescriptorBuilder::new(10, "test", ConcreteDataType::int32_datatype())
}
#[test]
fn test_is_source_column_compatible() {
let desc = new_column_desc_builder().build().unwrap();
let source = ColumnMetadata { cf_id: 1, desc };
// Same column is always compatible, also tests read nullable column
// as a nullable column.
assert!(is_source_column_compatible(&source, &source).unwrap());
// Different id.
let desc = new_column_desc_builder()
.id(source.desc.id + 1)
.build()
.unwrap();
let dest = ColumnMetadata { cf_id: 1, desc };
assert!(!is_source_column_compatible(&source, &dest).unwrap());
}
#[test]
fn test_nullable_column_read_by_not_null() {
let desc = new_column_desc_builder().build().unwrap();
assert!(desc.is_nullable());
let source = ColumnMetadata { cf_id: 1, desc };
let desc = new_column_desc_builder()
.is_nullable(false)
.build()
.unwrap();
let dest = ColumnMetadata { cf_id: 1, desc };
let err = is_source_column_compatible(&source, &dest).unwrap_err();
assert!(
matches!(err, Error::CompatRead { .. }),
"{:?} is not CompatRead",
err
);
}
#[test]
fn test_read_not_null_column() {
let desc = new_column_desc_builder()
.is_nullable(false)
.build()
.unwrap();
let source = ColumnMetadata { cf_id: 1, desc };
let desc = new_column_desc_builder()
.is_nullable(false)
.build()
.unwrap();
let not_null_dest = ColumnMetadata { cf_id: 1, desc };
assert!(is_source_column_compatible(&source, &not_null_dest).unwrap());
let desc = new_column_desc_builder().build().unwrap();
let null_dest = ColumnMetadata { cf_id: 1, desc };
assert!(is_source_column_compatible(&source, &null_dest).unwrap());
}
#[test]
fn test_read_column_with_different_name() {
let desc = new_column_desc_builder().build().unwrap();
let source = ColumnMetadata { cf_id: 1, desc };
let desc = new_column_desc_builder()
.name(format!("{}_other", source.desc.name))
.build()
.unwrap();
let dest = ColumnMetadata { cf_id: 1, desc };
let err = is_source_column_compatible(&source, &dest).unwrap_err();
assert!(
matches!(err, Error::CompatRead { .. }),
"{:?} is not CompatRead",
err
);
}
}

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use common_error::prelude::*;
use datatypes::arrow::bitmap::MutableBitmap;
use datatypes::schema::{SchemaBuilder, SchemaRef};
use datatypes::vectors::{BooleanVector, VectorRef};
use datatypes::vectors::BooleanVector;
use store_api::storage::{Chunk, ColumnId};
use crate::error;
@@ -184,36 +184,6 @@ impl ProjectedSchema {
.unwrap_or(true)
}
/// Construct a new [Batch] from row key, value, sequence and op_type.
///
/// # Panics
/// Panics if number of columns are not the same as this schema.
pub fn batch_from_parts(
&self,
row_key_columns: Vec<VectorRef>,
mut value_columns: Vec<VectorRef>,
sequences: VectorRef,
op_types: VectorRef,
) -> Batch {
// sequence and op_type
let num_internal_columns = 2;
assert_eq!(
self.schema_to_read.num_columns(),
row_key_columns.len() + value_columns.len() + num_internal_columns
);
let mut columns = row_key_columns;
// Reserve space for value, sequence and op_type
columns.reserve(value_columns.len() + num_internal_columns);
columns.append(&mut value_columns);
// Internal columns are push in sequence, op_type order.
columns.push(sequences);
columns.push(op_types);
Batch::new(columns)
}
fn build_schema_to_read(
region_schema: &RegionSchema,
projection: &Projection,
@@ -369,7 +339,7 @@ impl BatchOp for ProjectedSchema {
mod tests {
use datatypes::prelude::ScalarVector;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::TimestampVector;
use datatypes::vectors::{TimestampVector, VectorRef};
use store_api::storage::OpType;
use super::*;
@@ -474,17 +444,6 @@ mod tests {
assert_eq!(2, chunk.columns.len());
assert_eq!(&chunk.columns[0], batch.column(2));
assert_eq!(&chunk.columns[1], batch.column(1));
// Test batch_from_parts
let keys = batch.columns()[0..2].to_vec();
let values = batch.columns()[2..3].to_vec();
let created = projected_schema.batch_from_parts(
keys,
values,
batch.column(3).clone(),
batch.column(4).clone(),
);
assert_eq!(batch, created);
}
#[test]

View File

@@ -5,7 +5,6 @@ use datatypes::arrow::array::Array;
use datatypes::arrow::chunk::Chunk as ArrowChunk;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::{Metadata, Schema, SchemaBuilder, SchemaRef};
use datatypes::vectors::Helper;
use store_api::storage::consts;
use crate::metadata::{self, ColumnMetadata, ColumnsMetadata, Error, Result};
@@ -50,22 +49,6 @@ impl StoreSchema {
ArrowChunk::new(batch.columns().iter().map(|v| v.to_arrow_array()).collect())
}
pub fn arrow_chunk_to_batch(&self, chunk: &ArrowChunk<Arc<dyn Array>>) -> Result<Batch> {
assert_eq!(self.schema.num_columns(), chunk.columns().len());
let columns = chunk
.iter()
.enumerate()
.map(|(i, column)| {
Helper::try_into_vector(column.clone()).context(metadata::ConvertChunkSnafu {
name: self.column_name(i),
})
})
.collect::<Result<_>>()?;
Ok(Batch::new(columns))
}
pub(crate) fn contains_column(&self, name: &str) -> bool {
self.schema.column_schema_by_name(name).is_some()
}
@@ -159,6 +142,32 @@ impl StoreSchema {
pub(crate) fn num_columns(&self) -> usize {
self.schema.num_columns()
}
#[inline]
pub(crate) fn row_key_end(&self) -> usize {
self.row_key_end
}
#[inline]
pub(crate) fn user_column_end(&self) -> usize {
self.user_column_end
}
#[inline]
pub(crate) fn value_columns(&self) -> &[ColumnMetadata] {
&self.columns[self.row_key_end..self.user_column_end]
}
/// Returns the index of the value column according its `offset`.
#[inline]
pub(crate) fn value_column_index_by_offset(&self, offset: usize) -> usize {
self.row_key_end + offset
}
#[inline]
pub(crate) fn columns(&self) -> &[ColumnMetadata] {
&self.columns
}
}
impl TryFrom<ArrowSchema> for StoreSchema {
@@ -262,9 +271,5 @@ mod tests {
// Convert batch to chunk.
let chunk = store_schema.batch_to_arrow_chunk(&batch);
check_chunk_batch(&chunk, &batch);
// Convert chunk to batch.
let converted_batch = store_schema.arrow_chunk_to_batch(&chunk).unwrap();
check_chunk_batch(&chunk, &converted_batch);
}
}

View File

@@ -9,7 +9,7 @@ use async_trait::async_trait;
use common_telemetry::debug;
use datatypes::arrow::array::Array;
use datatypes::arrow::chunk::Chunk;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::arrow::datatypes::{DataType, Schema};
use datatypes::arrow::io::parquet::read::{
infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer,
};
@@ -27,6 +27,7 @@ use table::predicate::Predicate;
use crate::error::{self, Result};
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchReader};
use crate::schema::compat::ReadAdapter;
use crate::schema::{ProjectedSchemaRef, StoreSchema};
use crate::sst;
@@ -213,17 +214,18 @@ impl<'a> ParquetReader<'a> {
let arrow_schema =
infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?;
let store_schema = Arc::new(
StoreSchema::try_from(arrow_schema)
.context(error::ConvertStoreSchemaSnafu { file: &file_path })?,
);
// Now the StoreSchema is only used to validate metadata of the parquet file, but this schema
// would be useful once we support altering schema, as this is the actual schema of the SST.
let store_schema = StoreSchema::try_from(arrow_schema)
.context(error::ConvertStoreSchemaSnafu { file: &file_path })?;
let adapter = ReadAdapter::new(store_schema.clone(), self.projected_schema.clone())?;
let pruned_row_groups = self
.predicate
.prune_row_groups(store_schema.schema().clone(), &metadata.row_groups);
let projected_fields = self.projected_fields().to_vec();
let projected_fields = adapter.fields_to_read();
let chunk_stream = try_stream!({
for (idx, valid) in pruned_row_groups.iter().enumerate() {
if !valid {
@@ -250,27 +252,20 @@ impl<'a> ParquetReader<'a> {
}
});
ChunkStream::new(self.projected_schema.clone(), Box::pin(chunk_stream))
}
fn projected_fields(&self) -> &[Field] {
&self.projected_schema.schema_to_read().arrow_schema().fields
ChunkStream::new(adapter, Box::pin(chunk_stream))
}
}
pub type SendableChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk<Arc<dyn Array>>>> + Send>>;
pub struct ChunkStream {
projected_schema: ProjectedSchemaRef,
adapter: ReadAdapter,
stream: SendableChunkStream,
}
impl ChunkStream {
pub fn new(projected_schema: ProjectedSchemaRef, stream: SendableChunkStream) -> Result<Self> {
Ok(Self {
projected_schema,
stream,
})
pub fn new(adapter: ReadAdapter, stream: SendableChunkStream) -> Result<Self> {
Ok(Self { adapter, stream })
}
}
@@ -280,12 +275,7 @@ impl BatchReader for ChunkStream {
self.stream
.try_next()
.await?
.map(|chunk| {
self.projected_schema
.schema_to_read()
.arrow_chunk_to_batch(&chunk)
.context(error::InvalidParquetSchemaSnafu)
})
.map(|chunk| self.adapter.arrow_chunk_to_batch(&chunk))
.transpose()
}
}

View File

@@ -6,7 +6,6 @@ use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlanAdapter, Ph
use common_query::DfPhysicalPlan;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
/// Datafusion table adpaters
use datafusion::datasource::{
datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider,
TableType as DfTableType,