diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 1258db1b0c..6d51d1dd59 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -65,7 +65,7 @@ use crate::read::{BoxedBatchReader, BoxedRecordBatchStream}; use crate::region::options::{MergeMode, RegionOptions}; use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; -use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime}; +use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime}; use crate::schedule::remote_job_scheduler::{ CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef, }; @@ -186,7 +186,7 @@ impl CompactionScheduler { } // The region can compact directly. - let mut status = + let mut status: CompactionStatus = CompactionStatus::new(region_id, version_control.clone(), access_layer.clone()); let request = status.new_compaction_request( self.request_sender.clone(), @@ -198,34 +198,47 @@ impl CompactionScheduler { schema_metadata_manager, max_parallelism, ); - self.region_status.insert(region_id, status); + let result = self .schedule_compaction_request(request, compact_options) .await; + if matches!(result, Ok(true)) { + // Only if the compaction request is scheduled successfully, + // we insert the region into the status map. + self.region_status.insert(region_id, status); + } self.listener.on_compaction_scheduled(region_id); - result + result.map(|_| ()) } - /// Notifies the scheduler that the compaction job is finished successfully. - pub(crate) async fn on_compaction_finished( + // Handle pending manual compaction request for the region. + // + // Returns true if should early return, false otherwise. + pub(crate) async fn handle_pending_compaction_request( &mut self, region_id: RegionId, manifest_ctx: &ManifestContextRef, schema_metadata_manager: SchemaMetadataManagerRef, - ) { + ) -> bool { let Some(status) = self.region_status.get_mut(®ion_id) else { - return; + return true; }; - if let Some(pending_request) = std::mem::take(&mut status.pending_request) { - let PendingCompaction { - options, - waiter, - max_parallelism, - } = pending_request; + // If there is a pending manual compaction request, schedule it. + // and defer returning the pending DDL requests to the caller. + let Some(pending_request) = std::mem::take(&mut status.pending_request) else { + return false; + }; - let request = status.new_compaction_request( + let PendingCompaction { + options, + waiter, + max_parallelism, + } = pending_request; + + let request = { + status.new_compaction_request( self.request_sender.clone(), waiter, self.engine_config.clone(), @@ -234,17 +247,68 @@ impl CompactionScheduler { self.listener.clone(), schema_metadata_manager, max_parallelism, - ); + ) + }; - if let Err(e) = self.schedule_compaction_request(request, options).await { - error!(e; "Failed to continue pending manual compaction for region id: {}", region_id); - } else { + match self.schedule_compaction_request(request, options).await { + Ok(true) => { debug!( "Successfully scheduled manual compaction for region id: {}", region_id ); + true } - return; + Ok(false) => { + // We still need to handle the pending DDL requests. + // So we can't return early here. + false + } + Err(e) => { + error!(e; "Failed to continue pending manual compaction for region id: {}", region_id); + self.remove_region_on_failure(region_id, Arc::new(e)); + true + } + } + } + + /// Notifies the scheduler that the compaction job is finished successfully. + pub(crate) async fn on_compaction_finished( + &mut self, + region_id: RegionId, + manifest_ctx: &ManifestContextRef, + schema_metadata_manager: SchemaMetadataManagerRef, + ) -> Vec { + // If there a pending compaction request, handle it first + // and defer returning the pending DDL requests to the caller. + if self + .handle_pending_compaction_request( + region_id, + manifest_ctx, + schema_metadata_manager.clone(), + ) + .await + { + return Vec::new(); + } + + let Some(status) = self.region_status.get_mut(®ion_id) else { + // The region status might be removed by the previous steps. + // So we return empty DDL requests. + return Vec::new(); + }; + + // Notify all waiters that compaction is finished. + for waiter in std::mem::take(&mut status.waiters) { + waiter.send(Ok(0)); + } + + // If there are pending DDL requests, run them. + let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests); + if !pending_ddl_requests.is_empty() { + self.region_status.remove(®ion_id); + // If there are pending DDL requests, we should return them to the caller. + // And skip try to schedule next compaction task. + return pending_ddl_requests; } // We should always try to compact the region until picker returns None. @@ -258,28 +322,40 @@ impl CompactionScheduler { schema_metadata_manager, MAX_PARALLEL_COMPACTION, ); + // Try to schedule next compaction task for this region. - if let Err(e) = self + match self .schedule_compaction_request( request, compact_request::Options::Regular(Default::default()), ) .await { - error!(e; "Failed to schedule next compaction for region {}", region_id); + Ok(true) => { + debug!( + "Successfully scheduled next compaction for region id: {}", + region_id + ); + } + Ok(false) => { + // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region. + // All DDL requests and pending compaction requests have already been processed. + // Safe to remove the region from status tracking. + self.region_status.remove(®ion_id); + } + Err(e) => { + error!(e; "Failed to schedule next compaction for region {}", region_id); + self.remove_region_on_failure(region_id, Arc::new(e)); + } } + + Vec::new() } /// Notifies the scheduler that the compaction job is failed. pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc) { error!(err; "Region {} failed to compact, cancel all pending tasks", region_id); - // Remove this region. - let Some(status) = self.region_status.remove(®ion_id) else { - return; - }; - - // Fast fail: cancels all pending tasks and sends error to their waiters. - status.on_failure(err); + self.remove_region_on_failure(region_id, err); } /// Notifies the scheduler that the region is dropped. @@ -303,14 +379,47 @@ impl CompactionScheduler { ); } + /// Add ddl request to pending queue. + /// + /// # Panics + /// Panics if region didn't request compaction. + pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) { + debug!( + "Added pending DDL request for region: {}, ddl: {:?}", + request.region_id, request.request + ); + let status = self.region_status.get_mut(&request.region_id).unwrap(); + status.pending_ddl_requests.push(request); + } + + #[cfg(test)] + pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool { + let has_pending = self + .region_status + .get(®ion_id) + .map(|status| !status.pending_ddl_requests.is_empty()) + .unwrap_or(false); + debug!( + "Checked pending DDL requests for region: {}, has_pending: {}", + region_id, has_pending + ); + has_pending + } + + /// Returns true if the region is compacting. + pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool { + self.region_status.contains_key(®ion_id) + } + /// Schedules a compaction request. /// - /// If the region has nothing to compact, it removes the region from the status map. + /// Returns true if the compaction request is scheduled successfully. + /// Returns false if no compaction task can be scheduled for this region. async fn schedule_compaction_request( &mut self, request: CompactionRequest, options: compact_request::Options, - ) -> Result<()> { + ) -> Result { let region_id = request.region_id(); let (dynamic_compaction_opts, ttl) = find_dynamic_options( region_id.table_id(), @@ -383,8 +492,7 @@ impl CompactionScheduler { for waiter in waiters { waiter.send(Ok(0)); } - self.region_status.remove(®ion_id); - return Ok(()); + return Ok(false); }; // If specified to run compaction remotely, we schedule the compaction job remotely. @@ -415,7 +523,7 @@ impl CompactionScheduler { job_id, region_id ); INFLIGHT_COMPACTION_COUNT.inc(); - return Ok(()); + return Ok(true); } Err(e) => { if !dynamic_compaction_opts.fallback_to_local() { @@ -461,6 +569,7 @@ impl CompactionScheduler { }); self.submit_compaction_task(local_compaction_task, region_id) + .map(|_| true) } fn submit_compaction_task( @@ -474,11 +583,9 @@ impl CompactionScheduler { task.run().await; INFLIGHT_COMPACTION_COUNT.dec(); })) - .map_err(|e| { - error!(e; "Failed to submit compaction request for region {}", region_id); - self.region_status.remove(®ion_id); - e - }) + .inspect_err( + |e| error!(e; "Failed to submit compaction request for region {}", region_id), + ) } fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { @@ -597,6 +704,8 @@ struct CompactionStatus { waiters: Vec, /// Pending compactions that are supposed to run as soon as current compaction task finished. pending_request: Option, + /// Pending DDL requests that should run when compaction is done. + pending_ddl_requests: Vec, } impl CompactionStatus { @@ -612,6 +721,7 @@ impl CompactionStatus { access_layer, waiters: Vec::new(), pending_request: None, + pending_ddl_requests: Vec::new(), } } @@ -647,6 +757,14 @@ impl CompactionStatus { region_id: self.region_id, })); } + + for pending_ddl in self.pending_ddl_requests { + pending_ddl + .sender + .send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } } /// Creates a new compaction request for compaction picker. @@ -869,6 +987,7 @@ struct PendingCompaction { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::time::Duration; use api::v1::region::StrictWindow; @@ -879,13 +998,28 @@ mod tests { use super::*; use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager}; + use crate::error::InvalidSchedulerStateSnafu; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::ManifestContext; + use crate::schedule::scheduler::{Job, Scheduler}; use crate::sst::FormatType; use crate::test_util::mock_schema_metadata_manager; use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler}; use crate::test_util::version_util::{VersionControlBuilder, apply_edit}; + struct FailingScheduler; + + #[async_trait::async_trait] + impl Scheduler for FailingScheduler { + fn schedule(&self, _job: Job) -> Result<()> { + InvalidSchedulerStateSnafu.fail() + } + + async fn stop(&self, _await_termination: bool) -> Result<()> { + Ok(()) + } + } + #[tokio::test] async fn test_find_compaction_options_db_level() { let env = SchedulerEnv::new().await; @@ -1406,6 +1540,396 @@ mod tests { assert_eq!(0, scheduler.region_status.len()); } + #[tokio::test] + async fn test_add_ddl_request_to_pending() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let (output_tx, _output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + assert!(scheduler.has_pending_ddls(region_id)); + } + + #[tokio::test] + async fn test_pending_ddl_request_failed_on_compaction_failed() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let (output_tx, output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + assert!(scheduler.has_pending_ddls(region_id)); + scheduler + .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build())); + + assert!(!scheduler.has_pending_ddls(region_id)); + let result = output_rx.await.unwrap(); + assert_matches!(result, Err(_)); + } + + #[tokio::test] + async fn test_pending_ddl_request_failed_on_region_closed() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let (output_tx, output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + assert!(scheduler.has_pending_ddls(region_id)); + scheduler.on_region_closed(region_id); + + assert!(!scheduler.has_pending_ddls(region_id)); + let result = output_rx.await.unwrap(); + assert_matches!(result, Err(_)); + } + + #[tokio::test] + async fn test_pending_ddl_request_failed_on_region_dropped() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let (output_tx, output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + assert!(scheduler.has_pending_ddls(region_id)); + scheduler.on_region_dropped(region_id); + + assert!(!scheduler.has_pending_ddls(region_id)); + let result = output_rx.await.unwrap(); + assert_matches!(result, Err(_)); + } + + #[tokio::test] + async fn test_pending_ddl_request_failed_on_region_truncated() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let (output_tx, output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + assert!(scheduler.has_pending_ddls(region_id)); + scheduler.on_region_truncated(region_id); + + assert!(!scheduler.has_pending_ddls(region_id)); + let result = output_rx.await.unwrap(); + assert_matches!(result, Err(_)); + } + + #[tokio::test] + async fn test_on_compaction_finished_returns_pending_ddl_requests() { + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let (output_tx, _output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + let pending_ddls = scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) + .await; + + assert_eq!(pending_ddls.len(), 1); + assert!(!scheduler.has_pending_ddls(region_id)); + assert!(!scheduler.region_status.contains_key(®ion_id)); + assert_eq!(job_scheduler.num_jobs(), 0); + } + + #[tokio::test] + async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + let (manual_tx, manual_rx) = oneshot::channel(); + let mut status = + CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone()); + status.set_pending_request(PendingCompaction { + options: compact_request::Options::Regular(Default::default()), + waiter: OptionOutputTx::from(manual_tx), + max_parallelism: 1, + }); + scheduler.region_status.insert(region_id, status); + + let (ddl_tx, _ddl_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(ddl_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + let pending_ddls = scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) + .await; + + assert_eq!(pending_ddls.len(), 1); + assert!(!scheduler.region_status.contains_key(®ion_id)); + assert_eq!(manual_rx.await.unwrap().unwrap(), 0); + } + + #[tokio::test] + async fn test_on_compaction_finished_returns_empty_when_region_absent() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let region_id = builder.region_id(); + let version_control = Arc::new(builder.build()); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + let pending_ddls = scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) + .await; + + assert!(pending_ddls.is_empty()); + } + + #[tokio::test] + async fn test_on_compaction_finished_manual_schedule_error_cleans_status() { + let env = SchedulerEnv::new() + .await + .scheduler(Arc::new(FailingScheduler)); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + let region_id = builder.region_id(); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + let (manual_tx, manual_rx) = oneshot::channel(); + let mut status = + CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone()); + status.set_pending_request(PendingCompaction { + options: compact_request::Options::Regular(Default::default()), + waiter: OptionOutputTx::from(manual_tx), + max_parallelism: 1, + }); + scheduler.region_status.insert(region_id, status); + + let (ddl_tx, ddl_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(ddl_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + let pending_ddls = scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) + .await; + + assert!(pending_ddls.is_empty()); + assert!(!scheduler.region_status.contains_key(®ion_id)); + assert!(manual_rx.await.is_err()); + assert_matches!(ddl_rx.await.unwrap(), Err(_)); + } + + #[tokio::test] + async fn test_on_compaction_finished_next_schedule_noop_removes_status() { + let env = SchedulerEnv::new().await; + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let pending_ddls = scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) + .await; + + assert!(pending_ddls.is_empty()); + assert!(!scheduler.region_status.contains_key(®ion_id)); + } + + #[tokio::test] + async fn test_on_compaction_finished_next_schedule_error_cleans_status() { + let env = SchedulerEnv::new() + .await + .scheduler(Arc::new(FailingScheduler)); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + let region_id = builder.region_id(); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + + let pending_ddls = scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) + .await; + + assert!(pending_ddls.is_empty()); + assert!(!scheduler.region_status.contains_key(®ion_id)); + } + #[tokio::test] async fn test_concurrent_memory_competition() { let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index fa957de670..df8521535f 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -24,8 +24,9 @@ use datatypes::arrow::datatypes::TimestampMillisecondType; use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::AlterKind::SetRegionOptions; use store_api::region_request::{ - PathType, RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, - RegionOpenRequest, RegionRequest, SetRegionOption, + EnterStagingRequest, PathType, RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest, + RegionFlushRequest, RegionOpenRequest, RegionRequest, SetRegionOption, + StagingPartitionDirective, }; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::Notify; @@ -648,6 +649,76 @@ async fn test_readonly_during_compaction_with_format(flat_format: bool) { assert_eq!((0..20).map(|v| v * 1000).collect::>(), vec); } +#[tokio::test] +async fn test_enter_staging_deferred_by_inflight_compaction() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new().await; + let listener = Arc::new(CompactionListener::default()); + let engine = env + .create_engine_with( + MitoConfig { + max_background_purges: 1, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) + .await; + + let region_id = RegionId::new(2048, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .build(); + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + put_and_flush(&engine, region_id, &column_schemas, 5..20).await; + + listener.wait_handle_finished().await; + + let engine_cloned = engine.clone(); + let enter_staging = tokio::spawn(async move { + engine_cloned + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(!enter_staging.is_finished()); + + listener.wake(); + enter_staging.await.unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert!(region.is_staging()); +} + #[tokio::test] async fn test_compaction_update_time_window() { test_compaction_update_time_window_with_format(false).await; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index bef5b9ce07..de8927c4de 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -300,11 +300,16 @@ impl MitoRegion { } /// Returns whether the region is in staging mode. - #[allow(dead_code)] pub(crate) fn is_staging(&self) -> bool { self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging) } + /// Returns whether the region is entering staging mode. + pub(crate) fn is_enter_staging(&self) -> bool { + self.manifest_ctx.state.load() + == RegionRoleState::Leader(RegionLeaderState::EnteringStaging) + } + pub fn region_id(&self) -> RegionId { self.region_id } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index b50134c372..841d10025d 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -14,6 +14,7 @@ use api::v1::region::compact_request; use common_telemetry::{error, info, warn}; +use store_api::logstore::LogStore; use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; @@ -68,7 +69,9 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, mut request: CompactionFinished, - ) { + ) where + S: LogStore, + { let region = match self.regions.get_region(region_id) { Some(region) => region, None => { @@ -105,13 +108,15 @@ impl RegionWorkerLoop { } // Schedule next compaction if necessary. - self.compaction_scheduler + let mut pending_ddls = self + .compaction_scheduler .on_compaction_finished( region_id, ®ion.manifest_ctx, self.schema_metadata_manager.clone(), ) .await; + self.handle_ddl_requests(&mut pending_ddls).await; } /// When compaction fails, we simply log the error. @@ -124,6 +129,13 @@ impl RegionWorkerLoop { /// Schedule compaction for the region if necessary. pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) { + if region.is_staging() || region.is_enter_staging() { + info!( + "Region {} is staging or entering staging, skip compaction", + region.region_id + ); + return; + } let now = self.time_provider.current_time_millis(); if now - region.last_compaction_millis() >= self.config.min_compaction_interval.as_millis() as i64 diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs index 7f926cce80..8b75fdd24f 100644 --- a/src/mito2/src/worker/handle_enter_staging.rs +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -98,6 +98,20 @@ impl RegionWorkerLoop { return; } + if self.compaction_scheduler.is_compacting(region_id) { + // Safety: region is compacting, add ddl request to pending queue. + self.compaction_scheduler + .add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender, + request: DdlRequest::EnterStaging(EnterStagingRequest { + partition_directive, + }), + }); + + return; + } + self.handle_enter_staging(region, partition_directive, sender); }