fix: wal replay ignore manifest entries (#1612)

* fix: wal replay ignore manifest entries

* test: add ut
This commit is contained in:
Lei, HUANG
2023-05-19 18:12:44 +08:00
committed by GitHub
parent 8764ce7845
commit 82f2b34f4d
5 changed files with 129 additions and 22 deletions

View File

@@ -67,6 +67,19 @@ pub enum Error {
source: raft_engine::Error,
location: Location,
},
#[snafu(display(
"Cannot override compacted entry, namespace: {}, first index: {}, attempt index: {}",
namespace,
first_index,
attempt_index
))]
OverrideCompactedEntry {
namespace: u64,
first_index: u64,
attempt_index: u64,
location: Location,
},
}
impl ErrorExt for Error {

View File

@@ -26,6 +26,7 @@ use store_api::logstore::namespace::Namespace as NamespaceTrait;
use store_api::logstore::{AppendResponse, LogStore};
use crate::config::LogConfig;
use crate::error;
use crate::error::{
AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu,
RaftEngineSnafu, StartGcTaskSnafu, StopGcTaskSnafu,
@@ -107,6 +108,13 @@ impl RaftEngineLogStore {
.start(common_runtime::bg_runtime())
.context(StartGcTaskSnafu)
}
fn span(&self, namespace: &<Self as LogStore>::Namespace) -> (Option<u64>, Option<u64>) {
(
self.engine.first_index(namespace.id()),
self.engine.last_index(namespace.id()),
)
}
}
impl Debug for RaftEngineLogStore {
@@ -132,11 +140,23 @@ impl LogStore for RaftEngineLogStore {
async fn append(&self, e: Self::Entry) -> Result<AppendResponse, Self::Error> {
ensure!(self.started(), IllegalStateSnafu);
let entry_id = e.id;
let namespace_id = e.namespace_id;
let mut batch = LogBatch::with_capacity(1);
batch
.add_entries::<MessageType>(e.namespace_id, &[e])
.add_entries::<MessageType>(namespace_id, &[e])
.context(AddEntryLogBatchSnafu)?;
if let Some(first_index) = self.engine.first_index(namespace_id) {
ensure!(
entry_id >= first_index,
error::OverrideCompactedEntrySnafu {
namespace: namespace_id,
first_index,
attempt_index: entry_id,
}
);
}
self.engine
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
@@ -151,11 +171,38 @@ impl LogStore for RaftEngineLogStore {
entries: Vec<Self::Entry>,
) -> Result<Vec<Id>, Self::Error> {
ensure!(self.started(), IllegalStateSnafu);
let entry_ids = entries.iter().map(Entry::get_id).collect::<Vec<_>>();
if entries.is_empty() {
return Ok(vec![]);
}
let mut min_entry_id = u64::MAX;
let entry_ids = entries
.iter()
.map(|e| {
let id = e.get_id();
if id < min_entry_id {
min_entry_id = id;
}
id
})
.collect::<Vec<_>>();
let mut batch = LogBatch::with_capacity(entries.len());
batch
.add_entries::<MessageType>(ns.id, &entries)
.context(AddEntryLogBatchSnafu)?;
if let Some(first_index) = self.engine.first_index(ns.id) {
ensure!(
min_entry_id >= first_index,
error::OverrideCompactedEntrySnafu {
namespace: ns.id,
first_index,
attempt_index: min_entry_id,
}
);
}
self.engine
.write(&mut batch, self.config.sync_write)
.context(RaftEngineSnafu)?;
@@ -175,6 +222,12 @@ impl LogStore for RaftEngineLogStore {
let last_index = engine.last_index(ns.id).unwrap_or(0);
let mut start_index = id.max(engine.first_index(ns.id).unwrap_or(last_index + 1));
info!(
"Read logstore, namespace: {}, start: {}, span: {:?}",
ns.id(),
id,
self.span(ns)
);
let max_batch_size = self.config.read_batch_size;
let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
let ns = ns.clone();
@@ -290,9 +343,11 @@ impl LogStore for RaftEngineLogStore {
ensure!(self.started(), IllegalStateSnafu);
let obsoleted = self.engine.compact_to(namespace.id(), id + 1);
info!(
"Namespace {} obsoleted {} entries",
"Namespace {} obsoleted {} entries, compacted index: {}, span: {:?}",
namespace.id(),
obsoleted
obsoleted,
id,
self.span(&namespace)
);
Ok(())
}

View File

@@ -23,7 +23,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_telemetry::logging;
use common_telemetry::{info, logging};
use common_time::util;
use metrics::{decrement_gauge, increment_gauge};
use snafu::ResultExt;
@@ -319,6 +319,12 @@ impl<S: LogStore> RegionImpl<S> {
let wal = Wal::new(metadata.id(), store_config.log_store);
wal.obsolete(flushed_sequence).await?;
info!(
"Obsolete WAL entries on startup, region: {}, flushed sequence: {}",
metadata.id(),
flushed_sequence
);
let shared = Arc::new(SharedData {
id: metadata.id(),
name,

View File

@@ -152,6 +152,30 @@ async fn test_manual_flush() {
assert!(has_parquet_file(&sst_dir));
}
#[tokio::test]
async fn test_flush_and_reopen() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("manual_flush");
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let mut tester = FlushTester::new(store_dir, flush_switch.clone()).await;
tester.put(&[(1000, Some(100))]).await;
tester.flush(Some(true)).await;
tester.reopen().await;
let i = tester
.base()
.region
.inner
.shared
.version_control
.committed_sequence();
// we wrote a request and flushed the region (involving writing a manifest), thus
// committed_sequence should be 2.
assert_eq!(2, i);
}
#[tokio::test]
async fn test_flush_empty() {
let dir = create_temp_dir("flush-empty");

View File

@@ -308,9 +308,9 @@ impl RegionWriter {
let prev_version = version_control.current_manifest_version();
action_list.set_prev_version(prev_version);
logging::debug!(
logging::info!(
"Try to remove region {}, action_list: {:?}",
drop_ctx.shared.name(),
drop_ctx.shared.id(),
action_list
);
@@ -319,10 +319,19 @@ impl RegionWriter {
// Mark all data obsolete and delete the namespace in the WAL
drop_ctx.wal.obsolete(committed_sequence).await?;
drop_ctx.wal.delete_namespace().await?;
logging::info!(
"Remove WAL entries in region: {}, committed sequence: {}",
drop_ctx.shared.id(),
committed_sequence
);
// Mark all SSTs deleted
let files = current_version.ssts().mark_all_files_deleted();
logging::debug!("Try to remove all SSTs {:?}", files);
logging::info!(
"Try to remove all SSTs, region: {}, files: {:?}",
drop_ctx.shared.id(),
files
);
Ok(())
}
@@ -539,15 +548,10 @@ impl WriterInner {
}
}
if let Some(payload) = payload {
num_requests += 1;
// Note that memtables of `Version` may be updated during replay.
let version = version_control.current();
if req_sequence > last_sequence {
last_sequence = req_sequence;
} else {
logging::error!(
if req_sequence > last_sequence {
last_sequence = req_sequence;
} else {
logging::error!(
"Sequence should not decrease during replay, found {} <= {}, \
region_id: {}, region_name: {}, flushed_sequence: {}, num_requests: {}",
req_sequence,
@@ -558,12 +562,17 @@ impl WriterInner {
num_requests,
);
error::SequenceNotMonotonicSnafu {
prev: last_sequence,
given: req_sequence,
}
.fail()?;
error::SequenceNotMonotonicSnafu {
prev: last_sequence,
given: req_sequence,
}
.fail()?;
}
if let Some(payload) = payload {
num_requests += 1;
// Note that memtables of `Version` may be updated during replay.
let version = version_control.current();
// TODO(yingwen): Trigger flush if the size of memtables reach the flush threshold to avoid
// out of memory during replay, but we need to do it carefully to avoid dead lock.
let mut inserter = Inserter::new(last_sequence);