From c6d91edb83826376e65d42fbd9b422509e94d40d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 18 Oct 2022 18:56:52 +0800 Subject: [PATCH] refactor(storage): Split schema mod into multiple sub-mods (#318) --- src/storage/src/schema.rs | 952 +--------------------------- src/storage/src/schema/projected.rs | 560 ++++++++++++++++ src/storage/src/schema/region.rs | 183 ++++++ src/storage/src/schema/store.rs | 259 ++++++++ 4 files changed, 1015 insertions(+), 939 deletions(-) create mode 100644 src/storage/src/schema/projected.rs create mode 100644 src/storage/src/schema/region.rs create mode 100644 src/storage/src/schema/store.rs diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 549d3ec270..0a0616c0fc 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -1,26 +1,13 @@ pub mod compat; - -use std::cmp::Ordering; -use std::collections::{BTreeSet, HashMap}; -use std::sync::Arc; +mod projected; +mod region; +mod store; use common_error::prelude::*; -use datatypes::arrow::array::Array; -use datatypes::arrow::bitmap::MutableBitmap; -use datatypes::arrow::chunk::Chunk as ArrowChunk; -use datatypes::arrow::datatypes::Schema as ArrowSchema; -use datatypes::schema::Metadata; -use datatypes::vectors::{BooleanVector, Helper, VectorRef}; -use serde::{Deserialize, Serialize}; -use snafu::ensure; -use store_api::storage::{consts, Chunk, ColumnId, ColumnSchema, Schema, SchemaBuilder, SchemaRef}; -use crate::error; -use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef}; -use crate::read::{Batch, BatchOp}; - -const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end"; -const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end"; +pub use crate::schema::projected::{ProjectedSchema, ProjectedSchemaRef}; +pub use crate::schema::region::{RegionSchema, RegionSchemaRef}; +pub use crate::schema::store::StoreSchema; #[derive(Debug, Snafu)] pub enum Error { @@ -82,658 +69,18 @@ pub enum Error { pub type Result = std::result::Result; -/// Schema of region. -/// -/// The `RegionSchema` has the knowledge of reserved and internal columns. -/// Reserved columns are columns that their names, ids are reserved by the storage -/// engine, and could not be used by the user. Reserved columns usually have -/// special usage. Reserved columns expect the version columns are also -/// called internal columns (though the version could also be thought as a -/// special kind of internal column), are not visible to user, such as our -/// internal sequence, op_type columns. -/// -/// The user schema is the schema that only contains columns that user could visit, -/// as well as what the schema user created. -#[derive(Debug, PartialEq)] -pub struct RegionSchema { - /// Schema that only contains columns that user defined, excluding internal columns - /// that are reserved and used by the storage engine. - /// - /// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef` - /// conveniently. The fields order in `SchemaRef` **must** be consistent with - /// columns order in [ColumnsMetadata] to ensure the projection index of a field - /// is correct. - user_schema: SchemaRef, - /// store schema contains all columns of the region, including all internal columns. - store_schema: StoreSchema, - /// Metadata of columns. - columns: ColumnsMetadataRef, -} - -impl RegionSchema { - pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result { - let user_schema = Arc::new(build_user_schema(&columns, version)?); - let store_schema = StoreSchema::from_columns_metadata(&columns, version)?; - - debug_assert_eq!(user_schema.version(), store_schema.version()); - debug_assert_eq!(version, user_schema.version()); - - Ok(RegionSchema { - user_schema, - store_schema, - columns, - }) - } - - /// Returns the schema of the region, excluding internal columns that used by - /// the storage engine. - #[inline] - pub fn user_schema(&self) -> &SchemaRef { - &self.user_schema - } - - /// Returns the schema actually stores, which would also contains all internal columns. - #[inline] - pub fn store_schema(&self) -> &StoreSchema { - &self.store_schema - } - - #[inline] - pub fn row_key_columns(&self) -> impl Iterator { - self.columns.iter_row_key_columns() - } - - #[inline] - pub fn value_columns(&self) -> impl Iterator { - self.columns.iter_value_columns() - } - - #[inline] - pub fn num_row_key_columns(&self) -> usize { - self.columns.num_row_key_columns() - } - - #[inline] - pub fn num_value_columns(&self) -> usize { - self.columns.num_value_columns() - } - - #[inline] - pub fn version(&self) -> u32 { - self.user_schema.version() - } - - #[inline] - fn sequence_index(&self) -> usize { - self.store_schema.sequence_index() - } - - #[inline] - fn op_type_index(&self) -> usize { - self.store_schema.op_type_index() - } - - #[inline] - fn row_key_indices(&self) -> impl Iterator { - self.store_schema.row_key_indices() - } - - #[inline] - fn column_metadata(&self, idx: usize) -> &ColumnMetadata { - self.columns.column_metadata(idx) - } - - #[inline] - fn timestamp_key_index(&self) -> usize { - self.columns.timestamp_key_index() - } -} - -pub type RegionSchemaRef = Arc; - -/// Schema for storage engine. -/// -/// Used internally, contains all row key columns, internal columns and parts of value columns. -/// -/// Only contains a reference to schema and some indices, so it should be cheap to clone. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct StoreSchema { - schema: SchemaRef, - row_key_end: usize, - user_column_end: usize, -} - -impl StoreSchema { - #[inline] - pub fn version(&self) -> u32 { - self.schema.version() - } - - #[inline] - pub fn schema(&self) -> &SchemaRef { - &self.schema - } - - #[inline] - pub fn arrow_schema(&self) -> &Arc { - self.schema.arrow_schema() - } - - pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> ArrowChunk> { - assert_eq!(self.schema.num_columns(), batch.num_columns()); - - ArrowChunk::new(batch.columns().iter().map(|v| v.to_arrow_array()).collect()) - } - - pub fn arrow_chunk_to_batch(&self, chunk: &ArrowChunk>) -> Result { - assert_eq!(self.schema.num_columns(), chunk.columns().len()); - - let columns = chunk - .iter() - .enumerate() - .map(|(i, column)| { - Helper::try_into_vector(column.clone()).context(ConvertChunkSnafu { - name: self.column_name(i), - }) - }) - .collect::>()?; - - Ok(Batch::new(columns)) - } - - pub(crate) fn contains_column(&self, name: &str) -> bool { - self.schema.column_schema_by_name(name).is_some() - } - - pub(crate) fn is_key_column(&self, name: &str) -> bool { - self.schema - .column_index_by_name(name) - .map(|idx| idx < self.row_key_end) - .unwrap_or(false) - } - - pub(crate) fn is_user_column(&self, name: &str) -> bool { - self.schema - .column_index_by_name(name) - .map(|idx| idx < self.user_column_end) - .unwrap_or(false) - } - - fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result { - let column_schemas: Vec<_> = columns - .iter_all_columns() - .map(|col| ColumnSchema::from(&col.desc)) - .collect(); - - StoreSchema::new( - column_schemas, - version, - columns.timestamp_key_index(), - columns.row_key_end(), - columns.user_column_end(), - ) - } - - fn new( - column_schemas: Vec, - version: u32, - timestamp_key_index: usize, - row_key_end: usize, - user_column_end: usize, - ) -> Result { - let schema = SchemaBuilder::try_from(column_schemas) - .context(ConvertSchemaSnafu)? - .timestamp_index(timestamp_key_index) - .version(version) - .add_metadata(ROW_KEY_END_KEY, row_key_end.to_string()) - .add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string()) - .build() - .context(BuildSchemaSnafu)?; - - assert_eq!( - consts::SEQUENCE_COLUMN_NAME, - schema.column_schemas()[user_column_end].name - ); - assert_eq!( - consts::OP_TYPE_COLUMN_NAME, - schema.column_schemas()[user_column_end + 1].name - ); - - Ok(StoreSchema { - schema: Arc::new(schema), - row_key_end, - user_column_end, - }) - } - - #[inline] - fn sequence_index(&self) -> usize { - self.user_column_end - } - - #[inline] - fn op_type_index(&self) -> usize { - self.user_column_end + 1 - } - - #[inline] - fn row_key_indices(&self) -> impl Iterator { - 0..self.row_key_end - } - - #[inline] - fn column_name(&self, idx: usize) -> &str { - &self.schema.column_schemas()[idx].name - } - - #[inline] - fn num_columns(&self) -> usize { - self.schema.num_columns() - } -} - -impl TryFrom for StoreSchema { - type Error = Error; - - fn try_from(arrow_schema: ArrowSchema) -> Result { - let schema = Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?; - // Recover other metadata from schema. - let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?; - let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?; - - // There should be sequence and op_type columns. - ensure!( - consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name, - InvalidIndexSnafu - ); - ensure!( - consts::OP_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name, - InvalidIndexSnafu - ); - - Ok(StoreSchema { - schema: Arc::new(schema), - row_key_end, - user_column_end, - }) - } -} - -/// Metadata about projection. -#[derive(Debug, Default)] -struct Projection { - /// Column indices of projection. - projected_columns: Vec, - /// Sorted and deduplicated indices of columns to read, includes all row key columns - /// and internal columns. - /// - /// We use these indices to read from data sources. - columns_to_read: Vec, - /// Maps column id to its index in `columns_to_read`. - /// - /// Used to ask whether the column with given column id is needed in projection. - id_to_read_idx: HashMap, - /// Maps index of `projected_columns` to index of the column in `columns_to_read`. - /// - /// Invariant: - /// - `projected_idx_to_read_idx.len() == projected_columns.len()` - projected_idx_to_read_idx: Vec, - /// Number of user columns to read. - num_user_columns: usize, -} - -impl Projection { - fn new(region_schema: &RegionSchema, projected_columns: Vec) -> Projection { - // Get a sorted list of column indices to read. - let mut column_indices: BTreeSet<_> = projected_columns.iter().cloned().collect(); - column_indices.extend(region_schema.row_key_indices()); - let num_user_columns = column_indices.len(); - // Now insert internal columns. - column_indices.extend([ - region_schema.sequence_index(), - region_schema.op_type_index(), - ]); - let columns_to_read: Vec<_> = column_indices.into_iter().collect(); - - // The region schema ensure that last two column must be internal columns. - assert_eq!( - region_schema.sequence_index(), - columns_to_read[num_user_columns] - ); - assert_eq!( - region_schema.op_type_index(), - columns_to_read[num_user_columns + 1] - ); - - // Mapping: => - let id_to_read_idx: HashMap<_, _> = columns_to_read - .iter() - .enumerate() - .map(|(idx, col_idx)| (region_schema.column_metadata(*col_idx).id(), idx)) - .collect(); - // Use column id to find index in `columns_to_read` of a column in `projected_columns`. - let projected_idx_to_read_idx = projected_columns - .iter() - .map(|col_idx| { - let column_id = region_schema.column_metadata(*col_idx).id(); - // This unwrap() should be safe since `columns_to_read` must contains all columns in `projected_columns`. - let read_idx = id_to_read_idx.get(&column_id).unwrap(); - *read_idx - }) - .collect(); - - Projection { - projected_columns, - columns_to_read, - id_to_read_idx, - projected_idx_to_read_idx, - num_user_columns, - } - } -} - -/// Schema with projection info. -#[derive(Debug)] -pub struct ProjectedSchema { - /// Projection info, `None` means don't need to do projection. - projection: Option, - /// Schema used to read from data sources. - schema_to_read: StoreSchema, - /// User schema after projection. - projected_user_schema: SchemaRef, -} - -pub type ProjectedSchemaRef = Arc; - -impl ProjectedSchema { - /// Create a new `ProjectedSchema` with given `projected_columns`. - /// - /// If `projected_columns` is None, then all columns would be read. If `projected_columns` is - /// `Some`, then the `Vec` in it contains the indices of columns need to be read. - /// - /// If the `Vec` is empty or contains invalid index, `Err` would be returned. - pub fn new( - region_schema: RegionSchemaRef, - projected_columns: Option>, - ) -> Result { - match projected_columns { - Some(indices) => { - Self::validate_projection(®ion_schema, &indices)?; - - let projection = Projection::new(®ion_schema, indices); - - let schema_to_read = Self::build_schema_to_read(®ion_schema, &projection)?; - let projected_user_schema = - Self::build_projected_user_schema(®ion_schema, &projection)?; - - Ok(ProjectedSchema { - projection: Some(projection), - schema_to_read, - projected_user_schema, - }) - } - None => Ok(ProjectedSchema::no_projection(region_schema)), - } - } - - /// Create a `ProjectedSchema` that read all columns. - pub fn no_projection(region_schema: RegionSchemaRef) -> ProjectedSchema { - // We could just reuse the StoreSchema and user schema. - ProjectedSchema { - projection: None, - schema_to_read: region_schema.store_schema().clone(), - projected_user_schema: region_schema.user_schema().clone(), - } - } - - #[inline] - pub fn projected_user_schema(&self) -> &SchemaRef { - &self.projected_user_schema - } - - #[inline] - pub fn schema_to_read(&self) -> &StoreSchema { - &self.schema_to_read - } - - /// Convert [Batch] into [Chunk]. - /// - /// This will remove all internal columns. The input `batch` should has the - /// same schema as `self.schema_to_read()`. - pub fn batch_to_chunk(&self, batch: &Batch) -> Chunk { - let columns = match &self.projection { - Some(projection) => projection - .projected_idx_to_read_idx - .iter() - .map(|col_idx| batch.column(*col_idx)) - .cloned() - .collect(), - None => { - let num_user_columns = self.projected_user_schema.num_columns(); - batch - .columns() - .iter() - .take(num_user_columns) - .cloned() - .collect() - } - }; - - Chunk::new(columns) - } - - /// Returns true if column with given `column_id` is needed (in projection). - pub fn is_needed(&self, column_id: ColumnId) -> bool { - self.projection - .as_ref() - .map(|p| p.id_to_read_idx.contains_key(&column_id)) - .unwrap_or(true) - } - - /// Construct a new [Batch] from row key, value, sequence and op_type. - /// - /// # Panics - /// Panics if number of columns are not the same as this schema. - pub fn batch_from_parts( - &self, - row_key_columns: Vec, - mut value_columns: Vec, - sequences: VectorRef, - op_types: VectorRef, - ) -> Batch { - // sequence and op_type - let num_internal_columns = 2; - - assert_eq!( - self.schema_to_read.num_columns(), - row_key_columns.len() + value_columns.len() + num_internal_columns - ); - - let mut columns = row_key_columns; - // Reserve space for value, sequence and op_type - columns.reserve(value_columns.len() + num_internal_columns); - columns.append(&mut value_columns); - // Internal columns are push in sequence, op_type order. - columns.push(sequences); - columns.push(op_types); - - Batch::new(columns) - } - - fn build_schema_to_read( - region_schema: &RegionSchema, - projection: &Projection, - ) -> Result { - let column_schemas: Vec<_> = projection - .columns_to_read - .iter() - .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) - .collect(); - // All row key columns are reserved in this schema, so we can use the row_key_end - // and timestamp_key_index from region schema. - StoreSchema::new( - column_schemas, - region_schema.version(), - region_schema.timestamp_key_index(), - region_schema.columns.row_key_end(), - projection.num_user_columns, - ) - } - - fn build_projected_user_schema( - region_schema: &RegionSchema, - projection: &Projection, - ) -> Result { - let timestamp_index = - projection - .projected_columns - .iter() - .enumerate() - .find_map(|(idx, col_idx)| { - if *col_idx == region_schema.timestamp_key_index() { - Some(idx) - } else { - None - } - }); - let column_schemas: Vec<_> = projection - .projected_columns - .iter() - .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) - .collect(); - - let mut builder = SchemaBuilder::try_from(column_schemas) - .context(ConvertSchemaSnafu)? - .version(region_schema.version()); - if let Some(timestamp_index) = timestamp_index { - builder = builder.timestamp_index(timestamp_index); - } - - let schema = builder.build().context(BuildSchemaSnafu)?; - - Ok(Arc::new(schema)) - } - - fn validate_projection(region_schema: &RegionSchema, indices: &[usize]) -> Result<()> { - // The projection indices should not be empty, at least the timestamp column - // should be always read, and the `StoreSchema` also requires the timestamp column. - ensure!( - !indices.is_empty(), - InvalidProjectionSnafu { - msg: "at least one column should be read", - } - ); - - // Now only allowed to read user columns. - let user_schema = region_schema.user_schema(); - for i in indices { - ensure!( - *i < user_schema.num_columns(), - InvalidProjectionSnafu { - msg: format!( - "index {} out of bound, only contains {} columns", - i, - user_schema.num_columns() - ), - } - ); - } - - Ok(()) - } -} - -impl BatchOp for ProjectedSchema { - fn compare_row(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering { - // Ordered by (row_key asc, sequence desc, op_type desc). - let indices = self.schema_to_read.row_key_indices(); - for idx in indices { - let (left_col, right_col) = (left.column(idx), right.column(idx)); - // Comparision of vector is done by virtual method calls currently. Consider using - // enum dispatch if this becomes bottleneck. - let order = left_col.get_ref(i).cmp(&right_col.get_ref(j)); - if order != Ordering::Equal { - return order; - } - } - let (sequence_index, op_type_index) = ( - self.schema_to_read.sequence_index(), - self.schema_to_read.op_type_index(), - ); - right - .column(sequence_index) - .get_ref(j) - .cmp(&left.column(sequence_index).get_ref(i)) - .then_with(|| { - right - .column(op_type_index) - .get_ref(j) - .cmp(&left.column(op_type_index).get_ref(i)) - }) - } - - fn dedup(&self, batch: &Batch, selected: &mut MutableBitmap, prev: Option<&Batch>) { - if let Some(prev) = prev { - assert_eq!(batch.num_columns(), prev.num_columns()); - } - let indices = self.schema_to_read.row_key_indices(); - for idx in indices { - let (current, prev_col) = ( - batch.column(idx), - prev.map(|prev| prev.column(idx).as_ref()), - ); - current.dedup(selected, prev_col); - } - } - - fn filter(&self, batch: &Batch, filter: &BooleanVector) -> error::Result { - let columns = batch - .columns() - .iter() - .enumerate() - .map(|(i, v)| { - v.filter(filter).context(error::FilterColumnSnafu { - name: self.schema_to_read.column_name(i), - }) - }) - .collect::>>()?; - - Ok(Batch::new(columns)) - } -} - -fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { - let value = metadata.get(key).context(MissingMetaSnafu { key })?; - value.parse().context(ParseIndexSnafu { value }) -} - -// Now user schema don't have extra metadata like store schema. -fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result { - let column_schemas: Vec<_> = columns - .iter_user_columns() - .map(|col| ColumnSchema::from(&col.desc)) - .collect(); - - SchemaBuilder::try_from(column_schemas) - .context(ConvertSchemaSnafu)? - .timestamp_index(columns.timestamp_key_index()) - .version(version) - .build() - .context(BuildSchemaSnafu) -} - #[cfg(test)] mod tests { - use datatypes::prelude::ScalarVector; - use datatypes::type_id::LogicalTypeId; - use datatypes::vectors::{Int64Vector, TimestampVector, UInt64Vector, UInt8Vector}; - use store_api::storage::OpType; + use std::sync::Arc; + + use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector}; use super::*; use crate::metadata::RegionMetadata; - use crate::test_util::{descriptor_util, read_util, schema_util}; + use crate::read::Batch; + use crate::test_util::descriptor_util; - fn new_batch() -> Batch { + pub(crate) fn new_batch() -> Batch { let k0 = Int64Vector::from_slice(&[1, 2, 3]); let timestamp = Int64Vector::from_slice(&[4, 5, 6]); let v0 = Int64Vector::from_slice(&[7, 8, 9]); @@ -749,16 +96,7 @@ mod tests { ]) } - fn check_chunk_batch(chunk: &ArrowChunk>, batch: &Batch) { - assert_eq!(5, chunk.columns().len()); - assert_eq!(3, chunk.len()); - - for i in 0..5 { - assert_eq!(chunk[i], batch.column(i).to_arrow_array()); - } - } - - fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema { + pub(crate) fn new_region_schema(version: u32, num_value_columns: usize) -> RegionSchema { let metadata: RegionMetadata = descriptor_util::desc_with_value_columns("test", num_value_columns) .try_into() @@ -767,268 +105,4 @@ mod tests { let columns = metadata.columns; RegionSchema::new(columns, version).unwrap() } - - #[test] - fn test_region_schema() { - let region_schema = Arc::new(new_region_schema(123, 1)); - - let expect_schema = schema_util::new_schema_with_version( - &[ - ("k0", LogicalTypeId::Int64, false), - ("timestamp", LogicalTypeId::Timestamp, false), - ("v0", LogicalTypeId::Int64, true), - ], - Some(1), - 123, - ); - - assert_eq!(expect_schema, **region_schema.user_schema()); - - // Checks row key column. - let mut row_keys = region_schema.row_key_columns(); - assert_eq!("k0", row_keys.next().unwrap().desc.name); - assert_eq!("timestamp", row_keys.next().unwrap().desc.name); - assert_eq!(None, row_keys.next()); - assert_eq!(2, region_schema.num_row_key_columns()); - - // Checks value column. - let mut values = region_schema.value_columns(); - assert_eq!("v0", values.next().unwrap().desc.name); - assert_eq!(None, values.next()); - assert_eq!(1, region_schema.num_value_columns()); - - // Checks version. - assert_eq!(123, region_schema.version()); - assert_eq!(123, region_schema.store_schema().version()); - - // Checks StoreSchema. - let store_schema = region_schema.store_schema(); - let sst_arrow_schema = store_schema.arrow_schema(); - let converted_store_schema = StoreSchema::try_from((**sst_arrow_schema).clone()).unwrap(); - - assert_eq!(*store_schema, converted_store_schema); - - let expect_schema = schema_util::new_schema_with_version( - &[ - ("k0", LogicalTypeId::Int64, false), - ("timestamp", LogicalTypeId::Timestamp, false), - ("v0", LogicalTypeId::Int64, true), - (consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false), - (consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false), - ], - Some(1), - 123, - ); - assert_eq!( - expect_schema.column_schemas(), - store_schema.schema().column_schemas() - ); - assert_eq!(3, store_schema.sequence_index()); - assert_eq!(4, store_schema.op_type_index()); - let row_key_indices: Vec<_> = store_schema.row_key_indices().collect(); - assert_eq!([0, 1], &row_key_indices[..]); - - // Test batch and chunk conversion. - let batch = new_batch(); - // Convert batch to chunk. - let chunk = store_schema.batch_to_arrow_chunk(&batch); - check_chunk_batch(&chunk, &batch); - - // Convert chunk to batch. - let converted_batch = store_schema.arrow_chunk_to_batch(&chunk).unwrap(); - check_chunk_batch(&chunk, &converted_batch); - } - - #[test] - fn test_projection() { - // Build a region schema with 2 value columns. So the final user schema is - // (k0, timestamp, v0, v1) - let region_schema = new_region_schema(0, 2); - - // Projection, but still keep column order. - // After projection: (timestamp, v0) - let projected_columns = vec![1, 2]; - let projection = Projection::new(®ion_schema, projected_columns.clone()); - assert_eq!(projected_columns, projection.projected_columns); - // Need to read (k0, timestamp, v0, sequence, op_type) - assert_eq!(&[0, 1, 2, 4, 5], &projection.columns_to_read[..]); - assert_eq!(5, projection.id_to_read_idx.len()); - // Index of timestamp, v0 in `columns_to_read` - assert_eq!(&[1, 2], &projection.projected_idx_to_read_idx[..]); - // 3 columns: k0, timestamp, v0 - assert_eq!(3, projection.num_user_columns); - - // Projection, unordered. - // After projection: (timestamp, v1, k0) - let projected_columns = vec![1, 3, 0]; - let projection = Projection::new(®ion_schema, projected_columns.clone()); - assert_eq!(projected_columns, projection.projected_columns); - // Need to read (k0, timestamp, v1, sequence, op_type) - assert_eq!(&[0, 1, 3, 4, 5], &projection.columns_to_read[..]); - assert_eq!(5, projection.id_to_read_idx.len()); - // Index of timestamp, v1, k0 in `columns_to_read` - assert_eq!(&[1, 2, 0], &projection.projected_idx_to_read_idx[..]); - // 3 columns: k0, timestamp, v1 - assert_eq!(3, projection.num_user_columns); - - // Empty projection. - let projection = Projection::new(®ion_schema, Vec::new()); - assert!(projection.projected_columns.is_empty()); - // Still need to read row keys. - assert_eq!(&[0, 1, 4, 5], &projection.columns_to_read[..]); - assert_eq!(4, projection.id_to_read_idx.len()); - assert!(projection.projected_idx_to_read_idx.is_empty()); - assert_eq!(2, projection.num_user_columns); - } - - #[test] - fn test_projected_schema_with_projection() { - // (k0, timestamp, v0, v1, v2) - let region_schema = Arc::new(new_region_schema(123, 3)); - - // After projection: (v1, timestamp) - let projected_schema = - ProjectedSchema::new(region_schema.clone(), Some(vec![3, 1])).unwrap(); - let expect_user = schema_util::new_schema_with_version( - &[ - ("v1", LogicalTypeId::Int64, true), - ("timestamp", LogicalTypeId::Timestamp, false), - ], - Some(1), - 123, - ); - assert_eq!(expect_user, **projected_schema.projected_user_schema()); - - // Test is_needed - let needed: Vec<_> = region_schema - .columns - .iter_all_columns() - .enumerate() - .filter_map(|(idx, column_meta)| { - if projected_schema.is_needed(column_meta.id()) { - Some(idx) - } else { - None - } - }) - .collect(); - // (k0, timestamp, v1, sequence, op_type) - assert_eq!(&[0, 1, 3, 5, 6], &needed[..]); - - // Use another projection. - // After projection: (v0, timestamp) - let projected_schema = ProjectedSchema::new(region_schema, Some(vec![2, 1])).unwrap(); - - // The schema to read should be same as region schema with (k0, timestamp, v0). - // We can't use `new_schema_with_version()` because the StoreSchema also store other - // metadata that `new_schema_with_version()` can't store. - let expect_schema = new_region_schema(123, 1); - assert_eq!( - expect_schema.store_schema(), - projected_schema.schema_to_read() - ); - - // (k0, timestamp, v0, sequence, op_type) - let batch = new_batch(); - // Test Batch to our Chunk. - // (v0, timestamp) - let chunk = projected_schema.batch_to_chunk(&batch); - assert_eq!(2, chunk.columns.len()); - assert_eq!(&chunk.columns[0], batch.column(2)); - assert_eq!(&chunk.columns[1], batch.column(1)); - - // Test batch_from_parts - let keys = batch.columns()[0..2].to_vec(); - let values = batch.columns()[2..3].to_vec(); - let created = projected_schema.batch_from_parts( - keys, - values, - batch.column(3).clone(), - batch.column(4).clone(), - ); - assert_eq!(batch, created); - } - - #[test] - fn test_projected_schema_no_projection() { - // (k0, timestamp, v0) - let region_schema = Arc::new(new_region_schema(123, 1)); - - let projected_schema = ProjectedSchema::no_projection(region_schema.clone()); - - assert_eq!( - region_schema.user_schema(), - projected_schema.projected_user_schema() - ); - assert_eq!( - region_schema.store_schema(), - projected_schema.schema_to_read() - ); - - for column in region_schema.columns.iter_all_columns() { - assert!(projected_schema.is_needed(column.id())); - } - - // (k0, timestamp, v0, sequence, op_type) - let batch = new_batch(); - // Test Batch to our Chunk. - // (k0, timestamp, v0) - let chunk = projected_schema.batch_to_chunk(&batch); - assert_eq!(3, chunk.columns.len()); - } - - #[test] - fn test_projected_schema_empty_projection() { - // (k0, timestamp, v0) - let region_schema = Arc::new(new_region_schema(123, 1)); - - let err = ProjectedSchema::new(region_schema, Some(Vec::new())) - .err() - .unwrap(); - assert!(matches!(err, Error::InvalidProjection { .. })); - } - - #[test] - fn test_compare_batch() { - let schema = read_util::new_projected_schema(); - let left = read_util::new_full_kv_batch(&[(1000, 1, 1000, OpType::Put)]); - let right = read_util::new_full_kv_batch(&[ - (999, 1, 1000, OpType::Put), - (1000, 1, 999, OpType::Put), - (1000, 1, 1000, OpType::Put), - ]); - - assert_eq!(Ordering::Greater, schema.compare_row(&left, 0, &right, 0)); - assert_eq!(Ordering::Less, schema.compare_row(&left, 0, &right, 1)); - assert_eq!(Ordering::Equal, schema.compare_row(&left, 0, &right, 2)); - } - - #[test] - fn test_dedup_batch() { - let schema = read_util::new_projected_schema(); - let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (2000, Some(2))]); - let mut selected = MutableBitmap::from_len_zeroed(3); - - schema.dedup(&batch, &mut selected, None); - assert!(selected.get(0)); - assert!(selected.get(1)); - assert!(!selected.get(2)); - - let prev = read_util::new_kv_batch(&[(1000, Some(1))]); - schema.dedup(&batch, &mut selected, Some(&prev)); - assert!(!selected.get(0)); - assert!(selected.get(1)); - assert!(!selected.get(2)); - } - - #[test] - fn test_filter_batch() { - let schema = read_util::new_projected_schema(); - let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (3000, Some(3))]); - let filter = BooleanVector::from_slice(&[true, false, true]); - - let res = schema.filter(&batch, &filter).unwrap(); - let expect: VectorRef = Arc::new(TimestampVector::from_values([1000, 3000])); - assert_eq!(expect, *res.column(0)); - } } diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs new file mode 100644 index 0000000000..ca35fe5d51 --- /dev/null +++ b/src/storage/src/schema/projected.rs @@ -0,0 +1,560 @@ +use std::cmp::Ordering; +use std::collections::{BTreeSet, HashMap}; +use std::sync::Arc; + +use common_error::prelude::*; +use datatypes::arrow::bitmap::MutableBitmap; +use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; +use datatypes::vectors::{BooleanVector, VectorRef}; +use store_api::storage::{Chunk, ColumnId}; + +use crate::error; +use crate::read::{Batch, BatchOp}; +use crate::schema::{self, RegionSchema, RegionSchemaRef, Result, StoreSchema}; + +/// Metadata about projection. +#[derive(Debug, Default)] +struct Projection { + /// Column indices of projection. + projected_columns: Vec, + /// Sorted and deduplicated indices of columns to read, includes all row key columns + /// and internal columns. + /// + /// We use these indices to read from data sources. + columns_to_read: Vec, + /// Maps column id to its index in `columns_to_read`. + /// + /// Used to ask whether the column with given column id is needed in projection. + id_to_read_idx: HashMap, + /// Maps index of `projected_columns` to index of the column in `columns_to_read`. + /// + /// Invariant: + /// - `projected_idx_to_read_idx.len() == projected_columns.len()` + projected_idx_to_read_idx: Vec, + /// Number of user columns to read. + num_user_columns: usize, +} + +impl Projection { + fn new(region_schema: &RegionSchema, projected_columns: Vec) -> Projection { + // Get a sorted list of column indices to read. + let mut column_indices: BTreeSet<_> = projected_columns.iter().cloned().collect(); + column_indices.extend(region_schema.row_key_indices()); + let num_user_columns = column_indices.len(); + // Now insert internal columns. + column_indices.extend([ + region_schema.sequence_index(), + region_schema.op_type_index(), + ]); + let columns_to_read: Vec<_> = column_indices.into_iter().collect(); + + // The region schema ensure that last two column must be internal columns. + assert_eq!( + region_schema.sequence_index(), + columns_to_read[num_user_columns] + ); + assert_eq!( + region_schema.op_type_index(), + columns_to_read[num_user_columns + 1] + ); + + // Mapping: => + let id_to_read_idx: HashMap<_, _> = columns_to_read + .iter() + .enumerate() + .map(|(idx, col_idx)| (region_schema.column_metadata(*col_idx).id(), idx)) + .collect(); + // Use column id to find index in `columns_to_read` of a column in `projected_columns`. + let projected_idx_to_read_idx = projected_columns + .iter() + .map(|col_idx| { + let column_id = region_schema.column_metadata(*col_idx).id(); + // This unwrap() should be safe since `columns_to_read` must contains all columns in `projected_columns`. + let read_idx = id_to_read_idx.get(&column_id).unwrap(); + *read_idx + }) + .collect(); + + Projection { + projected_columns, + columns_to_read, + id_to_read_idx, + projected_idx_to_read_idx, + num_user_columns, + } + } +} + +/// Schema with projection info. +#[derive(Debug)] +pub struct ProjectedSchema { + /// Projection info, `None` means don't need to do projection. + projection: Option, + /// Schema used to read from data sources. + schema_to_read: StoreSchema, + /// User schema after projection. + projected_user_schema: SchemaRef, +} + +pub type ProjectedSchemaRef = Arc; + +impl ProjectedSchema { + /// Create a new `ProjectedSchema` with given `projected_columns`. + /// + /// If `projected_columns` is None, then all columns would be read. If `projected_columns` is + /// `Some`, then the `Vec` in it contains the indices of columns need to be read. + /// + /// If the `Vec` is empty or contains invalid index, `Err` would be returned. + pub fn new( + region_schema: RegionSchemaRef, + projected_columns: Option>, + ) -> Result { + match projected_columns { + Some(indices) => { + Self::validate_projection(®ion_schema, &indices)?; + + let projection = Projection::new(®ion_schema, indices); + + let schema_to_read = Self::build_schema_to_read(®ion_schema, &projection)?; + let projected_user_schema = + Self::build_projected_user_schema(®ion_schema, &projection)?; + + Ok(ProjectedSchema { + projection: Some(projection), + schema_to_read, + projected_user_schema, + }) + } + None => Ok(ProjectedSchema::no_projection(region_schema)), + } + } + + /// Create a `ProjectedSchema` that read all columns. + pub fn no_projection(region_schema: RegionSchemaRef) -> ProjectedSchema { + // We could just reuse the StoreSchema and user schema. + ProjectedSchema { + projection: None, + schema_to_read: region_schema.store_schema().clone(), + projected_user_schema: region_schema.user_schema().clone(), + } + } + + #[inline] + pub fn projected_user_schema(&self) -> &SchemaRef { + &self.projected_user_schema + } + + #[inline] + pub fn schema_to_read(&self) -> &StoreSchema { + &self.schema_to_read + } + + /// Convert [Batch] into [Chunk]. + /// + /// This will remove all internal columns. The input `batch` should has the + /// same schema as `self.schema_to_read()`. + pub fn batch_to_chunk(&self, batch: &Batch) -> Chunk { + let columns = match &self.projection { + Some(projection) => projection + .projected_idx_to_read_idx + .iter() + .map(|col_idx| batch.column(*col_idx)) + .cloned() + .collect(), + None => { + let num_user_columns = self.projected_user_schema.num_columns(); + batch + .columns() + .iter() + .take(num_user_columns) + .cloned() + .collect() + } + }; + + Chunk::new(columns) + } + + /// Returns true if column with given `column_id` is needed (in projection). + pub fn is_needed(&self, column_id: ColumnId) -> bool { + self.projection + .as_ref() + .map(|p| p.id_to_read_idx.contains_key(&column_id)) + .unwrap_or(true) + } + + /// Construct a new [Batch] from row key, value, sequence and op_type. + /// + /// # Panics + /// Panics if number of columns are not the same as this schema. + pub fn batch_from_parts( + &self, + row_key_columns: Vec, + mut value_columns: Vec, + sequences: VectorRef, + op_types: VectorRef, + ) -> Batch { + // sequence and op_type + let num_internal_columns = 2; + + assert_eq!( + self.schema_to_read.num_columns(), + row_key_columns.len() + value_columns.len() + num_internal_columns + ); + + let mut columns = row_key_columns; + // Reserve space for value, sequence and op_type + columns.reserve(value_columns.len() + num_internal_columns); + columns.append(&mut value_columns); + // Internal columns are push in sequence, op_type order. + columns.push(sequences); + columns.push(op_types); + + Batch::new(columns) + } + + fn build_schema_to_read( + region_schema: &RegionSchema, + projection: &Projection, + ) -> Result { + let column_schemas: Vec<_> = projection + .columns_to_read + .iter() + .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) + .collect(); + // All row key columns are reserved in this schema, so we can use the row_key_end + // and timestamp_key_index from region schema. + StoreSchema::new( + column_schemas, + region_schema.version(), + region_schema.timestamp_key_index(), + region_schema.row_key_end(), + projection.num_user_columns, + ) + } + + fn build_projected_user_schema( + region_schema: &RegionSchema, + projection: &Projection, + ) -> Result { + let timestamp_index = + projection + .projected_columns + .iter() + .enumerate() + .find_map(|(idx, col_idx)| { + if *col_idx == region_schema.timestamp_key_index() { + Some(idx) + } else { + None + } + }); + let column_schemas: Vec<_> = projection + .projected_columns + .iter() + .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) + .collect(); + + let mut builder = SchemaBuilder::try_from(column_schemas) + .context(schema::ConvertSchemaSnafu)? + .version(region_schema.version()); + if let Some(timestamp_index) = timestamp_index { + builder = builder.timestamp_index(timestamp_index); + } + + let schema = builder.build().context(schema::BuildSchemaSnafu)?; + + Ok(Arc::new(schema)) + } + + fn validate_projection(region_schema: &RegionSchema, indices: &[usize]) -> Result<()> { + // The projection indices should not be empty, at least the timestamp column + // should be always read, and the `StoreSchema` also requires the timestamp column. + ensure!( + !indices.is_empty(), + schema::InvalidProjectionSnafu { + msg: "at least one column should be read", + } + ); + + // Now only allowed to read user columns. + let user_schema = region_schema.user_schema(); + for i in indices { + ensure!( + *i < user_schema.num_columns(), + schema::InvalidProjectionSnafu { + msg: format!( + "index {} out of bound, only contains {} columns", + i, + user_schema.num_columns() + ), + } + ); + } + + Ok(()) + } +} + +impl BatchOp for ProjectedSchema { + fn compare_row(&self, left: &Batch, i: usize, right: &Batch, j: usize) -> Ordering { + // Ordered by (row_key asc, sequence desc, op_type desc). + let indices = self.schema_to_read.row_key_indices(); + for idx in indices { + let (left_col, right_col) = (left.column(idx), right.column(idx)); + // Comparision of vector is done by virtual method calls currently. Consider using + // enum dispatch if this becomes bottleneck. + let order = left_col.get_ref(i).cmp(&right_col.get_ref(j)); + if order != Ordering::Equal { + return order; + } + } + let (sequence_index, op_type_index) = ( + self.schema_to_read.sequence_index(), + self.schema_to_read.op_type_index(), + ); + right + .column(sequence_index) + .get_ref(j) + .cmp(&left.column(sequence_index).get_ref(i)) + .then_with(|| { + right + .column(op_type_index) + .get_ref(j) + .cmp(&left.column(op_type_index).get_ref(i)) + }) + } + + fn dedup(&self, batch: &Batch, selected: &mut MutableBitmap, prev: Option<&Batch>) { + if let Some(prev) = prev { + assert_eq!(batch.num_columns(), prev.num_columns()); + } + let indices = self.schema_to_read.row_key_indices(); + for idx in indices { + let (current, prev_col) = ( + batch.column(idx), + prev.map(|prev| prev.column(idx).as_ref()), + ); + current.dedup(selected, prev_col); + } + } + + fn filter(&self, batch: &Batch, filter: &BooleanVector) -> error::Result { + let columns = batch + .columns() + .iter() + .enumerate() + .map(|(i, v)| { + v.filter(filter).context(error::FilterColumnSnafu { + name: self.schema_to_read.column_name(i), + }) + }) + .collect::>>()?; + + Ok(Batch::new(columns)) + } +} + +#[cfg(test)] +mod tests { + use datatypes::prelude::ScalarVector; + use datatypes::type_id::LogicalTypeId; + use datatypes::vectors::TimestampVector; + use store_api::storage::OpType; + + use super::*; + use crate::schema::{tests, Error}; + use crate::test_util::{read_util, schema_util}; + + #[test] + fn test_projection() { + // Build a region schema with 2 value columns. So the final user schema is + // (k0, timestamp, v0, v1) + let region_schema = tests::new_region_schema(0, 2); + + // Projection, but still keep column order. + // After projection: (timestamp, v0) + let projected_columns = vec![1, 2]; + let projection = Projection::new(®ion_schema, projected_columns.clone()); + assert_eq!(projected_columns, projection.projected_columns); + // Need to read (k0, timestamp, v0, sequence, op_type) + assert_eq!(&[0, 1, 2, 4, 5], &projection.columns_to_read[..]); + assert_eq!(5, projection.id_to_read_idx.len()); + // Index of timestamp, v0 in `columns_to_read` + assert_eq!(&[1, 2], &projection.projected_idx_to_read_idx[..]); + // 3 columns: k0, timestamp, v0 + assert_eq!(3, projection.num_user_columns); + + // Projection, unordered. + // After projection: (timestamp, v1, k0) + let projected_columns = vec![1, 3, 0]; + let projection = Projection::new(®ion_schema, projected_columns.clone()); + assert_eq!(projected_columns, projection.projected_columns); + // Need to read (k0, timestamp, v1, sequence, op_type) + assert_eq!(&[0, 1, 3, 4, 5], &projection.columns_to_read[..]); + assert_eq!(5, projection.id_to_read_idx.len()); + // Index of timestamp, v1, k0 in `columns_to_read` + assert_eq!(&[1, 2, 0], &projection.projected_idx_to_read_idx[..]); + // 3 columns: k0, timestamp, v1 + assert_eq!(3, projection.num_user_columns); + + // Empty projection. + let projection = Projection::new(®ion_schema, Vec::new()); + assert!(projection.projected_columns.is_empty()); + // Still need to read row keys. + assert_eq!(&[0, 1, 4, 5], &projection.columns_to_read[..]); + assert_eq!(4, projection.id_to_read_idx.len()); + assert!(projection.projected_idx_to_read_idx.is_empty()); + assert_eq!(2, projection.num_user_columns); + } + + #[test] + fn test_projected_schema_with_projection() { + // (k0, timestamp, v0, v1, v2) + let region_schema = Arc::new(tests::new_region_schema(123, 3)); + + // After projection: (v1, timestamp) + let projected_schema = + ProjectedSchema::new(region_schema.clone(), Some(vec![3, 1])).unwrap(); + let expect_user = schema_util::new_schema_with_version( + &[ + ("v1", LogicalTypeId::Int64, true), + ("timestamp", LogicalTypeId::Timestamp, false), + ], + Some(1), + 123, + ); + assert_eq!(expect_user, **projected_schema.projected_user_schema()); + + // Test is_needed + let needed: Vec<_> = region_schema + .all_columns() + .enumerate() + .filter_map(|(idx, column_meta)| { + if projected_schema.is_needed(column_meta.id()) { + Some(idx) + } else { + None + } + }) + .collect(); + // (k0, timestamp, v1, sequence, op_type) + assert_eq!(&[0, 1, 3, 5, 6], &needed[..]); + + // Use another projection. + // After projection: (v0, timestamp) + let projected_schema = ProjectedSchema::new(region_schema, Some(vec![2, 1])).unwrap(); + + // The schema to read should be same as region schema with (k0, timestamp, v0). + // We can't use `new_schema_with_version()` because the StoreSchema also store other + // metadata that `new_schema_with_version()` can't store. + let expect_schema = tests::new_region_schema(123, 1); + assert_eq!( + expect_schema.store_schema(), + projected_schema.schema_to_read() + ); + + // (k0, timestamp, v0, sequence, op_type) + let batch = tests::new_batch(); + // Test Batch to our Chunk. + // (v0, timestamp) + let chunk = projected_schema.batch_to_chunk(&batch); + assert_eq!(2, chunk.columns.len()); + assert_eq!(&chunk.columns[0], batch.column(2)); + assert_eq!(&chunk.columns[1], batch.column(1)); + + // Test batch_from_parts + let keys = batch.columns()[0..2].to_vec(); + let values = batch.columns()[2..3].to_vec(); + let created = projected_schema.batch_from_parts( + keys, + values, + batch.column(3).clone(), + batch.column(4).clone(), + ); + assert_eq!(batch, created); + } + + #[test] + fn test_projected_schema_no_projection() { + // (k0, timestamp, v0) + let region_schema = Arc::new(tests::new_region_schema(123, 1)); + + let projected_schema = ProjectedSchema::no_projection(region_schema.clone()); + + assert_eq!( + region_schema.user_schema(), + projected_schema.projected_user_schema() + ); + assert_eq!( + region_schema.store_schema(), + projected_schema.schema_to_read() + ); + + for column in region_schema.all_columns() { + assert!(projected_schema.is_needed(column.id())); + } + + // (k0, timestamp, v0, sequence, op_type) + let batch = tests::new_batch(); + // Test Batch to our Chunk. + // (k0, timestamp, v0) + let chunk = projected_schema.batch_to_chunk(&batch); + assert_eq!(3, chunk.columns.len()); + } + + #[test] + fn test_projected_schema_empty_projection() { + // (k0, timestamp, v0) + let region_schema = Arc::new(tests::new_region_schema(123, 1)); + + let err = ProjectedSchema::new(region_schema, Some(Vec::new())) + .err() + .unwrap(); + assert!(matches!(err, Error::InvalidProjection { .. })); + } + + #[test] + fn test_compare_batch() { + let schema = read_util::new_projected_schema(); + let left = read_util::new_full_kv_batch(&[(1000, 1, 1000, OpType::Put)]); + let right = read_util::new_full_kv_batch(&[ + (999, 1, 1000, OpType::Put), + (1000, 1, 999, OpType::Put), + (1000, 1, 1000, OpType::Put), + ]); + + assert_eq!(Ordering::Greater, schema.compare_row(&left, 0, &right, 0)); + assert_eq!(Ordering::Less, schema.compare_row(&left, 0, &right, 1)); + assert_eq!(Ordering::Equal, schema.compare_row(&left, 0, &right, 2)); + } + + #[test] + fn test_dedup_batch() { + let schema = read_util::new_projected_schema(); + let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (2000, Some(2))]); + let mut selected = MutableBitmap::from_len_zeroed(3); + + schema.dedup(&batch, &mut selected, None); + assert!(selected.get(0)); + assert!(selected.get(1)); + assert!(!selected.get(2)); + + let prev = read_util::new_kv_batch(&[(1000, Some(1))]); + schema.dedup(&batch, &mut selected, Some(&prev)); + assert!(!selected.get(0)); + assert!(selected.get(1)); + assert!(!selected.get(2)); + } + + #[test] + fn test_filter_batch() { + let schema = read_util::new_projected_schema(); + let batch = read_util::new_kv_batch(&[(1000, Some(1)), (2000, Some(2)), (3000, Some(3))]); + let filter = BooleanVector::from_slice(&[true, false, true]); + + let res = schema.filter(&batch, &filter).unwrap(); + let expect: VectorRef = Arc::new(TimestampVector::from_values([1000, 3000])); + assert_eq!(expect, *res.column(0)); + } +} diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs new file mode 100644 index 0000000000..9790c20edc --- /dev/null +++ b/src/storage/src/schema/region.rs @@ -0,0 +1,183 @@ +use std::sync::Arc; + +use common_error::prelude::*; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; + +use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef}; +use crate::schema::{self, Result, StoreSchema}; + +/// Schema of region. +/// +/// The `RegionSchema` has the knowledge of reserved and internal columns. +/// Reserved columns are columns that their names, ids are reserved by the storage +/// engine, and could not be used by the user. Reserved columns usually have +/// special usage. Reserved columns expect the version columns are also +/// called internal columns (though the version could also be thought as a +/// special kind of internal column), are not visible to user, such as our +/// internal sequence, op_type columns. +/// +/// The user schema is the schema that only contains columns that user could visit, +/// as well as what the schema user created. +#[derive(Debug, PartialEq)] +pub struct RegionSchema { + /// Schema that only contains columns that user defined, excluding internal columns + /// that are reserved and used by the storage engine. + /// + /// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef` + /// conveniently. The fields order in `SchemaRef` **must** be consistent with + /// columns order in [ColumnsMetadata] to ensure the projection index of a field + /// is correct. + user_schema: SchemaRef, + /// store schema contains all columns of the region, including all internal columns. + store_schema: StoreSchema, + /// Metadata of columns. + columns: ColumnsMetadataRef, +} + +impl RegionSchema { + pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result { + let user_schema = Arc::new(build_user_schema(&columns, version)?); + let store_schema = StoreSchema::from_columns_metadata(&columns, version)?; + + debug_assert_eq!(user_schema.version(), store_schema.version()); + debug_assert_eq!(version, user_schema.version()); + + Ok(RegionSchema { + user_schema, + store_schema, + columns, + }) + } + + /// Returns the schema of the region, excluding internal columns that used by + /// the storage engine. + #[inline] + pub fn user_schema(&self) -> &SchemaRef { + &self.user_schema + } + + /// Returns the schema actually stores, which would also contains all internal columns. + #[inline] + pub fn store_schema(&self) -> &StoreSchema { + &self.store_schema + } + + #[inline] + pub fn row_key_columns(&self) -> impl Iterator { + self.columns.iter_row_key_columns() + } + + #[inline] + pub fn value_columns(&self) -> impl Iterator { + self.columns.iter_value_columns() + } + + #[inline] + pub fn num_row_key_columns(&self) -> usize { + self.columns.num_row_key_columns() + } + + #[inline] + pub fn num_value_columns(&self) -> usize { + self.columns.num_value_columns() + } + + #[inline] + pub fn version(&self) -> u32 { + self.user_schema.version() + } + + #[inline] + pub(crate) fn row_key_end(&self) -> usize { + self.columns.row_key_end() + } + + #[inline] + pub(crate) fn sequence_index(&self) -> usize { + self.store_schema.sequence_index() + } + + #[inline] + pub(crate) fn op_type_index(&self) -> usize { + self.store_schema.op_type_index() + } + + #[inline] + pub(crate) fn row_key_indices(&self) -> impl Iterator { + self.store_schema.row_key_indices() + } + + #[inline] + pub(crate) fn column_metadata(&self, idx: usize) -> &ColumnMetadata { + self.columns.column_metadata(idx) + } + + #[inline] + pub(crate) fn timestamp_key_index(&self) -> usize { + self.columns.timestamp_key_index() + } + + #[cfg(test)] + pub(crate) fn all_columns(&self) -> impl Iterator { + self.columns.iter_all_columns() + } +} + +pub type RegionSchemaRef = Arc; + +// Now user schema don't have extra metadata like store schema. +fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result { + let column_schemas: Vec<_> = columns + .iter_user_columns() + .map(|col| ColumnSchema::from(&col.desc)) + .collect(); + + SchemaBuilder::try_from(column_schemas) + .context(schema::ConvertSchemaSnafu)? + .timestamp_index(columns.timestamp_key_index()) + .version(version) + .build() + .context(schema::BuildSchemaSnafu) +} + +#[cfg(test)] +mod tests { + use datatypes::type_id::LogicalTypeId; + + use super::*; + use crate::schema::tests; + use crate::test_util::schema_util; + + #[test] + fn test_region_schema() { + let region_schema = Arc::new(tests::new_region_schema(123, 1)); + + let expect_schema = schema_util::new_schema_with_version( + &[ + ("k0", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Timestamp, false), + ("v0", LogicalTypeId::Int64, true), + ], + Some(1), + 123, + ); + + assert_eq!(expect_schema, **region_schema.user_schema()); + + // Checks row key column. + let mut row_keys = region_schema.row_key_columns(); + assert_eq!("k0", row_keys.next().unwrap().desc.name); + assert_eq!("timestamp", row_keys.next().unwrap().desc.name); + assert_eq!(None, row_keys.next()); + assert_eq!(2, region_schema.num_row_key_columns()); + + // Checks value column. + let mut values = region_schema.value_columns(); + assert_eq!("v0", values.next().unwrap().desc.name); + assert_eq!(None, values.next()); + assert_eq!(1, region_schema.num_value_columns()); + + // Checks version. + assert_eq!(123, region_schema.version()); + } +} diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs new file mode 100644 index 0000000000..827772024b --- /dev/null +++ b/src/storage/src/schema/store.rs @@ -0,0 +1,259 @@ +use std::sync::Arc; + +use common_error::prelude::*; +use datatypes::arrow::array::Array; +use datatypes::arrow::chunk::Chunk as ArrowChunk; +use datatypes::arrow::datatypes::Schema as ArrowSchema; +use datatypes::schema::{ColumnSchema, Metadata, Schema, SchemaBuilder, SchemaRef}; +use datatypes::vectors::Helper; +use store_api::storage::consts; + +use crate::metadata::ColumnsMetadata; +use crate::read::Batch; +use crate::schema::{self, Error, Result}; + +const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end"; +const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end"; + +/// Schema for storage engine. +/// +/// Used internally, contains all row key columns, internal columns and parts of value columns. +/// +/// Only contains a reference to schema and some indices, so it should be cheap to clone. +#[derive(Debug, Clone, PartialEq)] +pub struct StoreSchema { + schema: SchemaRef, + row_key_end: usize, + user_column_end: usize, +} + +impl StoreSchema { + #[inline] + pub fn version(&self) -> u32 { + self.schema.version() + } + + #[inline] + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + #[inline] + pub fn arrow_schema(&self) -> &Arc { + self.schema.arrow_schema() + } + + pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> ArrowChunk> { + assert_eq!(self.schema.num_columns(), batch.num_columns()); + + ArrowChunk::new(batch.columns().iter().map(|v| v.to_arrow_array()).collect()) + } + + pub fn arrow_chunk_to_batch(&self, chunk: &ArrowChunk>) -> Result { + assert_eq!(self.schema.num_columns(), chunk.columns().len()); + + let columns = chunk + .iter() + .enumerate() + .map(|(i, column)| { + Helper::try_into_vector(column.clone()).context(schema::ConvertChunkSnafu { + name: self.column_name(i), + }) + }) + .collect::>()?; + + Ok(Batch::new(columns)) + } + + pub(crate) fn contains_column(&self, name: &str) -> bool { + self.schema.column_schema_by_name(name).is_some() + } + + pub(crate) fn is_key_column(&self, name: &str) -> bool { + self.schema + .column_index_by_name(name) + .map(|idx| idx < self.row_key_end) + .unwrap_or(false) + } + + pub(crate) fn is_user_column(&self, name: &str) -> bool { + self.schema + .column_index_by_name(name) + .map(|idx| idx < self.user_column_end) + .unwrap_or(false) + } + + pub(crate) fn from_columns_metadata( + columns: &ColumnsMetadata, + version: u32, + ) -> Result { + let column_schemas: Vec<_> = columns + .iter_all_columns() + .map(|col| ColumnSchema::from(&col.desc)) + .collect(); + + StoreSchema::new( + column_schemas, + version, + columns.timestamp_key_index(), + columns.row_key_end(), + columns.user_column_end(), + ) + } + + pub(crate) fn new( + column_schemas: Vec, + version: u32, + timestamp_key_index: usize, + row_key_end: usize, + user_column_end: usize, + ) -> Result { + let schema = SchemaBuilder::try_from(column_schemas) + .context(schema::ConvertSchemaSnafu)? + .timestamp_index(timestamp_key_index) + .version(version) + .add_metadata(ROW_KEY_END_KEY, row_key_end.to_string()) + .add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string()) + .build() + .context(schema::BuildSchemaSnafu)?; + + assert_eq!( + consts::SEQUENCE_COLUMN_NAME, + schema.column_schemas()[user_column_end].name + ); + assert_eq!( + consts::OP_TYPE_COLUMN_NAME, + schema.column_schemas()[user_column_end + 1].name + ); + + Ok(StoreSchema { + schema: Arc::new(schema), + row_key_end, + user_column_end, + }) + } + + #[inline] + pub(crate) fn sequence_index(&self) -> usize { + self.user_column_end + } + + #[inline] + pub(crate) fn op_type_index(&self) -> usize { + self.user_column_end + 1 + } + + #[inline] + pub(crate) fn row_key_indices(&self) -> impl Iterator { + 0..self.row_key_end + } + + #[inline] + pub(crate) fn column_name(&self, idx: usize) -> &str { + &self.schema.column_schemas()[idx].name + } + + #[inline] + pub(crate) fn num_columns(&self) -> usize { + self.schema.num_columns() + } +} + +impl TryFrom for StoreSchema { + type Error = Error; + + fn try_from(arrow_schema: ArrowSchema) -> Result { + let schema = Schema::try_from(arrow_schema).context(schema::ConvertArrowSchemaSnafu)?; + // Recover other metadata from schema. + let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?; + let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?; + + // There should be sequence and op_type columns. + ensure!( + consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name, + schema::InvalidIndexSnafu + ); + ensure!( + consts::OP_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name, + schema::InvalidIndexSnafu + ); + + Ok(StoreSchema { + schema: Arc::new(schema), + row_key_end, + user_column_end, + }) + } +} + +fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { + let value = metadata + .get(key) + .context(schema::MissingMetaSnafu { key })?; + value.parse().context(schema::ParseIndexSnafu { value }) +} + +#[cfg(test)] +mod tests { + use datatypes::arrow::array::Array; + use datatypes::arrow::chunk::Chunk as ArrowChunk; + use datatypes::type_id::LogicalTypeId; + use store_api::storage::consts; + + use super::*; + use crate::read::Batch; + use crate::schema::tests; + use crate::test_util::schema_util; + + fn check_chunk_batch(chunk: &ArrowChunk>, batch: &Batch) { + assert_eq!(5, chunk.columns().len()); + assert_eq!(3, chunk.len()); + + for i in 0..5 { + assert_eq!(chunk[i], batch.column(i).to_arrow_array()); + } + } + + #[test] + fn test_store_schema() { + let region_schema = Arc::new(tests::new_region_schema(123, 1)); + + // Checks StoreSchema. + let store_schema = region_schema.store_schema(); + assert_eq!(123, store_schema.version()); + let sst_arrow_schema = store_schema.arrow_schema(); + let converted_store_schema = StoreSchema::try_from((**sst_arrow_schema).clone()).unwrap(); + + assert_eq!(*store_schema, converted_store_schema); + + let expect_schema = schema_util::new_schema_with_version( + &[ + ("k0", LogicalTypeId::Int64, false), + ("timestamp", LogicalTypeId::Timestamp, false), + ("v0", LogicalTypeId::Int64, true), + (consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false), + (consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false), + ], + Some(1), + 123, + ); + assert_eq!( + expect_schema.column_schemas(), + store_schema.schema().column_schemas() + ); + assert_eq!(3, store_schema.sequence_index()); + assert_eq!(4, store_schema.op_type_index()); + let row_key_indices: Vec<_> = store_schema.row_key_indices().collect(); + assert_eq!([0, 1], &row_key_indices[..]); + + // Test batch and chunk conversion. + let batch = tests::new_batch(); + // Convert batch to chunk. + let chunk = store_schema.batch_to_arrow_chunk(&batch); + check_chunk_batch(&chunk, &batch); + + // Convert chunk to batch. + let converted_batch = store_schema.arrow_chunk_to_batch(&chunk).unwrap(); + check_chunk_batch(&chunk, &converted_batch); + } +}