diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 646798c4b0..b234d8ce0e 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -67,6 +67,19 @@ pub enum Error { source: raft_engine::Error, location: Location, }, + + #[snafu(display( + "Cannot override compacted entry, namespace: {}, first index: {}, attempt index: {}", + namespace, + first_index, + attempt_index + ))] + OverrideCompactedEntry { + namespace: u64, + first_index: u64, + attempt_index: u64, + location: Location, + }, } impl ErrorExt for Error { diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 0ce32d6eb2..1c952680df 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -26,6 +26,7 @@ use store_api::logstore::namespace::Namespace as NamespaceTrait; use store_api::logstore::{AppendResponse, LogStore}; use crate::config::LogConfig; +use crate::error; use crate::error::{ AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, RaftEngineSnafu, StartGcTaskSnafu, StopGcTaskSnafu, @@ -107,6 +108,13 @@ impl RaftEngineLogStore { .start(common_runtime::bg_runtime()) .context(StartGcTaskSnafu) } + + fn span(&self, namespace: &::Namespace) -> (Option, Option) { + ( + self.engine.first_index(namespace.id()), + self.engine.last_index(namespace.id()), + ) + } } impl Debug for RaftEngineLogStore { @@ -132,11 +140,23 @@ impl LogStore for RaftEngineLogStore { async fn append(&self, e: Self::Entry) -> Result { ensure!(self.started(), IllegalStateSnafu); let entry_id = e.id; + let namespace_id = e.namespace_id; let mut batch = LogBatch::with_capacity(1); batch - .add_entries::(e.namespace_id, &[e]) + .add_entries::(namespace_id, &[e]) .context(AddEntryLogBatchSnafu)?; + if let Some(first_index) = self.engine.first_index(namespace_id) { + ensure!( + entry_id >= first_index, + error::OverrideCompactedEntrySnafu { + namespace: namespace_id, + first_index, + attempt_index: entry_id, + } + ); + } + self.engine .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; @@ -151,11 +171,38 @@ impl LogStore for RaftEngineLogStore { entries: Vec, ) -> Result, Self::Error> { ensure!(self.started(), IllegalStateSnafu); - let entry_ids = entries.iter().map(Entry::get_id).collect::>(); + if entries.is_empty() { + return Ok(vec![]); + } + + let mut min_entry_id = u64::MAX; + let entry_ids = entries + .iter() + .map(|e| { + let id = e.get_id(); + if id < min_entry_id { + min_entry_id = id; + } + id + }) + .collect::>(); + let mut batch = LogBatch::with_capacity(entries.len()); batch .add_entries::(ns.id, &entries) .context(AddEntryLogBatchSnafu)?; + + if let Some(first_index) = self.engine.first_index(ns.id) { + ensure!( + min_entry_id >= first_index, + error::OverrideCompactedEntrySnafu { + namespace: ns.id, + first_index, + attempt_index: min_entry_id, + } + ); + } + self.engine .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; @@ -175,6 +222,12 @@ impl LogStore for RaftEngineLogStore { let last_index = engine.last_index(ns.id).unwrap_or(0); let mut start_index = id.max(engine.first_index(ns.id).unwrap_or(last_index + 1)); + info!( + "Read logstore, namespace: {}, start: {}, span: {:?}", + ns.id(), + id, + self.span(ns) + ); let max_batch_size = self.config.read_batch_size; let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size); let ns = ns.clone(); @@ -290,9 +343,11 @@ impl LogStore for RaftEngineLogStore { ensure!(self.started(), IllegalStateSnafu); let obsoleted = self.engine.compact_to(namespace.id(), id + 1); info!( - "Namespace {} obsoleted {} entries", + "Namespace {} obsoleted {} entries, compacted index: {}, span: {:?}", namespace.id(), - obsoleted + obsoleted, + id, + self.span(&namespace) ); Ok(()) } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 5914e86644..c96e71c528 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use common_telemetry::logging; +use common_telemetry::{info, logging}; use common_time::util; use metrics::{decrement_gauge, increment_gauge}; use snafu::ResultExt; @@ -319,6 +319,12 @@ impl RegionImpl { let wal = Wal::new(metadata.id(), store_config.log_store); wal.obsolete(flushed_sequence).await?; + info!( + "Obsolete WAL entries on startup, region: {}, flushed sequence: {}", + metadata.id(), + flushed_sequence + ); + let shared = Arc::new(SharedData { id: metadata.id(), name, diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 4033b3d538..860b81f278 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -152,6 +152,30 @@ async fn test_manual_flush() { assert!(has_parquet_file(&sst_dir)); } +#[tokio::test] +async fn test_flush_and_reopen() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("manual_flush"); + let store_dir = dir.path().to_str().unwrap(); + let flush_switch = Arc::new(FlushSwitch::default()); + let mut tester = FlushTester::new(store_dir, flush_switch.clone()).await; + + tester.put(&[(1000, Some(100))]).await; + tester.flush(Some(true)).await; + tester.reopen().await; + let i = tester + .base() + .region + .inner + .shared + .version_control + .committed_sequence(); + + // we wrote a request and flushed the region (involving writing a manifest), thus + // committed_sequence should be 2. + assert_eq!(2, i); +} + #[tokio::test] async fn test_flush_empty() { let dir = create_temp_dir("flush-empty"); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 52eb84cbba..52acbe3ce0 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -308,9 +308,9 @@ impl RegionWriter { let prev_version = version_control.current_manifest_version(); action_list.set_prev_version(prev_version); - logging::debug!( + logging::info!( "Try to remove region {}, action_list: {:?}", - drop_ctx.shared.name(), + drop_ctx.shared.id(), action_list ); @@ -319,10 +319,19 @@ impl RegionWriter { // Mark all data obsolete and delete the namespace in the WAL drop_ctx.wal.obsolete(committed_sequence).await?; drop_ctx.wal.delete_namespace().await?; + logging::info!( + "Remove WAL entries in region: {}, committed sequence: {}", + drop_ctx.shared.id(), + committed_sequence + ); // Mark all SSTs deleted let files = current_version.ssts().mark_all_files_deleted(); - logging::debug!("Try to remove all SSTs {:?}", files); + logging::info!( + "Try to remove all SSTs, region: {}, files: {:?}", + drop_ctx.shared.id(), + files + ); Ok(()) } @@ -539,15 +548,10 @@ impl WriterInner { } } - if let Some(payload) = payload { - num_requests += 1; - // Note that memtables of `Version` may be updated during replay. - let version = version_control.current(); - - if req_sequence > last_sequence { - last_sequence = req_sequence; - } else { - logging::error!( + if req_sequence > last_sequence { + last_sequence = req_sequence; + } else { + logging::error!( "Sequence should not decrease during replay, found {} <= {}, \ region_id: {}, region_name: {}, flushed_sequence: {}, num_requests: {}", req_sequence, @@ -558,12 +562,17 @@ impl WriterInner { num_requests, ); - error::SequenceNotMonotonicSnafu { - prev: last_sequence, - given: req_sequence, - } - .fail()?; + error::SequenceNotMonotonicSnafu { + prev: last_sequence, + given: req_sequence, } + .fail()?; + } + + if let Some(payload) = payload { + num_requests += 1; + // Note that memtables of `Version` may be updated during replay. + let version = version_control.current(); // TODO(yingwen): Trigger flush if the size of memtables reach the flush threshold to avoid // out of memory during replay, but we need to do it carefully to avoid dead lock. let mut inserter = Inserter::new(last_sequence);