fix: handle corner case in catchup where compacted entry id exceeds region last entry id (#6312)

* fix(mito2): handle corner case in catchup where compacted entry id exceeds region last entry id

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-06-16 14:36:31 +08:00
committed by GitHub
parent f4f8d65a39
commit 10bf9b11f6
6 changed files with 230 additions and 7 deletions

View File

@@ -158,7 +158,7 @@ impl RaftEngineLogStore {
.context(StartWalTaskSnafu { name: "sync_task" })
}
fn span(&self, provider: &RaftEngineProvider) -> (Option<u64>, Option<u64>) {
pub fn span(&self, provider: &RaftEngineProvider) -> (Option<u64>, Option<u64>) {
(
self.engine.first_index(provider.id),
self.engine.last_index(provider.id),

View File

@@ -22,15 +22,20 @@ use common_recordbatch::RecordBatches;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::provider::RaftEngineProvider;
use store_api::region_engine::{RegionEngine, RegionRole, SetRegionRoleStateResponse};
use store_api::region_request::{RegionCatchupRequest, RegionOpenRequest, RegionRequest};
use store_api::region_request::{
RegionCatchupRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::error::{self, Error};
use crate::test_util::{
build_rows, flush_region, kafka_log_store_factory, prepare_test_for_kafka_log_store, put_rows,
rows_schema, single_kafka_log_store_factory, CreateRequestBuilder, LogStoreFactory, TestEnv,
raft_engine_log_store_factory, rows_schema, single_kafka_log_store_factory,
single_raft_engine_log_store_factory, CreateRequestBuilder, LogStoreFactory, TestEnv,
};
use crate::wal::EntryId;
@@ -496,6 +501,189 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
assert!(region.is_writable());
}
async fn close_region(engine: &MitoEngine, region_id: RegionId) {
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
}
async fn open_region(
engine: &MitoEngine,
region_id: RegionId,
region_dir: String,
skip_wal_replay: bool,
) {
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: region_dir.clone(),
options: HashMap::new(),
skip_wal_replay,
}),
)
.await
.unwrap();
}
async fn scan_region(engine: &MitoEngine, region_id: RegionId) -> RecordBatches {
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
RecordBatches::try_collect(stream).await.unwrap()
}
#[apply(single_raft_engine_log_store_factory)]
async fn test_local_catchup(factory: Option<LogStoreFactory>) {
use store_api::region_engine::SettableRegionRoleState;
use crate::test_util::LogStoreImpl;
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("local_catchup").with_log_store_factory(factory.clone());
let leader_engine = env.create_engine(MitoConfig::default()).await;
let Some(LogStoreImpl::RaftEngine(log_store)) = env.get_log_store() else {
unreachable!()
};
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
leader_engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&leader_engine, region_id, rows).await;
flush_region(&leader_engine, region_id, None).await;
// Ensure the last entry id is 1.
let resp = leader_engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
.unwrap();
let last_entry_id = get_last_entry_id(resp);
assert_eq!(last_entry_id.unwrap(), 1);
// Close the region, and open it again.
close_region(&leader_engine, region_id).await;
open_region(&leader_engine, region_id, region_dir.clone(), false).await;
// Set the region to leader.
leader_engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
// Write more rows
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(4, 7),
};
put_rows(&leader_engine, region_id, rows).await;
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(8, 9),
};
put_rows(&leader_engine, region_id, rows).await;
let resp = leader_engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
.unwrap();
let last_entry_id = get_last_entry_id(resp);
assert_eq!(last_entry_id.unwrap(), 3);
close_region(&leader_engine, region_id).await;
// Reopen the region, and skip the wal replay.
leader_engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: region_dir.clone(),
options: HashMap::new(),
skip_wal_replay: true,
}),
)
.await
.unwrap();
// The last entry id should be 1.
let region = leader_engine.get_region(region_id).unwrap();
assert_eq!(region.version_control.current().last_entry_id, 1);
// There are 2 entries in the log store.
let (start, end) = log_store.span(&RaftEngineProvider::new(region_id.into()));
assert_eq!(start.unwrap(), 2);
assert_eq!(end.unwrap(), 3);
// Try to catchup the region.
let resp = leader_engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
}),
)
.await;
assert!(resp.is_ok());
// After catchup, the last entry id should be 3.
let region = leader_engine.get_region(region_id).unwrap();
assert_eq!(region.version_control.current().last_entry_id, 3);
// The log store has been obsoleted these 2 entries.
let (start, end) = log_store.span(&RaftEngineProvider::new(region_id.into()));
assert_eq!(start, None);
assert_eq!(end, None);
// For local WAL, entries are not replayed during catchup.
// Therefore, any rows that were not flushed before closing the region will not be visible.
let batches = scan_region(&leader_engine, region_id).await;
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
// Write more rows
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(4, 7),
};
put_rows(&leader_engine, region_id, rows).await;
let batches = scan_region(&leader_engine, region_id).await;
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_catchup_not_exist() {
let mut env = TestEnv::new();

View File

@@ -434,8 +434,8 @@ impl RegionOpener {
.await?;
} else {
info!(
"Skip the WAL replay for region: {}, manifest version: {}",
region_id, manifest.manifest_version
"Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
region_id, manifest.manifest_version, flushed_entry_id
);
}
let now = self.time_provider.current_time_millis();

View File

@@ -78,6 +78,12 @@ impl VersionControl {
data.last_entry_id = entry_id;
}
/// Updates last entry id.
pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
let mut data = self.data.write().unwrap();
data.last_entry_id = entry_id;
}
/// Sequence number of last committed data.
pub(crate) fn committed_sequence(&self) -> SequenceNumber {
self.data.read().unwrap().committed_sequence

View File

@@ -122,6 +122,12 @@ pub(crate) fn multiple_log_store_factories(#[case] factory: Option<LogStoreFacto
#[tokio::test]
pub(crate) fn single_kafka_log_store_factory(#[case] factory: Option<LogStoreFactory>) {}
#[template]
#[rstest]
#[case::with_raft_engine(raft_engine_log_store_factory())]
#[tokio::test]
pub(crate) fn single_raft_engine_log_store_factory(#[case] factory: Option<LogStoreFactory>) {}
#[derive(Clone)]
pub(crate) struct RaftEngineLogStoreFactory;
@@ -692,6 +698,10 @@ impl TestEnv {
pub fn get_kv_backend(&self) -> KvBackendRef {
self.kv_backend.clone()
}
pub(crate) fn get_log_store(&self) -> Option<LogStoreImpl> {
self.log_store.as_ref().cloned()
}
}
/// Builder to mock a [RegionCreateRequest].

View File

@@ -98,8 +98,27 @@ impl<S: LogStore> RegionWorkerLoop<S> {
)
}
} else {
warn!("Skips to replay memtable for region: {}", region.region_id);
let flushed_entry_id = region.version_control.current().last_entry_id;
let version = region.version_control.current();
let mut flushed_entry_id = version.last_entry_id;
let high_watermark = self
.wal
.store()
.high_watermark(&region.provider)
.unwrap_or_default();
warn!(
"Skips to replay memtable for region: {}, flushed entry id: {}, high watermark: {}",
region.region_id, flushed_entry_id, high_watermark
);
if high_watermark > flushed_entry_id {
warn!(
"Found high watermark is greater than flushed entry id, using high watermark as flushed entry id, region: {}, high watermark: {}, flushed entry id: {}",
region_id, high_watermark, flushed_entry_id
);
flushed_entry_id = high_watermark;
region.version_control.set_entry_id(flushed_entry_id);
}
let on_region_opened = self.wal.on_region_opened();
on_region_opened(region_id, flushed_entry_id, &region.provider).await?;
}