diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 185db77c2c..a3053a5293 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -75,6 +75,9 @@ pub enum Error { reason: String, backtrace: Backtrace, }, + + #[snafu(display("Duplicated metadata for {}", key))] + DuplicateMeta { key: String, backtrace: Backtrace }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index efe07ae95f..c0d5cd4c60 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -1,6 +1,6 @@ mod constraint; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; pub use arrow::datatypes::Metadata; @@ -25,12 +25,14 @@ const VERSION_KEY: &str = "greptime:version"; /// Key used to store default constraint in arrow field's metadata. const ARROW_FIELD_DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint"; +/// Schema of a column, used as an immutable struct. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ColumnSchema { pub name: String, pub data_type: ConcreteDataType, is_nullable: bool, default_constraint: Option, + metadata: Metadata, } impl ColumnSchema { @@ -44,6 +46,7 @@ impl ColumnSchema { data_type, is_nullable, default_constraint: None, + metadata: Metadata::new(), } } @@ -57,6 +60,11 @@ impl ColumnSchema { self.default_constraint.as_ref() } + #[inline] + pub fn metadata(&self) -> &Metadata { + &self.metadata + } + pub fn with_default_constraint( mut self, default_constraint: Option, @@ -69,6 +77,12 @@ impl ColumnSchema { Ok(self) } + /// Creates a new [`ColumnSchema`] with given metadata. + pub fn with_metadata(mut self, metadata: Metadata) -> Self { + self.metadata = metadata; + self + } + pub fn create_default_vector(&self, num_rows: usize) -> Result> { match &self.default_constraint { Some(c) => c @@ -303,8 +317,9 @@ impl TryFrom<&Field> for ColumnSchema { fn try_from(field: &Field) -> Result { let data_type = ConcreteDataType::try_from(&field.data_type)?; - let default_constraint = match field.metadata.get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) { - Some(json) => Some(serde_json::from_str(json).context(DeserializeSnafu { json })?), + let mut metadata = field.metadata.clone(); + let default_constraint = match metadata.remove(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) { + Some(json) => Some(serde_json::from_str(&json).context(DeserializeSnafu { json })?), None => None, }; @@ -313,6 +328,7 @@ impl TryFrom<&Field> for ColumnSchema { data_type, is_nullable: field.is_nullable, default_constraint, + metadata, }) } } @@ -321,16 +337,21 @@ impl TryFrom<&ColumnSchema> for Field { type Error = Error; fn try_from(column_schema: &ColumnSchema) -> Result { - let metadata = if let Some(value) = &column_schema.default_constraint { - let mut m = BTreeMap::new(); - m.insert( + let mut metadata = column_schema.metadata.clone(); + if let Some(value) = &column_schema.default_constraint { + // Adds an additional metadata to store the default constraint. + let old = metadata.insert( ARROW_FIELD_DEFAULT_CONSTRAINT_KEY.to_string(), serde_json::to_string(&value).context(SerializeSnafu)?, ); - m - } else { - BTreeMap::default() - }; + + ensure!( + old.is_none(), + error::DuplicateMetaSnafu { + key: ARROW_FIELD_DEFAULT_CONSTRAINT_KEY, + } + ); + } Ok(Field::new( column_schema.name.clone(), @@ -420,6 +441,11 @@ mod tests { let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true) .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99)))) .unwrap(); + assert!(column_schema + .metadata() + .get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) + .is_none()); + let field = Field::try_from(&column_schema).unwrap(); assert_eq!("test", field.name); assert_eq!(ArrowDataType::Int32, field.data_type); @@ -436,6 +462,45 @@ mod tests { assert_eq!(column_schema, new_column_schema); } + #[test] + fn test_column_schema_with_metadata() { + let mut metadata = Metadata::new(); + metadata.insert("k1".to_string(), "v1".to_string()); + let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true) + .with_metadata(metadata) + .with_default_constraint(Some(ColumnDefaultConstraint::null_value())) + .unwrap(); + assert_eq!("v1", column_schema.metadata().get("k1").unwrap()); + assert!(column_schema + .metadata() + .get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) + .is_none()); + + let field = Field::try_from(&column_schema).unwrap(); + assert_eq!("v1", field.metadata.get("k1").unwrap()); + assert!(field + .metadata + .get(ARROW_FIELD_DEFAULT_CONSTRAINT_KEY) + .is_some()); + + let new_column_schema = ColumnSchema::try_from(&field).unwrap(); + assert_eq!(column_schema, new_column_schema); + } + + #[test] + fn test_column_schema_with_duplicate_metadata() { + let mut metadata = Metadata::new(); + metadata.insert( + ARROW_FIELD_DEFAULT_CONSTRAINT_KEY.to_string(), + "v1".to_string(), + ); + let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true) + .with_metadata(metadata) + .with_default_constraint(Some(ColumnDefaultConstraint::null_value())) + .unwrap(); + Field::try_from(&column_schema).unwrap_err(); + } + #[test] fn test_column_schema_invalid_default_constraint() { ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false) diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 368a179184..50947601c0 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -179,7 +179,7 @@ pub enum Error { #[snafu(display("Parquet file schema is invalid, source: {}", source))] InvalidParquetSchema { #[snafu(backtrace)] - source: crate::schema::Error, + source: MetadataError, }, #[snafu(display("Region is under {} state, cannot proceed operation", state))] @@ -223,7 +223,7 @@ pub enum Error { ConvertStoreSchema { file: String, #[snafu(backtrace)] - source: crate::schema::Error, + source: MetadataError, }, #[snafu(display("Invalid raw region metadata, region: {}, source: {}", region, source))] @@ -236,7 +236,7 @@ pub enum Error { #[snafu(display("Invalid projection, source: {}", source))] InvalidProjection { #[snafu(backtrace)] - source: crate::schema::Error, + source: MetadataError, }, #[snafu(display("Failed to push data to batch builder, source: {}", source))] @@ -295,6 +295,15 @@ pub enum Error { version: u32, backtrace: Backtrace, }, + + #[snafu(display( + "Failed to convert between ColumnSchema and ColumnMetadata, source: {}", + source + ))] + ConvertColumnSchema { + #[snafu(backtrace)] + source: MetadataError, + }, } pub type Result = std::result::Result; @@ -345,9 +354,9 @@ impl ErrorExt for Error { | InvalidRegionState { .. } | ReadWal { .. } => StatusCode::StorageUnavailable, - InvalidAlterRequest { source, .. } | InvalidRegionDesc { source, .. } => { - source.status_code() - } + InvalidAlterRequest { source, .. } + | InvalidRegionDesc { source, .. } + | ConvertColumnSchema { source, .. } => source.status_code(), PushBatch { source, .. } => source.status_code(), AddDefault { source, .. } => source.status_code(), } diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index a967c0aa0c..0a733ec79a 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -1,16 +1,19 @@ use std::collections::{HashMap, HashSet}; +use std::num::ParseIntError; +use std::str::FromStr; use std::sync::Arc; use common_error::prelude::*; use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Metadata}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; use store_api::storage::{ consts::{self, ReservedColumnId}, AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, - ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, - RegionDescriptor, RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor, - RowKeyDescriptorBuilder, Schema, SchemaRef, + ColumnDescriptorBuilderError, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, + ColumnFamilyId, ColumnId, RegionDescriptor, RegionDescriptorBuilder, RegionId, RegionMeta, + RowKeyDescriptor, RowKeyDescriptorBuilder, Schema, SchemaRef, }; use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata}; @@ -18,6 +21,7 @@ use crate::schema::{RegionSchema, RegionSchemaRef}; /// Error for handling metadata. #[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] pub enum Error { #[snafu(display("Column name {} already exists", name))] ColNameExists { name: String, backtrace: Backtrace }, @@ -34,7 +38,7 @@ pub enum Error { #[snafu(display("Failed to build schema, source: {}", source))] InvalidSchema { #[snafu(backtrace)] - source: crate::schema::Error, + source: datatypes::error::Error, }, #[snafu(display("Column name {} is reserved by the system", name))] @@ -64,7 +68,63 @@ pub enum Error { #[snafu(display("Failed to drop column {} as it is an internal column", name))] DropInternalColumn { name: String }, + // End of variants for validating `AlterRequest`. + #[snafu(display("Failed to convert to column schema, source: {}", source))] + ToColumnSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display( + "Failed to parse metadata to int, key_value: {}, source: {}", + key_value, + source + ))] + ParseMetaInt { + // Store key and value in one string to reduce the enum size. + key_value: String, + source: std::num::ParseIntError, + backtrace: Backtrace, + }, + + #[snafu(display("Metadata of {} not found", key))] + MetaNotFound { key: String, backtrace: Backtrace }, + + #[snafu(display("Failed to build column descriptor, source: {}", source))] + BuildColumnDescriptor { + source: ColumnDescriptorBuilderError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to convert from arrow schema, source: {}", source))] + ConvertArrowSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Invalid internal column index in arrow schema"))] + InvalidIndex { backtrace: Backtrace }, + + #[snafu(display( + "Failed to convert arrow chunk to batch, name: {}, source: {}", + name, + source + ))] + ConvertChunk { + name: String, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to convert schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + + #[snafu(display("Invalid projection, {}", msg))] + InvalidProjection { msg: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -285,8 +345,7 @@ impl TryFrom for RegionMetadata { fn try_from(raw: RawRegionMetadata) -> Result { let columns = Arc::new(ColumnsMetadata::from(raw.columns)); - let schema = - Arc::new(RegionSchema::new(columns.clone(), raw.version).context(InvalidSchemaSnafu)?); + let schema = Arc::new(RegionSchema::new(columns.clone(), raw.version)?); Ok(RegionMetadata { id: raw.id, @@ -299,6 +358,10 @@ impl TryFrom for RegionMetadata { } } +const METADATA_CF_ID_KEY: &str = "greptime:storage:cf_id"; +const METADATA_COLUMN_ID_KEY: &str = "greptime:storage:column_id"; +const METADATA_COMMENT_KEY: &str = "greptime:storage:comment"; + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ColumnMetadata { pub cf_id: ColumnFamilyId, @@ -315,6 +378,67 @@ impl ColumnMetadata { pub fn name(&self) -> &str { &self.desc.name } + + /// Convert `self` to [`ColumnSchema`] for building a [`StoreSchema`](crate::schema::StoreSchema). This + /// would store additional metadatas to the ColumnSchema. + pub fn to_column_schema(&self) -> Result { + let desc = &self.desc; + ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable()) + .with_metadata(self.to_metadata()) + .with_default_constraint(desc.default_constraint().cloned()) + .context(ToColumnSchemaSnafu) + } + + /// Convert [`ColumnSchema`] in [`StoreSchema`](crate::schema::StoreSchema) to [`ColumnMetadata`]. + pub fn from_column_schema(column_schema: &ColumnSchema) -> Result { + let metadata = column_schema.metadata(); + let cf_id = try_parse_int(metadata, METADATA_CF_ID_KEY, Some(consts::DEFAULT_CF_ID))?; + let column_id = try_parse_int(metadata, METADATA_COLUMN_ID_KEY, None)?; + let comment = metadata + .get(METADATA_COMMENT_KEY) + .cloned() + .unwrap_or_default(); + + let desc = ColumnDescriptorBuilder::new( + column_id, + &column_schema.name, + column_schema.data_type.clone(), + ) + .is_nullable(column_schema.is_nullable()) + .default_constraint(column_schema.default_constraint().cloned()) + .comment(comment) + .build() + .context(BuildColumnDescriptorSnafu)?; + + Ok(ColumnMetadata { cf_id, desc }) + } + + fn to_metadata(&self) -> Metadata { + let mut metadata = Metadata::new(); + if self.cf_id != consts::DEFAULT_CF_ID { + metadata.insert(METADATA_CF_ID_KEY.to_string(), self.cf_id.to_string()); + } + metadata.insert(METADATA_COLUMN_ID_KEY.to_string(), self.desc.id.to_string()); + if !self.desc.comment.is_empty() { + metadata.insert(METADATA_COMMENT_KEY.to_string(), self.desc.comment.clone()); + } + + metadata + } +} + +fn try_parse_int(metadata: &Metadata, key: &str, default_value: Option) -> Result +where + T: FromStr, +{ + if let Some(value) = metadata.get(key) { + return value.parse().with_context(|_| ParseMetaIntSnafu { + key_value: format!("{}={}", key, value), + }); + } + // No such key in metadata. + + default_value.context(MetaNotFoundSnafu { key }) } #[derive(Clone, Debug, PartialEq)] @@ -361,8 +485,9 @@ impl ColumnsMetadata { self.columns.iter().take(self.user_column_end) } - pub fn iter_all_columns(&self) -> impl Iterator { - self.columns.iter() + #[inline] + pub fn columns(&self) -> &[ColumnMetadata] { + &self.columns } #[inline] @@ -710,8 +835,7 @@ impl RegionMetadataBuilder { fn build(self) -> Result { let columns = Arc::new(self.columns_meta_builder.build()?); - let schema = - Arc::new(RegionSchema::new(columns.clone(), self.version).context(InvalidSchemaSnafu)?); + let schema = Arc::new(RegionSchema::new(columns.clone(), self.version)?); Ok(RegionMetadata { id: self.id, @@ -765,10 +889,11 @@ fn is_internal_value_column(column_name: &str) -> bool { ) } -// TODO(yingwen): Add tests for using invalid row_key/cf to build metadata. #[cfg(test)] mod tests { + use datatypes::schema::ColumnDefaultConstraint; use datatypes::type_id::LogicalTypeId; + use datatypes::value::Value; use store_api::storage::{ AddColumn, AlterOperation, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, RowKeyDescriptorBuilder, @@ -1231,4 +1356,27 @@ mod tests { }; metadata.validate_alter(&req).unwrap(); } + + #[test] + fn test_column_metadata_conversion() { + let desc = ColumnDescriptorBuilder::new(123, "test", ConcreteDataType::int32_datatype()) + .is_nullable(false) + .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(321)))) + .comment("hello") + .build() + .unwrap(); + + let meta = ColumnMetadata { + cf_id: consts::DEFAULT_CF_ID, + desc: desc.clone(), + }; + let column_schema = meta.to_column_schema().unwrap(); + let new_meta = ColumnMetadata::from_column_schema(&column_schema).unwrap(); + assert_eq!(meta, new_meta); + + let meta = ColumnMetadata { cf_id: 567, desc }; + let column_schema = meta.to_column_schema().unwrap(); + let new_meta = ColumnMetadata::from_column_schema(&column_schema).unwrap(); + assert_eq!(meta, new_meta); + } } diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 0a0616c0fc..fdb3559ec7 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -3,71 +3,9 @@ mod projected; mod region; mod store; -use common_error::prelude::*; - pub use crate::schema::projected::{ProjectedSchema, ProjectedSchemaRef}; pub use crate::schema::region::{RegionSchema, RegionSchemaRef}; -pub use crate::schema::store::StoreSchema; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Failed to build schema, source: {}", source))] - BuildSchema { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - - #[snafu(display("Failed to convert from arrow schema, source: {}", source))] - ConvertArrowSchema { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - - #[snafu(display("Invalid internal column index in arrow schema"))] - InvalidIndex { backtrace: Backtrace }, - - #[snafu(display("Missing metadata {} in arrow schema", key))] - MissingMeta { key: String, backtrace: Backtrace }, - - #[snafu(display("Missing column {} in arrow schema", column))] - MissingColumn { - column: String, - backtrace: Backtrace, - }, - - #[snafu(display( - "Failed to parse index in schema meta, value: {}, source: {}", - value, - source - ))] - ParseIndex { - value: String, - source: std::num::ParseIntError, - backtrace: Backtrace, - }, - - #[snafu(display( - "Failed to convert arrow chunk to batch, name: {}, source: {}", - name, - source - ))] - ConvertChunk { - name: String, - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - - #[snafu(display("Failed to convert schema, source: {}", source))] - ConvertSchema { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - - #[snafu(display("Invalid projection, {}", msg))] - InvalidProjection { msg: String, backtrace: Backtrace }, -} - -pub type Result = std::result::Result; +pub use crate::schema::store::{StoreSchema, StoreSchemaRef}; #[cfg(test)] mod tests { diff --git a/src/storage/src/schema/projected.rs b/src/storage/src/schema/projected.rs index ca35fe5d51..0bb5f6c19b 100644 --- a/src/storage/src/schema/projected.rs +++ b/src/storage/src/schema/projected.rs @@ -4,13 +4,14 @@ use std::sync::Arc; use common_error::prelude::*; use datatypes::arrow::bitmap::MutableBitmap; -use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; +use datatypes::schema::{SchemaBuilder, SchemaRef}; use datatypes::vectors::{BooleanVector, VectorRef}; use store_api::storage::{Chunk, ColumnId}; use crate::error; +use crate::metadata::{self, Result}; use crate::read::{Batch, BatchOp}; -use crate::schema::{self, RegionSchema, RegionSchemaRef, Result, StoreSchema}; +use crate::schema::{RegionSchema, RegionSchemaRef, StoreSchema, StoreSchemaRef}; /// Metadata about projection. #[derive(Debug, Default)] @@ -91,7 +92,7 @@ pub struct ProjectedSchema { /// Projection info, `None` means don't need to do projection. projection: Option, /// Schema used to read from data sources. - schema_to_read: StoreSchema, + schema_to_read: StoreSchemaRef, /// User schema after projection. projected_user_schema: SchemaRef, } @@ -145,7 +146,7 @@ impl ProjectedSchema { } #[inline] - pub fn schema_to_read(&self) -> &StoreSchema { + pub fn schema_to_read(&self) -> &StoreSchemaRef { &self.schema_to_read } @@ -216,21 +217,25 @@ impl ProjectedSchema { fn build_schema_to_read( region_schema: &RegionSchema, projection: &Projection, - ) -> Result { - let column_schemas: Vec<_> = projection + ) -> Result { + // Reorder columns according to the projection. + let columns: Vec<_> = projection .columns_to_read .iter() - .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) + .map(|col_idx| region_schema.column_metadata(*col_idx)) + .cloned() .collect(); // All row key columns are reserved in this schema, so we can use the row_key_end // and timestamp_key_index from region schema. - StoreSchema::new( - column_schemas, + let store_schema = StoreSchema::new( + columns, region_schema.version(), region_schema.timestamp_key_index(), region_schema.row_key_end(), projection.num_user_columns, - ) + )?; + + Ok(Arc::new(store_schema)) } fn build_projected_user_schema( @@ -252,17 +257,22 @@ impl ProjectedSchema { let column_schemas: Vec<_> = projection .projected_columns .iter() - .map(|col_idx| ColumnSchema::from(®ion_schema.column_metadata(*col_idx).desc)) + .map(|col_idx| { + region_schema + .column_metadata(*col_idx) + .desc + .to_column_schema() + }) .collect(); let mut builder = SchemaBuilder::try_from(column_schemas) - .context(schema::ConvertSchemaSnafu)? + .context(metadata::ConvertSchemaSnafu)? .version(region_schema.version()); if let Some(timestamp_index) = timestamp_index { builder = builder.timestamp_index(timestamp_index); } - let schema = builder.build().context(schema::BuildSchemaSnafu)?; + let schema = builder.build().context(metadata::InvalidSchemaSnafu)?; Ok(Arc::new(schema)) } @@ -272,7 +282,7 @@ impl ProjectedSchema { // should be always read, and the `StoreSchema` also requires the timestamp column. ensure!( !indices.is_empty(), - schema::InvalidProjectionSnafu { + metadata::InvalidProjectionSnafu { msg: "at least one column should be read", } ); @@ -282,7 +292,7 @@ impl ProjectedSchema { for i in indices { ensure!( *i < user_schema.num_columns(), - schema::InvalidProjectionSnafu { + metadata::InvalidProjectionSnafu { msg: format!( "index {} out of bound, only contains {} columns", i, @@ -363,7 +373,8 @@ mod tests { use store_api::storage::OpType; use super::*; - use crate::schema::{tests, Error}; + use crate::metadata::Error; + use crate::schema::tests; use crate::test_util::{read_util, schema_util}; #[test] @@ -428,7 +439,8 @@ mod tests { // Test is_needed let needed: Vec<_> = region_schema - .all_columns() + .columns() + .iter() .enumerate() .filter_map(|(idx, column_meta)| { if projected_schema.is_needed(column_meta.id()) { @@ -491,7 +503,7 @@ mod tests { projected_schema.schema_to_read() ); - for column in region_schema.all_columns() { + for column in region_schema.columns() { assert!(projected_schema.is_needed(column.id())); } diff --git a/src/storage/src/schema/region.rs b/src/storage/src/schema/region.rs index 9790c20edc..29bb84cb84 100644 --- a/src/storage/src/schema/region.rs +++ b/src/storage/src/schema/region.rs @@ -1,10 +1,10 @@ use std::sync::Arc; use common_error::prelude::*; -use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; +use datatypes::schema::{Schema, SchemaBuilder, SchemaRef}; -use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef}; -use crate::schema::{self, Result, StoreSchema}; +use crate::metadata::{self, ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef, Result}; +use crate::schema::{StoreSchema, StoreSchemaRef}; /// Schema of region. /// @@ -29,7 +29,7 @@ pub struct RegionSchema { /// is correct. user_schema: SchemaRef, /// store schema contains all columns of the region, including all internal columns. - store_schema: StoreSchema, + store_schema: StoreSchemaRef, /// Metadata of columns. columns: ColumnsMetadataRef, } @@ -37,7 +37,7 @@ pub struct RegionSchema { impl RegionSchema { pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result { let user_schema = Arc::new(build_user_schema(&columns, version)?); - let store_schema = StoreSchema::from_columns_metadata(&columns, version)?; + let store_schema = Arc::new(StoreSchema::from_columns_metadata(&columns, version)?); debug_assert_eq!(user_schema.version(), store_schema.version()); debug_assert_eq!(version, user_schema.version()); @@ -58,7 +58,7 @@ impl RegionSchema { /// Returns the schema actually stores, which would also contains all internal columns. #[inline] - pub fn store_schema(&self) -> &StoreSchema { + pub fn store_schema(&self) -> &StoreSchemaRef { &self.store_schema } @@ -118,8 +118,8 @@ impl RegionSchema { } #[cfg(test)] - pub(crate) fn all_columns(&self) -> impl Iterator { - self.columns.iter_all_columns() + pub(crate) fn columns(&self) -> &[ColumnMetadata] { + self.columns.columns() } } @@ -129,15 +129,15 @@ pub type RegionSchemaRef = Arc; fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result { let column_schemas: Vec<_> = columns .iter_user_columns() - .map(|col| ColumnSchema::from(&col.desc)) + .map(|col| col.desc.to_column_schema()) .collect(); SchemaBuilder::try_from(column_schemas) - .context(schema::ConvertSchemaSnafu)? + .context(metadata::ConvertSchemaSnafu)? .timestamp_index(columns.timestamp_key_index()) .version(version) .build() - .context(schema::BuildSchemaSnafu) + .context(metadata::InvalidSchemaSnafu) } #[cfg(test)] diff --git a/src/storage/src/schema/store.rs b/src/storage/src/schema/store.rs index 827772024b..e423888ae8 100644 --- a/src/storage/src/schema/store.rs +++ b/src/storage/src/schema/store.rs @@ -4,29 +4,30 @@ use common_error::prelude::*; use datatypes::arrow::array::Array; use datatypes::arrow::chunk::Chunk as ArrowChunk; use datatypes::arrow::datatypes::Schema as ArrowSchema; -use datatypes::schema::{ColumnSchema, Metadata, Schema, SchemaBuilder, SchemaRef}; +use datatypes::schema::{Metadata, Schema, SchemaBuilder, SchemaRef}; use datatypes::vectors::Helper; use store_api::storage::consts; -use crate::metadata::ColumnsMetadata; +use crate::metadata::{self, ColumnMetadata, ColumnsMetadata, Error, Result}; use crate::read::Batch; -use crate::schema::{self, Error, Result}; const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end"; const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end"; -/// Schema for storage engine. +/// Schema that contains storage engine specific metadata, such as internal columns. /// -/// Used internally, contains all row key columns, internal columns and parts of value columns. -/// -/// Only contains a reference to schema and some indices, so it should be cheap to clone. -#[derive(Debug, Clone, PartialEq)] +/// Used internally, contains all row key columns, internal columns and a sub set of +/// value columns in a region. The columns are organized in `key, value, internal` order. +#[derive(Debug, PartialEq)] pub struct StoreSchema { + columns: Vec, schema: SchemaRef, row_key_end: usize, user_column_end: usize, } +pub type StoreSchemaRef = Arc; + impl StoreSchema { #[inline] pub fn version(&self) -> u32 { @@ -56,7 +57,7 @@ impl StoreSchema { .iter() .enumerate() .map(|(i, column)| { - Helper::try_into_vector(column.clone()).context(schema::ConvertChunkSnafu { + Helper::try_into_vector(column.clone()).context(metadata::ConvertChunkSnafu { name: self.column_name(i), }) }) @@ -87,13 +88,8 @@ impl StoreSchema { columns: &ColumnsMetadata, version: u32, ) -> Result { - let column_schemas: Vec<_> = columns - .iter_all_columns() - .map(|col| ColumnSchema::from(&col.desc)) - .collect(); - StoreSchema::new( - column_schemas, + columns.columns().to_vec(), version, columns.timestamp_key_index(), columns.row_key_end(), @@ -102,20 +98,25 @@ impl StoreSchema { } pub(crate) fn new( - column_schemas: Vec, + columns: Vec, version: u32, timestamp_key_index: usize, row_key_end: usize, user_column_end: usize, ) -> Result { + let column_schemas = columns + .iter() + .map(|meta| meta.to_column_schema()) + .collect::>>()?; + let schema = SchemaBuilder::try_from(column_schemas) - .context(schema::ConvertSchemaSnafu)? + .context(metadata::ConvertSchemaSnafu)? .timestamp_index(timestamp_key_index) .version(version) .add_metadata(ROW_KEY_END_KEY, row_key_end.to_string()) .add_metadata(USER_COLUMN_END_KEY, user_column_end.to_string()) .build() - .context(schema::BuildSchemaSnafu)?; + .context(metadata::InvalidSchemaSnafu)?; assert_eq!( consts::SEQUENCE_COLUMN_NAME, @@ -127,6 +128,7 @@ impl StoreSchema { ); Ok(StoreSchema { + columns, schema: Arc::new(schema), row_key_end, user_column_end, @@ -163,7 +165,7 @@ impl TryFrom for StoreSchema { type Error = Error; fn try_from(arrow_schema: ArrowSchema) -> Result { - let schema = Schema::try_from(arrow_schema).context(schema::ConvertArrowSchemaSnafu)?; + let schema = Schema::try_from(arrow_schema).context(metadata::ConvertArrowSchemaSnafu)?; // Recover other metadata from schema. let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?; let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?; @@ -171,14 +173,22 @@ impl TryFrom for StoreSchema { // There should be sequence and op_type columns. ensure!( consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name, - schema::InvalidIndexSnafu + metadata::InvalidIndexSnafu ); ensure!( consts::OP_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name, - schema::InvalidIndexSnafu + metadata::InvalidIndexSnafu ); + // Recover ColumnMetadata from schema. + let columns = schema + .column_schemas() + .iter() + .map(ColumnMetadata::from_column_schema) + .collect::>()?; + Ok(StoreSchema { + columns, schema: Arc::new(schema), row_key_end, user_column_end, @@ -189,21 +199,20 @@ impl TryFrom for StoreSchema { fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result { let value = metadata .get(key) - .context(schema::MissingMetaSnafu { key })?; - value.parse().context(schema::ParseIndexSnafu { value }) + .context(metadata::MetaNotFoundSnafu { key })?; + value.parse().with_context(|_| metadata::ParseMetaIntSnafu { + key_value: format!("{}={}", key, value), + }) } #[cfg(test)] mod tests { use datatypes::arrow::array::Array; use datatypes::arrow::chunk::Chunk as ArrowChunk; - use datatypes::type_id::LogicalTypeId; - use store_api::storage::consts; use super::*; use crate::read::Batch; use crate::schema::tests; - use crate::test_util::schema_util; fn check_chunk_batch(chunk: &ArrowChunk>, batch: &Batch) { assert_eq!(5, chunk.columns().len()); @@ -224,22 +233,24 @@ mod tests { let sst_arrow_schema = store_schema.arrow_schema(); let converted_store_schema = StoreSchema::try_from((**sst_arrow_schema).clone()).unwrap(); - assert_eq!(*store_schema, converted_store_schema); + assert_eq!(**store_schema, converted_store_schema); - let expect_schema = schema_util::new_schema_with_version( - &[ - ("k0", LogicalTypeId::Int64, false), - ("timestamp", LogicalTypeId::Timestamp, false), - ("v0", LogicalTypeId::Int64, true), - (consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false), - (consts::OP_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false), - ], - Some(1), - 123, - ); + let column_schemas: Vec<_> = region_schema + .columns() + .iter() + .map(|meta| meta.to_column_schema().unwrap()) + .collect(); + let expect_schema = SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .timestamp_index(1) + .build() + .unwrap(); + // Only compare column schemas since SchemaRef in StoreSchema also contains other metadata that only used + // by StoreSchema. assert_eq!( expect_schema.column_schemas(), - store_schema.schema().column_schemas() + store_schema.schema().column_schemas(), ); assert_eq!(3, store_schema.sequence_index()); assert_eq!(4, store_schema.op_type_index()); diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 40d646c66d..b10b46dbd3 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -39,6 +39,14 @@ impl ColumnDescriptor { pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> { self.default_constraint.as_ref() } + + /// Convert [ColumnDescriptor] to [ColumnSchema]. Fields not in ColumnSchema **will not** + /// be stored as metadata. + pub fn to_column_schema(&self) -> ColumnSchema { + ColumnSchema::new(&self.name, self.data_type.clone(), self.is_nullable) + .with_default_constraint(self.default_constraint.clone()) + .expect("ColumnDescriptor should validate default constraint") + } } impl ColumnDescriptorBuilder { @@ -74,14 +82,6 @@ impl ColumnDescriptorBuilder { } } -impl From<&ColumnDescriptor> for ColumnSchema { - fn from(desc: &ColumnDescriptor) -> ColumnSchema { - ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable) - .with_default_constraint(desc.default_constraint.clone()) - .expect("ColumnDescriptor should validate default constraint") - } -} - /// A [RowKeyDescriptor] contains information about row key. #[derive(Debug, Clone, PartialEq, Builder)] #[builder(pattern = "owned")] @@ -206,6 +206,22 @@ mod tests { .unwrap_err(); } + #[test] + fn test_descriptor_to_column_schema() { + let constraint = ColumnDefaultConstraint::Value(Value::Int32(123)); + let desc = new_column_desc_builder() + .default_constraint(Some(constraint.clone())) + .is_nullable(false) + .build() + .unwrap(); + let column_schema = desc.to_column_schema(); + let expected = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false) + .with_default_constraint(Some(constraint)) + .unwrap(); + + assert_eq!(expected, column_schema); + } + fn new_timestamp_desc() -> ColumnDescriptor { ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype()) .build()