Files
greptimedb/src/mito2/src/compaction.rs

2417 lines
87 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod buckets;
pub mod compactor;
pub mod memory_manager;
pub mod picker;
pub mod run;
mod task;
#[cfg(test)]
mod test_util;
mod twcs;
mod window;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use arrow_schema::Schema;
use common_base::Plugins;
use common_base::cancellation::CancellationHandle;
use common_memory_manager::OnExhaustedPolicy;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::{TimeToLive, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type::JsonNativeType;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::metadata::PageIndexPolicy;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use task::MAX_PARALLEL_COMPACTION;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
use crate::compaction::memory_manager::CompactionMemoryManager;
use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error,
GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, ParquetToArrowSchemaSnafu,
RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::FlatSource;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::read_columns::ReadColumns;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::seq_scan::SeqScan;
use crate::region::options::{MergeMode, RegionOptions};
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileMeta, Level};
use crate::sst::parquet::reader::MetadataCacheMetrics;
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
/// Region compaction request.
pub struct CompactionRequest {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) current_version: CompactionVersion,
pub(crate) access_layer: AccessLayerRef,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
/// Waiters of the compaction request.
pub(crate) waiters: Vec<OutputTx>,
/// Start time of compaction task.
pub(crate) start_time: Instant,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) listener: WorkerListener,
pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
pub(crate) max_parallelism: usize,
}
impl CompactionRequest {
pub(crate) fn region_id(&self) -> RegionId {
self.current_version.metadata.region_id
}
}
/// Compaction scheduler tracks and manages compaction tasks.
pub(crate) struct CompactionScheduler {
scheduler: SchedulerRef,
/// Compacting regions.
region_status: HashMap<RegionId, CompactionStatus>,
/// Request sender of the worker that this scheduler belongs to.
request_sender: Sender<WorkerRequestWithTime>,
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
memory_manager: Arc<CompactionMemoryManager>,
memory_policy: OnExhaustedPolicy,
listener: WorkerListener,
/// Plugins for the compaction scheduler.
plugins: Plugins,
}
impl CompactionScheduler {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
scheduler: SchedulerRef,
request_sender: Sender<WorkerRequestWithTime>,
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
plugins: Plugins,
memory_manager: Arc<CompactionMemoryManager>,
memory_policy: OnExhaustedPolicy,
) -> Self {
Self {
scheduler,
region_status: HashMap::new(),
request_sender,
cache_manager,
engine_config,
memory_manager,
memory_policy,
listener,
plugins,
}
}
/// Schedules a compaction for the region.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn schedule_compaction(
&mut self,
region_id: RegionId,
compact_options: compact_request::Options,
version_control: &VersionControlRef,
access_layer: &AccessLayerRef,
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
// skip compaction if region is in staging state
let current_state = manifest_ctx.current_state();
if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
info!(
"Skipping compaction for region {} in staging mode, options: {:?}",
region_id, compact_options
);
waiter.send(Ok(0));
return Ok(());
}
if let Some(status) = self.region_status.get_mut(&region_id) {
match compact_options {
Options::Regular(_) => {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
}
options @ Options::StrictWindow(_) => {
// Incoming compaction request is manually triggered.
status.set_pending_request(PendingCompaction {
options,
waiter,
max_parallelism,
});
info!(
"Region {} is compacting, manually compaction will be re-scheduled.",
region_id
);
}
}
return Ok(());
}
// The region can compact directly.
let mut status =
CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
let result = match self
.schedule_compaction_request(request, compact_options)
.await
{
Ok(Some(active_compaction)) => {
// Publish CompactionStatus only after a task has been accepted by the scheduler.
// This avoids exposing a half-initialized region status that could collect pending
// DDL/compaction state even though no compaction is actually running.
status.active_compaction = Some(active_compaction);
self.region_status.insert(region_id, status);
Ok(())
}
Ok(None) => Ok(()),
Err(e) => Err(e),
};
self.listener.on_compaction_scheduled(region_id);
result
}
// Handle pending manual compaction request for the region.
//
// Returns true if should early return, false otherwise.
pub(crate) async fn handle_pending_compaction_request(
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> bool {
let Some(status) = self.region_status.get_mut(&region_id) else {
return true;
};
// If there is a pending manual compaction request, schedule it.
// and defer returning the pending DDL requests to the caller.
let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
return false;
};
let PendingCompaction {
options,
waiter,
max_parallelism,
} = pending_request;
let request = {
status.new_compaction_request(
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
)
};
match self.schedule_compaction_request(request, options).await {
Ok(Some(active_compaction)) => {
let status = self.region_status.get_mut(&region_id).unwrap();
status.active_compaction = Some(active_compaction);
debug!(
"Successfully scheduled manual compaction for region id: {}",
region_id
);
true
}
Ok(None) => {
// We still need to handle the pending DDL requests.
// So we can't return early here.
false
}
Err(e) => {
error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
self.remove_region_on_failure(region_id, Arc::new(e));
true
}
}
}
/// Notifies the scheduler that the compaction job is finished successfully.
pub(crate) async fn on_compaction_finished(
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Vec<SenderDdlRequest> {
let Some(status) = self.region_status.get_mut(&region_id) else {
return Vec::new();
};
status.clear_running_task();
// If there a pending compaction request, handle it first
// and defer returning the pending DDL requests to the caller.
if self
.handle_pending_compaction_request(
region_id,
manifest_ctx,
schema_metadata_manager.clone(),
)
.await
{
return Vec::new();
}
let Some(status) = self.region_status.get_mut(&region_id) else {
// The region status might be removed by the previous steps.
// So we return empty DDL requests.
return Vec::new();
};
for waiter in std::mem::take(&mut status.waiters) {
waiter.send(Ok(0));
}
// If there are pending DDL requests, run them.
let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
if !pending_ddl_requests.is_empty() {
self.region_status.remove(&region_id);
// If there are pending DDL requests, we should return them to the caller.
// And skip try to schedule next compaction task.
return pending_ddl_requests;
}
// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
self.request_sender.clone(),
OptionOutputTx::none(),
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
MAX_PARALLEL_COMPACTION,
);
// Try to schedule next compaction task for this region.
match self
.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
)
.await
{
Ok(Some(active_compaction)) => {
self.region_status
.get_mut(&region_id)
.unwrap()
.active_compaction = Some(active_compaction);
debug!(
"Successfully scheduled next compaction for region id: {}",
region_id
);
}
Ok(None) => {
// No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
// All DDL requests and pending compaction requests have already been processed.
// Safe to remove the region from status tracking.
self.region_status.remove(&region_id);
}
Err(e) => {
error!(e; "Failed to schedule next compaction for region {}", region_id);
self.remove_region_on_failure(region_id, Arc::new(e));
}
}
Vec::new()
}
/// Notifies the scheduler that the compaction job is cancelled cooperatively.
pub(crate) async fn on_compaction_cancelled(
&mut self,
region_id: RegionId,
) -> Vec<SenderDdlRequest> {
self.remove_region_on_cancel(region_id)
}
/// Notifies the scheduler that the compaction job is failed.
pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
self.remove_region_on_failure(region_id, err);
}
/// Notifies the scheduler that the region is dropped.
pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionDroppedSnafu { region_id }.build()),
);
}
/// Notifies the scheduler that the region is closed.
pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
}
/// Notifies the scheduler that the region is truncated.
pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionTruncatedSnafu { region_id }.build()),
);
}
/// Add ddl request to pending queue.
///
/// # Panics
/// Panics if region didn't request compaction.
pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
debug!(
"Added pending DDL request for region: {}, ddl: {:?}",
request.region_id, request.request
);
let status = self.region_status.get_mut(&request.region_id).unwrap();
status.pending_ddl_requests.push(request);
}
#[cfg(test)]
pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
let has_pending = self
.region_status
.get(&region_id)
.map(|status| !status.pending_ddl_requests.is_empty())
.unwrap_or(false);
debug!(
"Checked pending DDL requests for region: {}, has_pending: {}",
region_id, has_pending
);
has_pending
}
pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
let Some(status) = self.region_status.get_mut(&region_id) else {
return RequestCancelResult::NotRunning;
};
status.request_cancel()
}
/// Schedules a compaction request.
///
/// Returns the active compaction state if the request is scheduled successfully.
/// Returns `None` if no compaction task can be scheduled for this region.
async fn schedule_compaction_request(
&mut self,
request: CompactionRequest,
options: compact_request::Options,
) -> Result<Option<ActiveCompaction>> {
let region_id = request.region_id();
let (dynamic_compaction_opts, ttl) = find_dynamic_options(
region_id,
&request.current_version.options,
&request.schema_metadata_manager,
)
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to find dynamic options for region: {}", region_id);
(
request.current_version.options.compaction.clone(),
request.current_version.options.ttl.unwrap_or_default(),
)
});
let picker = new_picker(
&options,
&dynamic_compaction_opts,
request.current_version.options.append_mode,
Some(self.engine_config.max_background_compactions),
);
let region_id = request.region_id();
let CompactionRequest {
engine_config,
current_version,
access_layer,
request_sender,
waiters,
start_time,
cache_manager,
manifest_ctx,
listener,
schema_metadata_manager: _,
max_parallelism,
} = request;
debug!(
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
picker, region_id, ttl
);
let compaction_region = CompactionRegion {
region_id,
current_version: current_version.clone(),
region_options: RegionOptions {
compaction: dynamic_compaction_opts.clone(),
..current_version.options.clone()
},
engine_config: engine_config.clone(),
region_metadata: current_version.metadata.clone(),
cache_manager: cache_manager.clone(),
access_layer: access_layer.clone(),
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl: Some(ttl),
max_parallelism,
};
let picker_output = {
let _pick_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick"])
.start_timer();
picker.pick(&compaction_region)
};
let picker_output = if let Some(picker_output) = picker_output {
picker_output
} else {
// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
for waiter in waiters {
waiter.send(Ok(0));
}
return Ok(None);
};
// If specified to run compaction remotely, we schedule the compaction job remotely.
// It will fall back to local compaction if there is no remote job scheduler.
let waiters = if dynamic_compaction_opts.remote_compaction() {
if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
let remote_compaction_job = CompactionJob {
compaction_region: compaction_region.clone(),
picker_output: picker_output.clone(),
start_time,
waiters,
ttl,
};
let result = remote_job_scheduler
.schedule(
RemoteJob::CompactionJob(remote_compaction_job),
Box::new(DefaultNotifier {
request_sender: request_sender.clone(),
}),
)
.await;
match result {
Ok(job_id) => {
info!(
"Scheduled remote compaction job {} for region {}",
job_id, region_id
);
INFLIGHT_COMPACTION_COUNT.inc();
return Ok(Some(ActiveCompaction::Remote));
}
Err(e) => {
if !dynamic_compaction_opts.fallback_to_local() {
error!(e; "Failed to schedule remote compaction job for region {}", region_id);
return RemoteCompactionSnafu {
region_id,
job_id: None,
reason: e.reason,
}
.fail();
}
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
// Return the waiters back to the caller for local compaction.
e.waiters
}
}
} else {
debug!(
"Remote compaction is not enabled, fallback to local compaction for region {}",
region_id
);
waiters
}
} else {
waiters
};
// Create a local compaction task.
let estimated_bytes = estimate_compaction_bytes(&picker_output);
let cancel_handle = Arc::new(CancellationHandle::default());
let state = LocalCompactionState::new(cancel_handle.clone());
let local_compaction_task = Box::new(CompactionTaskImpl {
state: state.clone(),
request_sender,
waiters,
start_time,
listener,
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
memory_manager: self.memory_manager.clone(),
memory_policy: self.memory_policy,
estimated_memory_bytes: estimated_bytes,
});
self.submit_compaction_task(local_compaction_task, region_id)
.map(|_| Some(ActiveCompaction::Local { state }))
}
fn submit_compaction_task(
&mut self,
mut task: Box<CompactionTaskImpl>,
region_id: RegionId,
) -> Result<()> {
self.scheduler
.schedule(Box::pin(async move {
INFLIGHT_COMPACTION_COUNT.inc();
task.run().await;
INFLIGHT_COMPACTION_COUNT.dec();
}))
.inspect_err(
|e| error!(e; "Failed to submit compaction request for region {}", region_id),
)
}
fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};
// Notifies all pending tasks.
status.on_failure(err);
}
fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
let Some(status) = self.region_status.remove(&region_id) else {
return Vec::new();
};
status.on_cancel()
}
}
#[derive(Debug, Clone)]
pub(crate) struct LocalCompactionState {
cancel_handle: Arc<CancellationHandle>,
commit_started: Arc<Mutex<bool>>,
}
#[derive(Debug)]
enum ActiveCompaction {
Local { state: LocalCompactionState },
Remote,
}
impl LocalCompactionState {
fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
Self {
cancel_handle,
commit_started: Arc::new(Mutex::new(false)),
}
}
/// Returns the cancellation handle for this compaction task.
pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
self.cancel_handle.clone()
}
/// Marks the compaction task as started to commit,
/// which means the compaction task is in the final stage and is about to update region version and manifest.
/// It will reject cancellation request after this method is called.
///
/// Returns true if this is the first time to mark commit started, false otherwise.
pub(crate) fn mark_commit_started(&self) -> bool {
let mut commit_started = self.commit_started.lock().unwrap();
if self.cancel_handle.is_cancelled() {
return false;
}
*commit_started = true;
true
}
/// Request cancellation for this compaction task.
pub(crate) fn request_cancel(&self) -> RequestCancelResult {
// The cancel handle must under the lock of `commit_started` to avoid racing between cancellation and commit.
let commit_started = self.commit_started.lock().unwrap();
if *commit_started {
return RequestCancelResult::TooLateToCancel;
}
if self.cancel_handle.is_cancelled() {
return RequestCancelResult::AlreadyCancelling;
}
self.cancel_handle.cancel();
RequestCancelResult::CancelIssued
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RequestCancelResult {
CancelIssued,
AlreadyCancelling,
TooLateToCancel,
NotRunning,
}
impl Drop for CompactionScheduler {
fn drop(&mut self) {
for (region_id, status) in self.region_status.drain() {
// We are shutting down so notify all pending tasks.
status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
}
}
}
/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
async fn find_dynamic_options(
region_id: RegionId,
region_options: &crate::region::options::RegionOptions,
schema_metadata_manager: &SchemaMetadataManagerRef,
) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
let table_id = region_id.table_id();
if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
debug!(
"Use region options directly for table {}: compaction={:?}, ttl={:?}",
table_id, region_options.compaction, region_options.ttl
);
return Ok((region_options.compaction.clone(), ttl));
}
let db_options = tokio::time::timeout(
crate::config::FETCH_OPTION_TIMEOUT,
schema_metadata_manager.get_schema_options_by_table_id(table_id),
)
.await
.context(TimeoutSnafu)?
.context(GetSchemaMetadataSnafu)?;
let ttl = if let Some(ttl) = region_options.ttl {
debug!(
"Use region TTL directly for table {}: ttl={:?}",
table_id, region_options.ttl
);
ttl
} else {
db_options
.as_ref()
.and_then(|options| options.ttl)
.unwrap_or_default()
.into()
};
let compaction = if !region_options.compaction_override {
if let Some(schema_opts) = db_options {
let map: HashMap<String, String> = schema_opts
.extra_options
.iter()
.filter_map(|(k, v)| {
if k.starts_with("compaction.") {
Some((k.clone(), v.clone()))
} else {
None
}
})
.collect();
if map.is_empty() {
region_options.compaction.clone()
} else {
crate::region::options::RegionOptions::try_from_options(region_id, &map)
.map(|o| o.compaction)
.unwrap_or_else(|e| {
error!(e; "Failed to create RegionOptions from map");
region_options.compaction.clone()
})
}
} else {
debug!(
"DB options is None for table {}, use region compaction: compaction={:?}",
table_id, region_options.compaction
);
region_options.compaction.clone()
}
} else {
debug!(
"No schema options for table {}, use region compaction: compaction={:?}",
table_id, region_options.compaction
);
region_options.compaction.clone()
};
debug!(
"Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
table_id, compaction, ttl
);
Ok((compaction, ttl))
}
/// Status of running and pending region compaction tasks.
struct CompactionStatus {
/// Id of the region.
region_id: RegionId,
/// Version control of the region.
version_control: VersionControlRef,
/// Access layer of the region.
access_layer: AccessLayerRef,
/// Pending waiters for compaction.
waiters: Vec<OutputTx>,
/// Pending compactions that are supposed to run as soon as current compaction task finished.
pending_request: Option<PendingCompaction>,
/// Pending DDL requests that should run when compaction is done.
pending_ddl_requests: Vec<SenderDdlRequest>,
/// Active compaction state.
active_compaction: Option<ActiveCompaction>,
}
impl CompactionStatus {
/// Creates a new [CompactionStatus]
fn new(
region_id: RegionId,
version_control: VersionControlRef,
access_layer: AccessLayerRef,
) -> CompactionStatus {
CompactionStatus {
region_id,
version_control,
access_layer,
waiters: Vec::new(),
pending_request: None,
pending_ddl_requests: Vec::new(),
active_compaction: None,
}
}
#[cfg(test)]
fn start_local_task(&mut self) -> LocalCompactionState {
let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
self.active_compaction = Some(ActiveCompaction::Local {
state: state.clone(),
});
state
}
#[cfg(test)]
fn start_remote_task(&mut self) {
self.active_compaction = Some(ActiveCompaction::Remote);
}
fn request_cancel(&mut self) -> RequestCancelResult {
let Some(active_compaction) = &self.active_compaction else {
return RequestCancelResult::NotRunning;
};
match active_compaction {
ActiveCompaction::Local { state, .. } => state.request_cancel(),
ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
}
}
fn clear_running_task(&mut self) -> bool {
self.active_compaction.take().is_some()
}
/// Merge the waiter to the pending compaction.
fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
/// Set pending compaction request or replace current value if already exist.
fn set_pending_request(&mut self, pending: PendingCompaction) {
if let Some(prev) = self.pending_request.replace(pending) {
debug!(
"Replace pending compaction options with new request {:?} for region: {}",
prev.options, self.region_id
);
prev.waiter.send(ManualCompactionOverrideSnafu.fail());
}
}
fn on_failure(mut self, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
if let Some(pending_compaction) = self.pending_request {
pending_compaction
.waiter
.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
for pending_ddl in self.pending_ddl_requests {
pending_ddl
.sender
.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
#[must_use]
fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
for waiter in self.waiters.drain(..) {
waiter.send(CompactionCancelledSnafu.fail());
}
if let Some(pending_compaction) = self.pending_request {
pending_compaction.waiter.send(
Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
region_id: self.region_id,
}),
);
}
std::mem::take(&mut self.pending_ddl_requests)
}
/// Creates a new compaction request for compaction picker.
///
/// It consumes all pending compaction waiters.
#[allow(clippy::too_many_arguments)]
fn new_compaction_request(
&mut self,
request_sender: Sender<WorkerRequestWithTime>,
mut waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
cache_manager: CacheManagerRef,
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> CompactionRequest {
let current_version = CompactionVersion::from(self.version_control.current().version);
let start_time = Instant::now();
let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
waiters.extend(std::mem::take(&mut self.waiters));
if let Some(waiter) = waiter.take_inner() {
waiters.push(waiter);
}
CompactionRequest {
engine_config,
current_version,
access_layer: self.access_layer.clone(),
request_sender: request_sender.clone(),
waiters,
start_time,
cache_manager,
manifest_ctx: manifest_ctx.clone(),
listener,
schema_metadata_manager,
max_parallelism,
}
}
}
#[derive(Debug, Clone)]
pub struct CompactionOutput {
/// Compaction output file level.
pub output_level: Level,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
/// Whether to remove deletion markers.
pub filter_deleted: bool,
/// Compaction output time range. Only windowed compaction specifies output time range.
pub output_time_range: Option<TimestampRange>,
}
/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializedCompactionOutput {
output_level: Level,
inputs: Vec<FileMeta>,
filter_deleted: bool,
output_time_range: Option<TimestampRange>,
}
/// Builders to create [BoxedRecordBatchStream] for compaction.
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: CacheManagerRef,
inputs: &'a [FileHandle],
append_mode: bool,
filter_deleted: bool,
time_range: Option<TimestampRange>,
merge_mode: MergeMode,
}
impl CompactionSstReaderBuilder<'_> {
/// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files,
/// for compaction. The schema of the [FlatSource] is unified.
async fn build_flat_sst_reader(self) -> Result<FlatSource> {
let scan_input = self.build_scan_input().await?.with_compaction(true);
let schema = scan_input.mapper.input_arrow_schema(scan_input.compaction);
let stream = SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await?;
Ok(FlatSource::new_stream(schema, stream))
}
async fn build_scan_input(self) -> Result<ScanInput> {
let schema = self.metadata.schema.arrow_schema();
let json_type_hint = if schema.has_json_extension_field() {
let mut json_type_hint = schema
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.map(|field| (field.name().clone(), JsonNativeType::Null))
.collect::<HashMap<_, _>>();
let schemas = self.collect_arrow_schemas_from_parquet().await?;
for schema in schemas {
for field in schema.fields() {
let Some(merged) = json_type_hint.get_mut(field.name()) else {
continue;
};
let json_type = JsonNativeType::try_from(field.data_type())
.context(DataTypeMismatchSnafu)?;
merged.merge(&json_type);
}
}
Some(json_type_hint)
} else {
None
};
let projection = (0..self.metadata.column_metadatas.len()).collect();
let read_columns = ReadColumns::from_deduped_column_ids(
self.metadata.column_metadatas.iter().map(|x| x.column_id),
);
let mapper = FlatProjectionMapper::new_with_read_columns(
&self.metadata,
projection,
read_columns,
json_type_hint.as_ref(),
)?;
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.
if let Some(time_range) = self.time_range {
scan_input =
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
Ok(scan_input)
}
async fn collect_arrow_schemas_from_parquet(&self) -> Result<Vec<Schema>> {
let mut schemas = Vec::with_capacity(self.inputs.len());
for file_handle in self.inputs {
let file_path =
file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type());
let file_size = file_handle.meta_ref().file_size;
let parquet_metadata = match self
.sst_layer
.read_sst(file_handle.clone())
.cache(CacheStrategy::Compaction(self.cache.clone()))
.read_parquet_metadata(
&file_path,
file_size,
&mut MetadataCacheMetrics::default(),
PageIndexPolicy::default(),
)
.await
.map(|x| x.0.parquet_metadata())
{
Ok(x) => x,
Err(e) if e.is_object_not_found() => {
warn!(
e;
"Input SST file does not exist during compaction metadata pre-read, skipping it (this can happen during concurrent deletion/cleanup), region_id: {}, file: {}",
file_handle.region_id(),
file_handle.file_id()
);
continue;
}
Err(e) => return Err(e),
};
let file_metadata = parquet_metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.context(ParquetToArrowSchemaSnafu { file: file_path })?;
schemas.push(schema);
}
Ok(schemas)
}
}
/// Converts time range to predicates so that rows outside the range will be filtered.
fn time_range_to_predicate(
range: TimestampRange,
metadata: &RegionMetadataRef,
) -> Result<PredicateGroup> {
let ts_col = metadata.time_index_column();
// safety: time index column's type must be a valid timestamp type.
let ts_col_unit = ts_col
.column_schema
.data_type
.as_timestamp()
.unwrap()
.unit();
let exprs = match (range.start(), range.end()) {
(Some(start), Some(end)) => {
vec![
datafusion_expr::col(ts_col.column_schema.name.clone())
.gt_eq(ts_to_lit(*start, ts_col_unit)?),
datafusion_expr::col(ts_col.column_schema.name.clone())
.lt(ts_to_lit(*end, ts_col_unit)?),
]
}
(Some(start), None) => {
vec![
datafusion_expr::col(ts_col.column_schema.name.clone())
.gt_eq(ts_to_lit(*start, ts_col_unit)?),
]
}
(None, Some(end)) => {
vec![
datafusion_expr::col(ts_col.column_schema.name.clone())
.lt(ts_to_lit(*end, ts_col_unit)?),
]
}
(None, None) => {
return Ok(PredicateGroup::default());
}
};
let predicate = PredicateGroup::new(metadata, &exprs)?;
Ok(predicate)
}
fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
let ts = ts
.convert_to(ts_col_unit)
.context(TimeRangePredicateOverflowSnafu {
timestamp: ts,
unit: ts_col_unit,
})?;
let val = ts.value();
let scalar_value = match ts_col_unit {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
};
Ok(datafusion_expr::lit(scalar_value))
}
/// Finds all expired SSTs across levels.
fn get_expired_ssts(
levels: &[LevelMeta],
ttl: Option<TimeToLive>,
now: Timestamp,
) -> Vec<FileHandle> {
let Some(ttl) = ttl else {
return vec![];
};
levels
.iter()
.flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
.collect()
}
/// Estimates compaction memory as the sum of all input files' maximum row-group
/// uncompressed sizes.
fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
picker_output
.outputs
.iter()
.flat_map(|output| output.inputs.iter())
.map(|file: &FileHandle| {
let meta = file.meta_ref();
meta.max_row_group_uncompressed_size
})
.sum()
}
/// Pending compaction request that is supposed to run after current task is finished,
/// typically used for manual compactions.
struct PendingCompaction {
/// Compaction options. Currently, it can only be [StrictWindow].
pub(crate) options: compact_request::Options,
/// Waiters of pending requests.
pub(crate) waiter: OptionOutputTx,
/// Max parallelism for pending compaction.
pub(crate) max_parallelism: usize,
}
#[cfg(test)]
mod tests {
use std::assert_matches;
use std::time::Duration;
use api::v1::region::StrictWindow;
use common_datasource::compression::CompressionType;
use common_meta::key::schema_name::SchemaNameValue;
use common_time::DatabaseTimeToLive;
use tokio::sync::{Barrier, oneshot};
use super::*;
use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
use crate::error::InvalidSchedulerStateSnafu;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::ManifestContext;
use crate::schedule::scheduler::{Job, Scheduler};
use crate::sst::FormatType;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
struct FailingScheduler;
#[async_trait::async_trait]
impl Scheduler for FailingScheduler {
fn schedule(&self, _job: Job) -> Result<()> {
InvalidSchedulerStateSnafu.fail()
}
async fn stop(&self, _await_termination: bool) -> Result<()> {
Ok(())
}
}
#[tokio::test]
async fn test_find_compaction_options_db_level() {
let env = SchedulerEnv::new().await;
let builder = VersionControlBuilder::new();
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
let region_id = builder.region_id();
let table_id = region_id.table_id();
// Register table without ttl but with db-level compaction options
let mut schema_value = SchemaNameValue {
ttl: Some(DatabaseTimeToLive::default()),
..Default::default()
};
schema_value
.extra_options
.insert("compaction.type".to_string(), "twcs".to_string());
schema_value
.extra_options
.insert("compaction.twcs.time_window".to_string(), "2h".to_string());
schema_metadata_manager
.register_region_table_info(
table_id,
"t",
"c",
"s",
Some(schema_value),
kv_backend.clone(),
)
.await;
let version_control = Arc::new(builder.build());
let region_opts = version_control.current().version.options.clone();
let (opts, _) = find_dynamic_options(region_id, &region_opts, &schema_metadata_manager)
.await
.unwrap();
match opts {
crate::region::options::CompactionOptions::Twcs(t) => {
assert_eq!(t.time_window_seconds(), Some(2 * 3600));
}
}
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let (otx, _orx) = oneshot::channel();
let request = scheduler
.region_status
.entry(region_id)
.or_insert_with(|| {
crate::compaction::CompactionStatus::new(
region_id,
version_control.clone(),
env.access_layer.clone(),
)
})
.new_compaction_request(
scheduler.request_sender.clone(),
OptionOutputTx::new(Some(OutputTx::new(otx))),
scheduler.engine_config.clone(),
scheduler.cache_manager.clone(),
&manifest_ctx,
scheduler.listener.clone(),
schema_metadata_manager.clone(),
1,
);
scheduler
.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_find_compaction_options_priority() {
fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
let mut schema_value = SchemaNameValue {
ttl: Some(DatabaseTimeToLive::default()),
..Default::default()
};
schema_value
.extra_options
.insert("compaction.type".to_string(), "twcs".to_string());
schema_value.extra_options.insert(
"compaction.twcs.time_window".to_string(),
time_window.to_string(),
);
schema_value
}
let cases = [
(
"db options set and table override set",
Some(schema_value_with_twcs("2h")),
true,
Some(Duration::from_secs(5 * 3600)),
Some(5 * 3600),
),
(
"db options set and table override not set",
Some(schema_value_with_twcs("2h")),
false,
None,
Some(2 * 3600),
),
(
"db options not set and table override set",
None,
true,
Some(Duration::from_secs(4 * 3600)),
Some(4 * 3600),
),
(
"db options not set and table override not set",
None,
false,
None,
None,
),
];
for (case_name, schema_value, override_set, table_window, expected_window) in cases {
let builder = VersionControlBuilder::new();
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
let region_id = builder.region_id();
let table_id = region_id.table_id();
schema_metadata_manager
.register_region_table_info(
table_id,
"t",
"c",
"s",
schema_value,
kv_backend.clone(),
)
.await;
let version_control = Arc::new(builder.build());
let mut region_opts = version_control.current().version.options.clone();
region_opts.compaction_override = override_set;
if let Some(window) = table_window {
let crate::region::options::CompactionOptions::Twcs(twcs) =
&mut region_opts.compaction;
twcs.time_window = Some(window);
}
let (opts, _) = find_dynamic_options(region_id, &region_opts, &schema_metadata_manager)
.await
.unwrap();
match opts {
crate::region::options::CompactionOptions::Twcs(t) => {
assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
}
}
}
}
#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
// Nothing to compact.
let version_control = Arc::new(builder.build());
let (output_tx, output_rx) = oneshot::channel();
let waiter = OptionOutputTx::from(output_tx);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
scheduler
.schedule_compaction(
builder.region_id(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
waiter,
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
assert!(scheduler.region_status.is_empty());
// Only one file, picker won't compact it.
let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
let (output_tx, output_rx) = oneshot::channel();
let waiter = OptionOutputTx::from(output_tx);
scheduler
.schedule_compaction(
builder.region_id(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
waiter,
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
assert!(scheduler.region_status.is_empty());
}
#[tokio::test]
async fn test_schedule_on_finished() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
// 5 files to compact.
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
let data = version_control.current();
let file_metas: Vec<_> = data.version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta_ref().clone())
.collect();
// 5 files for next compaction and removes old files.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&file_metas,
purger.clone(),
);
// The task is pending.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(
!scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.waiters
.is_empty()
);
// On compaction finished and schedule next compaction.
scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&[],
purger.clone(),
);
let (tx, _rx) = oneshot::channel();
// The task is pending.
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(
!scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.waiters
.is_empty()
);
}
#[tokio::test]
async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
common_telemetry::init_default_ut_logging();
let env = SchedulerEnv::new()
.await
.scheduler(Arc::new(FailingScheduler));
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
let result = scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await;
assert!(result.is_err());
assert!(!scheduler.region_status.contains_key(&region_id));
}
#[tokio::test]
async fn test_manual_compaction_when_compaction_in_progress() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
// 5 files to compact.
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta_ref().clone())
.collect();
// 5 files for next compaction and removes old files.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&file_metas,
purger.clone(),
);
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(
scheduler
.region_status
.get(&region_id)
.unwrap()
.pending_request
.is_none()
);
// Schedule another manual compaction.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
// Current job num should be 1 since compaction is in progress.
assert_eq!(1, job_scheduler.num_jobs());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_some());
// On compaction finished and schedule next compaction.
scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_none());
}
#[tokio::test]
async fn test_compaction_bypass_in_staging_mode() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
// Create version control and manifest context for staging mode
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = version_control.current().version.metadata.region_id;
// Create staging manifest context using the same pattern as SchedulerEnv
let staging_manifest_ctx = {
let manager = RegionManifestManager::new(
version_control.current().version.metadata.clone(),
0,
RegionManifestOptions {
manifest_dir: "".to_string(),
object_store: env.access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
remove_file_options: Default::default(),
manifest_cache: None,
},
FormatType::PrimaryKey,
&Default::default(),
)
.await
.unwrap();
Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Staging),
))
};
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
// Test regular compaction bypass in staging mode
let (tx, rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&staging_manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
let result = rx.await.unwrap();
assert_eq!(result.unwrap(), 0); // is there a better way to check this?
assert_eq!(0, scheduler.region_status.len());
}
#[tokio::test]
async fn test_add_ddl_request_to_pending() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
}
#[tokio::test]
async fn test_request_cancel_state_transitions() {
let env = SchedulerEnv::new().await;
let builder = VersionControlBuilder::new();
let region_id = builder.region_id();
let version_control = Arc::new(builder.build());
let mut status =
CompactionStatus::new(region_id, version_control, env.access_layer.clone());
let state = status.start_local_task();
assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
assert!(state.cancel_handle().is_cancelled());
assert_eq!(
status.request_cancel(),
RequestCancelResult::AlreadyCancelling
);
assert!(!state.mark_commit_started());
assert_eq!(
status.request_cancel(),
RequestCancelResult::AlreadyCancelling
);
assert!(status.clear_running_task());
assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
}
#[tokio::test]
async fn test_request_cancel_remote_compaction_is_too_late() {
let env = SchedulerEnv::new().await;
let builder = VersionControlBuilder::new();
let region_id = builder.region_id();
let version_control = Arc::new(builder.build());
let mut status =
CompactionStatus::new(region_id, version_control, env.access_layer.clone());
status.start_remote_task();
assert_eq!(
status.request_cancel(),
RequestCancelResult::TooLateToCancel
);
assert!(status.active_compaction.is_some());
}
#[tokio::test]
async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let _manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.has_pending_ddls(region_id));
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(job_scheduler.num_jobs(), 0);
}
#[tokio::test]
async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let _manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let status = scheduler.region_status.get_mut(&region_id).unwrap();
status.start_local_task();
let (manual_tx, manual_rx) = oneshot::channel();
status.set_pending_request(PendingCompaction {
options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
waiter: OptionOutputTx::from(manual_tx),
max_parallelism: 1,
});
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(job_scheduler.num_jobs(), 0);
assert_matches!(manual_rx.await.unwrap(), Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_compaction_failed() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler
.on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_region_closed() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler.on_region_closed(region_id);
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_region_dropped() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler.on_region_dropped(region_id);
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_pending_ddl_request_failed_on_region_truncated() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
let (output_tx, output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
assert!(scheduler.has_pending_ddls(region_id));
scheduler.on_region_truncated(region_id);
assert!(!scheduler.has_pending_ddls(region_id));
let result = output_rx.await.unwrap();
assert_matches!(result, Err(_));
}
#[tokio::test]
async fn test_on_compaction_finished_returns_pending_ddl_requests() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let (output_tx, _output_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(output_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.has_pending_ddls(region_id));
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(job_scheduler.num_jobs(), 0);
}
#[tokio::test]
async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
let (manual_tx, manual_rx) = oneshot::channel();
let mut status =
CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
status.start_local_task();
status.set_pending_request(PendingCompaction {
options: compact_request::Options::Regular(Default::default()),
waiter: OptionOutputTx::from(manual_tx),
max_parallelism: 1,
});
scheduler.region_status.insert(region_id, status);
let (ddl_tx, _ddl_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(ddl_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert_eq!(pending_ddls.len(), 1);
assert!(!scheduler.region_status.contains_key(&region_id));
assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
}
#[tokio::test]
async fn test_on_compaction_finished_returns_empty_when_region_absent() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let region_id = builder.region_id();
let version_control = Arc::new(builder.build());
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
}
#[tokio::test]
async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
let env = SchedulerEnv::new()
.await
.scheduler(Arc::new(FailingScheduler));
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
let (manual_tx, manual_rx) = oneshot::channel();
let mut status =
CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
status.start_local_task();
status.set_pending_request(PendingCompaction {
options: compact_request::Options::Regular(Default::default()),
waiter: OptionOutputTx::from(manual_tx),
max_parallelism: 1,
});
scheduler.region_status.insert(region_id, status);
let (ddl_tx, ddl_rx) = oneshot::channel();
scheduler.add_ddl_request_to_pending(SenderDdlRequest {
region_id,
sender: OptionOutputTx::from(ddl_tx),
request: crate::request::DdlRequest::EnterStaging(
store_api::region_request::EnterStagingRequest {
partition_directive:
store_api::region_request::StagingPartitionDirective::RejectAllWrites,
},
),
});
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
assert!(!scheduler.region_status.contains_key(&region_id));
assert!(manual_rx.await.is_err());
assert_matches!(ddl_rx.await.unwrap(), Err(_));
}
#[tokio::test]
async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
assert!(!scheduler.region_status.contains_key(&region_id));
}
#[tokio::test]
async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
let env = SchedulerEnv::new()
.await
.scheduler(Arc::new(FailingScheduler));
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let region_id = builder.region_id();
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
scheduler.region_status.insert(
region_id,
CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
);
scheduler
.region_status
.get_mut(&region_id)
.unwrap()
.start_local_task();
let pending_ddls = scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
.await;
assert!(pending_ddls.is_empty());
assert!(!scheduler.region_status.contains_key(&region_id));
}
#[tokio::test]
async fn test_concurrent_memory_competition() {
let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
// Spawn 3 tasks competing for memory, each trying to acquire 2MB
for _i in 0..3 {
let mgr = manager.clone();
let bar = barrier.clone();
let handle = tokio::spawn(async move {
bar.wait().await; // Synchronize start
mgr.try_acquire(2 * 1024 * 1024)
});
handles.push(handle);
}
let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
// Only 1 should succeed (3MB limit, 2MB request, can only fit one)
let succeeded = results.iter().filter(|r| r.is_some()).count();
let failed = results.iter().filter(|r| r.is_none()).count();
assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
assert_eq!(failed, 2, "Expected 2 tasks to fail");
// Clean up
drop(results);
assert_eq!(manager.used_bytes(), 0);
}
}