diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index ce415e60c6..a4d625c3c5 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -257,6 +257,18 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Invalid alter request, source: {}", source))] + InvalidAlterRequest { + #[snafu(backtrace)] + source: MetadataError, + }, + + #[snafu(display("Failed to alter metadata, source: {}", source))] + AlterMetadata { + #[snafu(backtrace)] + source: MetadataError, + }, } pub type Result = std::result::Result; @@ -267,7 +279,6 @@ impl ErrorExt for Error { match self { InvalidScanIndex { .. } - | InvalidRegionDesc { .. } | InvalidInputSchema { .. } | BatchMissingColumn { .. } | BatchMissingTimestamp { .. } @@ -288,7 +299,8 @@ impl ErrorExt for Error { | SequenceNotMonotonic { .. } | ConvertStoreSchema { .. } | InvalidRawRegion { .. } - | FilterColumn { .. } => StatusCode::Unexpected, + | FilterColumn { .. } + | AlterMetadata { .. } => StatusCode::Unexpected, FlushIo { .. } | WriteParquet { .. } @@ -306,6 +318,9 @@ impl ErrorExt for Error { | InvalidRegionState { .. } | ReadWal { .. } => StatusCode::StorageUnavailable, + InvalidAlterRequest { source, .. } | InvalidRegionDesc { source, .. } => { + source.status_code() + } PushBatch { source, .. } => source.status_code(), } } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 34f31508cd..db9714cb2f 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -4,7 +4,6 @@ use async_trait::async_trait; use common_telemetry::logging; use common_time::RangeMillis; use store_api::logstore::LogStore; -use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; use store_api::storage::SequenceNumber; use uuid::Uuid; @@ -16,7 +15,6 @@ use crate::memtable::{IterContext, MemtableId, MemtableRef}; use crate::region::RegionWriterRef; use crate::region::SharedDataRef; use crate::sst::{AccessLayerRef, FileMeta, WriteOptions}; -use crate::version::VersionEdit; use crate::wal::Wal; /// Default write buffer size (32M). @@ -195,25 +193,23 @@ impl FlushJob { Ok(metas) } - async fn write_to_manifest(&self, file_metas: &[FileMeta]) -> Result { + async fn write_manifest_and_apply(&self, file_metas: &[FileMeta]) -> Result<()> { let edit = RegionEdit { region_version: self.shared.version_control.metadata().version(), flushed_sequence: self.flush_sequence, files_to_add: file_metas.to_vec(), files_to_remove: Vec::default(), }; - let prev_version = self.shared.version_control.current_manifest_version(); - logging::debug!( - "Write region edit: {:?} to manifest, prev_version: {}.", - edit, - prev_version, - ); - - let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); - action_list.set_prev_version(prev_version); - - self.manifest.update(action_list).await + self.writer + .write_edit_and_apply( + &self.wal, + &self.shared, + &self.manifest, + edit, + self.max_memtable_id, + ) + .await } /// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$` @@ -228,18 +224,7 @@ impl Job for FlushJob { async fn run(&mut self, ctx: &Context) -> Result<()> { let file_metas = self.write_memtables_to_layer(ctx).await?; - let manifest_version = self.write_to_manifest(&file_metas).await?; - - let edit = VersionEdit { - files_to_add: file_metas, - flushed_sequence: Some(self.flush_sequence), - manifest_version, - max_memtable_id: Some(self.max_memtable_id), - }; - - self.writer - .apply_version_edit(&self.wal, edit, &self.shared) - .await?; + self.write_manifest_and_apply(&file_metas).await?; Ok(()) } diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index b3f142d735..3085becc75 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -7,10 +7,10 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; use store_api::storage::{ consts::{self, ReservedColumnId}, - AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, - ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, RegionDescriptor, - RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor, RowKeyDescriptorBuilder, - Schema, SchemaRef, + AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, + ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnFamilyId, ColumnId, + RegionDescriptor, RegionDescriptorBuilder, RegionId, RegionMeta, RowKeyDescriptor, + RowKeyDescriptorBuilder, Schema, SchemaRef, }; use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata}; @@ -43,16 +43,46 @@ pub enum Error { #[snafu(display("Missing timestamp key column"))] MissingTimestamp { backtrace: Backtrace }, + // Variants for validating `AlterRequest`, which won't have a backtrace. #[snafu(display("Expect altering metadata with version {}, given {}", expect, given))] - InvalidVersion { + InvalidAlterVersion { expect: VersionNumber, given: VersionNumber, - backtrace: Backtrace, }, + + #[snafu(display("Failed to add column as there is already a column named {}", name))] + AddExistColumn { name: String }, + + #[snafu(display("Failed to add a non null column {}", name))] + AddNonNullColumn { name: String }, + + #[snafu(display("Failed to drop column as there is no column named {}", name))] + DropAbsentColumn { name: String }, + + #[snafu(display("Failed to drop column {} as it is part of key", name))] + DropKeyColumn { name: String }, + + #[snafu(display("Failed to drop column {} as it is an internal column", name))] + DropInternalColumn { name: String }, + // End of variants for validating `AlterRequest`. } pub type Result = std::result::Result; +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + StatusCode::InvalidArguments + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + /// Implementation of [RegionMeta]. /// /// Holds a snapshot of region metadata. @@ -122,16 +152,43 @@ impl RegionMetadata { self.schema.version() } - /// Returns a new [RegionMetadata] after alteration, leave `self` unchanged. - pub fn alter(&self, req: &AlterRequest) -> Result { + /// Checks whether the `req` is valid, returns `Err` if it is invalid. + pub fn validate_alter(&self, req: &AlterRequest) -> Result<()> { ensure!( req.version == self.version, - InvalidVersionSnafu { + InvalidAlterVersionSnafu { expect: req.version, given: self.version, } ); + match &req.operation { + AlterOperation::AddColumns { columns } => { + for col in columns { + self.validate_add_column(col)?; + } + } + AlterOperation::DropColumns { names } => { + for name in names { + self.validate_drop_column(name)?; + } + } + } + + Ok(()) + } + + /// Returns a new [RegionMetadata] after alteration, leave `self` unchanged. + /// + /// Caller should use [RegionMetadata::validate_alter] to validate the `req` and + /// ensure the version of the `req` is equal to the version of the metadata. + /// + /// # Panics + /// Panics if `req.version != self.version`. + pub fn alter(&self, req: &AlterRequest) -> Result { + // The req should have been validated before. + assert_eq!(req.version, self.version); + let mut desc = self.to_descriptor(); // Apply the alter operation to the descriptor. req.operation.apply(&mut desc); @@ -141,6 +198,46 @@ impl RegionMetadata { .build() } + fn validate_add_column(&self, add_column: &AddColumn) -> Result<()> { + // We don't check the case that the column is not nullable but default constraint is null. The + // caller should guarantee this. + ensure!( + add_column.desc.is_nullable || add_column.desc.default_constraint.is_some(), + AddNonNullColumnSnafu { + name: &add_column.desc.name, + } + ); + + // Use the store schema to check the column as it contains all internal columns. + let store_schema = self.schema.store_schema(); + ensure!( + !store_schema.contains_column(&add_column.desc.name), + AddExistColumnSnafu { + name: &add_column.desc.name, + } + ); + + Ok(()) + } + + fn validate_drop_column(&self, name: &str) -> Result<()> { + let store_schema = self.schema.store_schema(); + ensure!( + store_schema.contains_column(name), + DropAbsentColumnSnafu { name } + ); + ensure!( + !store_schema.is_key_column(name), + DropKeyColumnSnafu { name } + ); + ensure!( + store_schema.is_user_column(name), + DropInternalColumnSnafu { name } + ); + + Ok(()) + } + fn to_descriptor(&self) -> RegionDescriptor { let row_key = self.columns.to_row_key_descriptor(); let mut builder = RegionDescriptorBuilder::default() @@ -948,7 +1045,6 @@ mod tests { "k2", ConcreteDataType::int32_datatype(), ) - .is_nullable(false) .build() .unwrap(), is_key: true, @@ -967,13 +1063,14 @@ mod tests { }, version: 0, }; + metadata.validate_alter(&req).unwrap(); let metadata = metadata.alter(&req).unwrap(); let builder: RegionMetadataBuilder = RegionDescBuilder::new(region_name) .enable_version_column(false) .push_key_column(("k1", LogicalTypeId::Int32, false)) .push_value_column(("v1", LogicalTypeId::Float32, true)) - .push_key_column(("k2", LogicalTypeId::Int32, false)) + .push_key_column(("k2", LogicalTypeId::Int32, true)) .push_value_column(("v2", LogicalTypeId::Float32, true)) .build() .try_into() @@ -1020,4 +1117,118 @@ mod tests { let expect = builder.version(1).build().unwrap(); assert_eq!(expect, metadata); } + + #[test] + fn test_validate_alter_request() { + let builder = RegionDescBuilder::new("region-alter") + .enable_version_column(false) + .timestamp(("ts", LogicalTypeId::Timestamp, false)) + .push_key_column(("k0", LogicalTypeId::Int32, false)) + .push_value_column(("v0", LogicalTypeId::Float32, true)) + .push_value_column(("v1", LogicalTypeId::Float32, true)); + let last_column_id = builder.last_column_id(); + let metadata: RegionMetadata = builder.build().try_into().unwrap(); + + // Test request with different version. + let mut req = AlterRequest { + operation: AlterOperation::AddColumns { + columns: vec![AddColumn { + desc: ColumnDescriptorBuilder::new( + last_column_id + 1, + "k2", + ConcreteDataType::int32_datatype(), + ) + .build() + .unwrap(), + is_key: true, + }], + }, + version: 1, + }; + assert!(matches!( + metadata.validate_alter(&req).err().unwrap(), + Error::InvalidAlterVersion { .. } + )); + req.version = 0; + + // Add existing column. + req.operation = AlterOperation::AddColumns { + columns: vec![AddColumn { + desc: ColumnDescriptorBuilder::new( + last_column_id + 1, + "ts", + ConcreteDataType::int32_datatype(), + ) + .build() + .unwrap(), + is_key: false, + }], + }; + assert!(matches!( + metadata.validate_alter(&req).err().unwrap(), + Error::AddExistColumn { .. } + )); + + // Add non null column. + req.operation = AlterOperation::AddColumns { + columns: vec![AddColumn { + desc: ColumnDescriptorBuilder::new( + last_column_id + 1, + "v2", + ConcreteDataType::int32_datatype(), + ) + .is_nullable(false) + .build() + .unwrap(), + is_key: false, + }], + }; + assert!(matches!( + metadata.validate_alter(&req).err().unwrap(), + Error::AddNonNullColumn { .. } + )); + + // Drop absent column. + let mut req = AlterRequest { + operation: AlterOperation::DropColumns { + names: vec![String::from("v2")], + }, + version: 0, + }; + assert!(matches!( + metadata.validate_alter(&req).err().unwrap(), + Error::DropAbsentColumn { .. } + )); + + // Drop key column. + req.operation = AlterOperation::DropColumns { + names: vec![String::from("ts")], + }; + assert!(matches!( + metadata.validate_alter(&req).err().unwrap(), + Error::DropKeyColumn { .. } + )); + req.operation = AlterOperation::DropColumns { + names: vec![String::from("k0")], + }; + assert!(matches!( + metadata.validate_alter(&req).err().unwrap(), + Error::DropKeyColumn { .. } + )); + + // Drop internal column. + req.operation = AlterOperation::DropColumns { + names: vec![String::from(consts::SEQUENCE_COLUMN_NAME)], + }; + assert!(matches!( + metadata.validate_alter(&req).err().unwrap(), + Error::DropInternalColumn { .. } + )); + + // Valid request + req.operation = AlterOperation::DropColumns { + names: vec![String::from("v0")], + }; + metadata.validate_alter(&req).unwrap(); + } } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 4676b27abe..27ff8b6df7 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -1,5 +1,6 @@ //! Region tests. +mod alter; mod basic; mod flush; mod projection; @@ -24,16 +25,16 @@ use crate::test_util::{ }; use crate::write_batch::PutData; -/// Create metadata of a region with schema: (timestamp, v1). +/// Create metadata of a region with schema: (timestamp, v0). pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMetadata { let desc = RegionDescBuilder::new(region_name) .enable_version_column(enable_version_column) - .push_value_column(("v1", LogicalTypeId::Int64, true)) + .push_value_column(("v0", LogicalTypeId::Int64, true)) .build(); desc.try_into().unwrap() } -/// Test region with schema (timestamp, v1). +/// Test region with schema (timestamp, v0). pub struct TesterBase { pub region: RegionImpl, write_ctx: WriteContext, @@ -51,7 +52,7 @@ impl TesterBase { /// Put without version specified. /// - /// Format of data: (timestamp, v1), timestamp is key, v1 is value. + /// Format of data: (timestamp, v0), timestamp is key, v0 is value. pub async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { let data: Vec<(Timestamp, Option)> = data.iter().map(|(l, r)| ((*l).into(), *r)).collect(); @@ -98,7 +99,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { &[ (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), - ("v1", LogicalTypeId::Int64, true), + ("v0", LogicalTypeId::Int64, true), ], Some(0), ) @@ -106,7 +107,7 @@ fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { write_batch_util::new_write_batch( &[ (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), - ("v1", LogicalTypeId::Int64, true), + ("v0", LogicalTypeId::Int64, true), ], Some(0), ) @@ -122,7 +123,7 @@ fn new_put_data(data: &[(Timestamp, Option)]) -> PutData { put_data .add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps)) .unwrap(); - put_data.add_value_column("v1", Arc::new(values)).unwrap(); + put_data.add_value_column("v0", Arc::new(values)).unwrap(); put_data } @@ -149,7 +150,7 @@ async fn test_new_region() { let desc = RegionDescBuilder::new(region_name) .enable_version_column(true) .push_key_column(("k1", LogicalTypeId::Int32, false)) - .push_value_column(("v1", LogicalTypeId::Float32, true)) + .push_value_column(("v0", LogicalTypeId::Float32, true)) .build(); let metadata = desc.try_into().unwrap(); @@ -168,7 +169,7 @@ async fn test_new_region() { ("k1", LogicalTypeId::Int32, false), (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), - ("v1", LogicalTypeId::Float32, true), + ("v0", LogicalTypeId::Float32, true), ], Some(1), ); diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs new file mode 100644 index 0000000000..00026230ce --- /dev/null +++ b/src/storage/src/region/tests/alter.rs @@ -0,0 +1,144 @@ +use datatypes::prelude::ConcreteDataType; +use log_store::fs::log::LocalFileLogStore; +use store_api::storage::{ + AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, + Region, RegionMeta, SchemaRef, +}; +use tempdir::TempDir; + +use crate::region::tests::{self, FileTesterBase}; +use crate::region::RegionImpl; +use crate::test_util::config_util; + +const REGION_NAME: &str = "region-alter-0"; + +async fn create_region_for_alter(store_dir: &str) -> RegionImpl { + // Always disable version column in this test. + let metadata = tests::new_metadata(REGION_NAME, false); + + let store_config = config_util::new_store_config(REGION_NAME, store_dir).await; + + RegionImpl::create(metadata, store_config).await.unwrap() +} + +/// Tester for region alter. +struct AlterTester { + base: Option, +} + +impl AlterTester { + async fn new(store_dir: &str) -> AlterTester { + let region = create_region_for_alter(store_dir).await; + + AlterTester { + base: Some(FileTesterBase::with_region(region)), + } + } + + #[inline] + fn base(&self) -> &FileTesterBase { + self.base.as_ref().unwrap() + } + + fn schema(&self) -> SchemaRef { + let metadata = self.base().region.in_memory_metadata(); + metadata.schema().clone() + } + + /// Put data with initial schema. + async fn put_before_alter(&self, data: &[(i64, Option)]) { + self.base().put(data).await; + } + + async fn alter(&self, mut req: AlterRequest) { + let version = self.version(); + req.version = version; + + self.base().region.alter(req).await.unwrap(); + } + + fn version(&self) -> u32 { + let metadata = self.base().region.in_memory_metadata(); + metadata.version() + } +} + +fn new_column_desc(id: ColumnId, name: &str) -> ColumnDescriptor { + ColumnDescriptorBuilder::new(id, name, ConcreteDataType::int64_datatype()) + .is_nullable(true) + .build() + .unwrap() +} + +fn add_column_req(desc_and_is_key: &[(ColumnDescriptor, bool)]) -> AlterRequest { + let columns = desc_and_is_key + .iter() + .map(|(desc, is_key)| AddColumn { + desc: desc.clone(), + is_key: *is_key, + }) + .collect(); + let operation = AlterOperation::AddColumns { columns }; + + AlterRequest { + operation, + version: 0, + } +} + +fn drop_column_req(names: &[&str]) -> AlterRequest { + let names = names.iter().map(|s| s.to_string()).collect(); + let operation = AlterOperation::DropColumns { names }; + + AlterRequest { + operation, + version: 0, + } +} + +fn check_schema_names(schema: &SchemaRef, names: &[&str]) { + assert_eq!(names.len(), schema.num_columns()); + for (idx, name) in names.iter().enumerate() { + assert_eq!(*name, schema.column_name_by_index(idx)); + assert!(schema.column_schema_by_name(name).is_some()); + } +} + +#[tokio::test] +async fn test_alter_region() { + let dir = TempDir::new("alter-region").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let tester = AlterTester::new(store_dir).await; + + let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))]; + + tester.put_before_alter(&data).await; + + let schema = tester.schema(); + check_schema_names(&schema, &["timestamp", "v0"]); + + let req = add_column_req(&[ + (new_column_desc(4, "k0"), true), // key column k0 + (new_column_desc(5, "v1"), false), // value column v1 + ]); + tester.alter(req).await; + + let schema = tester.schema(); + check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]); + + let req = add_column_req(&[ + (new_column_desc(6, "v2"), false), + (new_column_desc(7, "v3"), false), + ]); + tester.alter(req).await; + + let schema = tester.schema(); + check_schema_names(&schema, &["k0", "timestamp", "v0", "v1", "v2", "v3"]); + + // Remove v0, v1 + let req = drop_column_req(&["v0", "v1"]); + tester.alter(req).await; + + let schema = tester.schema(); + check_schema_names(&schema, &["k0", "timestamp", "v2", "v3"]); +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index e9d7551a43..d3918989fb 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -5,12 +5,16 @@ use common_time::RangeMillis; use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::LogStore; -use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteRequest, WriteResponse}; +use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; +use store_api::storage::{AlterRequest, WriteContext, WriteRequest, WriteResponse}; use tokio::sync::Mutex; use crate::background::JobHandle; use crate::error::{self, Result}; use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef}; +use crate::manifest::action::{ + RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, +}; use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet}; use crate::proto::wal::WalHeader; use crate::region::RegionManifest; @@ -27,6 +31,7 @@ pub type RegionWriterRef = Arc; /// Region writer manages all write operations to the region. #[derive(Debug)] pub struct RegionWriter { + // To avoid dead lock, we need to ensure the lock order is: inner -> version_mutex. /// Inner writer guarded by write lock, the write lock is used to ensure /// all write operations are serialized. inner: Mutex, @@ -57,66 +62,125 @@ impl RegionWriter { .await } - /// Apply version edit. - pub async fn apply_version_edit( - &self, - wal: &Wal, - edit: VersionEdit, - shared: &SharedDataRef, - ) -> Result<()> { - // HACK: We won't acquire the write lock here because write stall would hold - // write lock thus we have no chance to get the lock and apply the version edit. - // So we add a version lock to ensure modification to `VersionControl` is - // serialized. - let version_control = &shared.version_control; - - let _lock = self.version_mutex.lock().await; - let next_sequence = version_control.committed_sequence() + 1; - - self.persist_manifest_version(wal, next_sequence, &edit) - .await?; - - version_control.apply_edit(edit); - - version_control.set_committed_sequence(next_sequence); - - // TODO(yingwen): We should set the flush handle to `None`, but we can't acquire - // write lock here. - - Ok(()) - } - /// Replay data to memtables. pub async fn replay(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> { let mut inner = self.inner.lock().await; inner.replay(&self.version_mutex, writer_ctx).await } + /// Write and apply the region edit. + pub(crate) async fn write_edit_and_apply( + &self, + wal: &Wal, + shared: &SharedDataRef, + manifest: &RegionManifest, + edit: RegionEdit, + max_memtable_id: MemtableId, + ) -> Result<()> { + let _lock = self.version_mutex.lock().await; + // HACK: We won't acquire the write lock here because write stall would hold + // write lock thus we have no chance to get the lock and apply the version edit. + // So we add a version lock to ensure modification to `VersionControl` is + // serialized. + let version_control = &shared.version_control; + let prev_version = version_control.current_manifest_version(); + + logging::debug!( + "Write region edit: {:?} to manifest, prev_version: {}.", + edit, + prev_version, + ); + + let files_to_add = edit.files_to_add.clone(); + let flushed_sequence = edit.flushed_sequence; + + // Persist the meta action. + let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); + action_list.set_prev_version(prev_version); + let manifest_version = manifest.update(action_list).await?; + + let version_edit = VersionEdit { + files_to_add, + flushed_sequence: Some(flushed_sequence), + manifest_version, + max_memtable_id: Some(max_memtable_id), + }; + + // We could tolerate failure during persisting manifest version to the WAL, since it won't + // affect how we applying the edit to the version. + version_control.apply_edit(version_edit); + // TODO(yingwen): We should set the flush handle to `None`, but we can't acquire + // write lock here. + + // Persist the manifest version to notify subscriber of the wal that the manifest has been + // updated. This should be done at the end of the method. + self.persist_manifest_version(wal, version_control, manifest_version) + .await + } + /// Alter schema of the region. pub async fn alter( &self, - _alter_ctx: AlterContext<'_, S>, - _request: AlterRequest, + alter_ctx: AlterContext<'_, S>, + request: AlterRequest, ) -> Result<()> { - // TODO(yingwen): [alter] implements alter: - // 1. acquire version lock - // 2. validate request - // 3. build schema based on new request - // 4. persist it into the region manifest - // 5. update version in VersionControl + // To alter the schema, we need to acquire the write lock first, so we could + // avoid other writers write to the region and switch the memtable safely. + // Another potential benefit is that the write lock also protect against concurrent + // alter request to the region. + let _inner = self.inner.lock().await; - unimplemented!() + let version_control = alter_ctx.version_control(); + + let old_metadata = version_control.metadata(); + old_metadata + .validate_alter(&request) + .context(error::InvalidAlterRequestSnafu)?; + + // The write lock protects us against other alter request, so we could build the new + // metadata struct outside of the version mutex. + let new_metadata = old_metadata + .alter(&request) + .context(error::AlterMetadataSnafu)?; + + let raw = RawRegionMetadata::from(&new_metadata); + let mut action_list = + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { + metadata: raw, + })); + let new_metadata = Arc::new(new_metadata); + + // Acquire the version lock before altering the metadata. + let _lock = self.version_mutex.lock().await; + + // Persist the meta action. + let prev_version = version_control.current_manifest_version(); + action_list.set_prev_version(prev_version); + let manifest_version = alter_ctx.manifest.update(action_list).await?; + + // Now we could switch memtables and apply the new metadata to the version. + version_control.freeze_mutable_and_apply_metadata(new_metadata, manifest_version); + + self.persist_manifest_version(alter_ctx.wal, version_control, manifest_version) + .await } + /// Allocate a sequence and persist the manifest version using that sequence to the wal. + /// + /// This method should be protected by the `version_mutex`. async fn persist_manifest_version( &self, wal: &Wal, - seq: SequenceNumber, - edit: &VersionEdit, + version_control: &VersionControlRef, + manifest_version: ManifestVersion, ) -> Result<()> { - let header = WalHeader::with_last_manifest_version(edit.manifest_version); + let next_sequence = version_control.committed_sequence() + 1; - wal.write_to_wal(seq, header, Payload::None).await?; + let header = WalHeader::with_last_manifest_version(manifest_version); + wal.write_to_wal(next_sequence, header, Payload::None) + .await?; + + version_control.set_committed_sequence(next_sequence); Ok(()) } @@ -158,6 +222,13 @@ pub struct AlterContext<'a, S: LogStore> { pub manifest: &'a RegionManifest, } +impl<'a, S: LogStore> AlterContext<'a, S> { + #[inline] + fn version_control(&self) -> &VersionControlRef { + &self.shared.version_control + } +} + #[derive(Debug)] struct WriterInner { memtable_builder: MemtableBuilderRef, diff --git a/src/storage/src/schema.rs b/src/storage/src/schema.rs index 4a3a7dff3a..0796020b6f 100644 --- a/src/storage/src/schema.rs +++ b/src/storage/src/schema.rs @@ -239,6 +239,24 @@ impl StoreSchema { Ok(Batch::new(columns)) } + pub(crate) fn contains_column(&self, name: &str) -> bool { + self.schema.column_schema_by_name(name).is_some() + } + + pub(crate) fn is_key_column(&self, name: &str) -> bool { + self.schema + .column_index_by_name(name) + .map(|idx| idx < self.row_key_end) + .unwrap_or(false) + } + + pub(crate) fn is_user_column(&self, name: &str) -> bool { + self.schema + .column_index_by_name(name) + .map(|idx| idx < self.user_column_end) + .unwrap_or(false) + } + fn from_columns_metadata(columns: &ColumnsMetadata, version: u32) -> Result { let column_schemas: Vec<_> = columns .iter_all_columns() diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 93f81cd09f..cbb36ed2d5 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -102,11 +102,32 @@ impl VersionControl { version_to_update.commit(); } + /// Apply [VersionEdit] to the version. pub fn apply_edit(&self, edit: VersionEdit) { let mut version_to_update = self.version.lock(); version_to_update.apply_edit(edit); version_to_update.commit(); } + + /// Freeze all mutable memtables and then apply the new metadata to the version. + pub fn freeze_mutable_and_apply_metadata( + &self, + metadata: RegionMetadataRef, + manifest_version: ManifestVersion, + ) { + let mut version_to_update = self.version.lock(); + + let memtable_version = version_to_update.memtables(); + // When applying metadata, mutable memtable set might be empty and there is no + // need to freeze it. + if !memtable_version.mutable_memtables().is_empty() { + let freezed = memtable_version.freeze_mutable(); + version_to_update.memtables = Arc::new(freezed); + } + + version_to_update.apply_metadata(metadata, manifest_version); + version_to_update.commit(); + } } #[derive(Debug)] @@ -227,6 +248,29 @@ impl Version { self.ssts = Arc::new(merged_ssts); } + /// Updates metadata of the version. + /// + /// # Panics + /// Panics if `metadata.version() <= self.metadata.version()`. + pub fn apply_metadata( + &mut self, + metadata: RegionMetadataRef, + manifest_version: ManifestVersion, + ) { + assert!( + metadata.version() > self.metadata.version(), + "Updating metadata from version {} to {} is not allowed", + self.metadata.version(), + metadata.version() + ); + + if self.manifest_version < manifest_version { + self.manifest_version = manifest_version; + } + + self.metadata = metadata; + } + #[inline] pub fn manifest_version(&self) -> ManifestVersion { self.manifest_version