diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 3a358b43af..61403d1481 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -31,16 +31,8 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Failed to parse index in schema meta, value: {}, source: {}", - value, - source - ))] - ParseSchemaIndex { - value: String, - source: std::num::ParseIntError, - backtrace: Backtrace, - }, + #[snafu(display("Timestamp column {} not found", name,))] + TimestampNotFound { name: String, backtrace: Backtrace }, #[snafu(display( "Failed to parse version in schema meta, value: {}, source: {}", diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 1f5729464a..4af64ec29f 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -4,12 +4,19 @@ use std::sync::Arc; pub use arrow::datatypes::Metadata; use arrow::datatypes::{Field, Schema as ArrowSchema}; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Error, Result}; -const TIMESTAMP_INDEX_KEY: &str = "greptime:timestamp_index"; +/// Key used to store column name of the timestamp column in metadata. +/// +/// Instead of storing the column index, we store the column name as the +/// query engine may modify the column order of the arrow schema, then +/// we would fail to recover the correct timestamp column when converting +/// the arrow schema back to our schema. +const TIMESTAMP_COLUMN_KEY: &str = "greptime:timestamp_column"; +/// Key used to store version number of the schema in metadata. const VERSION_KEY: &str = "greptime:version"; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -80,6 +87,11 @@ impl Schema { self.column_schemas.len() } + #[inline] + pub fn is_empty(&self) -> bool { + self.column_schemas.is_empty() + } + /// Returns index of the timestamp key column. #[inline] pub fn timestamp_index(&self) -> Option { @@ -154,8 +166,9 @@ impl SchemaBuilder { pub fn build(mut self) -> Result { if let Some(timestamp_index) = self.timestamp_index { validate_timestamp_index(&self.column_schemas, timestamp_index)?; + let timestamp_name = self.column_schemas[timestamp_index].name.clone(); self.metadata - .insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string()); + .insert(TIMESTAMP_COLUMN_KEY.to_string(), timestamp_name); } self.metadata .insert(VERSION_KEY.to_string(), self.version.to_string()); @@ -241,9 +254,14 @@ impl TryFrom> for Schema { column_schemas.push(column_schema); } - let timestamp_index = try_parse_index(&arrow_schema.metadata, TIMESTAMP_INDEX_KEY)?; - if let Some(index) = timestamp_index { - validate_timestamp_index(&column_schemas, index)?; + let timestamp_name = arrow_schema.metadata.get(TIMESTAMP_COLUMN_KEY); + let mut timestamp_index = None; + if let Some(name) = timestamp_name { + let index = name_to_index + .get(name) + .context(error::TimestampNotFoundSnafu { name })?; + validate_timestamp_index(&column_schemas, *index)?; + timestamp_index = Some(*index); } let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?; @@ -267,18 +285,6 @@ impl TryFrom for Schema { } } -fn try_parse_index(metadata: &Metadata, key: &str) -> Result> { - if let Some(value) = metadata.get(key) { - let index = value - .parse() - .context(error::ParseSchemaIndexSnafu { value })?; - - Ok(Some(index)) - } else { - Ok(None) - } -} - fn try_parse_version(metadata: &Metadata, key: &str) -> Result { if let Some(value) = metadata.get(key) { let version = value @@ -313,6 +319,7 @@ mod tests { fn test_build_empty_schema() { let schema = SchemaBuilder::default().build().unwrap(); assert_eq!(0, schema.num_columns()); + assert!(schema.is_empty()); assert!(SchemaBuilder::default().timestamp_index(0).build().is_err()); } @@ -326,6 +333,7 @@ mod tests { let schema = Schema::new(column_schemas.clone()); assert_eq!(2, schema.num_columns()); + assert!(!schema.is_empty()); assert!(schema.timestamp_index().is_none()); assert!(schema.timestamp_column().is_none()); assert_eq!(Schema::INITIAL_VERSION, schema.version()); diff --git a/src/storage/benches/memtable/util/bench_context.rs b/src/storage/benches/memtable/util/bench_context.rs index 2aeb55320a..2d682ec1b3 100644 --- a/src/storage/benches/memtable/util/bench_context.rs +++ b/src/storage/benches/memtable/util/bench_context.rs @@ -1,5 +1,4 @@ use storage::memtable::{IterContext, KeyValues, MemtableRef}; -use store_api::storage::SequenceNumber; use crate::memtable::util::new_memtable; @@ -26,10 +25,9 @@ impl BenchContext { let mut read_count = 0; let iter_ctx = IterContext { batch_size, - visible_sequence: SequenceNumber::MAX, - for_flush: false, + ..Default::default() }; - let iter = self.memtable.iter(iter_ctx).unwrap(); + let iter = self.memtable.iter(&iter_ctx).unwrap(); for batch in iter { batch.unwrap(); read_count += batch_size; diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index ef5bba399d..4fad69c3f9 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -1,9 +1,13 @@ -use async_trait::async_trait; -use store_api::storage::{Chunk, ChunkReader, SchemaRef}; +use std::sync::Arc; -use crate::error::{Error, Result}; -use crate::memtable::{BoxedBatchIterator, IterContext, MemtableSet}; +use async_trait::async_trait; +use snafu::ResultExt; +use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber}; + +use crate::error::{self, Error, Result}; +use crate::memtable::{IterContext, MemtableRef, MemtableSet}; use crate::read::{Batch, BatchReader, ConcatReader}; +use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; use crate::sst::{AccessLayerRef, FileHandle, LevelMetas, ReadOptions, Visitor}; type BoxedIterator = Box> + Send>; @@ -13,7 +17,7 @@ type BoxedIterator = Box> + Send>; // using `Stream`, maybe change to `Stream` if we find out it is more efficient and have // necessary to do so. pub struct ChunkReaderImpl { - schema: SchemaRef, + schema: ProjectedSchemaRef, iter: Option, sst_reader: ConcatReader, } @@ -23,7 +27,7 @@ impl ChunkReader for ChunkReaderImpl { type Error = Error; fn schema(&self) -> &SchemaRef { - &self.schema + self.schema.projected_user_schema() } async fn next_chunk(&mut self) -> Result> { @@ -32,8 +36,7 @@ impl ChunkReader for ChunkReaderImpl { None => return Ok(None), }; - // TODO(yingwen): Check schema. - let chunk = batch_to_chunk(batch); + let chunk = self.schema.batch_to_chunk(&batch); Ok(Some(chunk)) } @@ -41,7 +44,7 @@ impl ChunkReader for ChunkReaderImpl { impl ChunkReaderImpl { pub fn new( - schema: SchemaRef, + schema: ProjectedSchemaRef, iter: BoxedIterator, sst_reader: ConcatReader, ) -> ChunkReaderImpl { @@ -64,54 +67,55 @@ impl ChunkReaderImpl { } } -// Assumes the schema is the same as key columns combine with value columns. -fn batch_to_chunk(mut batch: Batch) -> Chunk { - let mut columns = Vec::with_capacity(batch.keys.len() + batch.values.len()); - columns.append(&mut batch.keys); - columns.append(&mut batch.values); - - Chunk::new(columns) -} - /// Builder to create a new [ChunkReaderImpl] from scan request. pub struct ChunkReaderBuilder { - schema: SchemaRef, + schema: RegionSchemaRef, + projection: Option>, sst_layer: AccessLayerRef, iter_ctx: IterContext, - iters: Vec, + memtables: Vec, files_to_read: Vec, } impl ChunkReaderBuilder { - pub fn new(schema: SchemaRef, sst_layer: AccessLayerRef) -> Self { + pub fn new(schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Self { ChunkReaderBuilder { schema, - iter_ctx: IterContext::default(), - iters: Vec::new(), + projection: None, sst_layer, + iter_ctx: IterContext::default(), + memtables: Vec::new(), files_to_read: Vec::new(), } } /// Reserve space for iterating `num` memtables. pub fn reserve_num_memtables(mut self, num: usize) -> Self { - self.iters.reserve(num); + self.memtables.reserve(num); self } - pub fn iter_ctx(mut self, iter_ctx: IterContext) -> Self { - self.iter_ctx = iter_ctx; + pub fn projection(mut self, projection: Option>) -> Self { + self.projection = projection; self } - pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Result { + pub fn batch_size(mut self, batch_size: usize) -> Self { + self.iter_ctx.batch_size = batch_size; + self + } + + pub fn visible_sequence(mut self, sequence: SequenceNumber) -> Self { + self.iter_ctx.visible_sequence = sequence; + self + } + + pub fn pick_memtables(mut self, memtables: &MemtableSet) -> Self { for (_range, mem) in memtables.iter() { - let iter = mem.iter(self.iter_ctx.clone())?; - - self.iters.push(iter); + self.memtables.push(mem.clone()); } - Ok(self) + self } pub fn pick_ssts(mut self, ssts: &LevelMetas) -> Result { @@ -120,12 +124,24 @@ impl ChunkReaderBuilder { Ok(self) } - pub async fn build(self) -> Result { + pub async fn build(mut self) -> Result { + let schema = Arc::new( + ProjectedSchema::new(self.schema, self.projection) + .context(error::InvalidProjectionSnafu)?, + ); + + self.iter_ctx.projected_schema = Some(schema.clone()); + let mut iters = Vec::with_capacity(self.memtables.len()); + for mem in self.memtables { + let iter = mem.iter(&self.iter_ctx)?; + iters.push(iter); + } // Now we just simply chain all iterators together, ignore duplications/ordering. - let iter = Box::new(self.iters.into_iter().flatten()); + let iter = Box::new(iters.into_iter().flatten()); let read_opts = ReadOptions { batch_size: self.iter_ctx.batch_size, + projected_schema: schema.clone(), }; let mut sst_readers = Vec::with_capacity(self.files_to_read.len()); for file in &self.files_to_read { @@ -138,7 +154,7 @@ impl ChunkReaderBuilder { } let reader = ConcatReader::new(sst_readers); - Ok(ChunkReaderImpl::new(self.schema, iter, reader)) + Ok(ChunkReaderImpl::new(schema, iter, reader)) } } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 772e2a4cc0..9dacad49fd 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -231,6 +231,12 @@ pub enum Error { #[snafu(backtrace)] source: MetadataError, }, + + #[snafu(display("Invalid projection, source: {}", source))] + InvalidProjection { + #[snafu(backtrace)] + source: crate::schema::Error, + }, } pub type Result = std::result::Result; @@ -245,7 +251,8 @@ impl ErrorExt for Error { | InvalidInputSchema { .. } | BatchMissingColumn { .. } | BatchMissingTimestamp { .. } - | InvalidTimestamp { .. } => StatusCode::InvalidArguments, + | InvalidTimestamp { .. } + | InvalidProjection { .. } => StatusCode::InvalidArguments, Utf8 { .. } | EncodeJson { .. } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 059f21ce66..34f31508cd 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -164,16 +164,14 @@ impl FlushJob { } let mut futures = Vec::with_capacity(self.memtables.len()); + let iter_ctx = IterContext { + for_flush: true, + ..Default::default() + }; for m in &self.memtables { let file_name = Self::generate_sst_file_name(); // TODO(hl): Check if random file name already exists in meta. - - let iter_ctx = IterContext { - for_flush: true, - ..Default::default() - }; - - let iter = m.memtable.iter(iter_ctx)?; + let iter = m.memtable.iter(&iter_ctx)?; futures.push(async move { self.sst_layer .write_sst(&file_name, iter, &WriteOptions::default()) diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index b7e362d130..2bc20eff37 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -14,15 +14,17 @@ use crate::memtable::btree::BTreeMemtable; pub use crate::memtable::inserter::Inserter; pub use crate::memtable::version::{MemtableSet, MemtableVersion}; use crate::read::Batch; -use crate::schema::RegionSchemaRef; +use crate::schema::{ProjectedSchemaRef, RegionSchemaRef}; /// Unique id for memtables under same region. pub type MemtableId = u32; /// In memory storage. pub trait Memtable: Send + Sync + std::fmt::Debug { + /// Returns id of this memtable. fn id(&self) -> MemtableId; + /// Returns schema of the memtable. fn schema(&self) -> RegionSchemaRef; /// Write key/values to the memtable. @@ -32,8 +34,7 @@ pub trait Memtable: Send + Sync + std::fmt::Debug { fn write(&self, kvs: &KeyValues) -> Result<()>; /// Iterates the memtable. - // TODO(yingwen): 1. Use reference of IterContext? 2. Consider passing a projector (does column projection). - fn iter(&self, ctx: IterContext) -> Result; + fn iter(&self, ctx: &IterContext) -> Result; /// Returns the estimated bytes allocated by this memtable from heap. fn bytes_allocated(&self) -> usize; @@ -42,6 +43,8 @@ pub trait Memtable: Send + Sync + std::fmt::Debug { pub type MemtableRef = Arc; /// Context for iterating memtable. +/// +/// Should be cheap to clone. #[derive(Debug, Clone)] pub struct IterContext { /// The suggested batch size of the iterator. @@ -53,6 +56,11 @@ pub struct IterContext { // in memtable. /// Returns all rows, ignores sequence visibility and key duplication. pub for_flush: bool, + + /// Schema the reader expect to read. + /// + /// Set to `None` to read all columns. + pub projected_schema: Option, } impl Default for IterContext { @@ -62,6 +70,7 @@ impl Default for IterContext { // All data in memory is visible by default. visible_sequence: SequenceNumber::MAX, for_flush: false, + projected_schema: None, } } } @@ -82,7 +91,7 @@ pub enum RowOrdering { /// as an async trait. pub trait BatchIterator: Iterator> + Send + Sync { /// Returns the schema of this iterator. - fn schema(&self) -> RegionSchemaRef; + fn schema(&self) -> ProjectedSchemaRef; /// Returns the ordering of the output rows from this iterator. fn ordering(&self) -> RowOrdering; diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 676a8e73bf..945d4e2df2 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -18,7 +18,7 @@ use crate::memtable::{ BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering, }; use crate::read::Batch; -use crate::schema::RegionSchemaRef; +use crate::schema::{ProjectedSchema, ProjectedSchemaRef, RegionSchemaRef}; type RwLockMap = RwLock>; @@ -66,10 +66,10 @@ impl Memtable for BTreeMemtable { Ok(()) } - fn iter(&self, ctx: IterContext) -> Result { + fn iter(&self, ctx: &IterContext) -> Result { assert!(ctx.batch_size > 0); - let iter = BTreeIterator::new(ctx, self.schema.clone(), self.map.clone()); + let iter = BTreeIterator::new(ctx.clone(), self.schema.clone(), self.map.clone()); Ok(Box::new(iter)) } @@ -81,14 +81,17 @@ impl Memtable for BTreeMemtable { struct BTreeIterator { ctx: IterContext, + /// Schema of this memtable. schema: RegionSchemaRef, + /// Projected schema that user expect to read. + projected_schema: ProjectedSchemaRef, map: Arc, last_key: Option, } impl BatchIterator for BTreeIterator { - fn schema(&self) -> RegionSchemaRef { - self.schema.clone() + fn schema(&self) -> ProjectedSchemaRef { + self.projected_schema.clone() } fn ordering(&self) -> RowOrdering { @@ -106,9 +109,15 @@ impl Iterator for BTreeIterator { impl BTreeIterator { fn new(ctx: IterContext, schema: RegionSchemaRef, map: Arc) -> BTreeIterator { + let projected_schema = ctx + .projected_schema + .clone() + .unwrap_or_else(|| Arc::new(ProjectedSchema::no_projection(schema.clone()))); + BTreeIterator { ctx, schema, + projected_schema, map, last_key: None, } @@ -142,17 +151,27 @@ impl BTreeIterator { .schema .row_key_columns() .map(|column_meta| column_meta.desc.data_type.clone()); + let key_needed = vec![true; self.schema.num_row_key_columns()]; let value_data_types = self .schema .value_columns() .map(|column_meta| column_meta.desc.data_type.clone()); + let value_needed: Vec<_> = self + .schema + .value_columns() + .map(|column_meta| self.projected_schema.is_needed(column_meta.id())) + .collect(); - Some(Batch { - keys: rows_to_vectors(key_data_types, keys.as_slice()), - sequences, - op_types, - values: rows_to_vectors(value_data_types, values.as_slice()), - }) + let key_columns = rows_to_vectors(key_data_types, &key_needed, keys.as_slice()); + let value_columns = rows_to_vectors(value_data_types, &value_needed, values.as_slice()); + let batch = self.projected_schema.batch_from_parts( + key_columns, + value_columns, + Arc::new(sequences), + Arc::new(op_types), + ); + + Some(batch) } } @@ -384,6 +403,7 @@ impl<'a> RowsProvider for &'a [&RowValue] { fn rows_to_vectors, T: RowsProvider>( data_types: I, + column_needed: &[bool], provider: T, ) -> Vec { if provider.is_empty() { @@ -399,6 +419,10 @@ fn rows_to_vectors, T: RowsProvider>( let mut vectors = Vec::with_capacity(column_num); for (col_idx, builder) in builders.iter_mut().enumerate() { + if !column_needed[col_idx] { + continue; + } + for row_idx in 0..row_num { let row = provider.row_by_index(row_idx); let value = &row[col_idx]; diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 4137bbeb08..f98596a00d 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -593,18 +593,18 @@ mod tests { sequence: SequenceNumber, data: &[(i64, Option)], ) { - let iter = mem.iter(IterContext::default()).unwrap(); + let iter = mem.iter(&IterContext::default()).unwrap(); let mut index = 0; for batch in iter { let batch = batch.unwrap(); - let row_num = batch.keys[0].len(); + let row_num = batch.column(0).len(); for i in 0..row_num { - let ts = batch.keys[0].get(i); - let v = batch.values[0].get(i); + let ts = batch.column(0).get(i); + let v = batch.column(1).get(i); assert_eq!(Value::from(data[index].0), ts); assert_eq!(Value::from(data[index].1), v); - assert_eq!(sequence, batch.sequences.get_data(i).unwrap()); + assert_eq!(Value::from(sequence), batch.column(2).get(i)); index += 1; } diff --git a/src/storage/src/memtable/tests.rs b/src/storage/src/memtable/tests.rs index 3decc35475..cf98bd7d0d 100644 --- a/src/storage/src/memtable/tests.rs +++ b/src/storage/src/memtable/tests.rs @@ -1,10 +1,11 @@ +use datatypes::arrow::array::{Int64Array, UInt64Array, UInt8Array}; use datatypes::prelude::*; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder}; use super::*; use crate::metadata::RegionMetadata; -use crate::schema::RegionSchemaRef; +use crate::schema::{ProjectedSchema, RegionSchemaRef}; use crate::test_util::descriptor_util::RegionDescBuilder; // For simplicity, all memtables in test share same memtable id. @@ -12,11 +13,12 @@ const MEMTABLE_ID: MemtableId = 1; // Schema for testing memtable: // - key: Int64(timestamp), UInt64(version), -// - value: UInt64 +// - value: UInt64, UInt64 pub fn schema_for_test() -> RegionSchemaRef { // Just build a region desc and use its columns metadata. let desc = RegionDescBuilder::new("test") .enable_version_column(true) + .push_value_column(("v0", LogicalTypeId::UInt64, true)) .push_value_column(("v1", LogicalTypeId::UInt64, true)) .build(); let metadata: RegionMetadata = desc.try_into().unwrap(); @@ -29,7 +31,7 @@ fn kvs_for_test_with_index( op_type: OpType, start_index_in_batch: usize, keys: &[(i64, u64)], - values: &[Option], + values: &[(Option, Option)], ) -> KeyValues { assert_eq!(keys.len(), values.len()); @@ -46,11 +48,18 @@ fn kvs_for_test_with_index( Arc::new(key_builders.1.finish()) as _, ]; - let mut value_builder = UInt64VectorBuilder::with_capacity(values.len()); + let mut value_builders = ( + UInt64VectorBuilder::with_capacity(values.len()), + UInt64VectorBuilder::with_capacity(values.len()), + ); for value in values { - value_builder.push(*value); + value_builders.0.push(value.0); + value_builders.1.push(value.1); } - let row_values = vec![Arc::new(value_builder.finish()) as _]; + let row_values = vec![ + Arc::new(value_builders.0.finish()) as _, + Arc::new(value_builders.1.finish()) as _, + ]; let kvs = KeyValues { sequence, @@ -70,7 +79,7 @@ fn kvs_for_test( sequence: SequenceNumber, op_type: OpType, keys: &[(i64, u64)], - values: &[Option], + values: &[(Option, Option)], ) -> KeyValues { kvs_for_test_with_index(sequence, op_type, 0, keys, values) } @@ -80,7 +89,7 @@ pub fn write_kvs( sequence: SequenceNumber, op_type: OpType, keys: &[(i64, u64)], - values: &[Option], + values: &[(Option, Option)], ) { let kvs = kvs_for_test(sequence, op_type, keys, values); @@ -88,13 +97,11 @@ pub fn write_kvs( } fn check_batch_valid(batch: &Batch) { - assert_eq!(2, batch.keys.len()); - assert_eq!(1, batch.values.len()); - let row_num = batch.keys[0].len(); - assert_eq!(row_num, batch.keys[1].len()); - assert_eq!(row_num, batch.sequences.len()); - assert_eq!(row_num, batch.op_types.len()); - assert_eq!(row_num, batch.values[0].len()); + assert_eq!(6, batch.num_columns()); + let row_num = batch.column(0).len(); + for i in 1..6 { + assert_eq!(row_num, batch.column(i).len()); + } } fn check_iter_content( @@ -102,25 +109,26 @@ fn check_iter_content( keys: &[(i64, u64)], sequences: &[u64], op_types: &[OpType], - values: &[Option], + values: &[(Option, Option)], ) { let mut index = 0; for batch in iter { let batch = batch.unwrap(); check_batch_valid(&batch); - let row_num = batch.keys[0].len(); + let row_num = batch.column(0).len(); for i in 0..row_num { - let (k0, k1) = (batch.keys[0].get(i), batch.keys[1].get(i)); - let sequence = batch.sequences.get_data(i).unwrap(); - let op_type = batch.op_types.get_data(i).unwrap(); - let v = batch.values[0].get(i); + let (k0, k1) = (batch.column(0).get(i), batch.column(1).get(i)); + let (v0, v1) = (batch.column(2).get(i), batch.column(3).get(i)); + let sequence = batch.column(4).get(i); + let op_type = batch.column(5).get(i); assert_eq!(Value::from(keys[index].0), k0); assert_eq!(Value::from(keys[index].1), k1); - assert_eq!(sequences[index], sequence); - assert_eq!(op_types[index].as_u8(), op_type); - assert_eq!(Value::from(values[index]), v); + assert_eq!(Value::from(values[index].0), v0); + assert_eq!(Value::from(values[index].1), v1); + assert_eq!(Value::from(sequences[index]), sequence); + assert_eq!(Value::from(op_types[index].as_u8()), op_type); index += 1; } @@ -177,7 +185,7 @@ struct TestContext { fn write_iter_memtable_case(ctx: &TestContext) { // Test iterating an empty memtable. - let mut iter = ctx.memtable.iter(IterContext::default()).unwrap(); + let mut iter = ctx.memtable.iter(&IterContext::default()).unwrap(); assert!(iter.next().is_none()); // Poll the empty iterator again. assert!(iter.next().is_none()); @@ -196,17 +204,25 @@ fn write_iter_memtable_case(ctx: &TestContext) { (2003, 5), (1001, 1), ], // keys - &[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values + &[ + (Some(1), None), + (Some(2), None), + (Some(7), None), + (Some(8), None), + (Some(9), None), + (Some(3), None), + ], // values ); write_kvs( &*ctx.memtable, 11, // sequence OpType::Put, - &[(1002, 1), (1003, 1), (1004, 1)], // keys - &[None, Some(5), None], // values + &[(1002, 1), (1003, 1), (1004, 1)], // keys + &[(None, None), (Some(5), None), (None, None)], // values ); - assert_eq!(216, ctx.memtable.bytes_allocated()); + // 9 key value pairs (6 + 3). + assert_eq!(288, ctx.memtable.bytes_allocated()); let batch_sizes = [1, 4, 8, consts::READ_BATCH_SIZE]; for batch_size in batch_sizes { @@ -214,8 +230,11 @@ fn write_iter_memtable_case(ctx: &TestContext) { batch_size, ..Default::default() }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); - assert_eq!(ctx.schema, iter.schema()); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + assert_eq!( + ctx.schema.user_schema(), + iter.schema().projected_user_schema() + ); assert_eq!(RowOrdering::Key, iter.ordering()); check_iter_content( @@ -244,15 +263,15 @@ fn write_iter_memtable_case(ctx: &TestContext) { OpType::Put, ], // op_types &[ - Some(1), - Some(2), - Some(3), - None, - Some(5), - None, - Some(7), - Some(8), - Some(9), + (Some(1), None), + (Some(2), None), + (Some(3), None), + (None, None), + (Some(5), None), + (None, None), + (Some(7), None), + (Some(8), None), + (Some(9), None), ], // values ); } @@ -272,7 +291,7 @@ fn check_iter_batch_size(iter: &mut dyn BatchIterator, total: usize, batch_size: let batch = batch.unwrap(); check_batch_valid(&batch); - let row_num = batch.keys[0].len(); + let row_num = batch.column(0).len(); if remains >= batch_size { assert_eq!(batch_size, row_num); remains -= batch_size; @@ -301,7 +320,14 @@ fn test_iter_batch_size() { (2003, 1), (2003, 5), ], // keys - &[Some(1), Some(2), Some(3), Some(4), None, None], // values + &[ + (Some(1), None), + (Some(2), None), + (Some(3), None), + (Some(4), None), + (None, None), + (None, None), + ], // values ); let total = 6; @@ -313,7 +339,7 @@ fn test_iter_batch_size() { ..Default::default() }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); check_iter_batch_size(&mut *iter, total, batch_size); } }); @@ -328,15 +354,15 @@ fn test_duplicate_key_across_batch() { 10, // sequence OpType::Put, &[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys - &[Some(1), None, None, None], // values + &[(Some(1), None), (None, None), (None, None), (None, None)], // values ); write_kvs( &*ctx.memtable, 11, // sequence OpType::Put, - &[(1000, 1), (2001, 2)], // keys - &[Some(1231), Some(1232)], // values + &[(1000, 1), (2001, 2)], // keys + &[(Some(1231), None), (Some(1232), None)], // values ); let batch_sizes = [1, 2, 3, 4, 5]; @@ -346,13 +372,18 @@ fn test_duplicate_key_across_batch() { ..Default::default() }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); check_iter_content( &mut *iter, &[(1000, 1), (1000, 2), (2000, 1), (2001, 2)], // keys &[11, 10, 10, 11], // sequences &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], // op_types - &[Some(1231), None, None, Some(1232)], // values + &[ + (Some(1231), None), + (None, None), + (None, None), + (Some(1232), None), + ], // values ); } }); @@ -367,7 +398,7 @@ fn test_duplicate_key_in_batch() { 10, // sequence OpType::Put, &[(1000, 1), (1000, 2), (1000, 1), (2001, 2)], // keys - &[None, None, Some(1234), None], // values + &[(None, None), (None, None), (Some(1234), None), (None, None)], // values ); let batch_sizes = [1, 2, 3, 4, 5]; @@ -377,13 +408,13 @@ fn test_duplicate_key_in_batch() { ..Default::default() }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); check_iter_content( &mut *iter, &[(1000, 1), (1000, 2), (2001, 2)], // keys &[10, 10, 10], // sequences &[OpType::Put, OpType::Put, OpType::Put], // op_types - &[Some(1234), None, None, None], // values + &[(Some(1234), None), (None, None), (None, None), (None, None)], // values ); } }); @@ -397,24 +428,24 @@ fn test_sequence_visibility() { &*ctx.memtable, 10, // sequence OpType::Put, - &[(1000, 1), (1000, 2)], // keys - &[Some(1), Some(2)], // values + &[(1000, 1), (1000, 2)], // keys + &[(Some(1), None), (Some(2), None)], // values ); write_kvs( &*ctx.memtable, 11, // sequence OpType::Put, - &[(1000, 1), (1000, 2)], // keys - &[Some(11), Some(12)], // values + &[(1000, 1), (1000, 2)], // keys + &[(Some(11), None), (Some(12), None)], // values ); write_kvs( &*ctx.memtable, 12, // sequence OpType::Put, - &[(1000, 1), (1000, 2)], // keys - &[Some(21), Some(22)], // values + &[(1000, 1), (1000, 2)], // keys + &[(Some(21), None), (Some(22), None)], // values ); { @@ -422,9 +453,10 @@ fn test_sequence_visibility() { batch_size: 1, visible_sequence: 9, for_flush: false, + projected_schema: None, }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); check_iter_content( &mut *iter, &[], // keys @@ -439,15 +471,16 @@ fn test_sequence_visibility() { batch_size: 1, visible_sequence: 10, for_flush: false, + projected_schema: None, }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); check_iter_content( &mut *iter, - &[(1000, 1), (1000, 2)], // keys - &[10, 10], // sequences - &[OpType::Put, OpType::Put], // op_types - &[Some(1), Some(2)], // values + &[(1000, 1), (1000, 2)], // keys + &[10, 10], // sequences + &[OpType::Put, OpType::Put], // op_types + &[(Some(1), None), (Some(2), None)], // values ); } @@ -456,15 +489,16 @@ fn test_sequence_visibility() { batch_size: 1, visible_sequence: 11, for_flush: false, + projected_schema: None, }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); check_iter_content( &mut *iter, - &[(1000, 1), (1000, 2)], // keys - &[11, 11], // sequences - &[OpType::Put, OpType::Put], // op_types - &[Some(11), Some(12)], // values + &[(1000, 1), (1000, 2)], // keys + &[11, 11], // sequences + &[OpType::Put, OpType::Put], // op_types + &[(Some(11), None), (Some(12), None)], // values ); } }); @@ -479,7 +513,7 @@ fn test_iter_after_none() { 10, // sequence OpType::Put, &[(1000, 0), (1001, 1), (1002, 2)], // keys - &[Some(0), Some(1), Some(2)], // values + &[(Some(0), None), (Some(1), None), (Some(2), None)], // values ); let iter_ctx = IterContext { @@ -487,9 +521,54 @@ fn test_iter_after_none() { ..Default::default() }; - let mut iter = ctx.memtable.iter(iter_ctx).unwrap(); + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); assert!(iter.next().is_some()); assert!(iter.next().is_none()); assert!(iter.next().is_none()); }); } + +#[test] +fn test_memtable_projection() { + let tester = MemtableTester::default(); + // Only need v0, but row key columns and internal columns would also be read. + let projected_schema = + Arc::new(ProjectedSchema::new(tester.schema.clone(), Some(vec![2])).unwrap()); + + tester.run_testcase(|ctx| { + write_kvs( + &*ctx.memtable, + 9, // sequence + OpType::Put, + &[(1000, 0), (1001, 1), (1002, 2)], // keys + &[ + (Some(10), Some(20)), + (Some(11), Some(21)), + (Some(12), Some(22)), + ], // values + ); + + let iter_ctx = IterContext { + batch_size: 4, + projected_schema: Some(projected_schema.clone()), + ..Default::default() + }; + + let mut iter = ctx.memtable.iter(&iter_ctx).unwrap(); + let batch = iter.next().unwrap().unwrap(); + assert!(iter.next().is_none()); + + assert_eq!(5, batch.num_columns()); + let k0 = Int64Array::from_slice(&[1000, 1001, 1002]); + let k1 = UInt64Array::from_slice(&[0, 1, 2]); + let v0 = UInt64Array::from_slice(&[10, 11, 12]); + let sequences = UInt64Array::from_slice(&[9, 9, 9]); + let op_types = UInt8Array::from_slice(&[0, 0, 0]); + + assert_eq!(k0, &*batch.column(0).to_arrow_array()); + assert_eq!(k1, &*batch.column(1).to_arrow_array()); + assert_eq!(v0, &*batch.column(2).to_arrow_array()); + assert_eq!(sequences, &*batch.column(3).to_arrow_array()); + assert_eq!(op_types, &*batch.column(4).to_arrow_array()); + }); +} diff --git a/src/storage/src/memtable/version.rs b/src/storage/src/memtable/version.rs index 72dc4706fe..cd4411f869 100644 --- a/src/storage/src/memtable/version.rs +++ b/src/storage/src/memtable/version.rs @@ -267,7 +267,14 @@ mod tests { (2003, 5), (1001, 1), ], // keys - &[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values + &[ + (Some(1), None), + (Some(2), None), + (Some(7), None), + (Some(8), None), + (Some(9), None), + (Some(3), None), + ], // values ); set.insert(RangeMillis::new(20, 30).unwrap(), memtable.clone()); diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index f998812dbc..b90b365821 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -149,6 +149,18 @@ pub struct ColumnMetadata { pub desc: ColumnDescriptor, } +impl ColumnMetadata { + #[inline] + pub fn id(&self) -> ColumnId { + self.desc.id + } + + #[inline] + pub fn name(&self) -> &str { + &self.desc.name + } +} + #[derive(Clone, Debug, PartialEq)] pub struct ColumnsMetadata { /// All columns. @@ -221,6 +233,11 @@ impl ColumnsMetadata { pub fn user_column_end(&self) -> usize { self.user_column_end } + + #[inline] + pub fn column_metadata(&self, idx: usize) -> &ColumnMetadata { + &self.columns[idx] + } } pub type ColumnsMetadataRef = Arc; diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index 150e917174..0f5255a549 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -1,18 +1,40 @@ //! Common structs and utilities for read. use async_trait::async_trait; -use datatypes::vectors::{UInt64Vector, UInt8Vector, VectorRef}; +use datatypes::vectors::VectorRef; use crate::error::Result; -// TODO(yingwen): Maybe pack op_type with sequence (reserve 8bits in u64 for op_type) like RocksDB. /// Storage internal representation of a batch of rows. +// Now the structure of `Batch` is still unstable, all pub fields may be changed. +#[derive(Debug, Default)] pub struct Batch { - // Now the structure of `Batch` is still unstable, all pub fields may be changed. - pub keys: Vec, - pub sequences: UInt64Vector, - pub op_types: UInt8Vector, - pub values: Vec, + /// Rows organized in columnar format. + /// + /// Columns follow the same order convention of region schema: + /// key, value, internal columns. + columns: Vec, +} + +impl Batch { + pub fn new(columns: Vec) -> Batch { + Batch { columns } + } + + #[inline] + pub fn num_columns(&self) -> usize { + self.columns.len() + } + + #[inline] + pub fn columns(&self) -> &[VectorRef] { + &self.columns + } + + #[inline] + pub fn column(&self, idx: usize) -> &VectorRef { + &self.columns[idx] + } } /// Async batch reader. diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 93fc267260..4377185dc2 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -2,6 +2,7 @@ mod basic; mod flush; +mod projection; use datatypes::prelude::ScalarVector; use datatypes::type_id::LogicalTypeId; diff --git a/src/storage/src/region/tests/projection.rs b/src/storage/src/region/tests/projection.rs new file mode 100644 index 0000000000..b6b5486ff5 --- /dev/null +++ b/src/storage/src/region/tests/projection.rs @@ -0,0 +1,161 @@ +use std::sync::Arc; + +use datatypes::prelude::ScalarVector; +use datatypes::type_id::LogicalTypeId; +use datatypes::vectors::Int64Vector; +use log_store::fs::log::LocalFileLogStore; +use store_api::logstore::LogStore; +use store_api::storage::{ + Chunk, ChunkReader, PutOperation, ReadContext, Region, ScanRequest, Snapshot, WriteContext, + WriteRequest, +}; +use tempdir::TempDir; + +use crate::region::RegionImpl; +use crate::region::RegionMetadata; +use crate::test_util::{self, config_util, descriptor_util, write_batch_util}; +use crate::write_batch::{PutData, WriteBatch}; + +/// Create metadata with schema (k0, timestamp, v0, v1) +fn new_metadata(region_name: &str) -> RegionMetadata { + let desc = descriptor_util::desc_with_value_columns(region_name, 2); + desc.try_into().unwrap() +} + +fn new_write_batch_for_test() -> WriteBatch { + write_batch_util::new_write_batch( + &[ + ("k0", LogicalTypeId::Int64, false), + (test_util::TIMESTAMP_NAME, LogicalTypeId::Int64, false), + ("v0", LogicalTypeId::Int64, true), + ("v1", LogicalTypeId::Int64, true), + ], + Some(1), + ) +} + +/// Build put data +/// +/// ```text +/// k0: [key_start, key_start + 1, ... key_start + len - 1] +/// timestamp: [ts_start, ts_start + 1, ... ts_start + len - 1] +/// v0: [initial_value, ...., initial_value] +/// v1: [initial_value, ..., initial_value + len - 1] +/// ``` +fn new_put_data(len: usize, key_start: i64, ts_start: i64, initial_value: i64) -> PutData { + let mut put_data = PutData::with_num_columns(4); + + let k0 = Int64Vector::from_values((0..len).map(|v| key_start + v as i64)); + let ts = Int64Vector::from_values((0..len).map(|v| ts_start + v as i64)); + let v0 = Int64Vector::from_values(std::iter::repeat(initial_value).take(len)); + let v1 = Int64Vector::from_values((0..len).map(|v| initial_value + v as i64)); + + put_data.add_key_column("k0", Arc::new(k0)).unwrap(); + put_data + .add_key_column(test_util::TIMESTAMP_NAME, Arc::new(ts)) + .unwrap(); + put_data.add_value_column("v0", Arc::new(v0)).unwrap(); + put_data.add_value_column("v1", Arc::new(v1)).unwrap(); + + put_data +} + +fn append_chunk_to(chunk: &Chunk, dst: &mut Vec>) { + if chunk.columns.is_empty() { + return; + } + let num_rows = chunk.columns[0].len(); + dst.resize(num_rows, Vec::new()); + for (i, row) in dst.iter_mut().enumerate() { + for col in &chunk.columns { + let val = col + .as_any() + .downcast_ref::() + .unwrap() + .get_data(i) + .unwrap(); + row.push(val); + } + } +} + +struct ProjectionTester { + region: RegionImpl, + write_ctx: WriteContext, + read_ctx: ReadContext, +} + +impl ProjectionTester { + fn with_region(region: RegionImpl) -> ProjectionTester { + ProjectionTester { + region, + write_ctx: WriteContext::default(), + read_ctx: ReadContext::default(), + } + } + + async fn put(&self, len: usize, key_start: i64, ts_start: i64, initial_value: i64) { + let mut batch = new_write_batch_for_test(); + let put_data = new_put_data(len, key_start, ts_start, initial_value); + batch.put(put_data).unwrap(); + + self.region.write(&self.write_ctx, batch).await.unwrap(); + } + + async fn scan(&self, projection: Option>) -> Vec> { + let snapshot = self.region.snapshot(&self.read_ctx).unwrap(); + + let request = ScanRequest { + projection, + ..Default::default() + }; + let resp = snapshot.scan(&self.read_ctx, request).await.unwrap(); + let mut reader = resp.reader; + + let mut dst = Vec::new(); + while let Some(chunk) = reader.next_chunk().await.unwrap() { + append_chunk_to(&chunk, &mut dst); + } + + dst + } +} + +const REGION_NAME: &str = "region-projection-0"; + +async fn new_tester(store_dir: &str) -> ProjectionTester { + let metadata = new_metadata(REGION_NAME); + + let store_config = config_util::new_store_config(REGION_NAME, store_dir).await; + let region = RegionImpl::create(metadata, store_config).await.unwrap(); + + ProjectionTester::with_region(region) +} + +#[tokio::test] +async fn test_projection_ordered() { + let dir = TempDir::new("projection-ordered").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + + let tester = new_tester(store_dir).await; + tester.put(4, 1, 10, 100).await; + + // timestamp, v1 + let output = tester.scan(Some(vec![1, 3])).await; + let expect = vec![vec![10, 100], vec![11, 101], vec![12, 102], vec![13, 103]]; + assert_eq!(expect, output); +} + +#[tokio::test] +async fn test_projection_unordered() { + let dir = TempDir::new("projection-unordered").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + + let tester = new_tester(store_dir).await; + tester.put(4, 1, 10, 100).await; + + // v1, k0 + let output = tester.scan(Some(vec![3, 0])).await; + let expect = vec![vec![100, 1], vec![101, 2], vec![102, 3], vec![103, 4]]; + assert_eq!(expect, output); +} diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 57b8b25640..e7d720784f 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -1,15 +1,15 @@ +use std::collections::{BTreeSet, HashMap}; use std::sync::Arc; use common_error::prelude::*; use datatypes::arrow::array::Array; -use datatypes::arrow::chunk::Chunk; +use datatypes::arrow::chunk::Chunk as ArrowChunk; use datatypes::arrow::datatypes::Schema as ArrowSchema; -use datatypes::prelude::Vector; use datatypes::schema::Metadata; -use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector}; +use datatypes::vectors::{Helper, VectorRef}; use serde::{Deserialize, Serialize}; use snafu::ensure; -use store_api::storage::{consts, ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +use store_api::storage::{consts, Chunk, ColumnId, ColumnSchema, Schema, SchemaBuilder, SchemaRef}; use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef}; use crate::read::Batch; @@ -64,6 +64,9 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Invalid projection, {}", msg))] + InvalidProjection { msg: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -99,7 +102,7 @@ pub struct RegionSchema { impl RegionSchema { pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result { let user_schema = Arc::new(build_user_schema(&columns, version)?); - let sst_schema = SstSchema::new(&columns, version)?; + let sst_schema = SstSchema::from_columns_metadata(&columns, version)?; debug_assert_eq!(user_schema.version(), sst_schema.version()); debug_assert_eq!(version, user_schema.version()); @@ -148,10 +151,37 @@ impl RegionSchema { pub fn version(&self) -> u32 { self.user_schema.version() } + + #[inline] + fn sequence_index(&self) -> usize { + self.sst_schema.sequence_index() + } + + #[inline] + fn op_type_index(&self) -> usize { + self.sst_schema.op_type_index() + } + + #[inline] + fn row_key_indices(&self) -> impl Iterator { + self.sst_schema.row_key_indices() + } + + #[inline] + fn column_metadata(&self, idx: usize) -> &ColumnMetadata { + self.columns.column_metadata(idx) + } + + #[inline] + fn timestamp_key_index(&self) -> usize { + self.columns.timestamp_key_index() + } } pub type RegionSchemaRef = Arc; +// TODO(yingwen): Now this schema in not only used by SST, maybe rename it to InternalSchema +// or something else. /// Schema of SST. /// /// Only contains a reference to schema and some indices, so it should be cheap to clone. @@ -163,37 +193,6 @@ pub struct SstSchema { } impl SstSchema { - fn new(columns: &ColumnsMetadata, version: u32) -> Result { - let column_schemas: Vec<_> = columns - .iter_all_columns() - .map(|col| ColumnSchema::from(&col.desc)) - .collect(); - - let schema = SchemaBuilder::from(column_schemas) - .timestamp_index(columns.timestamp_key_index()) - .version(version) - .add_metadata(ROW_KEY_END_KEY, columns.row_key_end().to_string()) - .add_metadata(USER_COLUMN_END_KEY, columns.user_column_end().to_string()) - .build() - .context(BuildSchemaSnafu)?; - - let user_column_end = columns.user_column_end(); - assert_eq!( - consts::SEQUENCE_COLUMN_NAME, - schema.column_schemas()[user_column_end].name - ); - assert_eq!( - consts::OP_TYPE_COLUMN_NAME, - schema.column_schemas()[user_column_end + 1].name - ); - - Ok(SstSchema { - schema: Arc::new(schema), - row_key_end: columns.row_key_end(), - user_column_end, - }) - } - #[inline] pub fn version(&self) -> u32 { self.schema.version() @@ -209,56 +208,71 @@ impl SstSchema { self.schema.arrow_schema() } - pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> Chunk> { - assert_eq!( - self.schema.num_columns(), - // key columns + value columns + sequence + op_type - batch.keys.len() + batch.values.len() + 2 - ); + pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> ArrowChunk> { + assert_eq!(self.schema.num_columns(), batch.num_columns()); - Chunk::new( - batch - .keys - .iter() - .map(|v| v.to_arrow_array()) - .chain(batch.values.iter().map(|v| v.to_arrow_array())) - .chain(std::iter::once(batch.sequences.to_arrow_array())) - .chain(std::iter::once(batch.op_types.to_arrow_array())) - .collect(), + ArrowChunk::new(batch.columns().iter().map(|v| v.to_arrow_array()).collect()) + } + + pub fn arrow_chunk_to_batch(&self, chunk: &ArrowChunk>) -> Result { + assert_eq!(self.schema.num_columns(), chunk.columns().len()); + + let columns = chunk + .iter() + .enumerate() + .map(|(i, column)| { + Helper::try_into_vector(column.clone()).context(ConvertChunkSnafu { + name: self.column_name(i), + }) + }) + .collect::>()?; + + Ok(Batch::new(columns)) + } + + fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result { + let column_schemas: Vec<_> = columns + .iter_all_columns() + .map(|col| ColumnSchema::from(&col.desc)) + .collect(); + + SstSchema::new( + column_schemas, + version, + columns.timestamp_key_index(), + columns.row_key_end(), + columns.user_column_end(), ) } - pub fn arrow_chunk_to_batch(&self, chunk: &Chunk>) -> Result { - let keys = self - .row_key_indices() - .map(|i| { - Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu { - name: self.column_name(i), - }) - }) - .collect::>()?; - let sequences = UInt64Vector::try_from_arrow_array(&chunk[self.sequence_index()].clone()) - .context(ConvertChunkSnafu { - name: consts::SEQUENCE_COLUMN_NAME, - })?; - let op_types = UInt8Vector::try_from_arrow_array(&chunk[self.op_type_index()].clone()) - .context(ConvertChunkSnafu { - name: consts::OP_TYPE_COLUMN_NAME, - })?; - let values = self - .value_indices() - .map(|i| { - Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu { - name: self.column_name(i), - }) - }) - .collect::>()?; + fn new( + column_schemas: Vec, + version: u32, + timestamp_key_index: usize, + row_key_end: usize, + user_column_end: usize, + ) -> Result { + let schema = SchemaBuilder::from(column_schemas) + .timestamp_index(timestamp_key_index) + .version(version) + .add_metadata(ROW_KEY_END_KEY, row_key_end.to_string()) + .add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string()) + .build() + .context(BuildSchemaSnafu)?; - Ok(Batch { - keys, - sequences, - op_types, - values, + assert_eq!( + consts::SEQUENCE_COLUMN_NAME, + schema.column_schemas()[user_column_end].name + ); + assert_eq!( + consts::OP_TYPE_COLUMN_NAME, + schema.column_schemas()[user_column_end + 1].name + ); + + Ok(SstSchema { + schema: Arc::new(schema), + row_key_end, + user_column_end, }) } @@ -278,13 +292,13 @@ impl SstSchema { } #[inline] - fn value_indices(&self) -> impl Iterator { - self.row_key_end..self.user_column_end + fn column_name(&self, idx: usize) -> &str { + &self.schema.column_schemas()[idx].name } #[inline] - fn column_name(&self, idx: usize) -> &str { - &self.schema.column_schemas()[idx].name + fn num_columns(&self) -> usize { + self.schema.num_columns() } } @@ -315,6 +329,288 @@ impl TryFrom for SstSchema { } } +/// Metadata about projection. +#[derive(Debug, Default)] +struct Projection { + /// Column indices of projection. + projected_columns: Vec, + /// Sorted and deduplicated indices of columns to read, includes all row key columns + /// and internal columns. + /// + /// We use these indices to read from data sources. + columns_to_read: Vec, + /// Maps column id to its index in `columns_to_read`. + /// + /// Used to ask whether the column with given column id is needed in projection. + id_to_read_idx: HashMap, + /// Maps index of `projected_columns` to index of the column in `columns_to_read`. + /// + /// Invariant: + /// - `projected_idx_to_read_idx.len() == projected_columns.len()` + projected_idx_to_read_idx: Vec, + /// Number of user columns to read. + num_user_columns: usize, +} + +impl Projection { + fn new(region_schema: &RegionSchema, projected_columns: Vec) -> Projection { + // Get a sorted list of column indices to read. + let mut column_indices: BTreeSet<_> = projected_columns.iter().cloned().collect(); + column_indices.extend(region_schema.row_key_indices()); + let num_user_columns = column_indices.len(); + // Now insert internal columns. + column_indices.extend([ + region_schema.sequence_index(), + region_schema.op_type_index(), + ]); + let columns_to_read: Vec<_> = column_indices.into_iter().collect(); + + // The region schema ensure that last two column must be internal columns. + assert_eq!( + region_schema.sequence_index(), + columns_to_read[num_user_columns] + ); + assert_eq!( + region_schema.op_type_index(), + columns_to_read[num_user_columns + 1] + ); + + // Mapping: => + let id_to_read_idx: HashMap<_, _> = columns_to_read + .iter() + .enumerate() + .map(|(idx, col_idx)| (region_schema.column_metadata(*col_idx).id(), idx)) + .collect(); + // Use column id to find index in `columns_to_read` of a column in `projected_columns`. + let projected_idx_to_read_idx = projected_columns + .iter() + .map(|col_idx| { + let column_id = region_schema.column_metadata(*col_idx).id(); + // This unwrap() should be safe since `columns_to_read` must contains all columns in `projected_columns`. + let read_idx = id_to_read_idx.get(&column_id).unwrap(); + *read_idx + }) + .collect(); + + Projection { + projected_columns, + columns_to_read, + id_to_read_idx, + projected_idx_to_read_idx, + num_user_columns, + } + } +} + +/// Schema with projection info. +#[derive(Debug)] +pub struct ProjectedSchema { + /// Projection info, `None` means don't need to do projection. + projection: Option, + /// Schema used to read from data sources. + schema_to_read: SstSchema, + /// User schema after projection. + projected_user_schema: SchemaRef, +} + +pub type ProjectedSchemaRef = Arc; + +impl ProjectedSchema { + /// Create a new `ProjectedSchema` with given `projected_columns`. + /// + /// If `projected_columns` is None, then all columns would be read. If `projected_columns` is + /// `Some`, then the `Vec` in it contains the indices of columns need to be read. + /// + /// If the `Vec` is empty or contains invalid index, `Err` would be returned. + pub fn new( + region_schema: RegionSchemaRef, + projected_columns: Option>, + ) -> Result { + match projected_columns { + Some(indices) => { + Self::validate_projection(®ion_schema, &indices)?; + + let projection = Projection::new(®ion_schema, indices); + + let schema_to_read = Self::build_schema_to_read(®ion_schema, &projection)?; + let projected_user_schema = + Self::build_projected_user_schema(®ion_schema, &projection)?; + + Ok(ProjectedSchema { + projection: Some(projection), + schema_to_read, + projected_user_schema, + }) + } + None => Ok(ProjectedSchema::no_projection(region_schema)), + } + } + + /// Create a `ProjectedSchema` that read all columns. + pub fn no_projection(region_schema: RegionSchemaRef) -> ProjectedSchema { + // We could just reuse the SstSchema and user schema. + ProjectedSchema { + projection: None, + schema_to_read: region_schema.sst_schema().clone(), + projected_user_schema: region_schema.user_schema().clone(), + } + } + + #[inline] + pub fn projected_user_schema(&self) -> &SchemaRef { + &self.projected_user_schema + } + + #[inline] + pub fn schema_to_read(&self) -> &SstSchema { + &self.schema_to_read + } + + /// Convert [Batch] into [Chunk]. + /// + /// This will remove all internal columns. The input `batch` should has the + /// same schema as `self.schema_to_read()`. + pub fn batch_to_chunk(&self, batch: &Batch) -> Chunk { + let columns = match &self.projection { + Some(projection) => projection + .projected_idx_to_read_idx + .iter() + .map(|col_idx| batch.column(*col_idx)) + .cloned() + .collect(), + None => { + let num_user_columns = self.projected_user_schema.num_columns(); + batch + .columns() + .iter() + .take(num_user_columns) + .cloned() + .collect() + } + }; + + Chunk::new(columns) + } + + /// Returns true if column with given `column_id` is needed (in projection). + pub fn is_needed(&self, column_id: ColumnId) -> bool { + self.projection + .as_ref() + .map(|p| p.id_to_read_idx.contains_key(&column_id)) + .unwrap_or(true) + } + + /// Construct a new [Batch] from row key, value, sequence and op_type. + /// + /// # Panics + /// Panics if number of columns are not the same as this schema. + pub fn batch_from_parts( + &self, + row_key_columns: Vec, + mut value_columns: Vec, + sequences: VectorRef, + op_types: VectorRef, + ) -> Batch { + // sequence and op_type + let num_internal_columns = 2; + + assert_eq!( + self.schema_to_read.num_columns(), + row_key_columns.len() + value_columns.len() + num_internal_columns + ); + + let mut columns = row_key_columns; + // Reserve space for value, sequence and op_type + columns.reserve(value_columns.len() + num_internal_columns); + columns.append(&mut value_columns); + // Internal columns are push in sequence, op_type order. + columns.push(sequences); + columns.push(op_types); + + Batch::new(columns) + } + + fn build_schema_to_read( + region_schema: &RegionSchema, + projection: &Projection, + ) -> Result { + let column_schemas: Vec<_> = projection + .columns_to_read + .iter() + .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) + .collect(); + // All row key columns are reserved in this schema, so we can use the row_key_end + // and timestamp_key_index from region schema. + SstSchema::new( + column_schemas, + region_schema.version(), + region_schema.timestamp_key_index(), + region_schema.columns.row_key_end(), + projection.num_user_columns, + ) + } + + fn build_projected_user_schema( + region_schema: &RegionSchema, + projection: &Projection, + ) -> Result { + let timestamp_index = + projection + .projected_columns + .iter() + .enumerate() + .find_map(|(idx, col_idx)| { + if *col_idx == region_schema.timestamp_key_index() { + Some(idx) + } else { + None + } + }); + let column_schemas: Vec<_> = projection + .projected_columns + .iter() + .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) + .collect(); + + let mut builder = SchemaBuilder::from(column_schemas).version(region_schema.version()); + if let Some(timestamp_index) = timestamp_index { + builder = builder.timestamp_index(timestamp_index); + } + + let schema = builder.build().context(BuildSchemaSnafu)?; + + Ok(Arc::new(schema)) + } + + fn validate_projection(region_schema: &RegionSchema, indices: &[usize]) -> Result<()> { + // The projection indices should not be empty, at least the timestamp column + // should be always read, and the `SstSchema` also requires the timestamp column. + ensure!( + !indices.is_empty(), + InvalidProjectionSnafu { + msg: "at least one column should be read", + } + ); + + // Now only allowed to read user columns. + let user_schema = region_schema.user_schema(); + for i in indices { + ensure!( + *i < user_schema.num_columns(), + InvalidProjectionSnafu { + msg: format!( + "index {} out of bound, only contains {} columns", + i, + user_schema.num_columns() + ), + } + ); + } + + Ok(()) + } +} + fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { let value = metadata.get(key).context(MissingMetaSnafu { key })?; value.parse().context(ParseIndexSnafu { value }) @@ -341,88 +637,93 @@ mod tests { use super::*; use crate::metadata::RegionMetadata; - use crate::test_util::{descriptor_util::RegionDescBuilder, schema_util}; + use crate::test_util::{descriptor_util, schema_util}; fn new_batch() -> Batch { - let k1 = Int64Vector::from_slice(&[1, 2, 3]); + let k0 = Int64Vector::from_slice(&[1, 2, 3]); let timestamp = Int64Vector::from_slice(&[4, 5, 6]); - let v1 = Int64Vector::from_slice(&[7, 8, 9]); + let v0 = Int64Vector::from_slice(&[7, 8, 9]); + let sequences = UInt64Vector::from_slice(&[100, 100, 100]); + let op_types = UInt8Vector::from_slice(&[0, 0, 0]); - Batch { - keys: vec![Arc::new(k1), Arc::new(timestamp)], - values: vec![Arc::new(v1)], - sequences: UInt64Vector::from_slice(&[100, 100, 100]), - op_types: UInt8Vector::from_slice(&[0, 0, 0]), - } + Batch::new(vec![ + Arc::new(k0), + Arc::new(timestamp), + Arc::new(v0), + Arc::new(sequences), + Arc::new(op_types), + ]) } - fn check_chunk_batch(chunk: &Chunk>, batch: &Batch) { + fn check_chunk_batch(chunk: &ArrowChunk>, batch: &Batch) { assert_eq!(5, chunk.columns().len()); assert_eq!(3, chunk.len()); - for i in 0..2 { - assert_eq!(chunk[i], batch.keys[i].to_arrow_array()); + for i in 0..5 { + assert_eq!(chunk[i], batch.column(i).to_arrow_array()); } - assert_eq!(chunk[2], batch.values[0].to_arrow_array()); - assert_eq!(chunk[3], batch.sequences.to_arrow_array()); - assert_eq!(chunk[4], batch.op_types.to_arrow_array()); + } + + fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema { + let metadata: RegionMetadata = + descriptor_util::desc_with_value_columns("test", num_value_columns) + .try_into() + .unwrap(); + + let columns = metadata.columns; + RegionSchema::new(columns, version).unwrap() } #[test] fn test_region_schema() { - let desc = RegionDescBuilder::new("test") - .push_key_column(("k1", LogicalTypeId::Int64, false)) - .push_value_column(("v1", LogicalTypeId::Int64, true)) - .build(); - let metadata: RegionMetadata = desc.try_into().unwrap(); + let region_schema = Arc::new(new_region_schema(123, 1)); - let columns = metadata.columns; - let region_schema = RegionSchema::new(columns.clone(), 0).unwrap(); - - let expect_schema = schema_util::new_schema( + let expect_schema = schema_util::new_schema_with_version( &[ - ("k1", LogicalTypeId::Int64, false), + ("k0", LogicalTypeId::Int64, false), ("timestamp", LogicalTypeId::Int64, false), - ("v1", LogicalTypeId::Int64, true), + ("v0", LogicalTypeId::Int64, true), ], Some(1), + 123, ); assert_eq!(expect_schema, **region_schema.user_schema()); + // Checks row key column. let mut row_keys = region_schema.row_key_columns(); - assert_eq!("k1", row_keys.next().unwrap().desc.name); + assert_eq!("k0", row_keys.next().unwrap().desc.name); assert_eq!("timestamp", row_keys.next().unwrap().desc.name); assert_eq!(None, row_keys.next()); assert_eq!(2, region_schema.num_row_key_columns()); + // Checks value column. let mut values = region_schema.value_columns(); - assert_eq!("v1", values.next().unwrap().desc.name); + assert_eq!("v0", values.next().unwrap().desc.name); assert_eq!(None, values.next()); assert_eq!(1, region_schema.num_value_columns()); - assert_eq!(0, region_schema.version()); - { - let region_schema = RegionSchema::new(columns, 1234).unwrap(); - assert_eq!(1234, region_schema.version()); - assert_eq!(1234, region_schema.sst_schema().version()); - } + // Checks version. + assert_eq!(123, region_schema.version()); + assert_eq!(123, region_schema.sst_schema().version()); + // Checks SstSchema. let sst_schema = region_schema.sst_schema(); let sst_arrow_schema = sst_schema.arrow_schema(); let converted_sst_schema = SstSchema::try_from((**sst_arrow_schema).clone()).unwrap(); assert_eq!(*sst_schema, converted_sst_schema); - let expect_schema = schema_util::new_schema( + let expect_schema = schema_util::new_schema_with_version( &[ - ("k1", LogicalTypeId::Int64, false), + ("k0", LogicalTypeId::Int64, false), ("timestamp", LogicalTypeId::Int64, false), - ("v1", LogicalTypeId::Int64, true), + ("v0", LogicalTypeId::Int64, true), (consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false), (consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false), ], Some(1), + 123, ); assert_eq!( expect_schema.column_schemas(), @@ -432,8 +733,6 @@ mod tests { assert_eq!(4, sst_schema.op_type_index()); let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect(); assert_eq!([0, 1], &row_key_indices[..]); - let value_indices: Vec<_> = sst_schema.value_indices().collect(); - assert_eq!([2], &value_indices[..]); // Test batch and chunk conversion. let batch = new_batch(); @@ -445,4 +744,165 @@ mod tests { let converted_batch = sst_schema.arrow_chunk_to_batch(&chunk).unwrap(); check_chunk_batch(&chunk, &converted_batch); } + + #[test] + fn test_projection() { + // Build a region schema with 2 value columns. So the final user schema is + // (k0, timestamp, v0, v1) + let region_schema = new_region_schema(0, 2); + + // Projection, but still keep column order. + // After projection: (timestamp, v0) + let projected_columns = vec![1, 2]; + let projection = Projection::new(®ion_schema, projected_columns.clone()); + assert_eq!(projected_columns, projection.projected_columns); + // Need to read (k0, timestamp, v0, sequence, op_type) + assert_eq!(&[0, 1, 2, 4, 5], &projection.columns_to_read[..]); + assert_eq!(5, projection.id_to_read_idx.len()); + // Index of timestamp, v0 in `columns_to_read` + assert_eq!(&[1, 2], &projection.projected_idx_to_read_idx[..]); + // 3 columns: k0, timestamp, v0 + assert_eq!(3, projection.num_user_columns); + + // Projection, unordered. + // After projection: (timestamp, v1, k0) + let projected_columns = vec![1, 3, 0]; + let projection = Projection::new(®ion_schema, projected_columns.clone()); + assert_eq!(projected_columns, projection.projected_columns); + // Need to read (k0, timestamp, v1, sequence, op_type) + assert_eq!(&[0, 1, 3, 4, 5], &projection.columns_to_read[..]); + assert_eq!(5, projection.id_to_read_idx.len()); + // Index of timestamp, v1, k0 in `columns_to_read` + assert_eq!(&[1, 2, 0], &projection.projected_idx_to_read_idx[..]); + // 3 columns: k0, timestamp, v1 + assert_eq!(3, projection.num_user_columns); + + // Empty projection. + let projection = Projection::new(®ion_schema, Vec::new()); + assert!(projection.projected_columns.is_empty()); + // Still need to read row keys. + assert_eq!(&[0, 1, 4, 5], &projection.columns_to_read[..]); + assert_eq!(4, projection.id_to_read_idx.len()); + assert!(projection.projected_idx_to_read_idx.is_empty()); + assert_eq!(2, projection.num_user_columns); + } + + #[test] + fn test_projected_schema_with_projection() { + // (k0, timestamp, v0, v1, v2) + let region_schema = Arc::new(new_region_schema(123, 3)); + + // After projection: (v1, timestamp) + let projected_schema = + ProjectedSchema::new(region_schema.clone(), Some(vec![3, 1])).unwrap(); + let expect_user = schema_util::new_schema_with_version( + &[ + ("v1", LogicalTypeId::Int64, true), + ("timestamp", LogicalTypeId::Int64, false), + ], + Some(1), + 123, + ); + assert_eq!(expect_user, **projected_schema.projected_user_schema()); + + // Test is_needed + let needed: Vec<_> = region_schema + .columns + .iter_all_columns() + .enumerate() + .filter_map(|(idx, column_meta)| { + if projected_schema.is_needed(column_meta.id()) { + Some(idx) + } else { + None + } + }) + .collect(); + // (k0, timestamp, v1, sequence, op_type) + assert_eq!(&[0, 1, 3, 5, 6], &needed[..]); + + // Use another projection. + // After projection: (v0, timestamp) + let projected_schema = ProjectedSchema::new(region_schema, Some(vec![2, 1])).unwrap(); + + // The schema to read should be same as region schema with (k0, timestamp, v0). + // We can't use `new_schema_with_version()` because the SstSchema also store other + // metadata that `new_schema_with_version()` can't store. + let expect_schema = new_region_schema(123, 1); + assert_eq!( + expect_schema.sst_schema(), + projected_schema.schema_to_read() + ); + + // (k0, timestamp, v0, sequence, op_type) + let batch = new_batch(); + // Test Batch to our Chunk. + // (v0, timestamp) + let chunk = projected_schema.batch_to_chunk(&batch); + assert_eq!(2, chunk.columns.len()); + assert_eq!( + chunk.columns[0].to_arrow_array(), + batch.column(2).to_arrow_array() + ); + assert_eq!( + chunk.columns[1].to_arrow_array(), + batch.column(1).to_arrow_array() + ); + + // Test batch_from_parts + let keys = batch.columns()[0..2].to_vec(); + let values = batch.columns()[2..3].to_vec(); + let created = projected_schema.batch_from_parts( + keys, + values, + batch.column(3).clone(), + batch.column(4).clone(), + ); + assert_eq!(5, created.num_columns()); + for i in 0..5 { + assert_eq!( + batch.column(i).to_arrow_array(), + created.column(i).to_arrow_array() + ); + } + } + + #[test] + fn test_projected_schema_no_projection() { + // (k0, timestamp, v0) + let region_schema = Arc::new(new_region_schema(123, 1)); + + let projected_schema = ProjectedSchema::no_projection(region_schema.clone()); + + assert_eq!( + region_schema.user_schema(), + projected_schema.projected_user_schema() + ); + assert_eq!( + region_schema.sst_schema(), + projected_schema.schema_to_read() + ); + + for column in region_schema.columns.iter_all_columns() { + assert!(projected_schema.is_needed(column.id())); + } + + // (k0, timestamp, v0, sequence, op_type) + let batch = new_batch(); + // Test Batch to our Chunk. + // (k0, timestamp, v0) + let chunk = projected_schema.batch_to_chunk(&batch); + assert_eq!(3, chunk.columns.len()); + } + + #[test] + fn test_projected_schema_empty_projection() { + // (k0, timestamp, v0) + let region_schema = Arc::new(new_region_schema(123, 1)); + + let err = ProjectedSchema::new(region_schema, Some(Vec::new())) + .err() + .unwrap(); + assert!(matches!(err, Error::InvalidProjection { .. })); + } } diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 50adc5e338..739783de32 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -8,7 +8,6 @@ use store_api::storage::{ use crate::chunk::{ChunkReaderBuilder, ChunkReaderImpl}; use crate::error::{Error, Result}; -use crate::memtable::IterContext; use crate::sst::AccessLayerRef; use crate::version::VersionRef; @@ -41,17 +40,15 @@ impl Snapshot for SnapshotImpl { let immutables = memtable_version.immutable_memtables(); let mut builder = - ChunkReaderBuilder::new(self.version.user_schema().clone(), self.sst_layer.clone()) + ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) .reserve_num_memtables(memtable_version.num_memtables()) - .iter_ctx(IterContext { - batch_size: ctx.batch_size, - visible_sequence, - ..Default::default() - }) - .pick_memtables(mutables)?; + .projection(request.projection) + .batch_size(ctx.batch_size) + .visible_sequence(visible_sequence) + .pick_memtables(mutables); for mem_set in immutables { - builder = builder.pick_memtables(mem_set)?; + builder = builder.pick_memtables(mem_set); } let reader = builder.pick_ssts(&**self.version.ssts())?.build().await?; diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index dc306b2d2c..2fcbfa3e5d 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use crate::error::Result; use crate::memtable::BoxedBatchIterator; use crate::read::BoxedBatchReader; +use crate::schema::ProjectedSchemaRef; use crate::sst::parquet::{ParquetReader, ParquetWriter}; /// Maximum level of SSTs. @@ -173,6 +174,9 @@ pub struct WriteOptions { pub struct ReadOptions { /// Suggested size of each batch. pub batch_size: usize, + /// The schema that user expected to read, might not the same as the + /// schema of the SST file. + pub projected_schema: ProjectedSchemaRef, } /// SST access layer. @@ -186,8 +190,7 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { opts: &WriteOptions, ) -> Result<()>; - /// Read SST file with given `file_name`. - // TODO(yingwen): Read SST according to scan request and returns a chunk stream. + /// Read SST file with given `file_name` and schema. async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result; } @@ -233,9 +236,13 @@ impl AccessLayer for FsAccessLayer { async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result { let file_path = self.sst_file_path(file_name); - let reader = ParquetReader::new(&file_path, self.object_store.clone()); + let reader = ParquetReader::new( + &file_path, + self.object_store.clone(), + opts.projected_schema.clone(), + ); - let stream = reader.chunk_stream(None, opts.batch_size).await?; + let stream = reader.chunk_stream(opts.batch_size).await?; Ok(Box::new(stream)) } } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index e8279b4fc6..2afd7e0050 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -23,7 +23,7 @@ use snafu::ResultExt; use crate::error::{self, Result}; use crate::memtable::BoxedBatchIterator; use crate::read::{Batch, BatchReader}; -use crate::schema::SstSchema; +use crate::schema::{ProjectedSchemaRef, SstSchema}; use crate::sst; /// Parquet sst writer. @@ -54,8 +54,8 @@ impl<'a> ParquetWriter<'a> { /// A chunk of records yielded from each iteration with a size given /// in config will be written to a single row group. async fn write_rows(self, extra_meta: Option>) -> Result<()> { - let region_schema = self.iter.schema(); - let sst_schema = region_schema.sst_schema(); + let projected_schema = self.iter.schema(); + let sst_schema = projected_schema.schema_to_read(); let schema = sst_schema.arrow_schema(); let object = self.object_store.object(self.file_path); @@ -156,28 +156,26 @@ fn transverse_recursive T + Clone>( pub struct ParquetReader<'a> { file_path: &'a str, object_store: ObjectStore, + projected_schema: ProjectedSchemaRef, } type ReaderFactoryFuture<'a, R> = Pin> + Send + 'a>>; -pub type FieldProjection = Box Vec + Send + Sync>; - impl<'a> ParquetReader<'a> { - pub fn new(file_path: &str, object_store: ObjectStore) -> ParquetReader { + pub fn new( + file_path: &str, + object_store: ObjectStore, + projected_schema: ProjectedSchemaRef, + ) -> ParquetReader { ParquetReader { file_path, object_store, + projected_schema, } } - // TODO(yingwen): Projection is not supported now, since field index would change after projection. - // To support projection, we may need to implement some helper methods in schema. - pub async fn chunk_stream( - &self, - _projection: Option, - chunk_size: usize, - ) -> Result { + pub async fn chunk_stream(&self, chunk_size: usize) -> Result { let file_path = self.file_path.to_string(); let operator = self.object_store.clone(); let reader_factory = move || -> ReaderFactoryFuture { @@ -195,12 +193,12 @@ impl<'a> ParquetReader<'a> { .context(error::ReadParquetSnafu { file: &file_path })?; let arrow_schema = infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?; - // Just read all fields. - let projected_fields = arrow_schema.fields.clone(); - - let sst_schema = SstSchema::try_from(arrow_schema) + // Now the SstSchema is only used to validate metadata of the parquet file, but this schema + // would be useful once we support altering schema, as this is the actual schema of the SST. + let _sst_schema = SstSchema::try_from(arrow_schema) .context(error::ConvertSstSchemaSnafu { file: &file_path })?; + let projected_fields = self.projected_fields().to_vec(); let chunk_stream = try_stream!({ for rg in metadata.row_groups { let column_chunks = read_columns_many_async( @@ -221,20 +219,27 @@ impl<'a> ParquetReader<'a> { } }); - ChunkStream::new(sst_schema, Box::pin(chunk_stream)) + ChunkStream::new(self.projected_schema.clone(), Box::pin(chunk_stream)) + } + + fn projected_fields(&self) -> &[Field] { + &self.projected_schema.schema_to_read().arrow_schema().fields } } pub type SendableChunkStream = Pin>>> + Send>>; pub struct ChunkStream { - schema: SstSchema, + projected_schema: ProjectedSchemaRef, stream: SendableChunkStream, } impl ChunkStream { - pub fn new(schema: SstSchema, stream: SendableChunkStream) -> Result { - Ok(Self { schema, stream }) + pub fn new(projected_schema: ProjectedSchemaRef, stream: SendableChunkStream) -> Result { + Ok(Self { + projected_schema, + stream, + }) } } @@ -245,7 +250,8 @@ impl BatchReader for ChunkStream { .try_next() .await? .map(|chunk| { - self.schema + self.projected_schema + .schema_to_read() .arrow_chunk_to_batch(&chunk) .context(error::InvalidParquetSchemaSnafu) }) @@ -284,7 +290,14 @@ mod tests { (2003, 5), (1001, 1), ], // keys - &[Some(1), Some(2), Some(7), Some(8), Some(9), Some(3)], // values + &[ + (Some(1), Some(1234)), + (Some(2), Some(1234)), + (Some(7), Some(1234)), + (Some(8), Some(1234)), + (Some(9), Some(1234)), + (Some(3), Some(1234)), + ], // values ); let dir = TempDir::new("write_parquet").unwrap(); @@ -292,7 +305,7 @@ mod tests { let backend = Backend::build().root(path).finish().await.unwrap(); let object_store = ObjectStore::new(backend); let sst_file_name = "test-flush.parquet"; - let iter = memtable.iter(IterContext::default()).unwrap(); + let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, iter, object_store); writer @@ -307,7 +320,7 @@ mod tests { // chunk schema: timestamp, __version, v1, __sequence, __op_type let chunk = file_reader.next().unwrap().unwrap(); - assert_eq!(5, chunk.arrays().len()); + assert_eq!(6, chunk.arrays().len()); // timestamp assert_eq!( @@ -323,22 +336,30 @@ mod tests { chunk.arrays()[1] ); - // v1 + // v0 assert_eq!( Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc, chunk.arrays()[2] ); + // v1 + assert_eq!( + Arc::new(UInt64Array::from_slice(&[ + 1234, 1234, 1234, 1234, 1234, 1234 + ])) as Arc, + chunk.arrays()[3] + ); + // sequence assert_eq!( Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc, - chunk.arrays()[3] + chunk.arrays()[4] ); // op_type assert_eq!( Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc, - chunk.arrays()[4] + chunk.arrays()[5] ); } } diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index 0e374279de..99139e2cd8 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -1,4 +1,5 @@ use datatypes::prelude::ConcreteDataType; +use datatypes::type_id::LogicalTypeId; use store_api::storage::{ ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId, RegionDescriptor, RegionId, RowKeyDescriptorBuilder, @@ -99,3 +100,14 @@ impl RegionDescBuilder { .unwrap() } } + +/// Create desc with schema (k0, timestamp, v0, ... vn-1) +pub fn desc_with_value_columns(region_name: &str, num_value_columns: usize) -> RegionDescriptor { + let mut builder = + RegionDescBuilder::new(region_name).push_key_column(("k0", LogicalTypeId::Int64, false)); + for i in 0..num_value_columns { + let name = format!("v{}", i); + builder = builder.push_value_column((&name, LogicalTypeId::Int64, true)); + } + builder.build() +} diff --git a/src/storage/src/test_util/read_util.rs b/src/storage/src/test_util/read_util.rs index 97de4ea54b..d47cfa0706 100644 --- a/src/storage/src/test_util/read_util.rs +++ b/src/storage/src/test_util/read_util.rs @@ -11,24 +11,21 @@ use crate::read::{Batch, BatchReader, BoxedBatchReader}; fn new_kv_batch(key_values: &[(i64, Option)]) -> Batch { let key = Arc::new(Int64Vector::from_values(key_values.iter().map(|v| v.0))); let value = Arc::new(Int64Vector::from_iter(key_values.iter().map(|v| v.1))); - let sequences = UInt64Vector::from_vec(vec![0; key_values.len()]); - let op_types = UInt8Vector::from_vec(vec![0; key_values.len()]); + let sequences = Arc::new(UInt64Vector::from_vec(vec![0; key_values.len()])); + let op_types = Arc::new(UInt8Vector::from_vec(vec![0; key_values.len()])); - Batch { - keys: vec![key], - sequences, - op_types, - values: vec![value], - } + Batch::new(vec![key, value, sequences, op_types]) } fn check_kv_batch(batches: &[Batch], expect: &[&[(i64, Option)]]) { for (batch, key_values) in batches.iter().zip(expect.iter()) { - let key = batch.keys[0] + let key = batch + .column(0) .as_any() .downcast_ref::() .unwrap(); - let value = batch.values[0] + let value = batch + .column(1) .as_any() .downcast_ref::() .unwrap(); diff --git a/src/storage/src/test_util/schema_util.rs b/src/storage/src/test_util/schema_util.rs index ac0602359d..df68559dde 100644 --- a/src/storage/src/test_util/schema_util.rs +++ b/src/storage/src/test_util/schema_util.rs @@ -7,7 +7,15 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> Schema { - let column_schemas = column_defs + new_schema_with_version(column_defs, timestamp_index, 0) +} + +pub fn new_schema_with_version( + column_defs: &[ColumnDef], + timestamp_index: Option, + version: u32, +) -> Schema { + let column_schemas: Vec<_> = column_defs .iter() .map(|column_def| { let datatype = column_def.1.data_type(); @@ -15,14 +23,11 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> }) .collect(); + let mut builder = SchemaBuilder::from(column_schemas).version(version); if let Some(index) = timestamp_index { - SchemaBuilder::from(column_schemas) - .timestamp_index(index) - .build() - .unwrap() - } else { - Schema::new(column_schemas) + builder = builder.timestamp_index(index); } + builder.build().unwrap() } pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option) -> SchemaRef { diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 6e9325cf6f..e8f727b31b 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -41,6 +41,8 @@ pub struct ScanRequest { /// Default is None. Only returns data whose sequence number is less than or /// equal to the `sequence`. pub sequence: Option, + /// Indices of columns to read, `None` to read all columns. + pub projection: Option>, } #[derive(Debug)] diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 3a803c87b7..de7b649dfd 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -105,15 +105,19 @@ impl Table for MitoTable { async fn scan( &self, - _projection: &Option>, + projection: &Option>, _filters: &[Expr], _limit: Option, ) -> TableResult { let read_ctx = ReadContext::default(); let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?; + let scan_request = ScanRequest { + projection: projection.clone(), + ..Default::default() + }; let mut reader = snapshot - .scan(&read_ctx, ScanRequest::default()) + .scan(&read_ctx, scan_request) .await .map_err(TableError::new)? .reader;