fix: apply recovered metadata after last WAL entry (#461)

* fix: apply recovered metadata after last WAL entry

* fix: condition error
This commit is contained in:
dennis zhuang
2022-11-14 20:43:47 +08:00
committed by GitHub
parent c90832ea6c
commit 68b299e04a
4 changed files with 127 additions and 23 deletions

View File

@@ -97,6 +97,7 @@ pub struct StoreConfig<S> {
pub flush_strategy: FlushStrategyRef,
}
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
pub type RecoveredMetadataMap = BTreeMap<SequenceNumber, (ManifestVersion, RawRegionMetadata)>;
impl<S: LogStore> RegionImpl<S> {
@@ -345,6 +346,10 @@ impl<S: LogStore> RegionImpl<S> {
self.inner.version_control().committed_sequence()
}
fn current_manifest_version(&self) -> ManifestVersion {
self.inner.version_control().current_manifest_version()
}
async fn wait_flush_done(&self) -> Result<()> {
self.inner.writer.wait_flush_done().await
}
@@ -353,6 +358,22 @@ impl<S: LogStore> RegionImpl<S> {
async fn write_inner(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
self.inner.write(ctx, request).await
}
// Replay metadata to inner.
async fn replay_inner(&self, recovered_metadata: RecoveredMetadataMap) -> Result<()> {
let inner = &self.inner;
let writer_ctx = WriterContext {
shared: &inner.shared,
flush_strategy: &inner.flush_strategy,
flush_scheduler: &inner.flush_scheduler,
sst_layer: &inner.sst_layer,
wal: &inner.wal,
writer: &inner.writer,
manifest: &inner.manifest,
};
inner.writer.replay(recovered_metadata, writer_ctx).await
}
}
/// Shared data of region.

View File

@@ -79,6 +79,10 @@ impl<S: LogStore> TesterBase<S> {
.unwrap()
}
pub async fn replay_inner(&self, recovered_metadata: RecoveredMetadataMap) {
self.region.replay_inner(recovered_metadata).await.unwrap()
}
/// Scan all data.
pub async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
logging::info!("Full scan with ctx {:?}", self.read_ctx);

View File

@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use common_time::Timestamp;
@@ -16,8 +17,10 @@ use tempdir::TempDir;
use crate::region::tests::{self, FileTesterBase};
use crate::region::OpenOptions;
use crate::region::RegionImpl;
use crate::region::{RawRegionMetadata, RegionMetadata};
use crate::test_util;
use crate::test_util::config_util;
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::write_batch::PutData;
const REGION_NAME: &str = "region-alter-0";
@@ -235,6 +238,7 @@ fn drop_column_req(names: &[&str]) -> AlterRequest {
fn check_schema_names(schema: &SchemaRef, names: &[&str]) {
assert_eq!(names.len(), schema.num_columns());
for (idx, name) in names.iter().enumerate() {
assert_eq!(*name, schema.column_name_by_index(idx));
assert!(schema.column_schema_by_name(name).is_some());
@@ -391,3 +395,34 @@ async fn test_put_old_schema_after_alter() {
let scanned = tester.full_scan().await;
assert_eq!(expect, scanned);
}
#[tokio::test]
async fn test_replay_metadata_after_open() {
let dir = TempDir::new("replay-metadata-after-open").unwrap();
let store_dir = dir.path().to_str().unwrap();
let mut tester = AlterTester::new(store_dir).await;
let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))];
tester.put_with_init_schema(&data).await;
tester.reopen().await;
let committed_sequence = tester.base().committed_sequence();
let manifest_version = tester.base().region.current_manifest_version();
let version = tester.version();
let mut recovered_metadata = BTreeMap::new();
let desc = RegionDescBuilder::new(REGION_NAME)
.push_key_column(("k1", LogicalTypeId::Int32, false))
.push_value_column(("v0", LogicalTypeId::Float32, true))
.build();
let metadata: &RegionMetadata = &desc.try_into().unwrap();
let mut raw_metadata: RawRegionMetadata = metadata.into();
raw_metadata.version = version + 1;
recovered_metadata.insert(committed_sequence, (manifest_version + 1, raw_metadata));
tester.base().replay_inner(recovered_metadata).await;
let schema = tester.schema();
check_schema_names(&schema, &["k1", "timestamp", "v0"]);
}

View File

@@ -5,6 +5,7 @@ use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
use store_api::storage::SequenceNumber;
use store_api::storage::{AlterRequest, WriteContext, WriteResponse};
use tokio::sync::Mutex;
@@ -17,9 +18,10 @@ use crate::manifest::action::{
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::metadata::RegionMetadataRef;
use crate::proto::wal::WalHeader;
use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef};
use crate::region::{RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef};
use crate::schema::compat::CompatWrite;
use crate::sst::AccessLayerRef;
use crate::version::VersionControl;
use crate::version::{VersionControlRef, VersionEdit};
use crate::wal::{Payload, Wal};
use crate::write_batch::WriteBatch;
@@ -341,28 +343,14 @@ impl WriterInner {
// There might be multiple metadata changes to be applied, so a loop is necessary.
if req_sequence > sequence_before_alter {
// This is the first request that use the new metadata.
// It's safe to unwrap here. It's checked above. Move out metadata to avoid cloning it.
let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap();
let region_metadata: RegionMetadataRef = Arc::new(
metadata.try_into().context(error::InvalidRawRegionSnafu {
region: &writer_ctx.shared.name,
})?,
);
let new_mutable = self
.memtable_builder
.build(region_metadata.schema().clone());
version_control.freeze_mutable_and_apply_metadata(
region_metadata,
manifest_version,
new_mutable,
);
num_recovered_metadata += 1;
logging::debug!(
"Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ",
writer_ctx.shared.name,
self.apply_metadata(
&writer_ctx,
sequence_before_alter,
manifest_version
);
next_apply_metadata,
version_control,
)?;
num_recovered_metadata += 1;
next_apply_metadata = recovered_metadata.pop_first();
} else {
// Keep the next_apply_metadata until req_sequence > sequence_before_alter
@@ -380,7 +368,7 @@ impl WriterInner {
} else {
logging::error!(
"Sequence should not decrease during replay, found {} <= {}, \
region_id: {}, region_name: {}, flushed_sequence: {}, num_requests: {}",
region_id: {}, region_name: {}, flushed_sequence: {}, num_requests: {}",
req_sequence,
last_sequence,
writer_ctx.shared.id,
@@ -402,6 +390,29 @@ impl WriterInner {
}
}
// Apply metadata after last WAL entry
while let Some((sequence_before_alter, _)) = next_apply_metadata {
assert!(
sequence_before_alter >= last_sequence,
"The sequence in metadata after last WAL entry is less than last sequence, \
metadata sequence: {}, last_sequence: {}, region_id: {}, region_name: {}",
sequence_before_alter,
last_sequence,
writer_ctx.shared.id,
writer_ctx.shared.name
);
self.apply_metadata(
&writer_ctx,
sequence_before_alter,
next_apply_metadata,
version_control,
)?;
num_recovered_metadata += 1;
next_apply_metadata = recovered_metadata.pop_first();
}
version_control.set_committed_sequence(last_sequence);
}
@@ -418,6 +429,39 @@ impl WriterInner {
Ok(())
}
fn apply_metadata<S: LogStore>(
&self,
writer_ctx: &WriterContext<'_, S>,
sequence: SequenceNumber,
mut metadata: Option<RecoverdMetadata>,
version_control: &VersionControl,
) -> Result<()> {
// It's safe to unwrap here, it's checked outside.
// Move out metadata to avoid cloning it.
let (_, (manifest_version, metadata)) = metadata.take().unwrap();
let region_metadata: RegionMetadataRef =
Arc::new(metadata.try_into().context(error::InvalidRawRegionSnafu {
region: &writer_ctx.shared.name,
})?);
let new_mutable = self
.memtable_builder
.build(region_metadata.schema().clone());
version_control.freeze_mutable_and_apply_metadata(
region_metadata,
manifest_version,
new_mutable,
);
logging::debug!(
"Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ",
writer_ctx.shared.name,
sequence,
manifest_version
);
Ok(())
}
/// Preprocess before write.
///
/// Creates needed mutable memtables, ensures there is enough capacity in memtable and trigger