fix(mito2): ensure enter staging waits for compaction (#7776)

* fix: do not schedule compaction

Signed-off-by: WenyXu <wenymedia@gmail.com>

* mito2: add pending ddl queue primitives to compaction scheduler

Signed-off-by: WenyXu <wenymedia@gmail.com>

* mito2: hand off pending ddls on compaction finish

Signed-off-by: WenyXu <wenymedia@gmail.com>

* mito2: defer enter staging via compaction pending ddl queue

Signed-off-by: WenyXu <wenymedia@gmail.com>

* mito2: cover pending-ddl failure paths on region lifecycle events

Signed-off-by: WenyXu <wenymedia@gmail.com>

* mito2: replay pending ddls directly in compaction handler

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: styling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: styling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-03-10 21:49:40 +08:00
committed by GitHub
parent 04cd2c8a05
commit b1b91e88f4
5 changed files with 670 additions and 44 deletions

View File

@@ -65,7 +65,7 @@ use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
use crate::region::options::{MergeMode, RegionOptions};
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
};
@@ -186,7 +186,7 @@ impl CompactionScheduler {
}
// The region can compact directly.
let mut status =
let mut status: CompactionStatus =
CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
let request = status.new_compaction_request(
self.request_sender.clone(),
@@ -198,34 +198,47 @@ impl CompactionScheduler {
schema_metadata_manager,
max_parallelism,
);
self.region_status.insert(region_id, status);
let result = self
.schedule_compaction_request(request, compact_options)
.await;
if matches!(result, Ok(true)) {
// Only if the compaction request is scheduled successfully,
// we insert the region into the status map.
self.region_status.insert(region_id, status);
}
self.listener.on_compaction_scheduled(region_id);
result
result.map(|_| ())
}
/// Notifies the scheduler that the compaction job is finished successfully.
pub(crate) async fn on_compaction_finished(
// Handle pending manual compaction request for the region.
//
// Returns true if should early return, false otherwise.
pub(crate) async fn handle_pending_compaction_request(
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) {
) -> bool {
let Some(status) = self.region_status.get_mut(&region_id) else {
return;
return true;
};
if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
let PendingCompaction {
options,
waiter,
max_parallelism,
} = pending_request;
// If there is a pending manual compaction request, schedule it.
// and defer returning the pending DDL requests to the caller.
let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
return false;
};
let request = status.new_compaction_request(
let PendingCompaction {
options,
waiter,
max_parallelism,
} = pending_request;
let request = {
status.new_compaction_request(
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
@@ -234,17 +247,68 @@ impl CompactionScheduler {
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
)
};
if let Err(e) = self.schedule_compaction_request(request, options).await {
error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
} else {
match self.schedule_compaction_request(request, options).await {
Ok(true) => {
debug!(
"Successfully scheduled manual compaction for region id: {}",
region_id
);
true
}
return;
Ok(false) => {
// We still need to handle the pending DDL requests.
// So we can't return early here.
false
}
Err(e) => {
error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
self.remove_region_on_failure(region_id, Arc::new(e));
true
}
}
}
/// Notifies the scheduler that the compaction job is finished successfully.
pub(crate) async fn on_compaction_finished(
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Vec<SenderDdlRequest> {
// If there a pending compaction request, handle it first
// and defer returning the pending DDL requests to the caller.
if self
.handle_pending_compaction_request(
region_id,
manifest_ctx,
schema_metadata_manager.clone(),
)
.await
{
return Vec::new();
}
let Some(status) = self.region_status.get_mut(&region_id) else {
// The region status might be removed by the previous steps.
// So we return empty DDL requests.
return Vec::new();
};
// Notify all waiters that compaction is finished.
for waiter in std::mem::take(&mut status.waiters) {
waiter.send(Ok(0));
}
// If there are pending DDL requests, run them.
let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
if !pending_ddl_requests.is_empty() {
self.region_status.remove(&region_id);
// If there are pending DDL requests, we should return them to the caller.
// And skip try to schedule next compaction task.
return pending_ddl_requests;
}
// We should always try to compact the region until picker returns None.
@@ -258,28 +322,40 @@ impl CompactionScheduler {
schema_metadata_manager,
MAX_PARALLEL_COMPACTION,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self
match self
.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
)
.await
{
error!(e; "Failed to schedule next compaction for region {}", region_id);
Ok(true) => {
debug!(
"Successfully scheduled next compaction for region id: {}",
region_id
);
}
Ok(false) => {
// No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
// All DDL requests and pending compaction requests have already been processed.
// Safe to remove the region from status tracking.
self.region_status.remove(&region_id);
}
Err(e) => {
error!(e; "Failed to schedule next compaction for region {}", region_id);
self.remove_region_on_failure(region_id, Arc::new(e));
}
}
Vec::new()
}
/// Notifies the scheduler that the compaction job is failed.
pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};
// Fast fail: cancels all pending tasks and sends error to their waiters.
status.on_failure(err);
self.remove_region_on_failure(region_id, err);
}
/// Notifies the scheduler that the region is dropped.
@@ -303,14 +379,47 @@ impl CompactionScheduler {
);
}
/// Add ddl request to pending queue.
///
/// # Panics
/// Panics if region didn't request compaction.
pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
debug!(
"Added pending DDL request for region: {}, ddl: {:?}",
request.region_id, request.request
);
let status = self.region_status.get_mut(&request.region_id).unwrap();
status.pending_ddl_requests.push(request);
}
#[cfg(test)]
pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
let has_pending = self
.region_status
.get(&region_id)
.map(|status| !status.pending_ddl_requests.is_empty())
.unwrap_or(false);
debug!(
"Checked pending DDL requests for region: {}, has_pending: {}",
region_id, has_pending
);
has_pending
}
/// Returns true if the region is compacting.
pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
self.region_status.contains_key(&region_id)
}
/// Schedules a compaction request.
///
/// If the region has nothing to compact, it removes the region from the status map.
/// Returns true if the compaction request is scheduled successfully.
/// Returns false if no compaction task can be scheduled for this region.
async fn schedule_compaction_request(
&mut self,
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
) -> Result<bool> {
let region_id = request.region_id();
let (dynamic_compaction_opts, ttl) = find_dynamic_options(
region_id.table_id(),
@@ -383,8 +492,7 @@ impl CompactionScheduler {
for waiter in waiters {
waiter.send(Ok(0));
}
self.region_status.remove(&region_id);
return Ok(());
return Ok(false);
};
// If specified to run compaction remotely, we schedule the compaction job remotely.
@@ -415,7 +523,7 @@ impl CompactionScheduler {
job_id, region_id
);
INFLIGHT_COMPACTION_COUNT.inc();
return Ok(());
return Ok(true);
}
Err(e) => {
if !dynamic_compaction_opts.fallback_to_local() {
@@ -461,6 +569,7 @@ impl CompactionScheduler {
});
self.submit_compaction_task(local_compaction_task, region_id)
.map(|_| true)
}
fn submit_compaction_task(
@@ -474,11 +583,9 @@ impl CompactionScheduler {
task.run().await;
INFLIGHT_COMPACTION_COUNT.dec();
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
self.region_status.remove(&region_id);
e
})
.inspect_err(
|e| error!(e; "Failed to submit compaction request for region {}", region_id),
)
}
fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
@@ -597,6 +704,8 @@ struct CompactionStatus {
waiters: Vec<OutputTx>,
/// Pending compactions that are supposed to run as soon as current compaction task finished.
pending_request: Option<PendingCompaction>,
/// Pending DDL requests that should run when compaction is done.
pending_ddl_requests: Vec<SenderDdlRequest>,
}
impl CompactionStatus {
@@ -612,6 +721,7 @@ impl CompactionStatus {
access_layer,
waiters: Vec::new(),
pending_request: None,
pending_ddl_requests: Vec::new(),
}
}
@@ -647,6 +757,14 @@ impl CompactionStatus {
region_id: self.region_id,
}));
}
for pending_ddl in self.pending_ddl_requests {
pending_ddl
.sender
.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Creates a new compaction request for compaction picker.
@@ -869,6 +987,7 @@ struct PendingCompaction {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use api::v1::region::StrictWindow;
@@ -879,13 +998,28 @@ mod tests {
use super::*;
use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
use crate::error::InvalidSchedulerStateSnafu;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::ManifestContext;
use crate::schedule::scheduler::{Job, Scheduler};
use crate::sst::FormatType;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
struct FailingScheduler;
#[async_trait::async_trait]
impl Scheduler for FailingScheduler {
fn schedule(&self, _job: Job) -> Result<()> {
InvalidSchedulerStateSnafu.fail()
}
async fn stop(&self, _await_termination: bool) -> Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_find_compaction_options_db_level() {
let env = SchedulerEnv::new().await;
@@ -1406,6 +1540,396 @@ mod tests {
assert_eq!(0, scheduler.region_status.len());
}
#[tokio::test]
async fn test_add_ddl_request_to_pending() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_compaction_failed() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler
.on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_region_closed() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler.on_region_closed(region_id);
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_region_dropped() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler.on_region_dropped(region_id);
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_region_truncated() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler.on_region_truncated(region_id);
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_on_compaction_finished_returns_pending_ddl_requests() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.has_pending_ddls(region_id));
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(job_scheduler.num_jobs(), 0);
}
#[tokio::test]
async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
let (manual_tx, manual_rx) = oneshot::channel();
let mut status =
CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
status.set_pending_request(PendingCompaction {
options: compact_request::Options::Regular(Default::default()),
waiter: OptionOutputTx::from(manual_tx),
max_parallelism: 1,
});
scheduler.region_status.insert(region_id, status);
let (ddl_tx, _ddl_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(ddl_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
}
#[tokio::test]
async fn test_on_compaction_finished_returns_empty_when_region_absent() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let region_id = builder.region_id();
let version_control = Arc::new(builder.build());
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
}
#[tokio::test]
async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
let env = SchedulerEnv::new()
.await
.scheduler(Arc::new(FailingScheduler));
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
let (manual_tx, manual_rx) = oneshot::channel();
let mut status =
CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
status.set_pending_request(PendingCompaction {
options: compact_request::Options::Regular(Default::default()),
waiter: OptionOutputTx::from(manual_tx),
max_parallelism: 1,
});
scheduler.region_status.insert(region_id, status);
let (ddl_tx, ddl_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(ddl_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
assert!(!scheduler.region_status.contains_key(&region_id));
assert!(manual_rx.await.is_err());
assert_matches!(ddl_rx.await.unwrap(), Err(_));
}
#[tokio::test]
async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
assert!(!scheduler.region_status.contains_key(&region_id));
}
#[tokio::test]
async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
let env = SchedulerEnv::new()
.await
.scheduler(Arc::new(FailingScheduler));
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
assert!(!scheduler.region_status.contains_key(&region_id));
}
#[tokio::test]
async fn test_concurrent_memory_competition() {
let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB

View File

@@ -24,8 +24,9 @@ use datatypes::arrow::datatypes::TimestampMillisecondType;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::AlterKind::SetRegionOptions;
use store_api::region_request::{
PathType, RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, SetRegionOption,
EnterStagingRequest, PathType, RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest,
RegionFlushRequest, RegionOpenRequest, RegionRequest, SetRegionOption,
StagingPartitionDirective,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;
@@ -648,6 +649,76 @@ async fn test_readonly_during_compaction_with_format(flat_format: bool) {
assert_eq!((0..20).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_enter_staging_deferred_by_inflight_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
MitoConfig {
max_background_purges: 1,
..Default::default()
},
None,
Some(listener.clone()),
None,
)
.await;
let region_id = RegionId::new(2048, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 5..20).await;
listener.wait_handle_finished().await;
let engine_cloned = engine.clone();
let enter_staging = tokio::spawn(async move {
engine_cloned
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(!enter_staging.is_finished());
listener.wake();
enter_staging.await.unwrap();
let region = engine.get_region(region_id).unwrap();
assert!(region.is_staging());
}
#[tokio::test]
async fn test_compaction_update_time_window() {
test_compaction_update_time_window_with_format(false).await;

View File

@@ -300,11 +300,16 @@ impl MitoRegion {
}
/// Returns whether the region is in staging mode.
#[allow(dead_code)]
pub(crate) fn is_staging(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
}
/// Returns whether the region is entering staging mode.
pub(crate) fn is_enter_staging(&self) -> bool {
self.manifest_ctx.state.load()
== RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
}
pub fn region_id(&self) -> RegionId {
self.region_id
}

View File

@@ -14,6 +14,7 @@
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
@@ -68,7 +69,9 @@ impl<S> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
mut request: CompactionFinished,
) {
) where
S: LogStore,
{
let region = match self.regions.get_region(region_id) {
Some(region) => region,
None => {
@@ -105,13 +108,15 @@ impl<S> RegionWorkerLoop<S> {
}
// Schedule next compaction if necessary.
self.compaction_scheduler
let mut pending_ddls = self
.compaction_scheduler
.on_compaction_finished(
region_id,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
)
.await;
self.handle_ddl_requests(&mut pending_ddls).await;
}
/// When compaction fails, we simply log the error.
@@ -124,6 +129,13 @@ impl<S> RegionWorkerLoop<S> {
/// Schedule compaction for the region if necessary.
pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
if region.is_staging() || region.is_enter_staging() {
info!(
"Region {} is staging or entering staging, skip compaction",
region.region_id
);
return;
}
let now = self.time_provider.current_time_millis();
if now - region.last_compaction_millis()
>= self.config.min_compaction_interval.as_millis() as i64

View File

@@ -98,6 +98,20 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
if self.compaction_scheduler.is_compacting(region_id) {
// Safety: region is compacting, add ddl request to pending queue.
self.compaction_scheduler
.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender,
request: DdlRequest::EnterStaging(EnterStagingRequest {
partition_directive,
}),
});
return;
}
self.handle_enter_staging(region, partition_directive, sender);
}