From df31f0b9ecc138fccebe386b9e51a698a4c605f7 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 7 May 2025 08:54:35 +0800 Subject: [PATCH] fix: improve region migration error handling and optimize leader downgrade with lease check (#6026) * fix(meta): improve region migration error handling and lease management * chore: refine comments * chore: apply suggestions from CR * chore: apply suggestions from CR * feat: consume opening_region_guard --- src/meta-srv/src/error.rs | 11 ++- .../src/handler/keep_lease_handler.rs | 85 ++++++++++++++++ src/meta-srv/src/lease.rs | 26 ++++- src/meta-srv/src/metasrv/builder.rs | 1 + .../src/procedure/region_migration.rs | 77 ++++++++++----- .../downgrade_leader_region.rs | 96 +++++++++++++++++-- .../region_migration/migration_start.rs | 14 ++- .../procedure/region_migration/test_util.rs | 1 + src/meta-srv/src/region/supervisor.rs | 3 +- src/mito2/src/worker/handle_catchup.rs | 7 ++ src/mito2/src/worker/handle_manifest.rs | 5 +- 11 files changed, 284 insertions(+), 42 deletions(-) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index d6c4ef4208..050274f24c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -336,6 +336,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to downgrade region leader, region: {}", region_id))] + DowngradeLeader { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: BoxedError, + }, + #[snafu(display("Region's leader peer changed: {}", msg))] LeaderPeerChanged { msg: String, @@ -956,7 +965,7 @@ impl ErrorExt for Error { Error::StartTelemetryTask { source, .. } => source.status_code(), Error::NextSequence { source, .. } => source.status_code(), - + Error::DowngradeLeader { source, .. } => source.status_code(), Error::RegisterProcedureLoader { source, .. } => source.status_code(), Error::SubmitDdlTask { source, .. } => source.status_code(), Error::ConvertProtoData { source, .. } diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index 553963aaa8..e57e0307f0 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -113,3 +113,88 @@ async fn put_into_memory_store(ctx: &mut Context, key: Vec, value: Vec, warn!(err; "Failed to update lease KV, peer: {peer:?}"); } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::meta::RequestHeader; + use common_meta::cache_invalidator::DummyCacheInvalidator; + use common_meta::datanode::Stat; + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::region_registry::LeaderRegionRegistry; + use common_meta::sequence::SequenceBuilder; + + use super::*; + use crate::cluster::MetaPeerClientBuilder; + use crate::handler::{HeartbeatMailbox, Pushers}; + use crate::lease::find_datanode_lease_value; + use crate::service::store::cached_kv::LeaderCachedKvBackend; + + #[tokio::test] + async fn test_put_into_memory_store() { + let in_memory = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( + kv_backend.clone(), + )); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); + let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); + let ctx = Context { + server_addr: "127.0.0.1:0000".to_string(), + in_memory, + kv_backend: kv_backend.clone(), + leader_cached_kv_backend, + meta_peer_client, + mailbox, + election: None, + is_infancy: false, + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + cache_invalidator: Arc::new(DummyCacheInvalidator), + leader_region_registry: Arc::new(LeaderRegionRegistry::new()), + }; + + let handler = DatanodeKeepLeaseHandler; + handle_request_many_times(ctx.clone(), &handler, 1).await; + + let lease_value = find_datanode_lease_value(1, &ctx.in_memory) + .await + .unwrap() + .unwrap(); + assert_eq!(lease_value.node_addr, "127.0.0.1:1"); + assert!(lease_value.timestamp_millis != 0); + } + + async fn handle_request_many_times( + mut ctx: Context, + handler: &DatanodeKeepLeaseHandler, + loop_times: i32, + ) { + let req = HeartbeatRequest { + header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())), + peer: Some(Peer::new(1, "127.0.0.1:1")), + ..Default::default() + }; + + for i in 1..=loop_times { + let mut acc = HeartbeatAccumulator { + stat: Some(Stat { + id: 101, + region_num: i as _, + ..Default::default() + }), + ..Default::default() + }; + handler.handle(&req, &mut ctx, &mut acc).await.unwrap(); + } + } +} diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 063c5233c7..53f1ecd8ed 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -16,14 +16,14 @@ use std::collections::HashMap; use std::hash::Hash; use common_error::ext::BoxedError; -use common_meta::kv_backend::KvBackend; +use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef}; use common_meta::peer::{Peer, PeerLookupService}; use common_meta::{util, DatanodeId, FlownodeId}; use common_time::util as time_util; use snafu::ResultExt; use crate::cluster::MetaPeerClientRef; -use crate::error::{Error, Result}; +use crate::error::{Error, KvBackendSnafu, Result}; use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue}; fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool { @@ -33,6 +33,28 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool { } } +/// Returns the lease value of the given datanode id, if the datanode is not found, returns None. +pub async fn find_datanode_lease_value( + datanode_id: DatanodeId, + in_memory_key: &ResettableKvBackendRef, +) -> Result> { + let lease_key = DatanodeLeaseKey { + node_id: datanode_id, + }; + let lease_key_bytes: Vec = lease_key.try_into()?; + let Some(kv) = in_memory_key + .get(&lease_key_bytes) + .await + .context(KvBackendSnafu)? + else { + return Ok(None); + }; + + let lease_value: LeaseValue = kv.value.try_into()?; + + Ok(Some(lease_value)) +} + /// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs` pub async fn lookup_datanode_peer( datanode_id: DatanodeId, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index dbc22ca2f7..85cccaff11 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -311,6 +311,7 @@ impl MetasrvBuilder { let region_migration_manager = Arc::new(RegionMigrationManager::new( procedure_manager.clone(), DefaultContextFactory::new( + in_memory.clone(), table_metadata_manager.clone(), memory_region_keeper.clone(), region_failure_detector_controller.clone(), diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index dd82a34589..ac87353818 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -37,6 +37,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; @@ -266,6 +267,7 @@ pub trait ContextFactory { #[derive(Clone)] pub struct DefaultContextFactory { volatile_ctx: VolatileContext, + in_memory_key: ResettableKvBackendRef, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, region_failure_detector_controller: RegionFailureDetectorControllerRef, @@ -277,6 +279,7 @@ pub struct DefaultContextFactory { impl DefaultContextFactory { /// Returns an [`DefaultContextFactory`]. pub fn new( + in_memory_key: ResettableKvBackendRef, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, region_failure_detector_controller: RegionFailureDetectorControllerRef, @@ -286,6 +289,7 @@ impl DefaultContextFactory { ) -> Self { Self { volatile_ctx: VolatileContext::default(), + in_memory_key, table_metadata_manager, opening_region_keeper, region_failure_detector_controller, @@ -301,6 +305,7 @@ impl ContextFactory for DefaultContextFactory { Context { persistent_ctx, volatile_ctx: self.volatile_ctx, + in_memory: self.in_memory_key, table_metadata_manager: self.table_metadata_manager, opening_region_keeper: self.opening_region_keeper, region_failure_detector_controller: self.region_failure_detector_controller, @@ -315,6 +320,7 @@ impl ContextFactory for DefaultContextFactory { pub struct Context { persistent_ctx: PersistentContext, volatile_ctx: VolatileContext, + in_memory: ResettableKvBackendRef, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, region_failure_detector_controller: RegionFailureDetectorControllerRef, @@ -417,8 +423,8 @@ impl Context { /// Notifies the RegionSupervisor to deregister failure detectors. /// - /// The original failure detectors was removed once the procedure was triggered. - /// However, the `from_peer` may still send the heartbeats contains the failed region. + /// The original failure detectors won't be removed once the procedure was triggered. + /// We need to deregister the failure detectors for the original region if the procedure is finished. pub async fn deregister_failure_detectors(&self) { let datanode_id = self.persistent_ctx.from_peer.id; let region_id = self.persistent_ctx.region_id; @@ -428,6 +434,19 @@ impl Context { .await; } + /// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer. + /// + /// The candidate region may be created on the destination peer, + /// so we need to deregister the failure detectors for the candidate region if the procedure is aborted. + pub async fn deregister_failure_detectors_for_candidate_region(&self) { + let to_peer_id = self.persistent_ctx.to_peer.id; + let region_id = self.persistent_ctx.region_id; + + self.region_failure_detector_controller + .deregister_failure_detectors(vec![(to_peer_id, region_id)]) + .await; + } + /// Removes the `table_route` of [VolatileContext], returns true if any. pub fn remove_table_route_value(&mut self) -> bool { let value = self.volatile_ctx.table_route.take(); @@ -674,30 +693,38 @@ impl Procedure for RegionMigrationProcedure { let _timer = METRIC_META_REGION_MIGRATION_EXECUTE .with_label_values(&[name]) .start_timer(); - let (next, status) = state.next(&mut self.context, ctx).await.map_err(|e| { - if e.is_retryable() { - METRIC_META_REGION_MIGRATION_ERROR - .with_label_values(&[name, "retryable"]) - .inc(); - ProcedureError::retry_later(e) - } else { - error!( - e; - "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}", - self.context.region_id(), - self.context.persistent_ctx.from_peer, - self.context.persistent_ctx.to_peer, - self.context.volatile_ctx.metrics, - ); - METRIC_META_REGION_MIGRATION_ERROR - .with_label_values(&[name, "external"]) - .inc(); - ProcedureError::external(e) + match state.next(&mut self.context, ctx).await { + Ok((next, status)) => { + *state = next; + Ok(status) } - })?; - - *state = next; - Ok(status) + Err(e) => { + if e.is_retryable() { + METRIC_META_REGION_MIGRATION_ERROR + .with_label_values(&[name, "retryable"]) + .inc(); + Err(ProcedureError::retry_later(e)) + } else { + // Consumes the opening region guard before deregistering the failure detectors. + self.context.volatile_ctx.opening_region_guard.take(); + self.context + .deregister_failure_detectors_for_candidate_region() + .await; + error!( + e; + "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}", + self.context.region_id(), + self.context.persistent_ctx.from_peer, + self.context.persistent_ctx.to_peer, + self.context.volatile_ctx.metrics, + ); + METRIC_META_REGION_MIGRATION_ERROR + .with_label_values(&[name, "external"]) + .inc(); + Err(ProcedureError::external(e)) + } + } + } } fn dump(&self) -> ProcedureResult { diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 56a28c7fb8..9ca3800456 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -16,18 +16,21 @@ use std::any::Any; use std::time::Duration; use api::v1::meta::MailboxMessage; +use common_error::ext::BoxedError; use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, }; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{error, info, warn}; +use common_time::util::current_time_millis; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use tokio::time::{sleep, Instant}; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; +use crate::lease::find_datanode_lease_value; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion; use crate::procedure::region_migration::{Context, State}; @@ -66,19 +69,32 @@ impl State for DowngradeLeaderRegion { match self.downgrade_region_with_retry(ctx).await { Ok(_) => { // Do nothing + info!( + "Downgraded region leader success, region: {}", + ctx.persistent_ctx.region_id + ); } Err(error::Error::ExceededDeadline { .. }) => { + info!( + "Downgrade region leader exceeded deadline, region: {}", + ctx.persistent_ctx.region_id + ); // Rollbacks the metadata if procedure is timeout return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false))); } Err(err) => { - error!(err; "Occurs non-retryable error"); + error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id); if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() { info!( - "Running into the downgrade leader slow path, sleep until {:?}", - deadline + "Running into the downgrade region leader slow path, region: {}, sleep until {:?}", + ctx.persistent_ctx.region_id, deadline ); tokio::time::sleep_until(*deadline).await; + } else { + warn!( + "Leader region lease deadline is not set, region: {}", + ctx.persistent_ctx.region_id + ); } } } @@ -220,6 +236,61 @@ impl DowngradeLeaderRegion { } } + async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) { + let leader = &ctx.persistent_ctx.from_peer; + + let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await { + Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis), + Err(err) => { + error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id); + return; + } + }; + + if let Some(last_connection_at) = last_connection_at { + let now = current_time_millis(); + let elapsed = now - last_connection_at; + let region_lease = Duration::from_secs(REGION_LEASE_SECS); + + // It's safe to update the region leader lease deadline here because: + // 1. The old region leader has already been marked as downgraded in metadata, + // which means any attempts to renew its lease will be rejected. + // 2. The pusher disconnect time record only gets removed when the datanode (from_peer) + // establishes a new heartbeat connection stream. + if elapsed >= (REGION_LEASE_SECS * 1000) as i64 { + ctx.volatile_ctx.reset_leader_region_lease_deadline(); + info!( + "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}", + leader, + last_connection_at, + region_lease, + ctx.persistent_ctx.region_id + ); + } else if elapsed > 0 { + // `now - last_connection_at` < REGION_LEASE_SECS * 1000 + let lease_timeout = + region_lease - Duration::from_millis((now - last_connection_at) as u64); + ctx.volatile_ctx.reset_leader_region_lease_deadline(); + ctx.volatile_ctx + .set_leader_region_lease_deadline(lease_timeout); + info!( + "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}", + leader, last_connection_at, elapsed, ctx.volatile_ctx.leader_region_lease_deadline, ctx.persistent_ctx.region_id + ); + } else { + warn!( + "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}", + leader, last_connection_at, now, ctx.persistent_ctx.region_id + ) + } + } else { + warn!( + "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}", + leader, ctx.persistent_ctx.region_id + ) + } + } + /// Downgrades a leader region. /// /// Fast path: @@ -240,13 +311,20 @@ impl DowngradeLeaderRegion { retry += 1; // Throws the error immediately if the procedure exceeded the deadline. if matches!(err, error::Error::ExceededDeadline { .. }) { + error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id); + return Err(err); + } else if matches!(err, error::Error::PusherNotFound { .. }) { + // Throws the error immediately if the datanode is unreachable. + error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id); + self.update_leader_region_lease_deadline(ctx).await; return Err(err); } else if err.is_retryable() && retry < self.optimistic_retry { - error!("Failed to downgrade region, error: {err:?}, retry later"); + error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id); sleep(self.retry_initial_interval).await; } else { - error!("Failed to downgrade region, error: {err:?}"); - break; + return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu { + region_id: ctx.persistent_ctx.region_id, + })?; } } else { ctx.update_operations_elapsed(timer); @@ -544,7 +622,11 @@ mod tests { ctx.volatile_ctx .set_leader_region_lease_deadline(Duration::from_secs(5)); let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap(); - state.downgrade_region_with_retry(&mut ctx).await.unwrap(); + let err = state + .downgrade_region_with_retry(&mut ctx) + .await + .unwrap_err(); + assert_matches!(err, error::Error::DowngradeLeader { .. }); assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None); // Should remain no change. assert_eq!( diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 0cf2e79061..07297f70aa 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -59,8 +60,16 @@ impl State for RegionMigrationStart { let from_peer = &ctx.persistent_ctx.from_peer; if self.has_migrated(®ion_route, to_peer)? { + info!( + "Region has been migrated, region: {:?}, to_peer: {:?}", + region_route.region.id, to_peer + ); Ok((Box::new(RegionMigrationEnd), Status::done())) } else if self.invalid_leader_peer(®ion_route, from_peer)? { + info!( + "Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}", + region_route.region.id, region_route.leader_peer, from_peer, + ); Ok(( Box::new(RegionMigrationAbort::new(&format!( "Invalid region leader peer: {from_peer:?}, expected: {:?}", @@ -152,7 +161,7 @@ impl RegionMigrationStart { fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result { let region_id = region_route.region.id; - let region_opened = region_route + let region_migrated = region_route .leader_peer .as_ref() .context(error::UnexpectedSnafu { @@ -160,8 +169,7 @@ impl RegionMigrationStart { })? .id == to_peer.id; - - Ok(region_opened) + Ok(region_migrated) } } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 54c07dc4f3..ae5b6736e4 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -121,6 +121,7 @@ impl TestingEnv { table_metadata_manager: self.table_metadata_manager.clone(), opening_region_keeper: self.opening_region_keeper.clone(), volatile_ctx: Default::default(), + in_memory_key: Arc::new(MemoryKvBackend::default()), mailbox: self.mailbox_ctx.mailbox().clone(), server_addr: self.server_addr.to_string(), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index fc4d606383..29867207cc 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -360,10 +360,9 @@ impl RegionSupervisor { Ok(false) => {} Ok(true) => { warn!( - "Skipping failover since maintenance mode is enabled. Removing failure detectors for regions: {:?}", + "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}", regions ); - self.deregister_failure_detectors(regions).await; return; } Err(err) => { diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index cc7be0f9a3..cc4c010456 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -51,6 +51,13 @@ impl RegionWorkerLoop { // Utilizes the short circuit evaluation. let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? { + if !is_empty_memtable { + warn!("Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}", + region.region_id, + region.manifest_ctx.manifest_version().await, + region.version_control.current().last_entry_id + ); + } self.reopen_region(®ion).await? } else { region diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 4fd0de0d7b..3edf3c326d 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -150,9 +150,10 @@ impl RegionWorkerLoop { }; let version = region.version(); if !version.memtables.is_empty() { + let current = region.version_control.current(); warn!( - "Region {} memtables is not empty, which should not happen, manifest version: {}", - region.region_id, manifest.manifest_version + "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}", + region.region_id, manifest.manifest_version, current.last_entry_id ); } let region_options = version.options.clone();