From f4e22282a43d0ce2d8026d3c0ac08ffa6d14cef1 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 31 Oct 2022 11:42:07 +0800 Subject: [PATCH] feat: Region supports reading data with different schema (#342) * feat(storage): Implement skeleton of ReadResolver ReadResolver is used to resolve difference between schemas * feat(storage): Add user_column_end to ReadResover * feat(storage): Implement Batch::batch_from_parts Used to construct Batch from parts according to the schema that user expects to read. * feat(storage): Compat memtable schema * feat(storage): Compat parquet file schema * fix(storage): ReadResolver supports projection under same schema version Now ReadResolver takes ProjectedSchemaRef as dest schema, and checks whether a value column is needed by the schema after projection. * feat(storage): Check whether columns are same columns is_source_column_readable() takes ColumnMetadata instead of ColumnSchema, and compares their column id to check whether they are same columns. * refactor(storage): Use row_key_end/user_column_end in source_schema Rename ReadResolver::is_needed to ReadResolver::is_source_needed, and remove row_key_end/user_column_end from ReadResolver, since they should be same as source_schema's * chore(storage): Remove unused codes * test(storage): Add tests for the resolver * feat(storage): Returns error on different source and dest column names * style(storage): Fix clippy * refactor: Rename ReadResolver to ReadAdapter * chore(table): Removed unused comment * refactor: rename to is_source_column_compatible --- src/storage/src/error.rs | 47 +- src/storage/src/memtable/btree.rs | 47 +- src/storage/src/region/tests/alter.rs | 105 ++++- src/storage/src/schema.rs | 29 +- src/storage/src/schema/compat.rs | 642 +++++++++++++++++++++++++- src/storage/src/schema/projected.rs | 45 +- src/storage/src/schema/store.rs | 47 +- src/storage/src/sst/parquet.rs | 36 +- src/table/src/table/adapter.rs | 1 - 9 files changed, 858 insertions(+), 141 deletions(-) diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 851f0d11b3..7de6a4e0b7 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -177,12 +177,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Parquet file schema is invalid, source: {}", source))] - InvalidParquetSchema { - #[snafu(backtrace)] - source: MetadataError, - }, - #[snafu(display("Region is under {} state, cannot proceed operation", state))] InvalidRegionState { state: &'static str, @@ -308,6 +302,40 @@ pub enum Error { #[snafu(backtrace)] source: MetadataError, }, + + #[snafu(display("Incompatible schema to read, reason: {}", reason))] + CompatRead { + reason: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to read column {}, could not create default value, source: {}", + column, + source + ))] + CreateDefaultToRead { + column: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to read column {}, no proper default value for it", column))] + NoDefaultToRead { + column: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to convert arrow chunk to batch, name: {}, source: {}", + name, + source + ))] + ConvertChunk { + name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -334,14 +362,16 @@ impl ErrorExt for Error { | Cancelled { .. } | DecodeMetaActionList { .. } | Readline { .. } - | InvalidParquetSchema { .. } | WalDataCorrupted { .. } | VersionNotFound { .. } | SequenceNotMonotonic { .. } | ConvertStoreSchema { .. } | InvalidRawRegion { .. } | FilterColumn { .. } - | AlterMetadata { .. } => StatusCode::Unexpected, + | AlterMetadata { .. } + | CompatRead { .. } + | CreateDefaultToRead { .. } + | NoDefaultToRead { .. } => StatusCode::Unexpected, FlushIo { .. } | WriteParquet { .. } @@ -364,6 +394,7 @@ impl ErrorExt for Error { | ConvertColumnSchema { source, .. } => source.status_code(), PushBatch { source, .. } => source.status_code(), AddDefault { source, .. } => source.status_code(), + ConvertChunk { source, .. } => source.status_code(), } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 945d4e2df2..efd5fe1cd9 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -18,6 +18,7 @@ use crate::memtable::{ BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering, }; use crate::read::Batch; +use crate::schema::compat::ReadAdapter; use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; type RwLockMap = RwLock>; @@ -69,7 +70,7 @@ impl Memtable for BTreeMemtable { fn iter(&self, ctx: &IterContext) -> Result { assert!(ctx.batch_size > 0); - let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone()); + let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone())?; Ok(Box::new(iter)) } @@ -85,6 +86,7 @@ struct BTreeIterator { schema: RegionSchemaRef, /// Projected schema that user expect to read. projected_schema: ProjectedSchemaRef, + adapter: ReadAdapter, map: Arc, last_key: Option, } @@ -103,27 +105,33 @@ impl Iterator for BTreeIterator { type Item = Result; fn next(&mut self) -> Option> { - self.next_batch().map(Ok) + self.next_batch().transpose() } } impl BTreeIterator { - fn new(ctx: IterContext, schema: RegionSchemaRef, map: Arc) -> BTreeIterator { + fn new( + ctx: IterContext, + schema: RegionSchemaRef, + map: Arc, + ) -> Result { let projected_schema = ctx .projected_schema .clone() .unwrap_or_else(|| Arc::new(ProjectedSchema::no_projection(schema.clone()))); + let adapter = ReadAdapter::new(schema.store_schema().clone(), projected_schema.clone())?; - BTreeIterator { + Ok(BTreeIterator { ctx, schema, projected_schema, + adapter, map, last_key: None, - } + }) } - fn next_batch(&mut self) -> Option { + fn next_batch(&mut self) -> Result> { let map = self.map.read().unwrap(); let iter = if let Some(last_key) = &self.last_key { map.range((Bound::Excluded(last_key), Bound::Unbounded)) @@ -139,7 +147,7 @@ impl BTreeIterator { }; if keys.is_empty() { - return None; + return Ok(None); } self.last_key = keys.last().map(|k| { let mut last_key = (*k).clone(); @@ -151,27 +159,30 @@ impl BTreeIterator { .schema .row_key_columns() .map(|column_meta| column_meta.desc.data_type.clone()); - let key_needed = vec![true; self.schema.num_row_key_columns()]; let value_data_types = self .schema .value_columns() .map(|column_meta| column_meta.desc.data_type.clone()); - let value_needed: Vec<_> = self - .schema - .value_columns() - .map(|column_meta| self.projected_schema.is_needed(column_meta.id())) - .collect(); - let key_columns = rows_to_vectors(key_data_types, &key_needed, keys.as_slice()); - let value_columns = rows_to_vectors(value_data_types, &value_needed, values.as_slice()); - let batch = self.projected_schema.batch_from_parts( + let key_columns = rows_to_vectors( + key_data_types, + self.adapter.source_key_needed(), + keys.as_slice(), + ); + let value_columns = rows_to_vectors( + value_data_types, + self.adapter.source_value_needed(), + values.as_slice(), + ); + + let batch = self.adapter.batch_from_parts( key_columns, value_columns, Arc::new(sequences), Arc::new(op_types), - ); + )?; - Some(batch) + Ok(Some(batch)) } } diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs index 94923546ac..6f44f6ac96 100644 --- a/src/storage/src/region/tests/alter.rs +++ b/src/storage/src/region/tests/alter.rs @@ -1,16 +1,15 @@ use std::sync::Arc; use common_time::Timestamp; -use datatypes::prelude::ConcreteDataType; -use datatypes::prelude::ScalarVector; -use datatypes::vectors::Int64Vector; -use datatypes::vectors::TimestampVector; +use datatypes::prelude::*; +use datatypes::vectors::{Int64Vector, TimestampVector}; use log_store::fs::log::LocalFileLogStore; use store_api::storage::PutOperation; use store_api::storage::WriteRequest; use store_api::storage::{ - AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, - Region, RegionMeta, SchemaRef, WriteResponse, + AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor, + ColumnDescriptorBuilder, ColumnId, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, + WriteResponse, }; use tempdir::TempDir; @@ -38,6 +37,7 @@ struct AlterTester { base: Option, } +#[derive(Debug, Clone, PartialEq)] struct DataRow { key: Option, ts: Timestamp, @@ -145,9 +145,59 @@ impl AlterTester { metadata.version() } - async fn full_scan(&self) -> Vec<(i64, Option)> { + async fn full_scan_with_init_schema(&self) -> Vec<(i64, Option)> { self.base().full_scan().await } + + async fn full_scan(&self) -> Vec { + let read_ctx = &self.base().read_ctx; + let snapshot = self.base().region.snapshot(read_ctx).unwrap(); + + let resp = snapshot + .scan(read_ctx, ScanRequest::default()) + .await + .unwrap(); + let mut reader = resp.reader; + + let metadata = self.base().region.in_memory_metadata(); + assert_eq!(metadata.schema(), reader.schema()); + + let mut dst = Vec::new(); + while let Some(chunk) = reader.next_chunk().await.unwrap() { + append_chunk_to(&chunk, &mut dst); + } + + dst + } +} + +fn append_chunk_to(chunk: &Chunk, dst: &mut Vec) { + assert_eq!(4, chunk.columns.len()); + + let k0_vector = chunk.columns[0] + .as_any() + .downcast_ref::() + .unwrap(); + let ts_vector = chunk.columns[1] + .as_any() + .downcast_ref::() + .unwrap(); + let v0_vector = chunk.columns[2] + .as_any() + .downcast_ref::() + .unwrap(); + let v1_vector = chunk.columns[3] + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..k0_vector.len() { + dst.push(DataRow::new( + k0_vector.get_data(i), + ts_vector.get_data(i).unwrap().value(), + v0_vector.get_data(i), + v1_vector.get_data(i), + )); + } } fn new_column_desc(id: ColumnId, name: &str) -> ColumnDescriptor { @@ -200,12 +250,8 @@ async fn test_alter_region_with_reopen() { let mut tester = AlterTester::new(store_dir).await; let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))]; - tester.put_with_init_schema(&data).await; - assert_eq!(3, tester.full_scan().await.len()); - - let schema = tester.schema(); - check_schema_names(&schema, &["timestamp", "v0"]); + assert_eq!(3, tester.full_scan_with_init_schema().await.len()); let req = add_column_req(&[ (new_column_desc(4, "k0"), true), // key column k0 @@ -216,6 +262,7 @@ async fn test_alter_region_with_reopen() { let schema = tester.schema(); check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]); + // Put data after schema altered. let data = vec![ DataRow::new(Some(10000), 1003, Some(103), Some(201)), DataRow::new(Some(10001), 1004, Some(104), Some(202)), @@ -223,14 +270,26 @@ async fn test_alter_region_with_reopen() { ]; tester.put(&data).await; + // Scan with new schema before reopen. + let mut expect = vec![ + DataRow::new(None, 1000, Some(100), None), + DataRow::new(None, 1001, Some(101), None), + DataRow::new(None, 1002, Some(102), None), + ]; + expect.extend_from_slice(&data); + let scanned = tester.full_scan().await; + assert_eq!(expect, scanned); + + // Reopen and put more data. tester.reopen().await; let data = vec![ DataRow::new(Some(10003), 1006, Some(106), Some(204)), DataRow::new(Some(10004), 1007, Some(107), Some(205)), DataRow::new(Some(10005), 1008, Some(108), Some(206)), ]; - tester.put(&data).await; + // Extend expected result. + expect.extend_from_slice(&data); // add columns,then remove them without writing data. let req = add_column_req(&[ @@ -248,8 +307,12 @@ async fn test_alter_region_with_reopen() { check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]); let data = vec![DataRow::new(Some(10006), 1009, Some(109), Some(207))]; - tester.put(&data).await; + expect.extend_from_slice(&data); + + // Scan with new schema after reopen and write. + let scanned = tester.full_scan().await; + assert_eq!(expect, scanned); } #[tokio::test] @@ -308,11 +371,23 @@ async fn test_put_old_schema_after_alter() { tester.alter(req).await; // Put with old schema. - let data = vec![(1003, Some(103)), (1004, Some(104))]; + let data = vec![(1005, Some(105)), (1006, Some(106))]; tester.put_with_init_schema(&data).await; // Put data with old schema directly to the inner writer, to check that the region // writer could compat the schema of write batch. let data = vec![(1003, Some(103)), (1004, Some(104))]; tester.put_inner_with_init_schema(&data).await; + + let expect = vec![ + DataRow::new(None, 1000, Some(100), None), + DataRow::new(None, 1001, Some(101), None), + DataRow::new(None, 1002, Some(102), None), + DataRow::new(None, 1003, Some(103), None), + DataRow::new(None, 1004, Some(104), None), + DataRow::new(None, 1005, Some(105), None), + DataRow::new(None, 1006, Some(106), None), + ]; + let scanned = tester.full_scan().await; + assert_eq!(expect, scanned); } diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index fdb3559ec7..c7c240b79e 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -11,32 +11,41 @@ pub use crate::schema::store::{StoreSchema, StoreSchemaRef}; mod tests { use std::sync::Arc; - use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector}; + use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector, VectorRef}; use super::*; use crate::metadata::RegionMetadata; use crate::read::Batch; use crate::test_util::descriptor_util; + pub const REGION_NAME: &str = "test"; + pub(crate) fn new_batch() -> Batch { + new_batch_with_num_values(1) + } + + pub(crate) fn new_batch_with_num_values(num_value_columns: usize) -> Batch { let k0 = Int64Vector::from_slice(&[1, 2, 3]); let timestamp = Int64Vector::from_slice(&[4, 5, 6]); - let v0 = Int64Vector::from_slice(&[7, 8, 9]); + let mut columns: Vec = vec![Arc::new(k0), Arc::new(timestamp)]; + + for i in 0..num_value_columns { + let vi = Int64Vector::from_slice(&[i as i64, i as i64, i as i64]); + columns.push(Arc::new(vi)); + } + let sequences = UInt64Vector::from_slice(&[100, 100, 100]); let op_types = UInt8Vector::from_slice(&[0, 0, 0]); - Batch::new(vec![ - Arc::new(k0), - Arc::new(timestamp), - Arc::new(v0), - Arc::new(sequences), - Arc::new(op_types), - ]) + columns.push(Arc::new(sequences)); + columns.push(Arc::new(op_types)); + + Batch::new(columns) } pub(crate) fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema { let metadata: RegionMetadata = - descriptor_util::desc_with_value_columns("test", num_value_columns) + descriptor_util::desc_with_value_columns(REGION_NAME, num_value_columns) .try_into() .unwrap(); diff --git a/src/storage/src/schema/compat.rs b/src/storage/src/schema/compat.rs index 2967a20a54..e77a3bb9e0 100644 --- a/src/storage/src/schema/compat.rs +++ b/src/storage/src/schema/compat.rs @@ -1,8 +1,18 @@ //! Utilities for resolving schema compatibility problems. -use datatypes::schema::SchemaRef; +use std::sync::Arc; -use crate::error::Result; +use datatypes::arrow::array::Array; +use datatypes::arrow::chunk::Chunk; +use datatypes::arrow::datatypes::Field; +use datatypes::schema::SchemaRef; +use datatypes::vectors::{Helper, VectorRef}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{self, Result}; +use crate::metadata::ColumnMetadata; +use crate::read::Batch; +use crate::schema::{ProjectedSchemaRef, StoreSchemaRef}; /// Make schema compatible to write to target with another schema. pub trait CompatWrite { @@ -14,3 +24,631 @@ pub trait CompatWrite { /// If there are columns not in `dest_schema`, an error would be returned. fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()>; } + +/// Checks whether column with `source_column` could be read as a column with `dest_column`. +/// +/// Returns +/// - `Ok(true)` if `source_column` is compatible to read using `dest_column` as schema. +/// - `Ok(false)` if they are considered different columns. +/// - `Err` if there is incompatible issue that could not be resolved. +fn is_source_column_compatible( + source_column: &ColumnMetadata, + dest_column: &ColumnMetadata, +) -> Result { + ensure!( + source_column.name() == dest_column.name(), + error::CompatReadSnafu { + reason: format!( + "try to use column in {} for column {}", + source_column.name(), + dest_column.name() + ), + } + ); + + if source_column.id() != dest_column.id() { + return Ok(false); + } + + ensure!( + source_column.desc.data_type == dest_column.desc.data_type, + error::CompatReadSnafu { + reason: format!( + "could not read column {} from {:?} type as {:?} type", + dest_column.name(), + source_column.desc.data_type, + dest_column.desc.data_type + ), + } + ); + + ensure!( + dest_column.desc.is_nullable() || !source_column.desc.is_nullable(), + error::CompatReadSnafu { + reason: format!( + "unable to read nullable data for non null column {}", + dest_column.name() + ), + } + ); + + Ok(true) +} + +/// Adapter to help reading data with source schema as data with dest schema. +#[derive(Debug)] +pub struct ReadAdapter { + /// Schema of data source. + source_schema: StoreSchemaRef, + /// Schema user expects to read. + dest_schema: ProjectedSchemaRef, + /// For each column in dest schema, stores the index in read result for + /// this column, or None if the column is not in result. + /// + /// This vec would be left empty if `source_version == dest_version`. + indices_in_result: Vec>, + /// For each column in source schema, stores whether we need to read that column. All + /// columns are needed by default. + is_source_needed: Vec, +} + +impl ReadAdapter { + /// Creates a new [ReadAdapter] that could convert data with `source_schema` into data + /// with `dest_schema`. + pub fn new( + source_schema: StoreSchemaRef, + dest_schema: ProjectedSchemaRef, + ) -> Result { + if source_schema.version() == dest_schema.schema_to_read().version() { + ReadAdapter::from_same_version(source_schema, dest_schema) + } else { + ReadAdapter::from_different_version(source_schema, dest_schema) + } + } + + fn from_same_version( + source_schema: StoreSchemaRef, + dest_schema: ProjectedSchemaRef, + ) -> Result { + let mut is_source_needed = vec![true; source_schema.num_columns()]; + if source_schema.num_columns() != dest_schema.schema_to_read().num_columns() { + // `dest_schema` might be projected, so we need to find out value columns that not be read + // by the `dest_schema`. + + for (offset, value_column) in source_schema.value_columns().iter().enumerate() { + // Iterate value columns in source and mark those not in destination as unneeded. + if !dest_schema.is_needed(value_column.id()) { + is_source_needed[source_schema.value_column_index_by_offset(offset)] = false; + } + } + } + + Ok(ReadAdapter { + source_schema, + dest_schema, + indices_in_result: Vec::new(), + is_source_needed, + }) + } + + fn from_different_version( + source_schema: StoreSchemaRef, + dest_schema: ProjectedSchemaRef, + ) -> Result { + let schema_to_read = dest_schema.schema_to_read(); + let mut indices_in_result = vec![None; schema_to_read.num_columns()]; + let mut is_source_needed = vec![true; source_schema.num_columns()]; + // Number of columns in result from source data. + let mut num_columns_in_result = 0; + + for (idx, source_column) in source_schema.columns().iter().enumerate() { + // For each column in source schema, check whether we need to read it. + if let Some(dest_idx) = schema_to_read + .schema() + .column_index_by_name(source_column.name()) + { + let dest_column = &schema_to_read.columns()[dest_idx]; + // Check whether we could read this column. + if is_source_column_compatible(source_column, dest_column)? { + // Mark that this column could be read from source data, since some + // columns in source schema would be skipped, we should not use + // the source column's index directly. + indices_in_result[dest_idx] = Some(num_columns_in_result); + num_columns_in_result += 1; + } else { + // This column is not the same column in dest schema, should be fill by default value + // instead of reading from source data. + is_source_needed[idx] = false; + } + } else { + // The column is not in `dest_schema`, we don't need to read it. + is_source_needed[idx] = false; + } + } + + Ok(ReadAdapter { + source_schema, + dest_schema, + indices_in_result, + is_source_needed, + }) + } + + /// Returns a bool slice to denote which key column in source is needed. + #[inline] + pub fn source_key_needed(&self) -> &[bool] { + &self.is_source_needed[..self.source_schema.row_key_end()] + } + + /// Returns a bool slice to denote which value column in source is needed. + #[inline] + pub fn source_value_needed(&self) -> &[bool] { + &self.is_source_needed + [self.source_schema.row_key_end()..self.source_schema.user_column_end()] + } + + /// Construct a new [Batch] from row key, value, sequence and op_type. + /// + /// # Panics + /// Panics if input `VectorRef` is empty. + pub fn batch_from_parts( + &self, + row_key_columns: Vec, + mut value_columns: Vec, + sequences: VectorRef, + op_types: VectorRef, + ) -> Result { + // Each vector should has same length, so here we just use the length of `sequence`. + let num_rows = sequences.len(); + + let mut source = row_key_columns; + // Reserve space for value, sequence and op_type + source.reserve(value_columns.len() + 2); + source.append(&mut value_columns); + // Internal columns are push in sequence, op_type order. + source.push(sequences); + source.push(op_types); + + if !self.need_compat() { + return Ok(Batch::new(source)); + } + + self.source_columns_to_batch(source, num_rows) + } + + /// Returns list of fields need to read from the parquet file. + pub fn fields_to_read(&self) -> Vec { + if !self.need_compat() { + return self + .dest_schema + .schema_to_read() + .arrow_schema() + .fields + .clone(); + } + + self.source_schema + .arrow_schema() + .fields + .iter() + .zip(self.is_source_needed.iter()) + .filter_map(|(field, is_needed)| { + if *is_needed { + Some(field.clone()) + } else { + None + } + }) + .collect() + } + + /// Convert chunk read from the parquet file into [Batch]. + /// + /// The chunk should have the same schema as [`ReadAdapter::fields_to_read()`]. + pub fn arrow_chunk_to_batch(&self, chunk: &Chunk>) -> Result { + let names = self + .source_schema + .schema() + .column_schemas() + .iter() + .zip(self.is_source_needed.iter()) + .filter_map(|(column_schema, is_needed)| { + if *is_needed { + Some(&column_schema.name) + } else { + None + } + }); + let source = chunk + .iter() + .zip(names) + .map(|(column, name)| { + Helper::try_into_vector(column.clone()).context(error::ConvertChunkSnafu { name }) + }) + .collect::>()?; + + if !self.need_compat() || chunk.is_empty() { + return Ok(Batch::new(source)); + } + + let num_rows = chunk.len(); + self.source_columns_to_batch(source, num_rows) + } + + #[inline] + fn need_compat(&self) -> bool { + self.source_schema.version() != self.dest_schema.schema_to_read().version() + } + + fn source_columns_to_batch(&self, source: Vec, num_rows: usize) -> Result { + let column_schemas = self.dest_schema.schema_to_read().schema().column_schemas(); + let columns = self + .indices_in_result + .iter() + .zip(column_schemas) + .map(|(index_opt, column_schema)| { + if let Some(idx) = index_opt { + Ok(source[*idx].clone()) + } else { + let vector = column_schema + .create_default_vector(num_rows) + .context(error::CreateDefaultToReadSnafu { + column: &column_schema.name, + })? + .context(error::NoDefaultToReadSnafu { + column: &column_schema.name, + })?; + Ok(vector) + } + }) + .collect::>>()?; + + Ok(Batch::new(columns)) + } +} + +#[cfg(test)] +mod tests { + use datatypes::data_type::ConcreteDataType; + use store_api::storage::consts; + use store_api::storage::ColumnDescriptorBuilder; + + use super::*; + use crate::error::Error; + use crate::metadata::RegionMetadata; + use crate::schema::tests; + use crate::schema::{ProjectedSchema, RegionSchema}; + use crate::test_util::descriptor_util; + + fn check_fields(fields: &[Field], names: &[&str]) { + for (field, name) in fields.iter().zip(names) { + assert_eq!(&field.name, name); + } + } + + fn call_batch_from_parts( + adapter: &ReadAdapter, + batch: &Batch, + num_value_columns: usize, + ) -> Batch { + let key = batch.columns()[0..2].to_vec(); + let value = batch.columns()[2..2 + num_value_columns].to_vec(); + let sequence = batch.column(2 + num_value_columns).clone(); + let op_type = batch.column(2 + num_value_columns + 1).clone(); + + adapter + .batch_from_parts(key, value, sequence, op_type) + .unwrap() + } + + fn check_batch_from_parts_without_padding( + adapter: &ReadAdapter, + batch: &Batch, + num_value_columns: usize, + ) { + let new_batch = call_batch_from_parts(adapter, batch, num_value_columns); + assert_eq!(*batch, new_batch); + } + + fn call_arrow_chunk_to_batch(adapter: &ReadAdapter, batch: &Batch) -> Batch { + let arrays = batch.columns().iter().map(|v| v.to_arrow_array()).collect(); + let chunk = Chunk::new(arrays); + adapter.arrow_chunk_to_batch(&chunk).unwrap() + } + + fn check_arrow_chunk_to_batch_without_padding(adapter: &ReadAdapter, batch: &Batch) { + let new_batch = call_arrow_chunk_to_batch(adapter, batch); + assert_eq!(*batch, new_batch); + } + + fn check_batch_with_null_padding(batch: &Batch, new_batch: &Batch, null_columns: &[usize]) { + assert_eq!( + batch.num_columns() + null_columns.len(), + new_batch.num_columns() + ); + + let columns_from_source = new_batch + .columns() + .iter() + .enumerate() + .filter_map(|(i, v)| { + if null_columns.contains(&i) { + None + } else { + Some(v.clone()) + } + }) + .collect::>(); + + assert_eq!(batch.columns(), &columns_from_source); + + for idx in null_columns { + assert!(new_batch.column(*idx).only_null()); + } + } + + #[test] + fn test_compat_same_schema() { + // (k0, timestamp, v0, v1) with version 0. + let region_schema = Arc::new(tests::new_region_schema(0, 2)); + let projected_schema = Arc::new(ProjectedSchema::no_projection(region_schema.clone())); + + let source_schema = region_schema.store_schema().clone(); + let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap(); + + assert_eq!(&[true, true], adapter.source_key_needed()); + assert_eq!(&[true, true], adapter.source_value_needed()); + + let batch = tests::new_batch_with_num_values(2); + check_batch_from_parts_without_padding(&adapter, &batch, 2); + + check_fields( + &adapter.fields_to_read(), + &[ + "k0", + "timestamp", + "v0", + "v1", + consts::SEQUENCE_COLUMN_NAME, + consts::OP_TYPE_COLUMN_NAME, + ], + ); + + check_arrow_chunk_to_batch_without_padding(&adapter, &batch); + } + + #[test] + fn test_compat_same_version_with_projection() { + // (k0, timestamp, v0, v1) with version 0. + let region_schema = Arc::new(tests::new_region_schema(0, 2)); + // Just read v0, k0. + let projected_schema = + Arc::new(ProjectedSchema::new(region_schema.clone(), Some(vec![2, 0])).unwrap()); + + let source_schema = region_schema.store_schema().clone(); + let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap(); + + assert_eq!(&[true, true], adapter.source_key_needed()); + assert_eq!(&[true, false], adapter.source_value_needed()); + + // One value column has been filtered out, so the result batch should only contains one value column. + let batch = tests::new_batch_with_num_values(1); + check_batch_from_parts_without_padding(&adapter, &batch, 1); + + check_fields( + &adapter.fields_to_read(), + &[ + "k0", + "timestamp", + "v0", + consts::SEQUENCE_COLUMN_NAME, + consts::OP_TYPE_COLUMN_NAME, + ], + ); + + check_arrow_chunk_to_batch_without_padding(&adapter, &batch); + } + + #[test] + fn test_compat_old_column() { + // (k0, timestamp, v0) with version 0. + let region_schema_old = Arc::new(tests::new_region_schema(0, 1)); + // (k0, timestamp, v0, v1) with version 1. + let region_schema_new = Arc::new(tests::new_region_schema(1, 1)); + + // Just read v0, k0 + let projected_schema = + Arc::new(ProjectedSchema::new(region_schema_new, Some(vec![2, 0])).unwrap()); + + let source_schema = region_schema_old.store_schema().clone(); + let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap(); + + assert_eq!(&[true, true], adapter.source_key_needed()); + assert_eq!(&[true], adapter.source_value_needed()); + + let batch = tests::new_batch_with_num_values(1); + check_batch_from_parts_without_padding(&adapter, &batch, 1); + + check_fields( + &adapter.fields_to_read(), + &[ + "k0", + "timestamp", + "v0", + consts::SEQUENCE_COLUMN_NAME, + consts::OP_TYPE_COLUMN_NAME, + ], + ); + + check_arrow_chunk_to_batch_without_padding(&adapter, &batch); + } + + #[test] + fn test_compat_new_column() { + // (k0, timestamp, v0, v1) with version 0. + let region_schema_old = Arc::new(tests::new_region_schema(0, 2)); + // (k0, timestamp, v0, v1, v2) with version 1. + let region_schema_new = Arc::new(tests::new_region_schema(1, 3)); + + // Just read v2, v0, k0 + let projected_schema = + Arc::new(ProjectedSchema::new(region_schema_new, Some(vec![4, 2, 0])).unwrap()); + + let source_schema = region_schema_old.store_schema().clone(); + let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap(); + + assert_eq!(&[true, true], adapter.source_key_needed()); + assert_eq!(&[true, false], adapter.source_value_needed()); + + // Only read one value column from source. + let batch = tests::new_batch_with_num_values(1); + // New batch should contains k0, timestamp, v0, sequence, op_type. + let new_batch = call_batch_from_parts(&adapter, &batch, 1); + // v2 is filled by null. + check_batch_with_null_padding(&batch, &new_batch, &[3]); + + check_fields( + &adapter.fields_to_read(), + &[ + "k0", + "timestamp", + "v0", + consts::SEQUENCE_COLUMN_NAME, + consts::OP_TYPE_COLUMN_NAME, + ], + ); + + let new_batch = call_arrow_chunk_to_batch(&adapter, &batch); + check_batch_with_null_padding(&batch, &new_batch, &[3]); + } + + #[test] + fn test_compat_different_column() { + // (k0, timestamp, v0, v1) with version 0. + let region_schema_old = Arc::new(tests::new_region_schema(0, 2)); + + let mut descriptor = descriptor_util::desc_with_value_columns(tests::REGION_NAME, 2); + // Assign a much larger column id to v0. + descriptor.default_cf.columns[0].id = descriptor.default_cf.columns.last().unwrap().id + 10; + let metadata: RegionMetadata = descriptor.try_into().unwrap(); + let columns = metadata.columns; + // (k0, timestamp, v0, v1) with version 2, and v0 has different column id. + let region_schema_new = Arc::new(RegionSchema::new(columns, 2).unwrap()); + + let projected_schema = Arc::new(ProjectedSchema::no_projection(region_schema_new)); + let source_schema = region_schema_old.store_schema().clone(); + let adapter = ReadAdapter::new(source_schema, projected_schema).unwrap(); + + assert_eq!(&[true, true], adapter.source_key_needed()); + // v0 is discarded as it has different column id than new schema's. + assert_eq!(&[false, true], adapter.source_value_needed()); + + // New batch should contains k0, timestamp, v1, sequence, op_type, so we need to remove v0 + // from the created batch. + let batch = tests::new_batch_with_num_values(2); + let mut columns = batch.columns().to_vec(); + // Remove v0. + columns.remove(2); + let batch = Batch::new(columns); + + let new_batch = call_batch_from_parts(&adapter, &batch, 1); + // v0 is filled by null. + check_batch_with_null_padding(&batch, &new_batch, &[2]); + + check_fields( + &adapter.fields_to_read(), + &[ + "k0", + "timestamp", + "v1", + consts::SEQUENCE_COLUMN_NAME, + consts::OP_TYPE_COLUMN_NAME, + ], + ); + + let new_batch = call_arrow_chunk_to_batch(&adapter, &batch); + check_batch_with_null_padding(&batch, &new_batch, &[2]); + } + + #[inline] + fn new_column_desc_builder() -> ColumnDescriptorBuilder { + ColumnDescriptorBuilder::new(10, "test", ConcreteDataType::int32_datatype()) + } + + #[test] + fn test_is_source_column_compatible() { + let desc = new_column_desc_builder().build().unwrap(); + let source = ColumnMetadata { cf_id: 1, desc }; + + // Same column is always compatible, also tests read nullable column + // as a nullable column. + assert!(is_source_column_compatible(&source, &source).unwrap()); + + // Different id. + let desc = new_column_desc_builder() + .id(source.desc.id + 1) + .build() + .unwrap(); + let dest = ColumnMetadata { cf_id: 1, desc }; + assert!(!is_source_column_compatible(&source, &dest).unwrap()); + } + + #[test] + fn test_nullable_column_read_by_not_null() { + let desc = new_column_desc_builder().build().unwrap(); + assert!(desc.is_nullable()); + let source = ColumnMetadata { cf_id: 1, desc }; + + let desc = new_column_desc_builder() + .is_nullable(false) + .build() + .unwrap(); + let dest = ColumnMetadata { cf_id: 1, desc }; + + let err = is_source_column_compatible(&source, &dest).unwrap_err(); + assert!( + matches!(err, Error::CompatRead { .. }), + "{:?} is not CompatRead", + err + ); + } + + #[test] + fn test_read_not_null_column() { + let desc = new_column_desc_builder() + .is_nullable(false) + .build() + .unwrap(); + let source = ColumnMetadata { cf_id: 1, desc }; + + let desc = new_column_desc_builder() + .is_nullable(false) + .build() + .unwrap(); + let not_null_dest = ColumnMetadata { cf_id: 1, desc }; + assert!(is_source_column_compatible(&source, ¬_null_dest).unwrap()); + + let desc = new_column_desc_builder().build().unwrap(); + let null_dest = ColumnMetadata { cf_id: 1, desc }; + assert!(is_source_column_compatible(&source, &null_dest).unwrap()); + } + + #[test] + fn test_read_column_with_different_name() { + let desc = new_column_desc_builder().build().unwrap(); + let source = ColumnMetadata { cf_id: 1, desc }; + + let desc = new_column_desc_builder() + .name(format!("{}_other", source.desc.name)) + .build() + .unwrap(); + let dest = ColumnMetadata { cf_id: 1, desc }; + + let err = is_source_column_compatible(&source, &dest).unwrap_err(); + assert!( + matches!(err, Error::CompatRead { .. }), + "{:?} is not CompatRead", + err + ); + } +} diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index 0bb5f6c19b..81d92070fd 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use common_error::prelude::*; use datatypes::arrow::bitmap::MutableBitmap; use datatypes::schema::{SchemaBuilder, SchemaRef}; -use datatypes::vectors::{BooleanVector, VectorRef}; +use datatypes::vectors::BooleanVector; use store_api::storage::{Chunk, ColumnId}; use crate::error; @@ -184,36 +184,6 @@ impl ProjectedSchema { .unwrap_or(true) } - /// Construct a new [Batch] from row key, value, sequence and op_type. - /// - /// # Panics - /// Panics if number of columns are not the same as this schema. - pub fn batch_from_parts( - &self, - row_key_columns: Vec, - mut value_columns: Vec, - sequences: VectorRef, - op_types: VectorRef, - ) -> Batch { - // sequence and op_type - let num_internal_columns = 2; - - assert_eq!( - self.schema_to_read.num_columns(), - row_key_columns.len() + value_columns.len() + num_internal_columns - ); - - let mut columns = row_key_columns; - // Reserve space for value, sequence and op_type - columns.reserve(value_columns.len() + num_internal_columns); - columns.append(&mut value_columns); - // Internal columns are push in sequence, op_type order. - columns.push(sequences); - columns.push(op_types); - - Batch::new(columns) - } - fn build_schema_to_read( region_schema: &RegionSchema, projection: &Projection, @@ -369,7 +339,7 @@ impl BatchOp for ProjectedSchema { mod tests { use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; - use datatypes::vectors::TimestampVector; + use datatypes::vectors::{TimestampVector, VectorRef}; use store_api::storage::OpType; use super::*; @@ -474,17 +444,6 @@ mod tests { assert_eq!(2, chunk.columns.len()); assert_eq!(&chunk.columns[0], batch.column(2)); assert_eq!(&chunk.columns[1], batch.column(1)); - - // Test batch_from_parts - let keys = batch.columns()[0..2].to_vec(); - let values = batch.columns()[2..3].to_vec(); - let created = projected_schema.batch_from_parts( - keys, - values, - batch.column(3).clone(), - batch.column(4).clone(), - ); - assert_eq!(batch, created); } #[test] diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index e423888ae8..c65ac6781a 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -5,7 +5,6 @@ use datatypes::arrow::array::Array; use datatypes::arrow::chunk::Chunk as ArrowChunk; use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::schema::{Metadata, Schema, SchemaBuilder, SchemaRef}; -use datatypes::vectors::Helper; use store_api::storage::consts; use crate::metadata::{self, ColumnMetadata, ColumnsMetadata, Error, Result}; @@ -50,22 +49,6 @@ impl StoreSchema { ArrowChunk::new(batch.columns().iter().map(|v| v.to_arrow_array()).collect()) } - pub fn arrow_chunk_to_batch(&self, chunk: &ArrowChunk>) -> Result { - assert_eq!(self.schema.num_columns(), chunk.columns().len()); - - let columns = chunk - .iter() - .enumerate() - .map(|(i, column)| { - Helper::try_into_vector(column.clone()).context(metadata::ConvertChunkSnafu { - name: self.column_name(i), - }) - }) - .collect::>()?; - - Ok(Batch::new(columns)) - } - pub(crate) fn contains_column(&self, name: &str) -> bool { self.schema.column_schema_by_name(name).is_some() } @@ -159,6 +142,32 @@ impl StoreSchema { pub(crate) fn num_columns(&self) -> usize { self.schema.num_columns() } + + #[inline] + pub(crate) fn row_key_end(&self) -> usize { + self.row_key_end + } + + #[inline] + pub(crate) fn user_column_end(&self) -> usize { + self.user_column_end + } + + #[inline] + pub(crate) fn value_columns(&self) -> &[ColumnMetadata] { + &self.columns[self.row_key_end..self.user_column_end] + } + + /// Returns the index of the value column according its `offset`. + #[inline] + pub(crate) fn value_column_index_by_offset(&self, offset: usize) -> usize { + self.row_key_end + offset + } + + #[inline] + pub(crate) fn columns(&self) -> &[ColumnMetadata] { + &self.columns + } } impl TryFrom for StoreSchema { @@ -262,9 +271,5 @@ mod tests { // Convert batch to chunk. let chunk = store_schema.batch_to_arrow_chunk(&batch); check_chunk_batch(&chunk, &batch); - - // Convert chunk to batch. - let converted_batch = store_schema.arrow_chunk_to_batch(&chunk).unwrap(); - check_chunk_batch(&chunk, &converted_batch); } } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index d48c760b62..06bb34588b 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use common_telemetry::debug; use datatypes::arrow::array::Array; use datatypes::arrow::chunk::Chunk; -use datatypes::arrow::datatypes::{DataType, Field, Schema}; +use datatypes::arrow::datatypes::{DataType, Schema}; use datatypes::arrow::io::parquet::read::{ infer_schema, read_columns_many_async, read_metadata_async, RowGroupDeserializer, }; @@ -27,6 +27,7 @@ use table::predicate::Predicate; use crate::error::{self, Result}; use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader}; +use crate::schema::compat::ReadAdapter; use crate::schema::{ProjectedSchemaRef, StoreSchema}; use crate::sst; @@ -213,17 +214,18 @@ impl<'a> ParquetReader<'a> { let arrow_schema = infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?; + let store_schema = Arc::new( + StoreSchema::try_from(arrow_schema) + .context(error::ConvertStoreSchemaSnafu { file: &file_path })?, + ); - // Now the StoreSchema is only used to validate metadata of the parquet file, but this schema - // would be useful once we support altering schema, as this is the actual schema of the SST. - let store_schema = StoreSchema::try_from(arrow_schema) - .context(error::ConvertStoreSchemaSnafu { file: &file_path })?; + let adapter = ReadAdapter::new(store_schema.clone(), self.projected_schema.clone())?; let pruned_row_groups = self .predicate .prune_row_groups(store_schema.schema().clone(), &metadata.row_groups); - let projected_fields = self.projected_fields().to_vec(); + let projected_fields = adapter.fields_to_read(); let chunk_stream = try_stream!({ for (idx, valid) in pruned_row_groups.iter().enumerate() { if !valid { @@ -250,27 +252,20 @@ impl<'a> ParquetReader<'a> { } }); - ChunkStream::new(self.projected_schema.clone(), Box::pin(chunk_stream)) - } - - fn projected_fields(&self) -> &[Field] { - &self.projected_schema.schema_to_read().arrow_schema().fields + ChunkStream::new(adapter, Box::pin(chunk_stream)) } } pub type SendableChunkStream = Pin>>> + Send>>; pub struct ChunkStream { - projected_schema: ProjectedSchemaRef, + adapter: ReadAdapter, stream: SendableChunkStream, } impl ChunkStream { - pub fn new(projected_schema: ProjectedSchemaRef, stream: SendableChunkStream) -> Result { - Ok(Self { - projected_schema, - stream, - }) + pub fn new(adapter: ReadAdapter, stream: SendableChunkStream) -> Result { + Ok(Self { adapter, stream }) } } @@ -280,12 +275,7 @@ impl BatchReader for ChunkStream { self.stream .try_next() .await? - .map(|chunk| { - self.projected_schema - .schema_to_read() - .arrow_chunk_to_batch(&chunk) - .context(error::InvalidParquetSchemaSnafu) - }) + .map(|chunk| self.adapter.arrow_chunk_to_batch(&chunk)) .transpose() } } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 59a697d1d6..06b002f98e 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -6,7 +6,6 @@ use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlanAdapter, Ph use common_query::DfPhysicalPlan; use common_telemetry::debug; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; -/// Datafusion table adpaters use datafusion::datasource::{ datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider, TableType as DfTableType,