diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 396d6f5735..673b2075c7 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -158,7 +158,7 @@ impl RaftEngineLogStore { .context(StartWalTaskSnafu { name: "sync_task" }) } - fn span(&self, provider: &RaftEngineProvider) -> (Option, Option) { + pub fn span(&self, provider: &RaftEngineProvider) -> (Option, Option) { ( self.engine.first_index(provider.id), self.engine.last_index(provider.id), diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index 8a24fecf3a..bb1c949d81 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -22,15 +22,20 @@ use common_recordbatch::RecordBatches; use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use rstest::rstest; use rstest_reuse::{self, apply}; +use store_api::logstore::provider::RaftEngineProvider; use store_api::region_engine::{RegionEngine, RegionRole, SetRegionRoleStateResponse}; -use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest}; +use store_api::region_request::{ + RegionCatchupRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest, +}; use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; +use crate::engine::MitoEngine; use crate::error::{self, Error}; use crate::test_util::{ build_rows, flush_region, kafka_log_store_factory, prepare_test_for_kafka_log_store, put_rows, - rows_schema, single_kafka_log_store_factory, CreateRequestBuilder, LogStoreFactory, TestEnv, + raft_engine_log_store_factory, rows_schema, single_kafka_log_store_factory, + single_raft_engine_log_store_factory, CreateRequestBuilder, LogStoreFactory, TestEnv, }; use crate::wal::EntryId; @@ -496,6 +501,189 @@ async fn test_catchup_with_manifest_update(factory: Option) { assert!(region.is_writable()); } +async fn close_region(engine: &MitoEngine, region_id: RegionId) { + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); +} + +async fn open_region( + engine: &MitoEngine, + region_id: RegionId, + region_dir: String, + skip_wal_replay: bool, +) { + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: region_dir.clone(), + options: HashMap::new(), + skip_wal_replay, + }), + ) + .await + .unwrap(); +} + +async fn scan_region(engine: &MitoEngine, region_id: RegionId) -> RecordBatches { + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + RecordBatches::try_collect(stream).await.unwrap() +} + +#[apply(single_raft_engine_log_store_factory)] +async fn test_local_catchup(factory: Option) { + use store_api::region_engine::SettableRegionRoleState; + + use crate::test_util::LogStoreImpl; + + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + + let mut env = TestEnv::with_prefix("local_catchup").with_log_store_factory(factory.clone()); + let leader_engine = env.create_engine(MitoConfig::default()).await; + let Some(LogStoreImpl::RaftEngine(log_store)) = env.get_log_store() else { + unreachable!() + }; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + + let column_schemas = rows_schema(&request); + leader_engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&leader_engine, region_id, rows).await; + flush_region(&leader_engine, region_id, None).await; + + // Ensure the last entry id is 1. + let resp = leader_engine + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower) + .await + .unwrap(); + let last_entry_id = get_last_entry_id(resp); + assert_eq!(last_entry_id.unwrap(), 1); + + // Close the region, and open it again. + close_region(&leader_engine, region_id).await; + open_region(&leader_engine, region_id, region_dir.clone(), false).await; + + // Set the region to leader. + leader_engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); + + // Write more rows + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(4, 7), + }; + put_rows(&leader_engine, region_id, rows).await; + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(8, 9), + }; + put_rows(&leader_engine, region_id, rows).await; + let resp = leader_engine + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower) + .await + .unwrap(); + let last_entry_id = get_last_entry_id(resp); + assert_eq!(last_entry_id.unwrap(), 3); + + close_region(&leader_engine, region_id).await; + // Reopen the region, and skip the wal replay. + leader_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir: region_dir.clone(), + options: HashMap::new(), + skip_wal_replay: true, + }), + ) + .await + .unwrap(); + + // The last entry id should be 1. + let region = leader_engine.get_region(region_id).unwrap(); + assert_eq!(region.version_control.current().last_entry_id, 1); + + // There are 2 entries in the log store. + let (start, end) = log_store.span(&RaftEngineProvider::new(region_id.into())); + assert_eq!(start.unwrap(), 2); + assert_eq!(end.unwrap(), 3); + + // Try to catchup the region. + let resp = leader_engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: None, + metadata_entry_id: None, + location_id: None, + }), + ) + .await; + assert!(resp.is_ok()); + // After catchup, the last entry id should be 3. + let region = leader_engine.get_region(region_id).unwrap(); + assert_eq!(region.version_control.current().last_entry_id, 3); + + // The log store has been obsoleted these 2 entries. + let (start, end) = log_store.span(&RaftEngineProvider::new(region_id.into())); + assert_eq!(start, None); + assert_eq!(end, None); + + // For local WAL, entries are not replayed during catchup. + // Therefore, any rows that were not flushed before closing the region will not be visible. + let batches = scan_region(&leader_engine, region_id).await; + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); + + // Write more rows + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(4, 7), + }; + put_rows(&leader_engine, region_id, rows).await; + + let batches = scan_region(&leader_engine, region_id).await; + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + #[tokio::test] async fn test_catchup_not_exist() { let mut env = TestEnv::new(); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index f32292d17e..50e4537550 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -434,8 +434,8 @@ impl RegionOpener { .await?; } else { info!( - "Skip the WAL replay for region: {}, manifest version: {}", - region_id, manifest.manifest_version + "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}", + region_id, manifest.manifest_version, flushed_entry_id ); } let now = self.time_provider.current_time_millis(); diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index b40c11ee5b..c1198e4d16 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -78,6 +78,12 @@ impl VersionControl { data.last_entry_id = entry_id; } + /// Updates last entry id. + pub(crate) fn set_entry_id(&self, entry_id: EntryId) { + let mut data = self.data.write().unwrap(); + data.last_entry_id = entry_id; + } + /// Sequence number of last committed data. pub(crate) fn committed_sequence(&self) -> SequenceNumber { self.data.read().unwrap().committed_sequence diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 6833e807ed..7a8327c8de 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -122,6 +122,12 @@ pub(crate) fn multiple_log_store_factories(#[case] factory: Option) {} +#[template] +#[rstest] +#[case::with_raft_engine(raft_engine_log_store_factory())] +#[tokio::test] +pub(crate) fn single_raft_engine_log_store_factory(#[case] factory: Option) {} + #[derive(Clone)] pub(crate) struct RaftEngineLogStoreFactory; @@ -692,6 +698,10 @@ impl TestEnv { pub fn get_kv_backend(&self) -> KvBackendRef { self.kv_backend.clone() } + + pub(crate) fn get_log_store(&self) -> Option { + self.log_store.as_ref().cloned() + } } /// Builder to mock a [RegionCreateRequest]. diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index cc4c010456..095c769452 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -98,8 +98,27 @@ impl RegionWorkerLoop { ) } } else { - warn!("Skips to replay memtable for region: {}", region.region_id); - let flushed_entry_id = region.version_control.current().last_entry_id; + let version = region.version_control.current(); + let mut flushed_entry_id = version.last_entry_id; + + let high_watermark = self + .wal + .store() + .high_watermark(®ion.provider) + .unwrap_or_default(); + warn!( + "Skips to replay memtable for region: {}, flushed entry id: {}, high watermark: {}", + region.region_id, flushed_entry_id, high_watermark + ); + + if high_watermark > flushed_entry_id { + warn!( + "Found high watermark is greater than flushed entry id, using high watermark as flushed entry id, region: {}, high watermark: {}, flushed entry id: {}", + region_id, high_watermark, flushed_entry_id + ); + flushed_entry_id = high_watermark; + region.version_control.set_entry_id(flushed_entry_id); + } let on_region_opened = self.wal.on_region_opened(); on_region_opened(region_id, flushed_entry_id, ®ion.provider).await?; }