From ed89cc3e212251a86999730c3453950fa3e6b0de Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 28 Sep 2022 13:56:25 +0800 Subject: [PATCH] feat: Change signature of the Region::alter method (#287) * feat: Change signature of the Region::alter method * refactor: Add builders for ColumnsMetadata and ColumnFamiliesMetadata * feat: Support altering the region metadata Altering the region metadata is done in a copy-write fashion: 1. Convert the `RegionMetadata` into `RegionDescriptor` which is more convenient to mutate 2. Apply the `AlterOperation` to the `RegionDescriptor`. This would mutate the descriptor in-place 3. Create a `RegionMetadataBuilder` from the descriptor, bump the version and then build the new metadata * feat: Implement altering table using the new Region::alter api * refactor: Replaced wal name by region id Region id is cheaper to clone than name * chore: Remove pub(crate) of build_xxxx in engine mod * style: fix clippy * test: Add tests for AlterOperation and RegionMetadata::alter * chore: ColumnsMetadataBuilder methods return &mut Self --- src/storage/src/error.rs | 18 +- src/storage/src/metadata.rs | 433 +++++++++++++----- src/storage/src/region.rs | 22 +- src/storage/src/region/writer.rs | 24 +- src/storage/src/test_util/descriptor_util.rs | 28 +- src/storage/src/wal.rs | 43 +- src/store-api/src/storage.rs | 4 +- src/store-api/src/storage/metadata.rs | 3 + src/store-api/src/storage/region.rs | 8 +- src/store-api/src/storage/requests.rs | 157 ++++++- src/table-engine/src/engine.rs | 4 +- src/table-engine/src/table.rs | 78 ++-- .../src/table/test_util/mock_engine.rs | 20 +- 13 files changed, 644 insertions(+), 198 deletions(-) diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index af6cc3e1ad..ce415e60c6 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -8,7 +8,7 @@ use datatypes::arrow::error::ArrowError; use serde_json::error::Error as JsonError; use store_api::manifest::action::ProtocolVersion; use store_api::manifest::ManifestVersion; -use store_api::storage::SequenceNumber; +use store_api::storage::{RegionId, SequenceNumber}; use crate::metadata::Error as MetadataError; @@ -102,9 +102,13 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to write WAL, WAL name: {}, source: {}", name, source))] + #[snafu(display( + "Failed to write WAL, WAL region_id: {}, source: {}", + region_id, + source + ))] WriteWal { - name: String, + region_id: RegionId, #[snafu(backtrace)] source: BoxedError, }, @@ -187,16 +191,16 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to read WAL, name: {}, source: {}", name, source))] + #[snafu(display("Failed to read WAL, region_id: {}, source: {}", region_id, source))] ReadWal { - name: String, + region_id: RegionId, #[snafu(backtrace)] source: BoxedError, }, - #[snafu(display("WAL data corrupted, name: {}, message: {}", name, message))] + #[snafu(display("WAL data corrupted, region_id: {}, message: {}", region_id, message))] WalDataCorrupted { - name: String, + region_id: RegionId, message: String, backtrace: Backtrace, }, diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index 29571a9051..b3f142d735 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -7,8 +7,10 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; use store_api::storage::{ consts::{self, ReservedColumnId}, - ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, ColumnId, - ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema, SchemaRef, + AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, + ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, + RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor, RowKeyDescriptorBuilder, + Schema, SchemaRef, }; use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata}; @@ -40,6 +42,13 @@ pub enum Error { #[snafu(display("Missing timestamp key column"))] MissingTimestamp { backtrace: Backtrace }, + + #[snafu(display("Expect altering metadata with version {}, given {}", expect, given))] + InvalidVersion { + expect: VersionNumber, + given: VersionNumber, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -62,6 +71,10 @@ impl RegionMeta for RegionMetaImpl { fn schema(&self) -> &SchemaRef { self.metadata.user_schema() } + + fn version(&self) -> u32 { + self.metadata.version + } } pub type VersionNumber = u32; @@ -108,6 +121,52 @@ impl RegionMetadata { pub fn version(&self) -> u32 { self.schema.version() } + + /// Returns a new [RegionMetadata] after alteration, leave `self` unchanged. + pub fn alter(&self, req: &AlterRequest) -> Result { + ensure!( + req.version == self.version, + InvalidVersionSnafu { + expect: req.version, + given: self.version, + } + ); + + let mut desc = self.to_descriptor(); + // Apply the alter operation to the descriptor. + req.operation.apply(&mut desc); + + RegionMetadataBuilder::try_from(desc)? + .version(self.version + 1) // Bump the metadata version. + .build() + } + + fn to_descriptor(&self) -> RegionDescriptor { + let row_key = self.columns.to_row_key_descriptor(); + let mut builder = RegionDescriptorBuilder::default() + .id(self.id) + .name(&self.name) + .row_key(row_key); + + for (cf_id, cf) in &self.column_families.id_to_cfs { + let mut cf_builder = ColumnFamilyDescriptorBuilder::default() + .cf_id(*cf_id) + .name(&cf.name); + for column in &self.columns.columns[cf.column_index_start..cf.column_index_end] { + cf_builder = cf_builder.push_column(column.desc.clone()); + } + // It should always be able to build the descriptor back. + let desc = cf_builder.build().unwrap(); + if *cf_id == consts::DEFAULT_CF_ID { + builder = builder.default_cf(desc); + } else { + builder = builder.push_extra_column_family(desc); + } + } + + // We could ensure all fields are set here. + builder.build().unwrap() + } } pub type RegionMetadataRef = Arc; @@ -238,6 +297,20 @@ impl ColumnsMetadata { pub fn column_metadata(&self, idx: usize) -> &ColumnMetadata { &self.columns[idx] } + + fn to_row_key_descriptor(&self) -> RowKeyDescriptor { + let mut builder = + RowKeyDescriptorBuilder::default().enable_version_column(self.enable_version_column); + for (idx, column) in self.iter_row_key_columns().enumerate() { + // Not a timestamp column. + if idx != self.timestamp_key_index { + builder = builder.push_column(column.desc.clone()); + } + } + builder = builder.timestamp(self.column_metadata(self.timestamp_key_index).desc.clone()); + // Since the metadata is built from descriptor, so it should always be able to build the descriptor back. + builder.build().unwrap() + } } pub type ColumnsMetadataRef = Arc; @@ -315,12 +388,10 @@ pub struct ColumnFamilyMetadata { pub column_index_end: usize, } -impl TryFrom for RegionMetadata { +impl TryFrom for RegionMetadataBuilder { type Error = Error; - fn try_from(desc: RegionDescriptor) -> Result { - // Doesn't set version explicitly here, because this is a new region meta - // created from descriptor, using initial version is reasonable. + fn try_from(desc: RegionDescriptor) -> Result { let mut builder = RegionMetadataBuilder::new() .name(desc.name) .id(desc.id) @@ -330,17 +401,25 @@ impl TryFrom for RegionMetadata { builder = builder.add_column_family(cf)?; } + Ok(builder) + } +} + +impl TryFrom for RegionMetadata { + type Error = Error; + + fn try_from(desc: RegionDescriptor) -> Result { + // Doesn't set version explicitly here, because this is a new region meta + // created from descriptor, using initial version is reasonable. + let builder = RegionMetadataBuilder::try_from(desc)?; + builder.build() } } -// TODO(yingwen): Add a builder to build ColumnMetadata and refactor this builder. #[derive(Default)] -struct RegionMetadataBuilder { - id: RegionId, - name: String, +struct ColumnsMetadataBuilder { columns: Vec, - column_schemas: Vec, name_to_col_index: HashMap, /// Column id set, used to validate column id uniqueness. column_ids: HashSet, @@ -349,27 +428,10 @@ struct RegionMetadataBuilder { row_key_end: usize, timestamp_key_index: Option, enable_version_column: bool, - - id_to_cfs: HashMap, - cf_names: HashSet, } -impl RegionMetadataBuilder { - fn new() -> RegionMetadataBuilder { - RegionMetadataBuilder::default() - } - - fn name(mut self, name: impl Into) -> Self { - self.name = name.into(); - self - } - - fn id(mut self, id: RegionId) -> Self { - self.id = id; - self - } - - fn row_key(mut self, key: RowKeyDescriptor) -> Result { +impl ColumnsMetadataBuilder { + fn row_key(&mut self, key: RowKeyDescriptor) -> Result<&mut Self> { for col in key.columns { self.push_row_key_column(col)?; } @@ -390,7 +452,77 @@ impl RegionMetadataBuilder { Ok(self) } - fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result { + fn push_row_key_column(&mut self, desc: ColumnDescriptor) -> Result<&mut Self> { + self.push_value_column(consts::KEY_CF_ID, desc) + } + + fn push_value_column( + &mut self, + cf_id: ColumnFamilyId, + desc: ColumnDescriptor, + ) -> Result<&mut Self> { + ensure!( + !is_internal_value_column(&desc.name), + ReservedColumnSnafu { name: &desc.name } + ); + + self.push_new_column(cf_id, desc) + } + + fn push_new_column( + &mut self, + cf_id: ColumnFamilyId, + desc: ColumnDescriptor, + ) -> Result<&mut Self> { + ensure!( + !self.name_to_col_index.contains_key(&desc.name), + ColNameExistsSnafu { name: &desc.name } + ); + ensure!( + !self.column_ids.contains(&desc.id), + ColIdExistsSnafu { id: desc.id } + ); + + let column_name = desc.name.clone(); + let column_id = desc.id; + let meta = ColumnMetadata { cf_id, desc }; + + let column_index = self.columns.len(); + self.columns.push(meta); + self.name_to_col_index.insert(column_name, column_index); + self.column_ids.insert(column_id); + + Ok(self) + } + + fn build(mut self) -> Result { + let timestamp_key_index = self.timestamp_key_index.context(MissingTimestampSnafu)?; + + let user_column_end = self.columns.len(); + // Setup internal columns. + for internal_desc in internal_column_descs() { + self.push_new_column(consts::DEFAULT_CF_ID, internal_desc)?; + } + + Ok(ColumnsMetadata { + columns: self.columns, + name_to_col_index: self.name_to_col_index, + row_key_end: self.row_key_end, + timestamp_key_index, + enable_version_column: self.enable_version_column, + user_column_end, + }) + } +} + +#[derive(Default)] +struct ColumnFamiliesMetadataBuilder { + id_to_cfs: HashMap, + cf_names: HashSet, +} + +impl ColumnFamiliesMetadataBuilder { + fn add_column_family(&mut self, cf: ColumnFamilyMetadata) -> Result<&mut Self> { ensure!( !self.id_to_cfs.contains_key(&cf.cf_id), CfIdExistsSnafu { id: cf.cf_id } @@ -401,12 +533,68 @@ impl RegionMetadataBuilder { CfNameExistsSnafu { name: &cf.name } ); - let column_index_start = self.columns.len(); - let column_index_end = column_index_start + cf.columns.len(); - for col in cf.columns { - self.push_value_column(cf.cf_id, col)?; - } + self.cf_names.insert(cf.name.clone()); + self.id_to_cfs.insert(cf.cf_id, cf); + Ok(self) + } + + fn build(self) -> ColumnFamiliesMetadata { + ColumnFamiliesMetadata { + id_to_cfs: self.id_to_cfs, + } + } +} + +struct RegionMetadataBuilder { + id: RegionId, + name: String, + columns_meta_builder: ColumnsMetadataBuilder, + cfs_meta_builder: ColumnFamiliesMetadataBuilder, + version: VersionNumber, +} + +impl Default for RegionMetadataBuilder { + fn default() -> RegionMetadataBuilder { + RegionMetadataBuilder::new() + } +} + +impl RegionMetadataBuilder { + fn new() -> RegionMetadataBuilder { + RegionMetadataBuilder { + id: 0, + name: String::new(), + columns_meta_builder: ColumnsMetadataBuilder::default(), + cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(), + version: Schema::INITIAL_VERSION, + } + } + + fn name(mut self, name: impl Into) -> Self { + self.name = name.into(); + self + } + + fn id(mut self, id: RegionId) -> Self { + self.id = id; + self + } + + fn version(mut self, version: VersionNumber) -> Self { + self.version = version; + self + } + + fn row_key(mut self, key: RowKeyDescriptor) -> Result { + self.columns_meta_builder.row_key(key)?; + + Ok(self) + } + + fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result { + let column_index_start = self.columns_meta_builder.columns.len(); + let column_index_end = column_index_start + cf.columns.len(); let cf_meta = ColumnFamilyMetadata { name: cf.name.clone(), cf_id: cf.cf_id, @@ -414,85 +602,29 @@ impl RegionMetadataBuilder { column_index_end, }; - self.id_to_cfs.insert(cf.cf_id, cf_meta); - self.cf_names.insert(cf.name); + self.cfs_meta_builder.add_column_family(cf_meta)?; + + for col in cf.columns { + self.columns_meta_builder.push_value_column(cf.cf_id, col)?; + } Ok(self) } - fn build(mut self) -> Result { - let timestamp_key_index = self.timestamp_key_index.context(MissingTimestampSnafu)?; - - let user_column_end = self.columns.len(); - // Setup internal columns. - for internal_desc in internal_column_descs() { - self.push_new_column(consts::DEFAULT_CF_ID, internal_desc)?; - } - - let columns = Arc::new(ColumnsMetadata { - columns: self.columns, - name_to_col_index: self.name_to_col_index, - row_key_end: self.row_key_end, - timestamp_key_index, - enable_version_column: self.enable_version_column, - user_column_end, - }); - let schema = Arc::new( - RegionSchema::new(columns.clone(), Schema::INITIAL_VERSION) - .context(InvalidSchemaSnafu)?, - ); + fn build(self) -> Result { + let columns = Arc::new(self.columns_meta_builder.build()?); + let schema = + Arc::new(RegionSchema::new(columns.clone(), self.version).context(InvalidSchemaSnafu)?); Ok(RegionMetadata { id: self.id, name: self.name, schema, columns, - column_families: ColumnFamiliesMetadata { - id_to_cfs: self.id_to_cfs, - }, - version: 0, + column_families: self.cfs_meta_builder.build(), + version: self.version, }) } - - // Helper methods: - - fn push_row_key_column(&mut self, desc: ColumnDescriptor) -> Result<()> { - self.push_value_column(consts::KEY_CF_ID, desc) - } - - fn push_value_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> { - ensure!( - !is_internal_value_column(&desc.name), - ReservedColumnSnafu { name: &desc.name } - ); - - self.push_new_column(cf_id, desc) - } - - fn push_new_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> { - ensure!( - !self.name_to_col_index.contains_key(&desc.name), - ColNameExistsSnafu { name: &desc.name } - ); - ensure!( - !self.column_ids.contains(&desc.id), - ColIdExistsSnafu { id: desc.id } - ); - - let column_schema = ColumnSchema::from(&desc); - - let column_name = desc.name.clone(); - let column_id = desc.id; - let meta = ColumnMetadata { cf_id, desc }; - - let column_index = self.columns.len(); - self.columns.push(meta); - self.column_schemas.push(column_schema); - self.name_to_col_index.insert(column_name, column_index); - self.column_ids.insert(column_id); - - Ok(()) - } } fn version_column_desc() -> ColumnDescriptor { @@ -541,7 +673,8 @@ fn is_internal_value_column(column_name: &str) -> bool { mod tests { use datatypes::type_id::LogicalTypeId; use store_api::storage::{ - ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, RowKeyDescriptorBuilder, + AddColumn, AlterOperation, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, + RowKeyDescriptorBuilder, }; use super::*; @@ -795,4 +928,96 @@ mod tests { let converted = RegionMetadata::try_from(raw).unwrap(); assert_eq!(metadata, converted); } + + #[test] + fn test_alter_metadata_add_columns() { + let region_name = "region-0"; + let builder = RegionDescBuilder::new(region_name) + .enable_version_column(false) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_value_column(("v1", LogicalTypeId::Float32, true)); + let last_column_id = builder.last_column_id(); + let metadata: RegionMetadata = builder.build().try_into().unwrap(); + + let req = AlterRequest { + operation: AlterOperation::AddColumns { + columns: vec![ + AddColumn { + desc: ColumnDescriptorBuilder::new( + last_column_id + 1, + "k2", + ConcreteDataType::int32_datatype(), + ) + .is_nullable(false) + .build() + .unwrap(), + is_key: true, + }, + AddColumn { + desc: ColumnDescriptorBuilder::new( + last_column_id + 2, + "v2", + ConcreteDataType::float32_datatype(), + ) + .build() + .unwrap(), + is_key: false, + }, + ], + }, + version: 0, + }; + let metadata = metadata.alter(&req).unwrap(); + + let builder: RegionMetadataBuilder = RegionDescBuilder::new(region_name) + .enable_version_column(false) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_value_column(("v1", LogicalTypeId::Float32, true)) + .push_key_column(("k2", LogicalTypeId::Int32, false)) + .push_value_column(("v2", LogicalTypeId::Float32, true)) + .build() + .try_into() + .unwrap(); + let expect = builder.version(1).build().unwrap(); + assert_eq!(expect, metadata); + } + + #[test] + fn test_alter_metadata_drop_columns() { + let region_name = "region-0"; + let metadata: RegionMetadata = RegionDescBuilder::new(region_name) + .enable_version_column(false) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_key_column(("k2", LogicalTypeId::Int32, false)) + .push_value_column(("v1", LogicalTypeId::Float32, true)) + .push_value_column(("v2", LogicalTypeId::Float32, true)) + .build() + .try_into() + .unwrap(); + + let req = AlterRequest { + operation: AlterOperation::DropColumns { + names: vec![ + String::from("k1"), // k1 would be ignored. + String::from("v1"), + ], + }, + version: 0, + }; + let metadata = metadata.alter(&req).unwrap(); + + let builder = RegionDescBuilder::new(region_name) + .enable_version_column(false) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_key_column(("k2", LogicalTypeId::Int32, false)); + let last_column_id = builder.last_column_id() + 1; + let builder: RegionMetadataBuilder = builder + .set_last_column_id(last_column_id) // This id is reserved for v1 + .push_value_column(("v2", LogicalTypeId::Float32, true)) + .build() + .try_into() + .unwrap(); + let expect = builder.version(1).build().unwrap(); + assert_eq!(expect, metadata); + } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index c693d821ba..b30be5af28 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -12,7 +12,8 @@ use store_api::manifest::{ self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator, }; use store_api::storage::{ - OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse, + AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext, + WriteResponse, }; use crate::error::{self, Error, Result}; @@ -23,7 +24,7 @@ use crate::manifest::{ }; use crate::memtable::MemtableBuilderRef; use crate::metadata::{RegionMetaImpl, RegionMetadata}; -pub use crate::region::writer::{RegionWriter, RegionWriterRef, WriterContext}; +pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext}; use crate::snapshot::SnapshotImpl; use crate::sst::AccessLayerRef; use crate::version::VersionEdit; @@ -75,6 +76,10 @@ impl Region for RegionImpl { fn write_request(&self) -> Self::WriteRequest { WriteBatch::new(self.in_memory_metadata().schema().clone()) } + + async fn alter(&self, request: AlterRequest) -> Result<()> { + self.inner.alter(request).await + } } /// Storage related config for region. @@ -343,6 +348,7 @@ impl RegionInner { } async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result { + // FIXME(yingwen): [alter] The schema may be outdated. let metadata = self.in_memory_metadata(); let schema = metadata.schema(); // Only compare column schemas. @@ -365,4 +371,16 @@ impl RegionInner { // Now altering schema is not allowed, so it is safe to validate schema outside of the lock. self.writer.write(ctx, request, writer_ctx).await } + + async fn alter(&self, request: AlterRequest) -> Result<()> { + // TODO(yingwen): [alter] Log the request. + + let alter_ctx = AlterContext { + shared: &self.shared, + wal: &self.wal, + manifest: &self.manifest, + }; + + self.writer.alter(alter_ctx, request).await + } } diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 3a6b3a6222..e9d7551a43 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -5,7 +5,7 @@ use common_time::RangeMillis; use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::LogStore; -use store_api::storage::{SequenceNumber, WriteContext, WriteRequest, WriteResponse}; +use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteRequest, WriteResponse}; use tokio::sync::Mutex; use crate::background::JobHandle; @@ -92,6 +92,22 @@ impl RegionWriter { inner.replay(&self.version_mutex, writer_ctx).await } + /// Alter schema of the region. + pub async fn alter( + &self, + _alter_ctx: AlterContext<'_, S>, + _request: AlterRequest, + ) -> Result<()> { + // TODO(yingwen): [alter] implements alter: + // 1. acquire version lock + // 2. validate request + // 3. build schema based on new request + // 4. persist it into the region manifest + // 5. update version in VersionControl + + unimplemented!() + } + async fn persist_manifest_version( &self, wal: &Wal, @@ -136,6 +152,12 @@ impl<'a, S: LogStore> WriterContext<'a, S> { } } +pub struct AlterContext<'a, S: LogStore> { + pub shared: &'a SharedDataRef, + pub wal: &'a Wal, + pub manifest: &'a RegionManifest, +} + #[derive(Debug)] struct WriterInner { memtable_builder: MemtableBuilderRef, diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index 8fc1a31a3c..9d51a0b8b8 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -20,7 +20,7 @@ impl RegionDescBuilder { pub fn new>(name: T) -> Self { let key_builder = RowKeyDescriptorBuilder::new( ColumnDescriptorBuilder::new( - 2, + 1, test_util::TIMESTAMP_NAME, ConcreteDataType::timestamp_millis_datatype(), ) @@ -32,7 +32,7 @@ impl RegionDescBuilder { Self { id: 0, name: name.into(), - last_column_id: 2, + last_column_id: 1, key_builder, default_cf_builder: ColumnFamilyDescriptorBuilder::default(), } @@ -43,20 +43,9 @@ impl RegionDescBuilder { self } - // This will reset the row key builder, so should be called before `push_key_column()` - // and `enable_version_column()`, or just call after `new()`. - // - // NOTE(ning): it's now possible to change this function to: - // - // ``` - // self.key_builder.timestamp(self.new_column(column_def)) - // ``` - // to resolve the constraint above - pub fn timestamp(mut self, column_def: ColumnDef) -> Self { - let builder = RowKeyDescriptorBuilder::new(self.new_column(column_def)); - - self.key_builder = builder; + let column = self.new_column(column_def); + self.key_builder = self.key_builder.timestamp(column); self } @@ -77,6 +66,11 @@ impl RegionDescBuilder { self } + pub fn set_last_column_id(mut self, column_id: ColumnId) -> Self { + self.last_column_id = column_id; + self + } + pub fn build(self) -> RegionDescriptor { RegionDescriptor { id: self.id, @@ -87,6 +81,10 @@ impl RegionDescBuilder { } } + pub fn last_column_id(&self) -> ColumnId { + self.last_column_id + } + fn alloc_column_id(&mut self) -> ColumnId { self.last_column_id += 1; self.last_column_id diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 637ad4d79b..4f2dae1dab 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -26,7 +26,7 @@ use crate::{ #[derive(Debug)] pub struct Wal { - name: String, + region_id: RegionId, namespace: S::Namespace, store: Arc, } @@ -35,11 +35,11 @@ pub type WriteBatchStream<'a> = Pin< Box)>> + Send + 'a>, >; -// wal should be cheap to clone +// Wal should be cheap to clone, so avoid holding things like String, Vec. impl Clone for Wal { fn clone(&self) -> Self { Self { - name: self.name.clone(), + region_id: self.region_id, namespace: self.namespace.clone(), store: self.store.clone(), } @@ -50,15 +50,15 @@ impl Wal { pub fn new(region_id: RegionId, store: Arc) -> Self { let namespace = store.namespace(region_id); Self { - name: region_id.to_string(), + region_id, namespace, store, } } #[inline] - pub fn name(&self) -> &str { - &self.name + pub fn region_id(&self) -> RegionId { + self.region_id } } @@ -102,7 +102,9 @@ impl Wal { encoder .encode(batch, &mut buf) .map_err(BoxedError::new) - .context(error::WriteWalSnafu { name: self.name() })?; + .context(error::WriteWalSnafu { + region_id: self.region_id(), + })?; } else if let Payload::WriteBatchProto(batch) = payload { // entry let encoder = WriteBatchProtobufEncoder {}; @@ -110,7 +112,9 @@ impl Wal { encoder .encode(batch, &mut buf) .map_err(BoxedError::new) - .context(error::WriteWalSnafu { name: self.name() })?; + .context(error::WriteWalSnafu { + region_id: self.region_id(), + })?; } // write bytes to wal @@ -123,9 +127,12 @@ impl Wal { .read(&self.namespace, start_seq) .await .map_err(BoxedError::new) - .context(error::ReadWalSnafu { name: self.name() })? + .context(error::ReadWalSnafu { + region_id: self.region_id(), + })? + // Handle the error when reading from the stream. .map_err(|e| Error::ReadWal { - name: self.name().to_string(), + region_id: self.region_id(), source: BoxedError::new(e), }) .and_then(|entries| async { @@ -146,7 +153,9 @@ impl Wal { .append(e) .await .map_err(BoxedError::new) - .context(error::WriteWalSnafu { name: self.name() })?; + .context(error::WriteWalSnafu { + region_id: self.region_id(), + })?; Ok((res.entry_id(), res.offset())) } @@ -164,7 +173,7 @@ impl Wal { ensure!( data_pos <= input.len(), error::WalDataCorruptedSnafu { - name: self.name(), + region_id: self.region_id(), message: format!( "Not enough input buffer, expected data position={}, actual buffer length={}", data_pos, @@ -181,7 +190,9 @@ impl Wal { let write_batch = decoder .decode(&input[data_pos..]) .map_err(BoxedError::new) - .context(error::ReadWalSnafu { name: self.name() })?; + .context(error::ReadWalSnafu { + region_id: self.region_id(), + })?; Ok((seq_num, header, Some(write_batch))) } @@ -191,12 +202,14 @@ impl Wal { let write_batch = decoder .decode(&input[data_pos..]) .map_err(BoxedError::new) - .context(error::ReadWalSnafu { name: self.name() })?; + .context(error::ReadWalSnafu { + region_id: self.region_id(), + })?; Ok((seq_num, header, Some(write_batch))) } _ => error::WalDataCorruptedSnafu { - name: self.name(), + region_id: self.region_id(), message: format!("invalid payload type={}", header.payload_type), } .fail(), diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index acc50deb2c..ff7169a01a 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -21,7 +21,9 @@ pub use self::descriptors::*; pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine}; pub use self::metadata::RegionMeta; pub use self::region::{Region, WriteContext}; -pub use self::requests::{GetRequest, PutOperation, ScanRequest, WriteRequest}; +pub use self::requests::{ + AddColumn, AlterOperation, AlterRequest, GetRequest, PutOperation, ScanRequest, WriteRequest, +}; pub use self::responses::{GetResponse, ScanResponse, WriteResponse}; pub use self::snapshot::{ReadContext, Snapshot}; pub use self::types::{OpType, SequenceNumber}; diff --git a/src/store-api/src/storage/metadata.rs b/src/store-api/src/storage/metadata.rs index a2b434bf19..bf2ef6537e 100644 --- a/src/store-api/src/storage/metadata.rs +++ b/src/store-api/src/storage/metadata.rs @@ -4,4 +4,7 @@ use crate::storage::SchemaRef; pub trait RegionMeta: Send + Sync { /// Returns the schema of the region. fn schema(&self) -> &SchemaRef; + + /// Returns the version of the region metadata. + fn version(&self) -> u32; } diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index b7373a5603..c223f927e5 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -23,10 +23,10 @@ use common_error::ext::ErrorExt; use crate::storage::engine::OpenOptions; use crate::storage::metadata::RegionMeta; -use crate::storage::requests::WriteRequest; +use crate::storage::requests::{AlterRequest, WriteRequest}; use crate::storage::responses::WriteResponse; use crate::storage::snapshot::{ReadContext, Snapshot}; -use crate::storage::{RegionDescriptor, RegionId}; +use crate::storage::RegionId; /// Chunks of rows in storage engine. #[async_trait] @@ -57,9 +57,7 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { /// Create write request fn write_request(&self) -> Self::WriteRequest; - fn alter(&self, _descriptor: RegionDescriptor) -> Result<(), Self::Error> { - unimplemented!() - } + async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>; } /// Context for write operations. diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 533c19858b..cc17ebccbe 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::time::Duration; use common_error::ext::ErrorExt; @@ -5,7 +6,7 @@ use common_query::logical_plan::Expr; use common_time::RangeMillis; use datatypes::vectors::VectorRef; -use crate::storage::SequenceNumber; +use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber}; /// Write request holds a collection of updates to apply to a region. pub trait WriteRequest: Send { @@ -50,3 +51,157 @@ pub struct ScanRequest { #[derive(Debug)] pub struct GetRequest {} + +/// Operation to add a column. +#[derive(Debug)] +pub struct AddColumn { + /// Descriptor of the column to add. + pub desc: ColumnDescriptor, + /// Is the column a key column. + pub is_key: bool, +} + +/// Operation to alter a region. +#[derive(Debug)] +pub enum AlterOperation { + /// Add columns to the region. + AddColumns { + /// Columns to add. + columns: Vec, + }, + /// Drop columns from the region, only value columns are allowed to drop. + DropColumns { + /// Name of columns to drop. + names: Vec, + }, +} + +impl AlterOperation { + /// Apply the operation to the [RegionDescriptor]. + pub fn apply(&self, descriptor: &mut RegionDescriptor) { + match self { + AlterOperation::AddColumns { columns } => { + Self::apply_add(columns, descriptor); + } + AlterOperation::DropColumns { names } => { + Self::apply_drop(names, descriptor); + } + } + } + + /// Add `columns` to the [RegionDescriptor]. + /// + /// Value columns would be added to the default column family. + fn apply_add(columns: &[AddColumn], descriptor: &mut RegionDescriptor) { + for col in columns { + if col.is_key { + descriptor.row_key.columns.push(col.desc.clone()); + } else { + descriptor.default_cf.columns.push(col.desc.clone()); + } + } + } + + /// Drop columns from the [RegionDescriptor] by their `names`. + /// + /// Only value columns would be removed, non-value columns in `names` would be ignored. + fn apply_drop(names: &[String], descriptor: &mut RegionDescriptor) { + let name_set: HashSet<_> = names.iter().collect(); + // Remove columns in the default cf. + descriptor + .default_cf + .columns + .retain(|col| !name_set.contains(&col.name)); + // Remove columns in other cfs. + for cf in &mut descriptor.extra_cfs { + cf.columns.retain(|col| !name_set.contains(&col.name)); + } + } +} + +/// Alter region request. +#[derive(Debug)] +pub struct AlterRequest { + /// Operation to do. + pub operation: AlterOperation, + /// The version of the schema before applying the alteration. + pub version: u32, +} + +#[cfg(test)] +mod tests { + use datatypes::prelude::*; + + use super::*; + use crate::storage::{ + ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId, RegionDescriptorBuilder, + RowKeyDescriptorBuilder, + }; + + fn new_column_desc(id: ColumnId) -> ColumnDescriptor { + ColumnDescriptorBuilder::new(id, id.to_string(), ConcreteDataType::int64_datatype()) + .is_nullable(false) + .build() + .unwrap() + } + + fn new_region_descriptor() -> RegionDescriptor { + let row_key = RowKeyDescriptorBuilder::default() + .timestamp(new_column_desc(1)) + .build() + .unwrap(); + let default_cf = ColumnFamilyDescriptorBuilder::default() + .push_column(new_column_desc(2)) + .build() + .unwrap(); + + RegionDescriptorBuilder::default() + .id(1) + .name("test") + .row_key(row_key) + .default_cf(default_cf) + .build() + .unwrap() + } + + #[test] + fn test_alter_operation() { + let mut desc = new_region_descriptor(); + + let op = AlterOperation::AddColumns { + columns: vec![ + AddColumn { + desc: new_column_desc(3), + is_key: true, + }, + AddColumn { + desc: new_column_desc(4), + is_key: false, + }, + ], + }; + op.apply(&mut desc); + + assert_eq!(1, desc.row_key.columns.len()); + assert_eq!("3", desc.row_key.columns[0].name); + assert_eq!(2, desc.default_cf.columns.len()); + assert_eq!("2", desc.default_cf.columns[0].name); + assert_eq!("4", desc.default_cf.columns[1].name); + + let op = AlterOperation::DropColumns { + names: vec![String::from("2")], + }; + op.apply(&mut desc); + assert_eq!(1, desc.row_key.columns.len()); + assert_eq!(1, desc.default_cf.columns.len()); + assert_eq!("4", desc.default_cf.columns[0].name); + + // Key columns are ignored. + let op = AlterOperation::DropColumns { + names: vec![String::from("1"), String::from("3")], + }; + op.apply(&mut desc); + assert_eq!(1, desc.row_key.columns.len()); + assert_eq!(1, desc.default_cf.columns.len()); + } +} diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index cd985920c8..1097750e23 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -126,7 +126,7 @@ struct MitoEngineInner { table_mutex: Mutex<()>, } -pub(crate) fn build_row_key_desc( +fn build_row_key_desc( mut column_id: ColumnId, table_name: &str, table_schema: &SchemaRef, @@ -189,7 +189,7 @@ pub(crate) fn build_row_key_desc( )) } -pub(crate) fn build_column_family( +fn build_column_family( mut column_id: ColumnId, table_name: &str, table_schema: &SchemaRef, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 2074ba1096..fd5f6613a9 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -23,10 +23,9 @@ use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; -use store_api::storage::RegionDescriptorBuilder; use store_api::storage::{ - ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, - WriteContext, WriteRequest, + AddColumn, AlterOperation, AlterRequest, ChunkReader, ColumnDescriptorBuilder, PutOperation, + ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, }; use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder}; @@ -37,7 +36,6 @@ use table::{ }; use tokio::sync::Mutex; -use crate::engine::{build_column_family, build_row_key_desc, INIT_COLUMN_ID}; use crate::error::{ self, ProjectedColumnNotFoundSnafu, Result, ScanTableManifestSnafu, SchemaBuildSnafu, TableInfoNotFoundSnafu, UnsupportedDefaultConstraintSnafu, UpdateTableManifestSnafu, @@ -184,31 +182,43 @@ impl Table for MitoTable { let table_info = self.table_info(); let table_name = &table_info.name; let table_meta = &table_info.meta; - let table_schema = match &req.alter_kind { + let (alter_op, table_schema) = match &req.alter_kind { AlterKind::AddColumn { new_column } => { - build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)? + let desc = ColumnDescriptorBuilder::new( + table_meta.next_column_id, + &new_column.name, + new_column.data_type.clone(), + ) + .is_nullable(new_column.is_nullable) + .default_constraint(new_column.default_constraint.clone()) + .build() + .context(error::BuildColumnDescriptorSnafu { + table_name, + column_name: &new_column.name, + })?; + let alter_op = AlterOperation::AddColumns { + columns: vec![AddColumn { + desc, + // TODO(yingwen): [alter] AlterTableRequest should be able to add a key column. + is_key: false, + }], + }; + + // TODO(yingwen): [alter] Better way to alter the schema struct. In fact the column order + // in table schema could differ from the region schema, so we could just push this column + // to the back of the schema (as last column). + let table_schema = + build_table_schema_with_new_column(table_name, &table_meta.schema, new_column)?; + + (alter_op, table_schema) } }; - let primary_key_indices = &table_meta.primary_key_indices; - let (next_column_id, default_cf) = build_column_family( - INIT_COLUMN_ID, - table_name, - &table_schema, - primary_key_indices, - )?; - let (next_column_id, row_key) = build_row_key_desc( - next_column_id, - table_name, - &table_schema, - primary_key_indices, - )?; - let new_meta = TableMetaBuilder::default() .schema(table_schema.clone()) .engine(&table_meta.engine) - .next_column_id(next_column_id) - .primary_key_indices(primary_key_indices.clone()) + .next_column_id(table_meta.next_column_id + 1) // Bump next column id. + .primary_key_indices(table_meta.primary_key_indices.clone()) .build() .context(error::BuildTableMetaSnafu { table_name })?; @@ -216,25 +226,21 @@ impl Table for MitoTable { new_info.ident.version = table_info.ident.version + 1; new_info.meta = new_meta; - // first alter region + // FIXME(yingwen): [alter] Alter the region, now this is a temporary implementation. let region = self.region(); - let region_descriptor = RegionDescriptorBuilder::default() - .id(region.id()) - .name(region.name()) - .row_key(row_key) - .default_cf(default_cf) - .build() - .context(error::BuildRegionDescriptorSnafu { - table_name, - region_name: region.name(), - })?; + let region_meta = region.in_memory_metadata(); + let alter_req = AlterRequest { + operation: alter_op, + version: region_meta.version(), + }; + logging::debug!( - "start altering region {} of table {}, with new region descriptor {:?}", + "start altering region {} of table {}, with request {:?}", region.name(), table_name, - region_descriptor + alter_req, ); - region.alter(region_descriptor).map_err(TableError::new)?; + region.alter(alter_req).await.map_err(TableError::new)?; // then alter table info logging::debug!( diff --git a/src/table-engine/src/table/test_util/mock_engine.rs b/src/table-engine/src/table/test_util/mock_engine.rs index 4512e74e85..0a3a34bb38 100644 --- a/src/table-engine/src/table/test_util/mock_engine.rs +++ b/src/table-engine/src/table/test_util/mock_engine.rs @@ -12,9 +12,9 @@ use datatypes::schema::{ColumnSchema, Schema}; use storage::metadata::{RegionMetaImpl, RegionMetadata}; use storage::write_batch::{Mutation, WriteBatch}; use store_api::storage::{ - Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, OpenOptions, - ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest, ScanResponse, - SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, + AlterRequest, Chunk, ChunkReader, CreateOptions, EngineContext, GetRequest, GetResponse, + OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, RegionMeta, ScanRequest, + ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, }; pub type Result = std::result::Result; @@ -115,13 +115,12 @@ impl Snapshot for MockSnapshot { // purpose the cost should be acceptable. #[derive(Debug, Clone)] pub struct MockRegion { - // FIXME(yingwen): Remove this once name is provided by metadata. - name: String, pub inner: Arc, } #[derive(Debug)] pub struct MockRegionInner { + name: String, pub metadata: ArcSwap, memtable: Arc>, } @@ -140,7 +139,7 @@ impl Region for MockRegion { } fn name(&self) -> &str { - &self.name + &self.inner.name } fn in_memory_metadata(&self) -> RegionMetaImpl { @@ -163,9 +162,12 @@ impl Region for MockRegion { WriteBatch::new(self.in_memory_metadata().schema().clone()) } - fn alter(&self, descriptor: RegionDescriptor) -> Result<()> { - let metadata = descriptor.try_into().unwrap(); + async fn alter(&self, request: AlterRequest) -> Result<()> { + let current = self.inner.metadata.load(); + // Mock engine just panic if failed to create a new metadata. + let metadata = current.alter(&request).unwrap(); self.inner.update_metadata(metadata); + Ok(()) } } @@ -177,6 +179,7 @@ impl MockRegionInner { memtable.insert(column.name.clone(), vec![]); } Self { + name: metadata.name().to_string(), metadata: ArcSwap::new(Arc::new(metadata)), memtable: Arc::new(RwLock::new(memtable)), } @@ -277,7 +280,6 @@ impl StorageEngine for MockEngine { let name = descriptor.name.clone(); let metadata = descriptor.try_into().unwrap(); let region = MockRegion { - name: name.clone(), inner: Arc::new(MockRegionInner::new(metadata)), }; regions.opened_regions.insert(name, region.clone());