diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index a11a3c4762..1569136b80 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -97,6 +97,7 @@ pub struct StoreConfig { pub flush_strategy: FlushStrategyRef, } +pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); pub type RecoveredMetadataMap = BTreeMap; impl RegionImpl { @@ -345,6 +346,10 @@ impl RegionImpl { self.inner.version_control().committed_sequence() } + fn current_manifest_version(&self) -> ManifestVersion { + self.inner.version_control().current_manifest_version() + } + async fn wait_flush_done(&self) -> Result<()> { self.inner.writer.wait_flush_done().await } @@ -353,6 +358,22 @@ impl RegionImpl { async fn write_inner(&self, ctx: &WriteContext, request: WriteBatch) -> Result { self.inner.write(ctx, request).await } + + // Replay metadata to inner. + async fn replay_inner(&self, recovered_metadata: RecoveredMetadataMap) -> Result<()> { + let inner = &self.inner; + let writer_ctx = WriterContext { + shared: &inner.shared, + flush_strategy: &inner.flush_strategy, + flush_scheduler: &inner.flush_scheduler, + sst_layer: &inner.sst_layer, + wal: &inner.wal, + writer: &inner.writer, + manifest: &inner.manifest, + }; + + inner.writer.replay(recovered_metadata, writer_ctx).await + } } /// Shared data of region. diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index d7d753faae..67aec5fa32 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -79,6 +79,10 @@ impl TesterBase { .unwrap() } + pub async fn replay_inner(&self, recovered_metadata: RecoveredMetadataMap) { + self.region.replay_inner(recovered_metadata).await.unwrap() + } + /// Scan all data. pub async fn full_scan(&self) -> Vec<(i64, Option)> { logging::info!("Full scan with ctx {:?}", self.read_ctx); diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs index 6f44f6ac96..72d213ec43 100644 --- a/src/storage/src/region/tests/alter.rs +++ b/src/storage/src/region/tests/alter.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::sync::Arc; use common_time::Timestamp; @@ -16,8 +17,10 @@ use tempdir::TempDir; use crate::region::tests::{self, FileTesterBase}; use crate::region::OpenOptions; use crate::region::RegionImpl; +use crate::region::{RawRegionMetadata, RegionMetadata}; use crate::test_util; use crate::test_util::config_util; +use crate::test_util::descriptor_util::RegionDescBuilder; use crate::write_batch::PutData; const REGION_NAME: &str = "region-alter-0"; @@ -235,6 +238,7 @@ fn drop_column_req(names: &[&str]) -> AlterRequest { fn check_schema_names(schema: &SchemaRef, names: &[&str]) { assert_eq!(names.len(), schema.num_columns()); + for (idx, name) in names.iter().enumerate() { assert_eq!(*name, schema.column_name_by_index(idx)); assert!(schema.column_schema_by_name(name).is_some()); @@ -391,3 +395,34 @@ async fn test_put_old_schema_after_alter() { let scanned = tester.full_scan().await; assert_eq!(expect, scanned); } + +#[tokio::test] +async fn test_replay_metadata_after_open() { + let dir = TempDir::new("replay-metadata-after-open").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + let mut tester = AlterTester::new(store_dir).await; + + let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))]; + + tester.put_with_init_schema(&data).await; + + tester.reopen().await; + + let committed_sequence = tester.base().committed_sequence(); + let manifest_version = tester.base().region.current_manifest_version(); + let version = tester.version(); + + let mut recovered_metadata = BTreeMap::new(); + + let desc = RegionDescBuilder::new(REGION_NAME) + .push_key_column(("k1", LogicalTypeId::Int32, false)) + .push_value_column(("v0", LogicalTypeId::Float32, true)) + .build(); + let metadata: &RegionMetadata = &desc.try_into().unwrap(); + let mut raw_metadata: RawRegionMetadata = metadata.into(); + raw_metadata.version = version + 1; + recovered_metadata.insert(committed_sequence, (manifest_version + 1, raw_metadata)); + tester.base().replay_inner(recovered_metadata).await; + let schema = tester.schema(); + check_schema_names(&schema, &["k1", "timestamp", "v0"]); +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 6dd39a1c9f..8b462e788b 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -5,6 +5,7 @@ use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; +use store_api::storage::SequenceNumber; use store_api::storage::{AlterRequest, WriteContext, WriteResponse}; use tokio::sync::Mutex; @@ -17,9 +18,10 @@ use crate::manifest::action::{ use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef}; use crate::metadata::RegionMetadataRef; use crate::proto::wal::WalHeader; -use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef}; +use crate::region::{RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef}; use crate::schema::compat::CompatWrite; use crate::sst::AccessLayerRef; +use crate::version::VersionControl; use crate::version::{VersionControlRef, VersionEdit}; use crate::wal::{Payload, Wal}; use crate::write_batch::WriteBatch; @@ -341,28 +343,14 @@ impl WriterInner { // There might be multiple metadata changes to be applied, so a loop is necessary. if req_sequence > sequence_before_alter { // This is the first request that use the new metadata. - // It's safe to unwrap here. It's checked above. Move out metadata to avoid cloning it. - let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap(); - let region_metadata: RegionMetadataRef = Arc::new( - metadata.try_into().context(error::InvalidRawRegionSnafu { - region: &writer_ctx.shared.name, - })?, - ); - let new_mutable = self - .memtable_builder - .build(region_metadata.schema().clone()); - version_control.freeze_mutable_and_apply_metadata( - region_metadata, - manifest_version, - new_mutable, - ); - num_recovered_metadata += 1; - logging::debug!( - "Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ", - writer_ctx.shared.name, + self.apply_metadata( + &writer_ctx, sequence_before_alter, - manifest_version - ); + next_apply_metadata, + version_control, + )?; + + num_recovered_metadata += 1; next_apply_metadata = recovered_metadata.pop_first(); } else { // Keep the next_apply_metadata until req_sequence > sequence_before_alter @@ -380,7 +368,7 @@ impl WriterInner { } else { logging::error!( "Sequence should not decrease during replay, found {} <= {}, \ - region_id: {}, region_name: {}, flushed_sequence: {}, num_requests: {}", + region_id: {}, region_name: {}, flushed_sequence: {}, num_requests: {}", req_sequence, last_sequence, writer_ctx.shared.id, @@ -402,6 +390,29 @@ impl WriterInner { } } + // Apply metadata after last WAL entry + while let Some((sequence_before_alter, _)) = next_apply_metadata { + assert!( + sequence_before_alter >= last_sequence, + "The sequence in metadata after last WAL entry is less than last sequence, \ + metadata sequence: {}, last_sequence: {}, region_id: {}, region_name: {}", + sequence_before_alter, + last_sequence, + writer_ctx.shared.id, + writer_ctx.shared.name + ); + + self.apply_metadata( + &writer_ctx, + sequence_before_alter, + next_apply_metadata, + version_control, + )?; + + num_recovered_metadata += 1; + next_apply_metadata = recovered_metadata.pop_first(); + } + version_control.set_committed_sequence(last_sequence); } @@ -418,6 +429,39 @@ impl WriterInner { Ok(()) } + fn apply_metadata( + &self, + writer_ctx: &WriterContext<'_, S>, + sequence: SequenceNumber, + mut metadata: Option, + version_control: &VersionControl, + ) -> Result<()> { + // It's safe to unwrap here, it's checked outside. + // Move out metadata to avoid cloning it. + + let (_, (manifest_version, metadata)) = metadata.take().unwrap(); + let region_metadata: RegionMetadataRef = + Arc::new(metadata.try_into().context(error::InvalidRawRegionSnafu { + region: &writer_ctx.shared.name, + })?); + let new_mutable = self + .memtable_builder + .build(region_metadata.schema().clone()); + version_control.freeze_mutable_and_apply_metadata( + region_metadata, + manifest_version, + new_mutable, + ); + logging::debug!( + "Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ", + writer_ctx.shared.name, + sequence, + manifest_version + ); + + Ok(()) + } + /// Preprocess before write. /// /// Creates needed mutable memtables, ensures there is enough capacity in memtable and trigger