diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 37a4f0bcf0..9c731cc62d 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -1,4 +1,5 @@ //! Storage engine implementation. +#![feature(map_first_last)] mod arrow_stream; mod background; mod chunk; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index ef01774207..639c000919 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -45,6 +45,7 @@ pub struct RawColumnFamiliesMetadata { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionChange { + pub committed_sequence: SequenceNumber, pub metadata: RawRegionMetadata, } @@ -94,6 +95,11 @@ impl RegionMetaActionList { impl MetaAction for RegionMetaActionList { type Error = error::Error; + fn set_protocol(&mut self, action: ProtocolAction) { + // The protocol action should be the first action in action list by convention. + self.actions.insert(0, RegionMetaAction::Protocol(action)); + } + fn set_prev_version(&mut self, version: ManifestVersion) { self.prev_version = version; } diff --git a/src/storage/src/manifest/impl_.rs b/src/storage/src/manifest/impl_.rs index a185da44e9..101fd2d911 100644 --- a/src/storage/src/manifest/impl_.rs +++ b/src/storage/src/manifest/impl_.rs @@ -138,7 +138,7 @@ impl> ManifestImplInner { self.version.load(Ordering::Relaxed) } - async fn save(&self, action_list: M) -> Result { + async fn save(&self, mut action_list: M) -> Result { let protocol = self.protocol.load(); ensure!( @@ -151,6 +151,16 @@ impl> ManifestImplInner { let version = self.inc_version(); + if version == 0 || protocol.min_writer_version < self.supported_writer_version { + let new_protocol = ProtocolAction { + min_reader_version: self.supported_reader_version, + min_writer_version: self.supported_writer_version, + }; + action_list.set_protocol(new_protocol.clone()); + + self.protocol.store(Arc::new(new_protocol)); + } + logging::debug!( "Save region metadata action: {:?}, version: {}", action_list, diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 2ec340367a..9496285caf 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -9,6 +9,7 @@ mod tests { use std::sync::Arc; use object_store::{backend::fs, ObjectStore}; + use store_api::manifest::action::ProtocolAction; use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION}; use tempdir::TempDir; @@ -45,6 +46,7 @@ mod tests { .update(RegionMetaActionList::with_action(RegionMetaAction::Change( RegionChange { metadata: region_meta.as_ref().into(), + committed_sequence: 99, }, ))) .await @@ -54,8 +56,14 @@ mod tests { let (v, action_list) = iter.next_action().await.unwrap().unwrap(); assert_eq!(0, v); - assert_eq!(1, action_list.actions.len()); - let action = &action_list.actions[0]; + assert_eq!(2, action_list.actions.len()); + let protocol = &action_list.actions[0]; + assert!(matches!( + protocol, + RegionMetaAction::Protocol(ProtocolAction { .. }) + )); + + let action = &action_list.actions[1]; match action { RegionMetaAction::Change(c) => { @@ -63,6 +71,7 @@ mod tests { RegionMetadata::try_from(c.metadata.clone()).unwrap(), *region_meta ); + assert_eq!(c.committed_sequence, 99); } _ => unreachable!(), } @@ -79,14 +88,21 @@ mod tests { let mut iter = manifest.scan(0, MAX_VERSION).await.unwrap(); let (v, action_list) = iter.next_action().await.unwrap().unwrap(); assert_eq!(0, v); - assert_eq!(1, action_list.actions.len()); - let action = &action_list.actions[0]; + assert_eq!(2, action_list.actions.len()); + let protocol = &action_list.actions[0]; + assert!(matches!( + protocol, + RegionMetaAction::Protocol(ProtocolAction { .. }) + )); + + let action = &action_list.actions[1]; match action { RegionMetaAction::Change(c) => { assert_eq!( RegionMetadata::try_from(c.metadata.clone()).unwrap(), *region_meta ); + assert_eq!(c.committed_sequence, 99); } _ => unreachable!(), } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index b30be5af28..923bcbb9cc 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -1,25 +1,23 @@ #[cfg(test)] mod tests; mod writer; - +use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; use common_telemetry::logging; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; -use store_api::manifest::{ - self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator, -}; +use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ - AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, WriteContext, - WriteResponse, + AlterRequest, OpenOptions, ReadContext, Region, RegionId, RegionMeta, SequenceNumber, + WriteContext, WriteResponse, }; use crate::error::{self, Error, Result}; use crate::flush::{FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::{ - action::{RegionChange, RegionMetaAction, RegionMetaActionList}, + action::{RawRegionMetadata, RegionChange, RegionMetaAction, RegionMetaActionList}, region::RegionManifest, }; use crate::memtable::MemtableBuilderRef; @@ -28,7 +26,7 @@ pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, Wri use crate::snapshot::SnapshotImpl; use crate::sst::AccessLayerRef; use crate::version::VersionEdit; -use crate::version::{Version, VersionControl, VersionControlRef}; +use crate::version::{Version, VersionControl, VersionControlRef, INIT_COMMITTED_SEQUENCE}; use crate::wal::Wal; use crate::write_batch::WriteBatch; @@ -95,6 +93,8 @@ pub struct StoreConfig { pub flush_strategy: FlushStrategyRef, } +pub type RecoveredMetadataMap = BTreeMap; + impl RegionImpl { /// Create a new region and also persist the region metadata to manifest. /// @@ -108,12 +108,12 @@ impl RegionImpl { // the manifest. let manifest_version = store_config .manifest - .update(RegionMetaActionList::new(vec![ - RegionMetaAction::Protocol(ProtocolAction::new()), - RegionMetaAction::Change(RegionChange { + .update(RegionMetaActionList::with_action(RegionMetaAction::Change( + RegionChange { metadata: metadata.as_ref().into(), - }), - ])) + committed_sequence: INIT_COMMITTED_SEQUENCE, + }, + ))) .await?; let version = Version::with_manifest_version(metadata, manifest_version); @@ -156,10 +156,11 @@ impl RegionImpl { _opts: &OpenOptions, ) -> Result>> { // Load version meta data from manifest. - let version = match Self::recover_from_manifest(&store_config.manifest).await? { - None => return Ok(None), - Some(version) => version, - }; + let (version, mut recovered_metadata) = + match Self::recover_from_manifest(&store_config.manifest).await? { + (None, _) => return Ok(None), + (Some(v), m) => (v, m), + }; logging::debug!( "Region recovered version from manifest, version: {:?}", @@ -167,7 +168,28 @@ impl RegionImpl { ); let metadata = version.metadata().clone(); + let flushed_sequence = version.flushed_sequence(); let version_control = Arc::new(VersionControl::with_version(version)); + + let recovered_metadata_after_flushed = + recovered_metadata.split_off(&(flushed_sequence + 1)); + // apply the last flushed metadata + if let Some((sequence, (manifest_version, metadata))) = recovered_metadata.pop_last() { + let metadata = Arc::new( + metadata + .try_into() + .context(error::InvalidRawRegionSnafu { region: &name })?, + ); + version_control.freeze_mutable_and_apply_metadata(metadata, manifest_version); + + logging::debug!( + "Applied the last flushed metadata to region: {}, sequence: {}, manifest: {}", + name, + sequence, + manifest_version, + ); + } + let wal = Wal::new(metadata.id(), store_config.log_store); let shared = Arc::new(SharedData { id: metadata.id(), @@ -186,7 +208,9 @@ impl RegionImpl { manifest: &store_config.manifest, }; // Replay all unflushed data. - writer.replay(writer_ctx).await?; + writer + .replay(recovered_metadata_after_flushed, writer_ctx) + .await?; let inner = Arc::new(RegionInner { shared, @@ -201,13 +225,17 @@ impl RegionImpl { Ok(Some(RegionImpl { inner })) } - async fn recover_from_manifest(manifest: &RegionManifest) -> Result> { + async fn recover_from_manifest( + manifest: &RegionManifest, + ) -> Result<(Option, RecoveredMetadataMap)> { let (start, end) = Self::manifest_scan_range(); let mut iter = manifest.scan(start, end).await?; let mut version = None; let mut actions = Vec::new(); let mut last_manifest_version = manifest::MIN_VERSION; + let mut recovered_metadata = BTreeMap::new(); + while let Some((manifest_version, action_list)) = iter.next_action().await? { last_manifest_version = manifest_version; @@ -227,8 +255,10 @@ impl RegionImpl { version = Self::replay_edit(manifest_version, action, version); } } - (RegionMetaAction::Change(_), Some(_)) => { - unimplemented!("alter schema is not implemented") + (RegionMetaAction::Change(c), Some(v)) => { + recovered_metadata + .insert(c.committed_sequence, (manifest_version, c.metadata)); + version = Some(v); } (action, None) => { actions.push((manifest_version, action)); @@ -249,7 +279,7 @@ impl RegionImpl { manifest.update_state(last_manifest_version + 1, protocol.clone()); } - Ok(version) + Ok((version, recovered_metadata)) } fn manifest_scan_range() -> (ManifestVersion, ManifestVersion) { @@ -351,6 +381,7 @@ impl RegionInner { // FIXME(yingwen): [alter] The schema may be outdated. let metadata = self.in_memory_metadata(); let schema = metadata.schema(); + // Only compare column schemas. ensure!( schema.column_schemas() == request.schema().column_schemas(), diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 27ff8b6df7..9fb476262d 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -37,7 +37,7 @@ pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMet /// Test region with schema (timestamp, v0). pub struct TesterBase { pub region: RegionImpl, - write_ctx: WriteContext, + pub write_ctx: WriteContext, pub read_ctx: ReadContext, } @@ -197,6 +197,7 @@ async fn test_recover_region_manifets() { assert!(RegionImpl::::recover_from_manifest(&manifest) .await .unwrap() + .0 .is_none()); { @@ -205,6 +206,7 @@ async fn test_recover_region_manifets() { .update(RegionMetaActionList::with_action(RegionMetaAction::Change( RegionChange { metadata: region_meta.as_ref().into(), + committed_sequence: 40, }, ))) .await @@ -217,13 +219,26 @@ async fn test_recover_region_manifets() { ])) .await .unwrap(); + + manifest + .update(RegionMetaActionList::with_action(RegionMetaAction::Change( + RegionChange { + metadata: region_meta.as_ref().into(), + committed_sequence: 42, + }, + ))) + .await + .unwrap(); } // try to recover - let version = RegionImpl::::recover_from_manifest(&manifest) - .await - .unwrap() - .unwrap(); + let (version, recovered_metadata) = + RegionImpl::::recover_from_manifest(&manifest) + .await + .unwrap(); + + assert_eq!(42, *recovered_metadata.first_key_value().unwrap().0); + let version = version.unwrap(); assert_eq!(*version.metadata(), region_meta); assert_eq!(version.flushed_sequence(), 2); assert_eq!(version.manifest_version(), 1); @@ -236,5 +251,5 @@ async fn test_recover_region_manifets() { assert!(version.mutable_memtables().is_empty()); // check manifest state - assert_eq!(2, manifest.last_version()); + assert_eq!(3, manifest.last_version()); } diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs index 00026230ce..fefef0c430 100644 --- a/src/storage/src/region/tests/alter.rs +++ b/src/storage/src/region/tests/alter.rs @@ -1,14 +1,27 @@ +use std::sync::Arc; + +use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; +use datatypes::prelude::ScalarVector; +use datatypes::type_id::LogicalTypeId; +use datatypes::vectors::Int64Vector; +use datatypes::vectors::TimestampVector; use log_store::fs::log::LocalFileLogStore; +use store_api::storage::PutOperation; +use store_api::storage::WriteRequest; use store_api::storage::{ AddColumn, AlterOperation, AlterRequest, ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, - Region, RegionMeta, SchemaRef, + Region, RegionMeta, SchemaRef, WriteResponse, }; use tempdir::TempDir; use crate::region::tests::{self, FileTesterBase}; +use crate::region::OpenOptions; use crate::region::RegionImpl; use crate::test_util::config_util; +use crate::test_util::{self, write_batch_util}; +use crate::write_batch::PutData; +use crate::write_batch::WriteBatch; const REGION_NAME: &str = "region-alter-0"; @@ -23,18 +36,82 @@ async fn create_region_for_alter(store_dir: &str) -> RegionImpl, } +struct DataRow { + key: Option, + ts: Timestamp, + v0: Option, + v1: Option, +} + +impl DataRow { + fn new(key: Option, ts: i64, v0: Option, v1: Option) -> Self { + DataRow { + key, + ts: ts.into(), + v0, + v1, + } + } +} + +fn new_write_batch_for_test() -> WriteBatch { + write_batch_util::new_write_batch( + &[ + ("k0", LogicalTypeId::Int64, true), + (test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false), + ("v0", LogicalTypeId::Int64, true), + ("v1", LogicalTypeId::Int64, true), + ], + Some(1), + ) +} + +fn new_put_data(data: &[DataRow]) -> PutData { + let mut put_data = PutData::with_num_columns(4); + + let keys = Int64Vector::from_iter(data.iter().map(|v| v.key)); + let timestamps = TimestampVector::from_vec(data.iter().map(|v| v.ts).collect()); + let values1 = Int64Vector::from_iter(data.iter().map(|kv| kv.v0)); + let values2 = Int64Vector::from_iter(data.iter().map(|kv| kv.v1)); + + put_data.add_key_column("k0", Arc::new(keys)).unwrap(); + put_data + .add_key_column(test_util::TIMESTAMP_NAME, Arc::new(timestamps)) + .unwrap(); + + put_data.add_value_column("v0", Arc::new(values1)).unwrap(); + put_data.add_value_column("v1", Arc::new(values2)).unwrap(); + + put_data +} + impl AlterTester { async fn new(store_dir: &str) -> AlterTester { let region = create_region_for_alter(store_dir).await; AlterTester { base: Some(FileTesterBase::with_region(region)), + store_dir: store_dir.to_string(), } } + async fn reopen(&mut self) { + // Close the old region. + self.base = None; + // Reopen the region. + let store_config = config_util::new_store_config(REGION_NAME, &self.store_dir).await; + let opts = OpenOptions::default(); + let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts) + .await + .unwrap() + .unwrap(); + self.base = Some(FileTesterBase::with_region(region)); + } + #[inline] fn base(&self) -> &FileTesterBase { self.base.as_ref().unwrap() @@ -45,6 +122,18 @@ impl AlterTester { metadata.schema().clone() } + async fn put(&self, data: &[DataRow]) -> WriteResponse { + let mut batch = new_write_batch_for_test(); + let put_data = new_put_data(data); + batch.put(put_data).unwrap(); + + self.base() + .region + .write(&self.base().write_ctx, batch) + .await + .unwrap() + } + /// Put data with initial schema. async fn put_before_alter(&self, data: &[(i64, Option)]) { self.base().put(data).await; @@ -61,6 +150,10 @@ impl AlterTester { let metadata = self.base().region.in_memory_metadata(); metadata.version() } + + async fn full_scan(&self) -> Vec<(i64, Option)> { + self.base().full_scan().await + } } fn new_column_desc(id: ColumnId, name: &str) -> ColumnDescriptor { @@ -104,6 +197,66 @@ fn check_schema_names(schema: &SchemaRef, names: &[&str]) { } } +#[tokio::test] +async fn test_alter_region_with_reopen() { + common_telemetry::init_default_ut_logging(); + let dir = TempDir::new("alter-region").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let mut tester = AlterTester::new(store_dir).await; + + let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))]; + + tester.put_before_alter(&data).await; + assert_eq!(3, tester.full_scan().await.len()); + + let schema = tester.schema(); + check_schema_names(&schema, &["timestamp", "v0"]); + + let req = add_column_req(&[ + (new_column_desc(4, "k0"), true), // key column k0 + (new_column_desc(5, "v1"), false), // value column v1 + ]); + tester.alter(req).await; + + let schema = tester.schema(); + check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]); + + let data = vec![ + DataRow::new(Some(10000), 1003, Some(103), Some(201)), + DataRow::new(Some(10001), 1004, Some(104), Some(202)), + DataRow::new(Some(10002), 1005, Some(105), Some(203)), + ]; + tester.put(&data).await; + + tester.reopen().await; + let data = vec![ + DataRow::new(Some(10003), 1006, Some(106), Some(204)), + DataRow::new(Some(10004), 1007, Some(107), Some(205)), + DataRow::new(Some(10005), 1008, Some(108), Some(206)), + ]; + + tester.put(&data).await; + + // add columns,then remove them without writing data. + let req = add_column_req(&[ + (new_column_desc(6, "v2"), false), // key column k0 + (new_column_desc(7, "v3"), false), // value column v1 + ]); + tester.alter(req).await; + + let req = drop_column_req(&["v2", "v3"]); + tester.alter(req).await; + + // reopen and write again + tester.reopen().await; + let schema = tester.schema(); + check_schema_names(&schema, &["k0", "timestamp", "v0", "v1"]); + + let data = vec![DataRow::new(Some(10006), 1009, Some(109), Some(207))]; + + tester.put(&data).await; +} + #[tokio::test] async fn test_alter_region() { let dir = TempDir::new("alter-region").unwrap(); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index d3918989fb..a872b979d4 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -17,8 +17,7 @@ use crate::manifest::action::{ }; use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet}; use crate::proto::wal::WalHeader; -use crate::region::RegionManifest; -use crate::region::SharedDataRef; +use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef}; use crate::sst::AccessLayerRef; use crate::version::{VersionControlRef, VersionEdit}; use crate::wal::{Payload, Wal}; @@ -63,9 +62,15 @@ impl RegionWriter { } /// Replay data to memtables. - pub async fn replay(&self, writer_ctx: WriterContext<'_, S>) -> Result<()> { + pub async fn replay( + &self, + recovered_metadata: RecoveredMetadataMap, + writer_ctx: WriterContext<'_, S>, + ) -> Result<()> { let mut inner = self.inner.lock().await; - inner.replay(&self.version_mutex, writer_ctx).await + inner + .replay(&self.version_mutex, recovered_metadata, writer_ctx) + .await } /// Write and apply the region edit. @@ -144,15 +149,17 @@ impl RegionWriter { .context(error::AlterMetadataSnafu)?; let raw = RawRegionMetadata::from(&new_metadata); - let mut action_list = - RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { - metadata: raw, - })); - let new_metadata = Arc::new(new_metadata); // Acquire the version lock before altering the metadata. let _lock = self.version_mutex.lock().await; + let mut action_list = + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { + metadata: raw, + committed_sequence: version_control.committed_sequence(), + })); + let new_metadata = Arc::new(new_metadata); + // Persist the meta action. let prev_version = version_control.current_manifest_version(); action_list.set_prev_version(prev_version); @@ -175,6 +182,7 @@ impl RegionWriter { manifest_version: ManifestVersion, ) -> Result<()> { let next_sequence = version_control.committed_sequence() + 1; + version_control.set_committed_sequence(next_sequence); let header = WalHeader::with_last_manifest_version(manifest_version); wal.write_to_wal(next_sequence, header, Payload::None) @@ -291,12 +299,15 @@ impl WriterInner { async fn replay( &mut self, version_mutex: &Mutex<()>, + mut recovered_metadata: RecoveredMetadataMap, writer_ctx: WriterContext<'_, S>, ) -> Result<()> { let version_control = writer_ctx.version_control(); let (flushed_sequence, mut last_sequence); let mut num_requests = 0; + let mut num_recovered_metadata = 0; + let mut next_apply_metadata = recovered_metadata.pop_first(); { let _lock = version_mutex.lock().await; @@ -307,6 +318,31 @@ impl WriterInner { // should be flushed_sequence + 1. let mut stream = writer_ctx.wal.read_from_wal(flushed_sequence + 1).await?; while let Some((req_sequence, _header, request)) = stream.try_next().await? { + while let Some((next_apply_sequence, _)) = next_apply_metadata { + if req_sequence >= next_apply_sequence { + // It's safe to unwrap here. It's checked above. + // Move out metadata to avoid cloning it. + let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap(); + version_control.freeze_mutable_and_apply_metadata( + Arc::new(metadata.try_into().context( + error::InvalidRawRegionSnafu { + region: &writer_ctx.shared.name, + }, + )?), + manifest_version, + ); + num_recovered_metadata += 1; + logging::debug!("Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ", + writer_ctx.shared.name, + next_apply_sequence, + manifest_version); + next_apply_metadata = recovered_metadata.pop_first(); + } else { + // Keep the next_apply_metadata until req_sequence >= next_apply_sequence + break; + } + } + if let Some(request) = request { num_requests += 1; let time_ranges = self.prepare_memtables(&request, version_control)?; @@ -345,12 +381,13 @@ impl WriterInner { } logging::info!( - "Region replay finished, region_id: {}, region_name: {}, flushed_sequence: {}, last_sequence: {}, num_requests: {}", + "Region replay finished, region_id: {}, region_name: {}, flushed_sequence: {}, last_sequence: {}, num_requests: {}, num_recovered_metadata: {}", writer_ctx.shared.id, writer_ctx.shared.name, flushed_sequence, last_sequence, num_requests, + num_recovered_metadata, ); Ok(()) diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index c3587f9c92..b9330d63f9 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -24,6 +24,8 @@ use crate::sync::CowCell; /// Default bucket duration: 2 Hours. const DEFAULT_BUCKET_DURATION: Duration = Duration::from_secs(3600 * 2); +pub const INIT_COMMITTED_SEQUENCE: u64 = 0; + /// Controls version of in memory state for a region. #[derive(Debug)] pub struct VersionControl { @@ -41,7 +43,7 @@ impl VersionControl { pub fn with_version(version: Version) -> VersionControl { VersionControl { version: CowCell::new(version), - committed_sequence: AtomicU64::new(0), + committed_sequence: AtomicU64::new(INIT_COMMITTED_SEQUENCE), } } diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index f75f9718ae..3eaadc17b0 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -17,6 +17,9 @@ pub const MAX_VERSION: u64 = u64::MAX; pub trait MetaAction: Serialize + DeserializeOwned + Send + Sync + Clone + std::fmt::Debug { type Error: ErrorExt + Send + Sync; + /// Set a protocol action into meta action + fn set_protocol(&mut self, action: ProtocolAction); + /// Set previous valid manifest version. fn set_prev_version(&mut self, version: ManifestVersion); diff --git a/src/table-engine/src/manifest.rs b/src/table-engine/src/manifest.rs index f8ca01d1c0..c254a238be 100644 --- a/src/table-engine/src/manifest.rs +++ b/src/table-engine/src/manifest.rs @@ -50,12 +50,10 @@ mod tests { let protocol = ProtocolAction::new(); let table_info = test_util::build_test_table_info(); - let action_list = TableMetaActionList::new(vec![ - TableMetaAction::Protocol(protocol.clone()), - TableMetaAction::Change(Box::new(TableChange { + let action_list = + TableMetaActionList::new(vec![TableMetaAction::Change(Box::new(TableChange { table_info: table_info.clone(), - })), - ]); + }))]); assert_eq!(0, manifest.update(action_list).await.unwrap()); diff --git a/src/table-engine/src/manifest/action.rs b/src/table-engine/src/manifest/action.rs index 590ab8a37d..87e074f497 100644 --- a/src/table-engine/src/manifest/action.rs +++ b/src/table-engine/src/manifest/action.rs @@ -39,6 +39,13 @@ pub struct TableMetaActionList { } impl TableMetaActionList { + pub fn with_action(action: TableMetaAction) -> Self { + Self { + actions: vec![action], + prev_version: 0, + } + } + pub fn new(actions: Vec) -> Self { Self { actions, @@ -50,6 +57,11 @@ impl TableMetaActionList { impl MetaAction for TableMetaActionList { type Error = StorageError; + fn set_protocol(&mut self, action: ProtocolAction) { + // The protocol action should be the first action in action list by convention. + self.actions.insert(0, TableMetaAction::Protocol(action)); + } + fn set_prev_version(&mut self, version: ManifestVersion) { self.prev_version = version; } diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index fd5f6613a9..f68b4ec8b9 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -21,7 +21,6 @@ use futures::task::{Context, Poll}; use futures::Stream; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; -use store_api::manifest::action::ProtocolAction; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ AddColumn, AlterOperation, AlterRequest, ChunkReader, ColumnDescriptorBuilder, PutOperation, @@ -249,12 +248,11 @@ impl Table for MitoTable { new_info ); self.manifest - .update(TableMetaActionList::new(vec![ - TableMetaAction::Protocol(ProtocolAction::new()), - TableMetaAction::Change(Box::new(TableChange { + .update(TableMetaActionList::with_action(TableMetaAction::Change( + Box::new(TableChange { table_info: new_info.clone(), - })), - ])) + }), + ))) .await .context(UpdateTableManifestSnafu { table_name: &self.table_info().name, @@ -414,12 +412,11 @@ impl MitoTable { // TODO(dennis): save manifest version into catalog? let _manifest_version = manifest - .update(TableMetaActionList::new(vec![ - TableMetaAction::Protocol(ProtocolAction::new()), - TableMetaAction::Change(Box::new(TableChange { + .update(TableMetaActionList::with_action(TableMetaAction::Change( + Box::new(TableChange { table_info: table_info.clone(), - })), - ])) + }), + ))) .await .context(UpdateTableManifestSnafu { table_name })?;