feat: cancel local compaction for enter staging (#7885)

* feat(mito2): support cancelling active local compaction

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test(mito2): cover compaction cancellation return paths

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: cancel remaining tasks

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-04-16 11:56:37 +08:00
committed by GitHub
parent a5ebaa3e9a
commit 525e88bce4
9 changed files with 773 additions and 84 deletions

View File

@@ -24,12 +24,13 @@ mod twcs;
mod window;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use common_base::Plugins;
use common_base::cancellation::CancellationHandle;
use common_memory_manager::OnExhaustedPolicy;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
@@ -53,9 +54,9 @@ use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu,
ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::BoxedRecordBatchStream;
@@ -186,7 +187,7 @@ impl CompactionScheduler {
}
// The region can compact directly.
let mut status: CompactionStatus =
let mut status =
CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
let request = status.new_compaction_request(
self.request_sender.clone(),
@@ -199,17 +200,25 @@ impl CompactionScheduler {
max_parallelism,
);
let result = self
let result = match self
.schedule_compaction_request(request, compact_options)
.await;
if matches!(result, Ok(true)) {
// Only if the compaction request is scheduled successfully,
// we insert the region into the status map.
self.region_status.insert(region_id, status);
}
.await
{
Ok(Some(active_compaction)) => {
// Publish CompactionStatus only after a task has been accepted by the scheduler.
// This avoids exposing a half-initialized region status that could collect pending
// DDL/compaction state even though no compaction is actually running.
status.active_compaction = Some(active_compaction);
self.region_status.insert(region_id, status);
Ok(())
}
Ok(None) => Ok(()),
Err(e) => Err(e),
};
self.listener.on_compaction_scheduled(region_id);
result.map(|_| ())
result
}
// Handle pending manual compaction request for the region.
@@ -251,14 +260,16 @@ impl CompactionScheduler {
};
match self.schedule_compaction_request(request, options).await {
Ok(true) => {
Ok(Some(active_compaction)) => {
let status = self.region_status.get_mut(&region_id).unwrap();
status.active_compaction = Some(active_compaction);
debug!(
"Successfully scheduled manual compaction for region id: {}",
region_id
);
true
}
Ok(false) => {
Ok(None) => {
// We still need to handle the pending DDL requests.
// So we can't return early here.
false
@@ -278,6 +289,11 @@ impl CompactionScheduler {
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Vec<SenderDdlRequest> {
let Some(status) = self.region_status.get_mut(&region_id) else {
return Vec::new();
};
status.clear_running_task();
// If there a pending compaction request, handle it first
// and defer returning the pending DDL requests to the caller.
if self
@@ -297,7 +313,6 @@ impl CompactionScheduler {
return Vec::new();
};
// Notify all waiters that compaction is finished.
for waiter in std::mem::take(&mut status.waiters) {
waiter.send(Ok(0));
}
@@ -331,13 +346,17 @@ impl CompactionScheduler {
)
.await
{
Ok(true) => {
Ok(Some(active_compaction)) => {
self.region_status
.get_mut(&region_id)
.unwrap()
.active_compaction = Some(active_compaction);
debug!(
"Successfully scheduled next compaction for region id: {}",
region_id
);
}
Ok(false) => {
Ok(None) => {
// No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
// All DDL requests and pending compaction requests have already been processed.
// Safe to remove the region from status tracking.
@@ -352,6 +371,14 @@ impl CompactionScheduler {
Vec::new()
}
/// Notifies the scheduler that the compaction job is cancelled cooperatively.
pub(crate) async fn on_compaction_cancelled(
&mut self,
region_id: RegionId,
) -> Vec<SenderDdlRequest> {
self.remove_region_on_cancel(region_id)
}
/// Notifies the scheduler that the compaction job is failed.
pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
@@ -406,20 +433,23 @@ impl CompactionScheduler {
has_pending
}
/// Returns true if the region is compacting.
pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
self.region_status.contains_key(&region_id)
pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
let Some(status) = self.region_status.get_mut(&region_id) else {
return RequestCancelResult::NotRunning;
};
status.request_cancel()
}
/// Schedules a compaction request.
///
/// Returns true if the compaction request is scheduled successfully.
/// Returns false if no compaction task can be scheduled for this region.
/// Returns the active compaction state if the request is scheduled successfully.
/// Returns `None` if no compaction task can be scheduled for this region.
async fn schedule_compaction_request(
&mut self,
request: CompactionRequest,
options: compact_request::Options,
) -> Result<bool> {
) -> Result<Option<ActiveCompaction>> {
let region_id = request.region_id();
let (dynamic_compaction_opts, ttl) = find_dynamic_options(
region_id.table_id(),
@@ -492,7 +522,7 @@ impl CompactionScheduler {
for waiter in waiters {
waiter.send(Ok(0));
}
return Ok(false);
return Ok(None);
};
// If specified to run compaction remotely, we schedule the compaction job remotely.
@@ -523,7 +553,7 @@ impl CompactionScheduler {
job_id, region_id
);
INFLIGHT_COMPACTION_COUNT.inc();
return Ok(true);
return Ok(Some(ActiveCompaction::Remote));
}
Err(e) => {
if !dynamic_compaction_opts.fallback_to_local() {
@@ -555,21 +585,25 @@ impl CompactionScheduler {
// Create a local compaction task.
let estimated_bytes = estimate_compaction_bytes(&picker_output);
let cancel_handle = Arc::new(CancellationHandle::default());
let state = LocalCompactionState::new(cancel_handle.clone());
let local_compaction_task = Box::new(CompactionTaskImpl {
state: state.clone(),
request_sender,
waiters,
start_time,
listener,
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor::default()),
compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
memory_manager: self.memory_manager.clone(),
memory_policy: self.memory_policy,
estimated_memory_bytes: estimated_bytes,
});
self.submit_compaction_task(local_compaction_task, region_id)
.map(|_| true)
.map(|_| Some(ActiveCompaction::Local { state }))
}
fn submit_compaction_task(
@@ -597,6 +631,77 @@ impl CompactionScheduler {
// Notifies all pending tasks.
status.on_failure(err);
}
fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
let Some(status) = self.region_status.remove(&region_id) else {
return Vec::new();
};
status.on_cancel()
}
}
#[derive(Debug, Clone)]
pub(crate) struct LocalCompactionState {
cancel_handle: Arc<CancellationHandle>,
commit_started: Arc<Mutex<bool>>,
}
#[derive(Debug)]
enum ActiveCompaction {
Local { state: LocalCompactionState },
Remote,
}
impl LocalCompactionState {
fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
Self {
cancel_handle,
commit_started: Arc::new(Mutex::new(false)),
}
}
/// Returns the cancellation handle for this compaction task.
pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
self.cancel_handle.clone()
}
/// Marks the compaction task as started to commit,
/// which means the compaction task is in the final stage and is about to update region version and manifest.
/// It will reject cancellation request after this method is called.
///
/// Returns true if this is the first time to mark commit started, false otherwise.
pub(crate) fn mark_commit_started(&self) -> bool {
let mut commit_started = self.commit_started.lock().unwrap();
if self.cancel_handle.is_cancelled() {
return false;
}
*commit_started = true;
true
}
/// Request cancellation for this compaction task.
pub(crate) fn request_cancel(&self) -> RequestCancelResult {
// The cancel handle must under the lock of `commit_started` to avoid racing between cancellation and commit.
let commit_started = self.commit_started.lock().unwrap();
if *commit_started {
return RequestCancelResult::TooLateToCancel;
}
if self.cancel_handle.is_cancelled() {
return RequestCancelResult::AlreadyCancelling;
}
self.cancel_handle.cancel();
RequestCancelResult::CancelIssued
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RequestCancelResult {
CancelIssued,
AlreadyCancelling,
TooLateToCancel,
NotRunning,
}
impl Drop for CompactionScheduler {
@@ -703,6 +808,8 @@ struct CompactionStatus {
pending_request: Option<PendingCompaction>,
/// Pending DDL requests that should run when compaction is done.
pending_ddl_requests: Vec<SenderDdlRequest>,
/// Active compaction state.
active_compaction: Option<ActiveCompaction>,
}
impl CompactionStatus {
@@ -719,9 +826,39 @@ impl CompactionStatus {
waiters: Vec::new(),
pending_request: None,
pending_ddl_requests: Vec::new(),
active_compaction: None,
}
}
#[cfg(test)]
fn start_local_task(&mut self) -> LocalCompactionState {
let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
self.active_compaction = Some(ActiveCompaction::Local {
state: state.clone(),
});
state
}
#[cfg(test)]
fn start_remote_task(&mut self) {
self.active_compaction = Some(ActiveCompaction::Remote);
}
fn request_cancel(&mut self) -> RequestCancelResult {
let Some(active_compaction) = &self.active_compaction else {
return RequestCancelResult::NotRunning;
};
match active_compaction {
ActiveCompaction::Local { state, .. } => state.request_cancel(),
ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
}
}
fn clear_running_task(&mut self) -> bool {
self.active_compaction.take().is_some()
}
/// Merge the waiter to the pending compaction.
fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
@@ -764,6 +901,23 @@ impl CompactionStatus {
}
}
#[must_use]
fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
for waiter in self.waiters.drain(..) {
waiter.send(CompactionCancelledSnafu.fail());
}
if let Some(pending_compaction) = self.pending_request {
pending_compaction.waiter.send(
Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
region_id: self.region_id,
}),
);
}
std::mem::take(&mut self.pending_ddl_requests)
}
/// Creates a new compaction request for compaction picker.
///
/// It consumes all pending compaction waiters.
@@ -1362,6 +1516,58 @@ mod tests {
);
}
#[tokio::test]
async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
common_telemetry::init_default_ut_logging();
let env = SchedulerEnv::new()
.await
.scheduler(Arc::new(FailingScheduler));
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
let result = scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await;
assert!(result.is_err());
assert!(!scheduler.region_status.contains_key(&region_id));
}
#[tokio::test]
async fn test_manual_compaction_when_compaction_in_progress() {
common_telemetry::init_default_ut_logging();
@@ -1542,6 +1748,11 @@ mod tests {
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
@@ -1558,6 +1769,142 @@ mod tests {
assert!(scheduler.has_pending_ddls(region_id));
}
#[tokio::test]
async fn test_request_cancel_state_transitions() {
let env = SchedulerEnv::new().await;
let builder = VersionControlBuilder::new();
let region_id = builder.region_id();
let version_control = Arc::new(builder.build());
let mut status =
CompactionStatus::new(region_id, version_control, env.access_layer.clone());
let state = status.start_local_task();
assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
assert!(state.cancel_handle().is_cancelled());
assert_eq!(
status.request_cancel(),
RequestCancelResult::AlreadyCancelling
);
assert!(!state.mark_commit_started());
assert_eq!(
status.request_cancel(),
RequestCancelResult::AlreadyCancelling
);
assert!(status.clear_running_task());
assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
}
#[tokio::test]
async fn test_request_cancel_remote_compaction_is_too_late() {
let env = SchedulerEnv::new().await;
let builder = VersionControlBuilder::new();
let region_id = builder.region_id();
let version_control = Arc::new(builder.build());
let mut status =
CompactionStatus::new(region_id, version_control, env.access_layer.clone());
status.start_remote_task();
assert_eq!(
status.request_cancel(),
RequestCancelResult::TooLateToCancel
);
assert!(status.active_compaction.is_some());
}
#[tokio::test]
async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let _manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.has_pending_ddls(region_id));
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(job_scheduler.num_jobs(), 0);
}
#[tokio::test]
async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let _manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let status = scheduler.region_status.get_mut(&region_id).unwrap();
status.start_local_task();
let (manual_tx, manual_rx) = oneshot::channel();
status.set_pending_request(PendingCompaction {
options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
waiter: OptionOutputTx::from(manual_tx),
max_parallelism: 1,
});
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(job_scheduler.num_jobs(), 0);
assert_matches!(manual_rx.await.unwrap(), Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_compaction_failed() {
let env = SchedulerEnv::new().await;
@@ -1713,6 +2060,11 @@ mod tests {
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
@@ -1752,6 +2104,7 @@ mod tests {
let (manual_tx, manual_rx) = oneshot::channel();
let mut status =
CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
status.start_local_task();
status.set_pending_request(PendingCompaction {
options: compact_request::Options::Regular(Default::default()),
waiter: OptionOutputTx::from(manual_tx),
@@ -1827,6 +2180,7 @@ mod tests {
let (manual_tx, manual_rx) = oneshot::channel();
let mut status =
CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
status.start_local_task();
status.set_pending_request(PendingCompaction {
options: compact_request::Options::Regular(Default::default()),
waiter: OptionOutputTx::from(manual_tx),
@@ -1873,6 +2227,11 @@ mod tests {
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
@@ -1910,6 +2269,11 @@ mod tests {
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)

View File

@@ -16,8 +16,9 @@ use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;
use common_base::cancellation::{CancellableFuture, CancellationHandle};
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{info, warn};
use common_telemetry::{debug, info, warn};
use common_time::TimeToLive;
use either::Either;
use itertools::Itertools;
@@ -438,19 +439,25 @@ impl SstMerger for DefaultSstMerger {
/// implementations in tests.
pub struct DefaultCompactor<M = DefaultSstMerger> {
merger: M,
cancel_handle: Arc<CancellationHandle>,
}
impl Default for DefaultCompactor {
fn default() -> Self {
#[cfg(test)]
impl<M: SstMerger> DefaultCompactor<M> {
pub fn with_merger(merger: M) -> Self {
Self {
merger: DefaultSstMerger,
merger,
cancel_handle: Arc::new(CancellationHandle::default()),
}
}
}
impl<M: SstMerger> DefaultCompactor<M> {
pub fn with_merger(merger: M) -> Self {
Self { merger }
impl DefaultCompactor {
pub fn with_cancel_handle(cancel_handle: Arc<CancellationHandle>) -> Self {
Self {
merger: DefaultSstMerger,
cancel_handle,
}
}
}
@@ -503,7 +510,7 @@ where
chunk.push(task);
}
}
let spawned: Vec<_> = chunk
let mut spawned: Vec<_> = chunk
.into_iter()
.map(|(inputs, fut)| {
let handle = common_runtime::spawn_compact(fut);
@@ -511,30 +518,56 @@ where
})
.collect();
for (inputs, handle) in spawned {
match handle.await {
Ok(Ok(files)) => {
while let Some((inputs, handle)) = spawned.pop() {
let abort_handle = handle.abort_handle();
match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
Ok(Ok(Ok(files))) => {
output_files.extend(files);
compacted_inputs.extend(inputs);
}
Ok(Err(e)) => {
Ok(Ok(Err(e))) => {
warn!(
e; "Region {} failed to merge compaction output with inputs: [{}], skipping",
e; "Failed to merge compaction output for region: {}, inputs: [{}]",
region_id,
inputs.iter().map(|f| f.file_id.to_string()).join(",")
);
}
Err(e) => {
Ok(Err(e)) => {
warn!(
"Region {} compaction task join error for inputs: [{}], skipping: {}",
region_id,
inputs.iter().map(|f| f.file_id.to_string()).join(","),
e
);
// If the cancel handle is cancelled,
// cancel the remaining tasks before returns the error.
if self.cancel_handle.is_cancelled() {
abort_handle.abort();
for (_, handle) in spawned {
handle.abort();
}
}
return Err(e).context(error::JoinSnafu);
}
Err(_) => {
debug!(
"Compaction merge cancelled for region: {}, aborting remaining {} spawned tasks",
region_id,
spawned.len(),
);
abort_handle.abort();
for (_, handle) in spawned {
handle.abort();
}
break;
}
}
}
if self.cancel_handle.is_cancelled() {
info!("Compaction merge cancelled for region: {}", region_id);
break;
}
}
// Include expired SSTs in removals — these don't depend on merge success.
@@ -584,14 +617,17 @@ where
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use store_api::storage::{FileId, RegionId};
use tokio::time::sleep;
use super::*;
use super::{DefaultCompactor, *};
use crate::cache::CacheManager;
use crate::compaction::picker::PickerOutput;
use crate::error::Result;
use crate::sst::file::FileHandle;
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::version::SstVersion;
@@ -821,4 +857,85 @@ mod tests {
expired_meta.file_id
);
}
#[derive(Clone)]
struct BlockingMerger {
call_idx: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl SstMerger for BlockingMerger {
async fn merge_single_output(
&self,
_compaction_region: CompactionRegion,
_output: CompactionOutput,
_write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
self.call_idx.fetch_add(1, Ordering::SeqCst);
std::future::pending().await
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_merge_ssts_cancels_spawned_tasks() {
common_telemetry::init_default_ut_logging();
let mut compaction_region = new_test_compaction_region().await;
compaction_region.max_parallelism = 2;
let cancel_handle = Arc::new(CancellationHandle::default());
let call_idx = Arc::new(AtomicUsize::new(0));
let compactor = DefaultCompactor {
merger: BlockingMerger {
call_idx: call_idx.clone(),
},
cancel_handle: cancel_handle.clone(),
};
let picker_output = PickerOutput {
outputs: vec![
CompactionOutput {
output_level: 1,
inputs: vec![new_file_handle(dummy_file_meta())],
filter_deleted: false,
output_time_range: None,
},
CompactionOutput {
output_level: 1,
inputs: vec![new_file_handle(dummy_file_meta())],
filter_deleted: false,
output_time_range: None,
},
CompactionOutput {
output_level: 1,
inputs: vec![new_file_handle(dummy_file_meta())],
filter_deleted: false,
output_time_range: None,
},
],
expired_ssts: vec![],
time_window_size: 3600,
max_file_size: None,
};
let task = tokio::spawn(async move {
compactor
.merge_ssts(&compaction_region, picker_output)
.await
});
sleep(Duration::from_millis(100)).await;
cancel_handle.cancel();
let merge_output = task
.await
.expect("merge_ssts should stop after cancellation")
.unwrap();
let started = call_idx.load(Ordering::SeqCst);
assert!(merge_output.files_to_add.is_empty());
assert!(merge_output.files_to_remove.is_empty());
assert_eq!(started, 2);
}
}

View File

@@ -16,13 +16,15 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Instant;
use common_base::cancellation::CancellableFuture;
use common_memory_manager::OnExhaustedPolicy;
use common_telemetry::{error, info, warn};
use itertools::Itertools;
use snafu::ResultExt;
use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::LocalCompactionState;
use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
@@ -30,8 +32,8 @@ use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionRoleState;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
WorkerRequest, WorkerRequestWithTime,
BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx,
RegionEditResult, WorkerRequest, WorkerRequestWithTime,
};
use crate::sst::file::FileMeta;
use crate::worker::WorkerListener;
@@ -41,6 +43,8 @@ use crate::{error, metrics};
pub const MAX_PARALLEL_COMPACTION: usize = 1;
pub(crate) struct CompactionTaskImpl {
/// Shared local-compaction state for cooperative cancellation.
pub(crate) state: LocalCompactionState,
pub compaction_region: CompactionRegion,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
@@ -184,9 +188,7 @@ impl CompactionTaskImpl {
);
}
async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
self.mark_files_compacting(true);
async fn handle_expiration(&mut self) {
// 1. In case of local compaction, we can delete expired ssts in advance.
if !self.picker_output.expired_ssts.is_empty() {
let remove_timer = COMPACTION_STAGE_ELAPSED
@@ -203,7 +205,9 @@ impl CompactionTaskImpl {
.await;
remove_timer.observe_duration();
}
}
async fn handle_compaction(&mut self) -> error::Result<MergeOutput> {
// 2. Merge inputs
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
@@ -239,6 +243,13 @@ impl CompactionTaskImpl {
.on_merge_ssts_finished(self.compaction_region.region_id)
.await;
Ok(compaction_result)
}
async fn update_manifest(
&self,
compaction_result: crate::compaction::compactor::MergeOutput,
) -> error::Result<RegionEdit> {
let _manifest_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["write_manifest"])
.start_timer();
@@ -296,14 +307,61 @@ impl CompactionTask for CompactionTaskImpl {
}
};
let notify = match self.handle_expiration_and_compaction().await {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
edit,
}),
Err(e) => {
// Marks files compacting before compaction and unmark after compaction (even if compaction is cancelled or failed), so that they won't be picked by other compaction tasks.
self.mark_files_compacting(true);
self.handle_expiration().await;
let cancel_handle = self.state.cancel_handle();
// Run compaction with cooperative cancellation.
let notify = match CancellableFuture::new(
async { self.handle_compaction().await },
cancel_handle,
)
.await
{
Ok(Ok(merge_output)) => {
// Stop accepting cancellation once we are about to publish the compaction edit.
if !self.state.mark_commit_started() {
let senders = std::mem::take(&mut self.waiters);
BackgroundNotify::CompactionCancelled(CompactionCancelled {
region_id: self.compaction_region.region_id,
senders,
})
} else {
match self.update_manifest(merge_output).await {
Ok(edit) => {
let senders = std::mem::take(&mut self.waiters);
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
senders,
start_time: self.start_time,
edit,
})
}
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
let err = Arc::new(e);
self.on_failure(err.clone());
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: self.compaction_region.region_id,
err,
})
}
}
}
}
Err(_) => {
info!(
"Compaction cancelled, region id: {}",
self.compaction_region.region_id
);
let senders = std::mem::take(&mut self.waiters);
BackgroundNotify::CompactionCancelled(CompactionCancelled {
region_id: self.compaction_region.region_id,
senders,
})
}
Ok(Err(e)) => {
error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
let err = Arc::new(e);
// notify compaction waiters
@@ -334,7 +392,7 @@ mod tests {
fn test_picker_output_with_expired_ssts() {
// Test that PickerOutput correctly includes expired_ssts
// This verifies that expired SSTs are properly identified and included
// in the picker output, which is then handled by handle_expiration_and_compaction
// in the picker output, which is then handled by handle_expiration()
let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
let expired_ssts = vec![
@@ -382,6 +440,6 @@ mod tests {
//
// The behavior is tested indirectly through integration tests:
// - remove_expired() logs errors but doesn't stop compaction
// - handle_expiration_and_compaction() continues even if remove_expired() encounters errors
// - The function is designed to be non-blocking for compaction
// - handle_expiration() continues even if remove_expired() encounters errors
// - The expiration stage is designed to be non-blocking for compaction
}

View File

@@ -18,6 +18,8 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::{ColumnSchema, Rows};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::TimestampMillisecondType;
@@ -650,7 +652,7 @@ async fn test_readonly_during_compaction_with_format(flat_format: bool) {
}
#[tokio::test]
async fn test_enter_staging_deferred_by_inflight_compaction() {
async fn test_enter_staging_cancels_inflight_local_compaction_before_commit() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let listener = Arc::new(CompactionListener::default());
@@ -706,17 +708,91 @@ async fn test_enter_staging_deferred_by_inflight_compaction() {
}),
)
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(!enter_staging.is_finished());
// The enter staging should finished, and the compaction should be cancelled.
assert!(enter_staging.is_finished());
let _ = enter_staging.await.unwrap().unwrap();
}
listener.wake();
enter_staging.await.unwrap();
#[tokio::test]
async fn test_manual_compaction_returns_cancelled_when_enter_staging_cancels_it() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
MitoConfig {
max_background_purges: 1,
..Default::default()
},
None,
Some(listener.clone()),
None,
)
.await;
let region = engine.get_region(region_id).unwrap();
assert!(region.is_staging());
let region_id = RegionId::new(2050, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 5..20).await;
let engine_cloned = engine.clone();
let compact = tokio::spawn(async move {
engine_cloned
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
});
listener.wait_handle_finished().await;
let engine_cloned = engine.clone();
let enter_staging = tokio::spawn(async move {
engine_cloned
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
});
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(compact.is_finished());
assert!(enter_staging.is_finished());
let err = compact.await.unwrap().unwrap_err();
assert_eq!(err.status_code(), StatusCode::Cancelled);
let _ = enter_staging.await.unwrap();
}
#[tokio::test]

View File

@@ -1073,6 +1073,9 @@ pub enum Error {
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
#[snafu(display("Compaction is cancelled."))]
CompactionCancelled {},
#[snafu(display("Compaction memory exhausted for region {region_id} (policy: {policy})",))]
CompactionMemoryExhausted {
region_id: RegionId,
@@ -1389,7 +1392,7 @@ impl ErrorExt for Error {
#[cfg(feature = "vector_index")]
VectorIndexBuild { .. } | VectorIndexFinish { .. } => StatusCode::Internal,
ManualCompactionOverride {} => StatusCode::Cancelled,
ManualCompactionOverride {} | CompactionCancelled {} => StatusCode::Cancelled,
CompactionMemoryExhausted { source, .. } => source.status_code(),

View File

@@ -46,9 +46,9 @@ use store_api::storage::{FileId, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
Result, UnexpectedSnafu,
CompactRegionSnafu, CompactionCancelledSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
Error, FillDefaultSnafu, FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu,
MissingPartitionExprSnafu, Result, UnexpectedSnafu,
};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionEdit, TruncateKind};
@@ -895,6 +895,8 @@ pub(crate) enum BackgroundNotify {
IndexBuildFailed(IndexBuildFailed),
/// Compaction has finished.
CompactionFinished(CompactionFinished),
/// Compaction has been cancelled cooperatively.
CompactionCancelled(CompactionCancelled),
/// Compaction has failed.
CompactionFailed(CompactionFailed),
/// Truncate result.
@@ -991,6 +993,24 @@ pub(crate) struct CompactionFinished {
pub(crate) edit: RegionEdit,
}
/// Notifies a compaction job has been cancelled cooperatively.
#[derive(Debug)]
pub(crate) struct CompactionCancelled {
/// Region id.
pub(crate) region_id: RegionId,
/// Waiters to wake once the cancellation has been observed by the worker.
pub(crate) senders: Vec<OutputTx>,
}
impl CompactionCancelled {
pub(crate) fn on_success(self) {
for sender in self.senders {
sender.send(CompactionCancelledSnafu {}.fail());
}
info!("Compaction cancelled for region: {}", self.region_id);
}
}
impl CompactionFinished {
pub fn on_success(self) {
// only update compaction time on success
@@ -1149,10 +1169,13 @@ pub(crate) struct CopyRegionFromRequest {
mod tests {
use api::v1::value::ValueData;
use api::v1::{Row, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use mito_codec::test_util::i64_value;
use store_api::metadata::RegionMetadataBuilder;
use tokio::sync::oneshot;
use super::*;
use crate::error::Error;
@@ -1216,6 +1239,21 @@ mod tests {
assert_eq!(None, request.column_index_by_name("c2"));
}
#[test]
fn test_compaction_cancelled_sends_cancelled_error() {
let (tx, rx) = oneshot::channel();
let request = CompactionCancelled {
region_id: RegionId::new(1, 1),
senders: vec![OutputTx::new(tx)],
};
request.on_success();
let err = rx.blocking_recv().unwrap().unwrap_err();
assert!(matches!(err, Error::CompactionCancelled { .. }));
assert_eq!(err.status_code(), StatusCode::Cancelled);
}
#[test]
fn test_write_request_column_num() {
let rows = Rows {

View File

@@ -1196,6 +1196,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
BackgroundNotify::CompactionFinished(req) => {
self.handle_compaction_finished(region_id, req).await
}
BackgroundNotify::CompactionCancelled(req) => {
self.handle_compaction_cancelled(region_id, req).await
}
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
BackgroundNotify::RegionChange(req) => {

View File

@@ -23,7 +23,8 @@ use crate::error::RegionNotFoundSnafu;
use crate::metrics::COMPACTION_REQUEST_COUNT;
use crate::region::MitoRegionRef;
use crate::request::{
BuildIndexRequest, CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx,
BuildIndexRequest, CompactionCancelled, CompactionFailed, CompactionFinished, OnFailure,
OptionOutputTx,
};
use crate::sst::index::IndexBuildType;
use crate::worker::RegionWorkerLoop;
@@ -119,6 +120,28 @@ impl<S> RegionWorkerLoop<S> {
self.handle_ddl_requests(&mut pending_ddls).await;
}
pub(crate) async fn handle_compaction_cancelled(
&mut self,
region_id: RegionId,
request: CompactionCancelled,
) where
S: LogStore,
{
request.on_success();
// Reuse the scheduler's finish path to wake pending DDLs after a cooperative stop.
let mut pending_ddls = match self.regions.get_region(region_id) {
Some(_) => {
self.compaction_scheduler
.on_compaction_cancelled(region_id)
.await
}
None => Vec::new(),
};
self.handle_ddl_requests(&mut pending_ddls).await;
}
/// When compaction fails, we simply log the error.
pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
error!(req.err; "Failed to compact region: {}", req.region_id);

View File

@@ -19,6 +19,7 @@ use store_api::logstore::LogStore;
use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
use store_api::storage::RegionId;
use crate::compaction::RequestCancelResult;
use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
use crate::flush::FlushReason;
use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange};
@@ -98,18 +99,24 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
if self.compaction_scheduler.is_compacting(region_id) {
// Safety: region is compacting, add ddl request to pending queue.
self.compaction_scheduler
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender,
request: DdlRequest::EnterStaging(EnterStagingRequest {
partition_directive,
}),
});
match self.compaction_scheduler.request_cancel(region_id) {
RequestCancelResult::CancelIssued
| RequestCancelResult::AlreadyCancelling
| RequestCancelResult::TooLateToCancel => {
// Safety: region is compacting or has entered the non-cancellable publish stage,
// keep the DDL pending until the current task finishes or acknowledges cancellation.
self.compaction_scheduler
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender,
request: DdlRequest::EnterStaging(EnterStagingRequest {
partition_directive,
}),
});
return;
return;
}
RequestCancelResult::NotRunning => {}
}
self.handle_enter_staging(region, partition_directive, sender);