diff --git a/Cargo.lock b/Cargo.lock index 130eaaeb61..198f009fea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7504,6 +7504,7 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "common-wal", "datafusion", "datatypes", "futures-util", diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index e25b0e350d..b9e34d9230 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -250,7 +250,7 @@ pub struct UpgradeRegion { /// `None` stands for no wait, /// it's helpful to verify whether the leader region is ready. #[serde(with = "humantime_serde")] - pub replay_timeout: Option, + pub replay_timeout: Duration, /// The hint for replaying memtable. #[serde(default)] pub location_id: Option, @@ -797,7 +797,7 @@ mod tests { region_id: RegionId::new(1024, 1), last_entry_id: None, metadata_last_entry_id: None, - replay_timeout: Some(Duration::from_millis(1000)), + replay_timeout: Duration::from_millis(1000), location_id: None, replay_entry_id: None, metadata_replay_entry_id: None, @@ -892,7 +892,7 @@ mod tests { region_id: RegionId::new(1024, 1), last_entry_id: None, metadata_last_entry_id: None, - replay_timeout: Some(Duration::from_millis(1000)), + replay_timeout: Duration::from_millis(1000), location_id: None, replay_entry_id: None, metadata_replay_entry_id: None, diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index bb81fc1c6b..8954513653 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -44,7 +44,6 @@ use crate::region_server::RegionServer; #[derive(Clone)] pub struct RegionHeartbeatResponseHandler { region_server: RegionServer, - catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, flush_tasks: TaskTracker<()>, open_region_parallelism: usize, @@ -64,7 +63,6 @@ pub trait InstructionHandler: Send + Sync { #[derive(Clone)] pub struct HandlerContext { region_server: RegionServer, - catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, flush_tasks: TaskTracker<()>, gc_tasks: TaskTracker, @@ -75,7 +73,6 @@ impl HandlerContext { pub fn new_for_test(region_server: RegionServer) -> Self { Self { region_server, - catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), flush_tasks: TaskTracker::new(), gc_tasks: TaskTracker::new(), @@ -88,7 +85,6 @@ impl RegionHeartbeatResponseHandler { pub fn new(region_server: RegionServer) -> Self { Self { region_server, - catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), flush_tasks: TaskTracker::new(), // Default to half of the number of CPUs. @@ -114,7 +110,12 @@ impl RegionHeartbeatResponseHandler { )), Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())), Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())), - Instruction::UpgradeRegions(_) => Ok(Box::new(UpgradeRegionsHandler.into())), + Instruction::UpgradeRegions(_) => Ok(Box::new( + UpgradeRegionsHandler { + upgrade_region_parallelism: self.open_region_parallelism, + } + .into(), + )), Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())), Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), @@ -216,7 +217,6 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let mailbox = ctx.mailbox.clone(); let region_server = self.region_server.clone(); - let catchup_tasks = self.catchup_tasks.clone(); let downgrade_tasks = self.downgrade_tasks.clone(); let flush_tasks = self.flush_tasks.clone(); let gc_tasks = self.gc_tasks.clone(); @@ -226,7 +226,6 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { .handle( &HandlerContext { region_server, - catchup_tasks, downgrade_tasks, flush_tasks, gc_tasks, diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index 4271a7ed30..d89d0d08b2 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -12,18 +12,78 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; use common_meta::instruction::{ InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply, }; -use common_telemetry::{info, warn}; -use futures::future::join_all; -use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint}; +use common_telemetry::{debug, info, warn}; +use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint}; +use store_api::storage::RegionId; +use crate::error::Result; use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; -use crate::heartbeat::task_tracker::WaitResult; #[derive(Debug, Clone, Copy, Default)] -pub struct UpgradeRegionsHandler; +pub struct UpgradeRegionsHandler { + pub upgrade_region_parallelism: usize, +} + +#[cfg(test)] +impl UpgradeRegionsHandler { + fn new_test() -> UpgradeRegionsHandler { + UpgradeRegionsHandler { + upgrade_region_parallelism: 8, + } + } +} + +impl UpgradeRegionsHandler { + fn convert_responses_to_replies( + responses: Result)>>, + catchup_regions: &[RegionId], + ) -> Vec { + match responses { + Ok(responses) => responses + .into_iter() + .map(|(region_id, result)| match result { + Ok(()) => UpgradeRegionReply { + region_id, + ready: true, + exists: true, + error: None, + }, + Err(err) => { + if err.status_code() == StatusCode::RegionNotFound { + UpgradeRegionReply { + region_id, + ready: false, + exists: false, + error: Some(format!("{err:?}")), + } + } else { + UpgradeRegionReply { + region_id, + ready: false, + exists: true, + error: Some(format!("{err:?}")), + } + } + } + }) + .collect::>(), + Err(err) => catchup_regions + .iter() + .map(|region_id| UpgradeRegionReply { + region_id: *region_id, + ready: false, + exists: true, + error: Some(format!("{err:?}")), + }) + .collect::>(), + } + } +} impl UpgradeRegionsHandler { // Handles upgrade regions instruction. @@ -34,12 +94,17 @@ impl UpgradeRegionsHandler { ctx: &HandlerContext, upgrade_regions: Vec, ) -> Vec { - let mut replies = Vec::with_capacity(upgrade_regions.len()); - let mut catchup_request = Vec::with_capacity(upgrade_regions.len()); + let num_upgrade_regions = upgrade_regions.len(); + let mut replies = Vec::with_capacity(num_upgrade_regions); + let mut catchup_requests = Vec::with_capacity(num_upgrade_regions); + let mut catchup_regions = Vec::with_capacity(num_upgrade_regions); + let mut timeout = None; for upgrade_region in upgrade_regions { let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id) else { + // Region is not found. + debug!("Region {} is not found", upgrade_region.region_id); replies.push(UpgradeRegionReply { region_id: upgrade_region.region_id, ready: false, @@ -49,7 +114,12 @@ impl UpgradeRegionsHandler { continue; }; + // Ignores the catchup requests for writable regions. if writable { + warn!( + "Region {} is writable, ignores the catchup request", + upgrade_region.region_id + ); replies.push(UpgradeRegionReply { region_id: upgrade_region.region_id, ready: true, @@ -66,6 +136,15 @@ impl UpgradeRegionsHandler { replay_timeout, .. } = upgrade_region; + match timeout { + Some(timeout) => { + debug_assert_eq!(timeout, replay_timeout); + } + None => { + // TODO(weny): required the replay_timeout. + timeout = Some(replay_timeout); + } + } let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint { @@ -75,9 +154,9 @@ impl UpgradeRegionsHandler { _ => None, }; - catchup_request.push(( + catchup_regions.push(upgrade_region.region_id); + catchup_requests.push(( upgrade_region.region_id, - replay_timeout.unwrap_or_default(), RegionCatchupRequest { set_writable: true, entry_id: last_entry_id, @@ -89,69 +168,35 @@ impl UpgradeRegionsHandler { } } - let mut wait_results = Vec::with_capacity(catchup_request.len()); + let Some(timeout) = timeout else { + // No replay timeout, so we don't need to catchup the regions. + info!("All regions are writable, no need to catchup"); + debug_assert_eq!(replies.len(), num_upgrade_regions); + return replies; + }; - for (region_id, replay_timeout, catchup_request) in catchup_request { - let region_server_moved = ctx.region_server.clone(); - // TODO(weny): parallelize the catchup tasks. - let result = ctx - .catchup_tasks - .try_register( - region_id, - Box::pin(async move { - info!( - "Executing region: {region_id} catchup to: last entry id {:?}", - catchup_request.entry_id - ); - region_server_moved - .handle_request(region_id, RegionRequest::Catchup(catchup_request)) - .await?; - Ok(()) - }), - ) - .await; - - if result.is_busy() { - warn!("Another catchup task is running for the region: {region_id}"); + match tokio::time::timeout( + timeout, + ctx.region_server + .handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests), + ) + .await + { + Ok(responses) => { + replies.extend( + Self::convert_responses_to_replies(responses, &catchup_regions).into_iter(), + ); + } + Err(_) => { + replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply { + region_id: *region_id, + ready: false, + exists: true, + error: None, + })); } - - // We don't care that it returns a newly registered or running task. - let mut watcher = result.into_watcher(); - wait_results.push(( - region_id, - ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await, - )); } - let results = join_all( - wait_results - .into_iter() - .map(|(region_id, result)| async move { - match result { - WaitResult::Timeout => UpgradeRegionReply { - region_id, - ready: false, - exists: true, - error: None, - }, - WaitResult::Finish(Ok(_)) => UpgradeRegionReply { - region_id, - ready: true, - exists: true, - error: None, - }, - WaitResult::Finish(Err(err)) => UpgradeRegionReply { - region_id, - ready: false, - exists: true, - error: Some(format!("{err:?}")), - }, - } - }), - ) - .await; - - replies.extend(results.into_iter()); replies } } @@ -181,7 +226,6 @@ mod tests { use mito2::engine::MITO_ENGINE_NAME; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; - use tokio::time::Instant; use crate::error; use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler; @@ -197,21 +241,30 @@ mod tests { let handler_context = HandlerContext::new_for_test(mock_region_server); let region_id = RegionId::new(1024, 1); - let waits = vec![None, Some(Duration::from_millis(100u64))]; - - for replay_timeout in waits { - let reply = UpgradeRegionsHandler - .handle( - &handler_context, - vec![UpgradeRegion { + let region_id2 = RegionId::new(1024, 2); + let replay_timeout = Duration::from_millis(100u64); + let reply = UpgradeRegionsHandler::new_test() + .handle( + &handler_context, + vec![ + UpgradeRegion { region_id, replay_timeout, ..Default::default() - }], - ) - .await; + }, + UpgradeRegion { + region_id: region_id2, + replay_timeout, + ..Default::default() + }, + ], + ) + .await; - let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; + let replies = &reply.unwrap().expect_upgrade_regions_reply(); + assert_eq!(replies[0].region_id, region_id); + assert_eq!(replies[1].region_id, region_id2); + for reply in replies { assert!(!reply.exists); assert!(reply.error.is_none()); } @@ -221,6 +274,7 @@ mod tests { async fn test_region_writable() { let mock_region_server = mock_region_server(); let region_id = RegionId::new(1024, 1); + let region_id2 = RegionId::new(1024, 2); let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| { @@ -230,25 +284,32 @@ mod tests { unreachable!(); })); }); - mock_region_server.register_test_region(region_id, mock_engine); - + mock_region_server.register_test_region(region_id, mock_engine.clone()); + mock_region_server.register_test_region(region_id2, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); - - let waits = vec![None, Some(Duration::from_millis(100u64))]; - - for replay_timeout in waits { - let reply = UpgradeRegionsHandler - .handle( - &handler_context, - vec![UpgradeRegion { + let replay_timeout = Duration::from_millis(100u64); + let reply = UpgradeRegionsHandler::new_test() + .handle( + &handler_context, + vec![ + UpgradeRegion { region_id, replay_timeout, ..Default::default() - }], - ) - .await; + }, + UpgradeRegion { + region_id: region_id2, + replay_timeout, + ..Default::default() + }, + ], + ) + .await; - let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; + let replies = &reply.unwrap().expect_upgrade_regions_reply(); + assert_eq!(replies[0].region_id, region_id); + assert_eq!(replies[1].region_id, region_id2); + for reply in replies { assert!(reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -271,30 +332,27 @@ mod tests { mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); + let replay_timeout = Duration::from_millis(100u64); + let reply = UpgradeRegionsHandler::new_test() + .handle( + &handler_context, + vec![UpgradeRegion { + region_id, + replay_timeout, + ..Default::default() + }], + ) + .await; - let waits = vec![None, Some(Duration::from_millis(100u64))]; - - for replay_timeout in waits { - let reply = UpgradeRegionsHandler - .handle( - &handler_context, - vec![UpgradeRegion { - region_id, - replay_timeout, - ..Default::default() - }], - ) - .await; - - let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; - assert!(!reply.ready); - assert!(reply.exists); - assert!(reply.error.is_none()); - } + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; + assert!(!reply.ready); + assert!(reply.exists); + assert!(reply.error.is_none(), "error: {:?}", reply.error); } #[tokio::test] async fn test_region_not_ready_with_retry() { + common_telemetry::init_default_ut_logging(); let mock_region_server = mock_region_server(); let region_id = RegionId::new(1024, 1); @@ -303,20 +361,13 @@ mod tests { // Region is not ready. region_engine.mock_role = Some(Some(RegionRole::Follower)); region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0))); - // Note: Don't change. region_engine.handle_request_delay = Some(Duration::from_millis(300)); }); mock_region_server.register_test_region(region_id, mock_engine); - - let waits = vec![ - Some(Duration::from_millis(100u64)), - Some(Duration::from_millis(100u64)), - ]; - + let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)]; let handler_context = HandlerContext::new_for_test(mock_region_server); - for replay_timeout in waits { - let reply = UpgradeRegionsHandler + let reply = UpgradeRegionsHandler::new_test() .handle( &handler_context, vec![UpgradeRegion { @@ -330,31 +381,28 @@ mod tests { let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(!reply.ready); assert!(reply.exists); - assert!(reply.error.is_none()); + assert!(reply.error.is_none(), "error: {:?}", reply.error); } - let timer = Instant::now(); - let reply = UpgradeRegionsHandler + let reply = UpgradeRegionsHandler::new_test() .handle( &handler_context, vec![UpgradeRegion { region_id, - replay_timeout: Some(Duration::from_millis(500)), + replay_timeout: Duration::from_millis(500), ..Default::default() }], ) .await; - // Must less than 300 ms. - assert!(timer.elapsed().as_millis() < 300); - let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(reply.ready); assert!(reply.exists); - assert!(reply.error.is_none()); + assert!(reply.error.is_none(), "error: {:?}", reply.error); } #[tokio::test] async fn test_region_error() { + common_telemetry::init_default_ut_logging(); let mock_region_server = mock_region_server(); let region_id = RegionId::new(1024, 1); @@ -374,8 +422,7 @@ mod tests { mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); - - let reply = UpgradeRegionsHandler + let reply = UpgradeRegionsHandler::new_test() .handle( &handler_context, vec![UpgradeRegion { @@ -391,12 +438,12 @@ mod tests { assert!(reply.exists); assert!(reply.error.is_none()); - let reply = UpgradeRegionsHandler + let reply = UpgradeRegionsHandler::new_test() .handle( &handler_context, vec![UpgradeRegion { region_id, - replay_timeout: Some(Duration::from_millis(200)), + replay_timeout: Duration::from_millis(200), ..Default::default() }], ) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 70373ca10c..03f90bd0dc 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -66,7 +66,8 @@ use store_api::region_engine::{ SettableRegionRoleState, }; use store_api::region_request::{ - AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest, + AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest, + RegionOpenRequest, RegionRequest, }; use store_api::storage::RegionId; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -191,6 +192,17 @@ impl RegionServer { .await } + #[tracing::instrument(skip_all)] + pub async fn handle_batch_catchup_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result)>> { + self.inner + .handle_batch_catchup_requests(parallelism, requests) + .await + } + #[tracing::instrument(skip_all, fields(request_type = request.request_type()))] pub async fn handle_request( &self, @@ -399,6 +411,14 @@ impl RegionServer { #[cfg(test)] /// Registers a region for test purpose. pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) { + { + let mut engines = self.inner.engines.write().unwrap(); + if !engines.contains_key(engine.name()) { + debug!("Registering test engine: {}", engine.name()); + engines.insert(engine.name().to_string(), engine.clone()); + } + } + self.inner .region_map .insert(region_id, RegionEngineWithStatus::Ready(engine)); @@ -972,6 +992,116 @@ impl RegionServerInner { .collect::>()) } + pub async fn handle_batch_catchup_requests_inner( + &self, + engine: RegionEngineRef, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result)>> { + for (region_id, _) in &requests { + self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup); + } + let region_ids = requests + .iter() + .map(|(region_id, _)| *region_id) + .collect::>(); + let mut responses = Vec::with_capacity(requests.len()); + match engine + .handle_batch_catchup_requests(parallelism, requests) + .await + { + Ok(results) => { + for (region_id, result) in results { + match result { + Ok(_) => { + if let Err(e) = self + .set_region_status_ready( + region_id, + engine.clone(), + RegionChange::Catchup, + ) + .await + { + error!(e; "Failed to set region to ready: {}", region_id); + responses.push((region_id, Err(BoxedError::new(e)))); + } else { + responses.push((region_id, Ok(()))); + } + } + Err(e) => { + self.unset_region_status(region_id, &engine, RegionChange::Catchup); + error!(e; "Failed to catchup region: {}", region_id); + responses.push((region_id, Err(e))); + } + } + } + } + Err(e) => { + for region_id in region_ids { + self.unset_region_status(region_id, &engine, RegionChange::Catchup); + } + error!(e; "Failed to catchup batch regions"); + return error::UnexpectedSnafu { + violated: format!("Failed to catchup batch regions: {:?}", e), + } + .fail(); + } + } + + Ok(responses) + } + + pub async fn handle_batch_catchup_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result)>> { + let mut engine_grouped_requests: HashMap> = HashMap::new(); + + let mut responses = Vec::with_capacity(requests.len()); + for (region_id, request) in requests { + if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) { + match engine { + CurrentEngine::Engine(engine) => { + engine_grouped_requests + .entry(engine.name().to_string()) + .or_default() + .push((region_id, request)); + } + CurrentEngine::EarlyReturn(_) => { + return error::UnexpectedSnafu { + violated: format!("Unexpected engine type for region {}", region_id), + } + .fail(); + } + } + } else { + responses.push(( + region_id, + Err(BoxedError::new( + error::RegionNotFoundSnafu { region_id }.build(), + )), + )); + } + } + + for (engine, requests) in engine_grouped_requests { + let engine = self + .engines + .read() + .unwrap() + .get(&engine) + .with_context(|| RegionEngineNotFoundSnafu { name: &engine })? + .clone(); + responses.extend( + self.handle_batch_catchup_requests_inner(engine, parallelism, requests) + .await?, + ); + } + + Ok(responses) + } + // Handle requests in batch. // // limitation: all create requests must be in the same engine. diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 7c3b3e4e19..155130db41 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -138,7 +138,7 @@ impl UpgradeCandidateRegion { region_id, last_entry_id, metadata_last_entry_id, - replay_timeout: Some(replay_timeout), + replay_timeout, location_id: Some(ctx.persistent_ctx.from_peer.id), replay_entry_id: None, metadata_replay_entry_id: None, diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 53bafb89f3..c601c10912 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -46,6 +46,7 @@ tracing.workspace = true common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true mito2 = { workspace = true, features = ["test"] } +common-wal = { workspace = true } [package.metadata.cargo-udeps.ignore] normal = ["aquamarine"] diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 6ba403e4c8..3eefb9e638 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -37,7 +37,7 @@ use common_error::status_code::StatusCode; use common_runtime::RepeatedTask; use mito2::engine::MitoEngine; pub(crate) use options::IndexOptions; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; pub(crate) use state::MetricEngineState; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; @@ -260,7 +260,25 @@ impl RegionEngine for MetricEngine { UnsupportedRegionRequestSnafu { request }.fail() } } - RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await, + RegionRequest::Catchup(_) => { + let mut response = self + .inner + .handle_batch_catchup_requests( + 1, + vec![(region_id, RegionCatchupRequest::default())], + ) + .await + .map_err(BoxedError::new)?; + debug_assert_eq!(response.len(), 1); + let (resp_region_id, response) = response + .pop() + .context(error::UnexpectedRequestSnafu { + reason: "expected 1 response, but got zero responses", + }) + .map_err(BoxedError::new)?; + debug_assert_eq!(region_id, resp_region_id); + return response; + } RegionRequest::BulkInserts(_) => { // todo(hl): find a way to support bulk inserts in metric engine. UnsupportedRegionRequestSnafu { request }.fail() @@ -509,13 +527,17 @@ mod test { use std::collections::HashMap; use common_telemetry::info; + use common_wal::options::{KafkaWalOptions, WalOptions}; use mito2::sst::location::region_dir_from_table_dir; + use mito2::test_util::{kafka_log_store_factory, prepare_test_for_kafka_log_store}; use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; + use store_api::mito_engine_options::WAL_OPTIONS_KEY; use store_api::region_request::{ PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, }; use super::*; + use crate::maybe_skip_kafka_log_store_integration_test; use crate::test_util::TestEnv; #[tokio::test] @@ -696,4 +718,128 @@ mod test { .unwrap_err(); assert_eq!(err.status_code(), StatusCode::RegionNotFound); } + + #[tokio::test] + async fn test_catchup_regions() { + common_telemetry::init_default_ut_logging(); + maybe_skip_kafka_log_store_integration_test!(); + let kafka_log_store_factory = kafka_log_store_factory().unwrap(); + let mito_env = mito2::test_util::TestEnv::new() + .await + .with_log_store_factory(kafka_log_store_factory.clone()); + let env = TestEnv::with_mito_env(mito_env).await; + let table_dir = |region_id| format!("table/{region_id}"); + let mut physical_region_ids = vec![]; + let mut logical_region_ids = vec![]; + + let num_topics = 3; + let num_physical_regions = 8; + let num_logical_regions = 16; + let parallelism = 2; + let mut topics = Vec::with_capacity(num_topics); + for _ in 0..num_topics { + let topic = prepare_test_for_kafka_log_store(&kafka_log_store_factory) + .await + .unwrap(); + topics.push(topic); + } + + let topic_idx = |id| (id as usize) % num_topics; + // Creates physical regions + for i in 0..num_physical_regions { + let physical_region_id = RegionId::new(1, i); + physical_region_ids.push(physical_region_id); + + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: topics[topic_idx(i)].clone(), + }); + env.create_physical_region( + physical_region_id, + &table_dir(physical_region_id), + vec![( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + )], + ) + .await; + // Creates logical regions for each physical region + for j in 0..num_logical_regions { + let logical_region_id = RegionId::new(1024 + i, j); + logical_region_ids.push(logical_region_id); + env.create_logical_region(physical_region_id, logical_region_id) + .await; + } + } + + let metric_engine = env.metric(); + // Closes all regions + for region_id in logical_region_ids.iter().chain(physical_region_ids.iter()) { + metric_engine + .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + } + + // Opens all regions and skip the wal + let requests = physical_region_ids + .iter() + .enumerate() + .map(|(idx, region_id)| { + let mut options = HashMap::new(); + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: topics[topic_idx(idx as u32)].clone(), + }); + options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new()); + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + ); + ( + *region_id, + RegionOpenRequest { + engine: METRIC_ENGINE_NAME.to_string(), + table_dir: table_dir(*region_id), + path_type: PathType::Bare, + options: options.clone(), + skip_wal_replay: true, + checkpoint: None, + }, + ) + }) + .collect::>(); + info!("Open batch regions with parallelism: {parallelism}"); + metric_engine + .handle_batch_open_requests(parallelism, requests) + .await + .unwrap(); + { + let state = metric_engine.inner.state.read().unwrap(); + for logical_region in &logical_region_ids { + assert!(!state.logical_regions().contains_key(logical_region)); + } + } + + let catch_requests = physical_region_ids + .iter() + .map(|region_id| { + ( + *region_id, + RegionCatchupRequest { + set_writable: true, + ..Default::default() + }, + ) + }) + .collect::>(); + metric_engine + .handle_batch_catchup_requests(parallelism, catch_requests) + .await + .unwrap(); + { + let state = metric_engine.inner.state.read().unwrap(); + for logical_region in &logical_region_ids { + assert!(state.logical_regions().contains_key(logical_region)); + } + } + } } diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 1ae63915e9..e25ce43e3d 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -324,9 +324,9 @@ mod test { let physical_region_id2 = RegionId::new(1024, 1); let logical_region_id1 = RegionId::new(1025, 0); let logical_region_id2 = RegionId::new(1025, 1); - env.create_physical_region(physical_region_id1, "/test_dir1") + env.create_physical_region(physical_region_id1, "/test_dir1", vec![]) .await; - env.create_physical_region(physical_region_id2, "/test_dir2") + env.create_physical_region(physical_region_id2, "/test_dir2", vec![]) .await; let region_create_request1 = crate::test_util::create_logical_region_request( diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index d6544f6c9e..61e7591893 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -15,19 +15,13 @@ use std::collections::HashMap; use common_error::ext::BoxedError; -use common_telemetry::debug; use snafu::{OptionExt, ResultExt}; use store_api::region_engine::{BatchResponses, RegionEngine}; -use store_api::region_request::{ - AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint, -}; +use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint}; use store_api::storage::RegionId; use crate::engine::MetricEngineInner; -use crate::error::{ - BatchCatchupMitoRegionSnafu, MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, - UnsupportedRegionRequestSnafu, -}; +use crate::error::{BatchCatchupMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result}; use crate::utils; impl MetricEngineInner { @@ -119,71 +113,4 @@ impl MetricEngineInner { Ok(responses) } - - pub async fn catchup_region( - &self, - region_id: RegionId, - req: RegionCatchupRequest, - ) -> Result { - if !self.is_physical_region(region_id) { - return UnsupportedRegionRequestSnafu { - request: RegionRequest::Catchup(req), - } - .fail(); - } - let data_region_id = utils::to_data_region_id(region_id); - let physical_region_options = *self - .state - .read() - .unwrap() - .physical_region_states() - .get(&data_region_id) - .context(PhysicalRegionNotFoundSnafu { - region_id: data_region_id, - })? - .options(); - - let metadata_region_id = utils::to_metadata_region_id(region_id); - // TODO(weny): improve the catchup, we can read the wal entries only once. - debug!("Catchup metadata region {metadata_region_id}"); - self.mito - .handle_request( - metadata_region_id, - RegionRequest::Catchup(RegionCatchupRequest { - set_writable: req.set_writable, - entry_id: req.metadata_entry_id, - metadata_entry_id: None, - location_id: req.location_id, - checkpoint: req.checkpoint.map(|c| ReplayCheckpoint { - entry_id: c.metadata_entry_id.unwrap_or_default(), - metadata_entry_id: None, - }), - }), - ) - .await - .context(MitoCatchupOperationSnafu)?; - - debug!("Catchup data region {data_region_id}"); - self.mito - .handle_request( - data_region_id, - RegionRequest::Catchup(RegionCatchupRequest { - set_writable: req.set_writable, - entry_id: req.entry_id, - metadata_entry_id: None, - location_id: req.location_id, - checkpoint: req.checkpoint.map(|c| ReplayCheckpoint { - entry_id: c.entry_id, - metadata_entry_id: None, - }), - }), - ) - .await - .context(MitoCatchupOperationSnafu) - .map(|response| response.affected_rows)?; - - self.recover_states(region_id, physical_region_options) - .await?; - Ok(0) - } } diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 2796d3652b..376aa8f5e9 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -828,9 +828,9 @@ mod test { let physical_region_id2 = RegionId::new(1024, 1); let logical_region_id1 = RegionId::new(1025, 0); let logical_region_id2 = RegionId::new(1025, 1); - env.create_physical_region(physical_region_id1, "/test_dir1") + env.create_physical_region(physical_region_id1, "/test_dir1", vec![]) .await; - env.create_physical_region(physical_region_id2, "/test_dir2") + env.create_physical_region(physical_region_id2, "/test_dir2", vec![]) .await; let region_create_request1 = diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index 9c06f94043..a962243525 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -76,7 +76,7 @@ mod tests { ]; for (phy_region_id, logi_region_ids) in &phy_to_logi { - env.create_physical_region(*phy_region_id, &TestEnv::default_table_dir()) + env.create_physical_region(*phy_region_id, &TestEnv::default_table_dir(), vec![]) .await; for logi_region_id in logi_region_ids { env.create_logical_region(*phy_region_id, *logi_region_id) diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index afbc9c9772..1028aed1d8 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -47,6 +47,7 @@ impl MetricEngineInner { for (region_id, request) in requests { if !request.is_physical_table() { + warn!("Skipping non-physical table open request: {region_id}"); continue; } let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?; diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index d5185bf4e4..91d2329ed7 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -156,13 +156,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Mito catchup operation fails"))] - MitoCatchupOperation { - source: BoxedError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Mito sync operation fails"))] MitoSyncOperation { source: BoxedError, @@ -364,7 +357,6 @@ impl ErrorExt for Error { | CloseMitoRegion { source, .. } | MitoReadOperation { source, .. } | MitoWriteOperation { source, .. } - | MitoCatchupOperation { source, .. } | MitoFlushOperation { source, .. } | MitoDeleteOperation { source, .. } | MitoSyncOperation { source, .. } diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index cc173e534c..d81240d47f 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -76,6 +76,17 @@ impl TestEnv { } } + /// Returns a new env with specific `prefix` and `mito_env` for test. + pub async fn with_mito_env(mut mito_env: MitoTestEnv) -> Self { + let mito = mito_env.create_engine(MitoConfig::default()).await; + let metric = MetricEngine::try_new(mito.clone(), EngineConfig::default()).unwrap(); + Self { + mito_env, + mito, + metric, + } + } + pub fn data_home(&self) -> String { let env_root = self.mito_env.data_home().to_string_lossy().to_string(); join_dir(&env_root, "data") @@ -125,7 +136,12 @@ impl TestEnv { } /// Create regions in [MetricEngine] with specific `physical_region_id`. - pub async fn create_physical_region(&self, physical_region_id: RegionId, table_dir: &str) { + pub async fn create_physical_region( + &self, + physical_region_id: RegionId, + table_dir: &str, + options: Vec<(String, String)>, + ) { let region_create_request = RegionCreateRequest { engine: METRIC_ENGINE_NAME.to_string(), column_metadatas: vec![ @@ -151,6 +167,7 @@ impl TestEnv { primary_key: vec![], options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] .into_iter() + .chain(options.into_iter()) .collect(), table_dir: table_dir.to_string(), path_type: PathType::Bare, // Use Bare path type for engine regions @@ -231,7 +248,7 @@ impl TestEnv { /// under [`default_logical_region_id`]. pub async fn init_metric_region(&self) { let physical_region_id = self.default_physical_region_id(); - self.create_physical_region(physical_region_id, &Self::default_table_dir()) + self.create_physical_region(physical_region_id, &Self::default_table_dir(), vec![]) .await; let logical_region_id = self.default_logical_region_id(); self.create_logical_region(physical_region_id, logical_region_id) @@ -424,6 +441,22 @@ pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec { rows } +#[macro_export] +/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set. +/// +/// The format of the environment variable is: +/// ```text +/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093 +/// ``` +macro_rules! maybe_skip_kafka_log_store_integration_test { + () => { + if std::env::var("GT_KAFKA_ENDPOINTS").is_err() { + common_telemetry::warn!("The kafka endpoints is empty, skipping the test"); + return; + } + }; +} + #[cfg(test)] mod test { use object_store::ObjectStore; diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 11871f09d5..c63c7cf99b 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -303,6 +303,11 @@ impl MitoEngine { self.inner.workers.is_region_opening(region_id) } + /// Returns true if the specific region is catching up. + pub fn is_region_catching_up(&self, region_id: RegionId) -> bool { + self.inner.workers.is_region_catching_up(region_id) + } + /// Returns the region disk/memory statistic. pub fn get_region_statistic(&self, region_id: RegionId) -> Option { self.find_region(region_id) diff --git a/src/mito2/src/engine/batch_catchup_test.rs b/src/mito2/src/engine/batch_catchup_test.rs index 6f641abe26..d8c744a733 100644 --- a/src/mito2/src/engine/batch_catchup_test.rs +++ b/src/mito2/src/engine/batch_catchup_test.rs @@ -53,20 +53,30 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f ..Default::default() }) .await; - let topic = prepare_test_for_kafka_log_store(&factory).await; - // FIXME(weny): change region number to 3. - let num_regions = 2u32; + // Prepares 3 topics for 8 regions + let num_topic = 3; + let mut topics = vec![]; + for _ in 0..num_topic { + let topic = prepare_test_for_kafka_log_store(&factory).await.unwrap(); + topics.push(topic); + } + + let num_regions = 8u32; let table_dir_fn = |region_id| format!("test/{region_id}"); let mut region_schema = HashMap::new(); + let get_topic_idx = |id| (id - 1) % num_topic; + + // Creates 8 regions and puts data into them for id in 1..=num_regions { let engine = engine.clone(); - let topic = topic.clone(); + let topic_idx = get_topic_idx(id); + let topic = topics[topic_idx as usize].clone(); let region_id = RegionId::new(1, id); let request = CreateRequestBuilder::new() .table_dir(&table_dir_fn(region_id)) - .kafka_topic(topic.clone()) + .kafka_topic(Some(topic)) .build(); let column_schemas = rows_schema(&request); region_schema.insert(region_id, column_schemas); @@ -76,7 +86,9 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f .unwrap(); } - for i in 0..10 { + // Puts data into regions + let rows = 30; + for i in 0..rows { for region_number in 1..=num_regions { let region_id = RegionId::new(1, region_number); let rows = Rows { @@ -103,7 +115,7 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f expected.push_str( "+-------+---------+---------------------+\n| tag_0 | field_0 | ts |\n+-------+---------+---------------------+\n", ); - for row in 0..10 { + for row in 0..rows { expected.push_str(&format!( "| {} | {}.0 | 1970-01-01T00:{:02}:{:02} |\n", i * 120 + row, @@ -129,25 +141,22 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f ) .await; - let mut options = HashMap::new(); - if let Some(topic) = &topic { - options.insert( - WAL_OPTIONS_KEY.to_string(), - serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { - topic: topic.clone(), - })) - .unwrap(), - ); - }; let requests = (1..=num_regions) .map(|id| { let region_id = RegionId::new(1, id); + let topic_idx = get_topic_idx(id); + let topic = topics[topic_idx as usize].clone(); + let mut options = HashMap::new(); + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { topic })).unwrap(), + ); ( region_id, RegionOpenRequest { engine: String::new(), table_dir: table_dir_fn(region_id), - options: options.clone(), + options, skip_wal_replay: true, path_type: PathType::Bare, checkpoint: None, @@ -155,8 +164,10 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f ) }) .collect::>(); + + let parallelism = 2; let results = engine - .handle_batch_open_requests(4, requests) + .handle_batch_open_requests(parallelism, requests) .await .unwrap(); for (_, result) in results { @@ -180,7 +191,7 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f .collect::>(); let results = engine - .handle_batch_catchup_requests(4, requests) + .handle_batch_catchup_requests(parallelism, requests) .await .unwrap(); for (_, result) in results { diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index f0ee6e13f5..0c7d058e4d 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -135,6 +135,7 @@ async fn test_catchup_with_last_entry_id(factory: Option) { let region = follower_engine.get_region(region_id).unwrap(); assert!(!region.is_writable()); assert!(resp.is_ok()); + assert!(!follower_engine.is_region_catching_up(region_id)); // Scans let request = ScanRequest::default(); @@ -256,7 +257,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option().unwrap(); - + assert!(!follower_engine.is_region_catching_up(region_id)); assert_matches!(err, Error::Unexpected { .. }); // It should ignore requests to writable regions. @@ -719,3 +720,33 @@ async fn test_catchup_not_exist_with_format(flat_format: bool) { .unwrap_err(); assert_matches!(err.status_code(), StatusCode::RegionNotFound); } + +#[tokio::test] +async fn test_catchup_region_busy() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + engine + .set_region_role(region_id, RegionRole::Follower) + .unwrap(); + let worker = engine.inner.workers.worker(region_id); + let catchup_regions = worker.catchup_regions(); + catchup_regions.insert_region(region_id); + let err = engine + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + ..Default::default() + }), + ) + .await + .unwrap_err(); + assert_matches!(err.status_code(), StatusCode::RegionBusy); +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index fd691888af..df63b5e4d8 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -14,6 +14,7 @@ //! Mito region. +pub mod catchup; pub mod opener; pub mod options; pub(crate) mod version; @@ -1150,6 +1151,34 @@ impl OpeningRegions { pub(crate) type OpeningRegionsRef = Arc; +/// The regions that are catching up. +#[derive(Debug, Default)] +pub(crate) struct CatchupRegions { + regions: RwLock>, +} + +impl CatchupRegions { + /// Returns true if the region exists. + pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool { + let regions = self.regions.read().unwrap(); + regions.contains(®ion_id) + } + + /// Inserts a new region into the set. + pub(crate) fn insert_region(&self, region_id: RegionId) { + let mut regions = self.regions.write().unwrap(); + regions.insert(region_id); + } + + /// Remove region by id. + pub(crate) fn remove_region(&self, region_id: RegionId) { + let mut regions = self.regions.write().unwrap(); + regions.remove(®ion_id); + } +} + +pub(crate) type CatchupRegionsRef = Arc; + /// Manifest stats. #[derive(Default, Debug, Clone)] pub(crate) struct ManifestStats { diff --git a/src/mito2/src/region/catchup.rs b/src/mito2/src/region/catchup.rs new file mode 100644 index 0000000000..0d8cfa8ed8 --- /dev/null +++ b/src/mito2/src/region/catchup.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Instant; + +use common_telemetry::{info, warn}; +use snafu::ensure; +use store_api::logstore::LogStore; + +use crate::error::{self, Result}; +use crate::region::MitoRegion; +use crate::region::opener::replay_memtable; +use crate::wal::Wal; +use crate::wal::entry_distributor::WalEntryReceiver; + +pub struct RegionCatchupTask { + entry_receiver: Option, + region: Arc, + replay_checkpoint_entry_id: Option, + expected_last_entry_id: Option, + allow_stale_entries: bool, + location_id: Option, + wal: Wal, +} + +impl RegionCatchupTask { + pub fn new(region: Arc, wal: Wal, allow_stale_entries: bool) -> Self { + Self { + entry_receiver: None, + region, + replay_checkpoint_entry_id: None, + expected_last_entry_id: None, + allow_stale_entries, + location_id: None, + wal, + } + } + + /// Sets the location id. + pub(crate) fn with_location_id(mut self, location_id: Option) -> Self { + self.location_id = location_id; + self + } + + /// Sets the expected last entry id. + pub(crate) fn with_expected_last_entry_id( + mut self, + expected_last_entry_id: Option, + ) -> Self { + self.expected_last_entry_id = expected_last_entry_id; + self + } + + /// Sets the entry receiver. + pub(crate) fn with_entry_receiver(mut self, entry_receiver: Option) -> Self { + self.entry_receiver = entry_receiver; + self + } + + /// Sets the replay checkpoint entry id. + pub(crate) fn with_replay_checkpoint_entry_id( + mut self, + replay_checkpoint_entry_id: Option, + ) -> Self { + self.replay_checkpoint_entry_id = replay_checkpoint_entry_id; + self + } + + pub async fn run(&mut self) -> Result<()> { + if self.region.provider.is_remote_wal() { + self.remote_wal_catchup().await + } else { + self.local_wal_catchup().await + } + } + + async fn remote_wal_catchup(&mut self) -> Result<()> { + let flushed_entry_id = self.region.version_control.current().last_entry_id; + let replay_from_entry_id = self + .replay_checkpoint_entry_id + .unwrap_or(flushed_entry_id) + .max(flushed_entry_id); + let region_id = self.region.region_id; + info!( + "Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}", + self.region.provider + ); + let timer = Instant::now(); + let wal_entry_reader = self + .entry_receiver + .take() + .map(|r| Box::new(r) as _) + .unwrap_or_else(|| { + self.wal + .wal_entry_reader(&self.region.provider, region_id, self.location_id) + }); + let on_region_opened = self.wal.on_region_opened(); + let last_entry_id = replay_memtable( + &self.region.provider, + wal_entry_reader, + region_id, + replay_from_entry_id, + &self.region.version_control, + self.allow_stale_entries, + on_region_opened, + ) + .await?; + info!( + "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.", + timer.elapsed(), + self.region.provider, + self.expected_last_entry_id + ); + if let Some(expected_last_entry_id) = self.expected_last_entry_id { + ensure!( + // The replayed last entry id may be greater than the `expected_last_entry_id`. + last_entry_id >= expected_last_entry_id, + error::UnexpectedSnafu { + reason: format!( + "Failed to catchup region {}, it was expected to replay to {}, but actually replayed to {}", + region_id, expected_last_entry_id, last_entry_id, + ), + } + ) + } + Ok(()) + } + + async fn local_wal_catchup(&mut self) -> Result<()> { + let version = self.region.version_control.current(); + let mut flushed_entry_id = version.last_entry_id; + let region_id = self.region.region_id; + let latest_entry_id = self + .wal + .store() + .latest_entry_id(&self.region.provider) + .unwrap_or_default(); + info!( + "Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}", + region_id, flushed_entry_id, latest_entry_id + ); + + if latest_entry_id > flushed_entry_id { + warn!( + "Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}", + region_id, latest_entry_id, flushed_entry_id + ); + flushed_entry_id = latest_entry_id; + self.region.version_control.set_entry_id(flushed_entry_id); + } + let on_region_opened = self.wal.on_region_opened(); + on_region_opened(region_id, flushed_entry_id, &self.region.provider).await?; + Ok(()) + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 0f3acc93b9..cef934c450 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -90,7 +90,7 @@ pub(crate) fn raft_engine_log_store_factory() -> Option { Some(LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory)) } -pub(crate) fn kafka_log_store_factory() -> Option { +pub fn kafka_log_store_factory() -> Option { let _ = dotenv::dotenv(); let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { warn!("env GT_KAFKA_ENDPOINTS not found"); @@ -130,7 +130,7 @@ pub(crate) fn multiple_log_store_factories(#[case] factory: Option) {} #[template] @@ -140,7 +140,7 @@ pub(crate) fn single_kafka_log_store_factory(#[case] factory: Option) {} #[derive(Clone)] -pub(crate) struct RaftEngineLogStoreFactory; +pub struct RaftEngineLogStoreFactory; impl RaftEngineLogStoreFactory { async fn create_log_store>(&self, wal_path: P) -> RaftEngineLogStore { @@ -148,7 +148,7 @@ impl RaftEngineLogStoreFactory { } } -pub(crate) async fn prepare_test_for_kafka_log_store(factory: &LogStoreFactory) -> Option { +pub async fn prepare_test_for_kafka_log_store(factory: &LogStoreFactory) -> Option { if let LogStoreFactory::Kafka(factory) = factory { let topic = uuid::Uuid::new_v4().to_string(); let client = factory.client().await; @@ -186,7 +186,7 @@ pub(crate) async fn append_noop_record(client: &Client, topic: &str) { .unwrap(); } #[derive(Clone)] -pub(crate) struct KafkaLogStoreFactory { +pub struct KafkaLogStoreFactory { broker_endpoints: Vec, } @@ -204,7 +204,7 @@ impl KafkaLogStoreFactory { } #[derive(Clone)] -pub(crate) enum LogStoreFactory { +pub enum LogStoreFactory { RaftEngine(RaftEngineLogStoreFactory), Kafka(KafkaLogStoreFactory), } @@ -268,7 +268,7 @@ impl TestEnv { } /// Overwrites the original `log_store_factory`. - pub(crate) fn with_log_store_factory(mut self, log_store_factory: LogStoreFactory) -> TestEnv { + pub fn with_log_store_factory(mut self, log_store_factory: LogStoreFactory) -> TestEnv { self.log_store_factory = log_store_factory; self } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2d442da363..a37af4d472 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -62,7 +62,10 @@ use crate::gc::{GcLimiter, GcLimiterRef}; use crate::memtable::MemtableBuilderProvider; use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING}; use crate::region::opener::PartitionExprFetcherRef; -use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef}; +use crate::region::{ + CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, + RegionMapRef, +}; use crate::request::{ BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime, @@ -280,6 +283,11 @@ impl WorkerGroup { self.worker(region_id).is_region_opening(region_id) } + /// Returns true if the specific region is catching up. + pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool { + self.worker(region_id).is_region_catching_up(region_id) + } + /// Returns region of specific `region_id`. /// /// This method should not be public. @@ -487,6 +495,7 @@ impl WorkerStarter { fn start(self) -> Result { let regions = Arc::new(RegionMap::default()); let opening_regions = Arc::new(OpeningRegions::default()); + let catchup_regions = Arc::new(CatchupRegions::default()); let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); @@ -496,6 +505,7 @@ impl WorkerStarter { id: self.id, config: self.config.clone(), regions: regions.clone(), + catchup_regions: catchup_regions.clone(), dropping_regions: Arc::new(RegionMap::default()), opening_regions: opening_regions.clone(), sender: sender.clone(), @@ -548,6 +558,7 @@ impl WorkerStarter { id: self.id, regions, opening_regions, + catchup_regions, sender, handle: Mutex::new(Some(handle)), running, @@ -563,6 +574,8 @@ pub(crate) struct RegionWorker { regions: RegionMapRef, /// The opening regions. opening_regions: OpeningRegionsRef, + /// The catching up regions. + catchup_regions: CatchupRegionsRef, /// Request sender. sender: Sender, /// Handle to the worker thread. @@ -633,6 +646,11 @@ impl RegionWorker { self.opening_regions.is_region_exists(region_id) } + /// Returns true if the region is catching up. + fn is_region_catching_up(&self, region_id: RegionId) -> bool { + self.catchup_regions.is_region_exists(region_id) + } + /// Returns region of specific `region_id`. fn get_region(&self, region_id: RegionId) -> Option { self.regions.get_region(region_id) @@ -643,6 +661,12 @@ impl RegionWorker { pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef { &self.opening_regions } + + #[cfg(test)] + /// Returns the [CatchupRegionsRef]. + pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef { + &self.catchup_regions + } } impl Drop for RegionWorker { @@ -740,6 +764,8 @@ struct RegionWorkerLoop { dropping_regions: RegionMapRef, /// Regions that are opening. opening_regions: OpeningRegionsRef, + /// Regions that are catching up. + catchup_regions: CatchupRegionsRef, /// Request sender. sender: Sender, /// Request receiver. @@ -1028,8 +1054,9 @@ impl RegionWorkerLoop { continue; } DdlRequest::Catchup((req, wal_entry_receiver)) => { - self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver) - .await + self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender) + .await; + continue; } }; diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 2cbdab75c2..8b744f7d6a 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -17,17 +17,17 @@ use std::sync::Arc; use common_telemetry::tracing::warn; -use common_telemetry::{debug, info}; -use snafu::ensure; +use common_telemetry::{debug, error, info}; use store_api::logstore::LogStore; use store_api::region_engine::{RegionRole, SettableRegionRoleState}; -use store_api::region_request::{AffectedRows, RegionCatchupRequest}; +use store_api::region_request::RegionCatchupRequest; use store_api::storage::RegionId; -use tokio::time::Instant; use crate::error::{self, Result}; use crate::region::MitoRegion; -use crate::region::opener::{RegionOpener, replay_memtable}; +use crate::region::catchup::RegionCatchupTask; +use crate::region::opener::RegionOpener; +use crate::request::OptionOutputTx; use crate::wal::entry_distributor::WalEntryReceiver; use crate::worker::RegionWorkerLoop; @@ -37,115 +37,67 @@ impl RegionWorkerLoop { region_id: RegionId, request: RegionCatchupRequest, entry_receiver: Option, - ) -> Result { + sender: OptionOutputTx, + ) { let Some(region) = self.regions.get_region(region_id) else { - return error::RegionNotFoundSnafu { region_id }.fail(); + sender.send(Err(error::RegionNotFoundSnafu { region_id }.build())); + return; }; if region.is_writable() { debug!("Region {region_id} is writable, skip catchup"); - return Ok(0); + sender.send(Ok(0)); + return; } - // Note: Currently, We protect the split brain by ensuring the mutable table is empty. - // It's expensive to execute catch-up requests without `set_writable=true` multiple times. - let version = region.version(); - let is_empty_memtable = version.memtables.is_empty(); - // Utilizes the short circuit evaluation. - let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? { - if !is_empty_memtable { - warn!( - "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}", - region.region_id, - region.manifest_ctx.manifest_version().await, - region.version_control.current().last_entry_id - ); + if self.catchup_regions.is_region_exists(region_id) { + warn!("Region {region_id} under catching up"); + sender.send(Err(error::RegionBusySnafu { region_id }.build())); + return; + } + + // If the memtable is not empty or the manifest has been updated, we need to reopen the region. + let region = match self.reopen_region_if_needed(region).await { + Ok(region) => region, + Err(e) => { + sender.send(Err(e)); + return; } - self.reopen_region(®ion).await? - } else { - region }; - if region.provider.is_remote_wal() { - let flushed_entry_id = region.version_control.current().last_entry_id; - let replay_from_entry_id = request - .checkpoint - .map(|c| c.entry_id) - .unwrap_or_default() - .max(flushed_entry_id); - info!( - "Trying to replay memtable for region: {region_id}, provider: {:?}, replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}", - region.provider - ); - let timer = Instant::now(); - let wal_entry_reader = entry_receiver.map(|r| Box::new(r) as _).unwrap_or_else(|| { - self.wal - .wal_entry_reader(®ion.provider, region_id, request.location_id) - }); - let on_region_opened = self.wal.on_region_opened(); - let last_entry_id = replay_memtable( - ®ion.provider, - wal_entry_reader, - region_id, - replay_from_entry_id, - ®ion.version_control, - self.config.allow_stale_entries, - on_region_opened, - ) - .await?; - info!( - "Elapsed: {:?}, region: {region_id}, provider: {:?} catchup finished. replay from entry id: {replay_from_entry_id}, flushed entry id: {flushed_entry_id}, last entry id: {last_entry_id}, expected: {:?}.", - timer.elapsed(), - region.provider, - request.entry_id - ); - if let Some(expected_last_entry_id) = request.entry_id { - ensure!( - // The replayed last entry id may be greater than the `expected_last_entry_id`. - last_entry_id >= expected_last_entry_id, - error::UnexpectedSnafu { - reason: format!( - "failed to set region {} to writable, it was expected to replayed to {}, but actually replayed to {}", - region_id, expected_last_entry_id, last_entry_id, - ), + self.catchup_regions.insert_region(region_id); + let catchup_regions = self.catchup_regions.clone(); + let wal = self.wal.clone(); + let allow_stale_entries = self.config.allow_stale_entries; + common_runtime::spawn_global(async move { + let mut task = RegionCatchupTask::new(region.clone(), wal, allow_stale_entries) + .with_entry_receiver(entry_receiver) + .with_expected_last_entry_id(request.entry_id) + .with_location_id(request.location_id) + .with_replay_checkpoint_entry_id(request.checkpoint.map(|c| c.entry_id)); + + match task.run().await { + Ok(_) => { + if request.set_writable { + region.set_role(RegionRole::Leader); + // Finalize leadership: persist backfilled metadata. + if let Err(err) = region + .set_role_state_gracefully(SettableRegionRoleState::Leader) + .await + { + error!(err; "Failed to set region {region_id} to leader"); + } } - ) + sender.send(Ok(0)); + catchup_regions.remove_region(region_id); + } + Err(err) => { + error!(err; "Failed to catchup region {region_id}"); + sender.send(Err(err)); + catchup_regions.remove_region(region_id); + } } - } else { - let version = region.version_control.current(); - let mut flushed_entry_id = version.last_entry_id; - - let latest_entry_id = self - .wal - .store() - .latest_entry_id(®ion.provider) - .unwrap_or_default(); - warn!( - "Skips to replay memtable for region: {}, flushed entry id: {}, latest entry id: {}", - region.region_id, flushed_entry_id, latest_entry_id - ); - - if latest_entry_id > flushed_entry_id { - warn!( - "Found latest entry id is greater than flushed entry id, using latest entry id as flushed entry id, region: {}, latest entry id: {}, flushed entry id: {}", - region_id, latest_entry_id, flushed_entry_id - ); - flushed_entry_id = latest_entry_id; - region.version_control.set_entry_id(flushed_entry_id); - } - let on_region_opened = self.wal.on_region_opened(); - on_region_opened(region_id, flushed_entry_id, ®ion.provider).await?; - } - - if request.set_writable { - region.set_role(RegionRole::Leader); - // Finalize leadership: persist backfilled metadata. - region - .set_role_state_gracefully(SettableRegionRoleState::Leader) - .await?; - } - - Ok(0) + }); } /// Reopens a region. @@ -184,4 +136,31 @@ impl RegionWorkerLoop { Ok(reopened_region) } + + async fn reopen_region_if_needed( + &mut self, + region: Arc, + ) -> Result> { + // Note: Currently, We protect the split brain by ensuring the memtable table is empty. + // It's expensive to execute catch-up requests without `set_writable=true` multiple times. + let version = region.version(); + let is_empty_memtable = version.memtables.is_empty(); + + // Utilizes the short circuit evaluation. + let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? { + if !is_empty_memtable { + warn!( + "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}", + region.region_id, + region.manifest_ctx.manifest_version().await, + region.version_control.current().last_entry_id + ); + } + self.reopen_region(®ion).await? + } else { + region + }; + + Ok(region) + } }