diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 951ada1df8..0fe593be95 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -343,7 +343,8 @@ impl DatanodeBuilder { ); let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; - let engines = Self::build_store_engines(opts, object_store_manager).await?; + let engines = + Self::build_store_engines(opts, object_store_manager, self.plugins.clone()).await?; for engine in engines { region_server.register_engine(engine); } @@ -357,14 +358,19 @@ impl DatanodeBuilder { async fn build_store_engines( opts: &DatanodeOptions, object_store_manager: ObjectStoreManagerRef, + plugins: Plugins, ) -> Result> { let mut engines = vec![]; for engine in &opts.region_engine { match engine { RegionEngineConfig::Mito(config) => { - let mito_engine = - Self::build_mito_engine(opts, object_store_manager.clone(), config.clone()) - .await?; + let mito_engine = Self::build_mito_engine( + opts, + object_store_manager.clone(), + config.clone(), + plugins.clone(), + ) + .await?; let metric_engine = MetricEngine::new(mito_engine.clone()); engines.push(Arc::new(mito_engine) as _); @@ -387,6 +393,7 @@ impl DatanodeBuilder { opts: &DatanodeOptions, object_store_manager: ObjectStoreManagerRef, config: MitoConfig, + plugins: Plugins, ) -> Result { let mito_engine = match &opts.wal { DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new( @@ -395,6 +402,7 @@ impl DatanodeBuilder { Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config) .await?, object_store_manager, + plugins, ) .await .context(BuildMitoEngineSnafu)?, @@ -403,6 +411,7 @@ impl DatanodeBuilder { config, Self::build_kafka_log_store(kafka_config).await?, object_store_manager, + plugins, ) .await .context(BuildMitoEngineSnafu)?, diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index d9593bfe58..f1baffcb7d 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -27,7 +27,8 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use api::v1::region::compact_request; -use common_telemetry::{debug, error}; +use common_base::Plugins; +use common_telemetry::{debug, error, info}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; @@ -59,6 +60,9 @@ use crate::region::options::MergeMode; use crate::region::version::{VersionControlRef, VersionRef}; use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; +use crate::schedule::remote_job_scheduler::{ + CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef, +}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; use crate::sst::version::LevelMeta; @@ -103,6 +107,8 @@ pub(crate) struct CompactionScheduler { cache_manager: CacheManagerRef, engine_config: Arc, listener: WorkerListener, + /// Plugins for the compaction scheduler. + plugins: Plugins, } impl CompactionScheduler { @@ -112,6 +118,7 @@ impl CompactionScheduler { cache_manager: CacheManagerRef, engine_config: Arc, listener: WorkerListener, + plugins: Plugins, ) -> Self { Self { scheduler, @@ -120,12 +127,13 @@ impl CompactionScheduler { cache_manager, engine_config, listener, + plugins, } } /// Schedules a compaction for the region. #[allow(clippy::too_many_arguments)] - pub(crate) fn schedule_compaction( + pub(crate) async fn schedule_compaction( &mut self, region_id: RegionId, compact_options: compact_request::Options, @@ -153,10 +161,11 @@ impl CompactionScheduler { ); self.region_status.insert(region_id, status); self.schedule_compaction_request(request, compact_options) + .await } /// Notifies the scheduler that the compaction job is finished successfully. - pub(crate) fn on_compaction_finished( + pub(crate) async fn on_compaction_finished( &mut self, region_id: RegionId, manifest_ctx: &ManifestContextRef, @@ -175,10 +184,13 @@ impl CompactionScheduler { self.listener.clone(), ); // Try to schedule next compaction task for this region. - if let Err(e) = self.schedule_compaction_request( - request, - compact_request::Options::Regular(Default::default()), - ) { + if let Err(e) = self + .schedule_compaction_request( + request, + compact_request::Options::Regular(Default::default()), + ) + .await + { error!(e; "Failed to schedule next compaction for region {}", region_id); } } @@ -219,48 +231,13 @@ impl CompactionScheduler { /// Schedules a compaction request. /// /// If the region has nothing to compact, it removes the region from the status map. - fn schedule_compaction_request( + async fn schedule_compaction_request( &mut self, request: CompactionRequest, options: compact_request::Options, ) -> Result<()> { + let picker = new_picker(options.clone(), &request.current_version.options.compaction); let region_id = request.region_id(); - let Some(mut task) = self.build_compaction_task(request, options) else { - // Nothing to compact, remove it from the region status map. - self.region_status.remove(®ion_id); - return Ok(()); - }; - - // Submit the compaction task. - self.scheduler - .schedule(Box::pin(async move { - task.run().await; - })) - .map_err(|e| { - error!(e; "Failed to submit compaction request for region {}", region_id); - // If failed to submit the job, we need to remove the region from the scheduler. - self.region_status.remove(®ion_id); - e - }) - } - - fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { - // Remove this region. - let Some(status) = self.region_status.remove(®ion_id) else { - return; - }; - - // Notifies all pending tasks. - status.on_failure(err); - } - - fn build_compaction_task( - &self, - req: CompactionRequest, - options: compact_request::Options, - ) -> Option> { - let picker = new_picker(options, &req.current_version.options.compaction); - let region_id = req.region_id(); let CompactionRequest { engine_config, current_version, @@ -271,7 +248,7 @@ impl CompactionScheduler { cache_manager, manifest_ctx, listener, - } = req; + } = request; debug!( "Pick compaction strategy {:?} for region: {}", picker, region_id @@ -304,10 +281,58 @@ impl CompactionScheduler { for waiter in waiters { waiter.send(Ok(0)); } - return None; + self.region_status.remove(®ion_id); + return Ok(()); }; - let task = CompactionTaskImpl { + // If specified to run compaction remotely, we schedule the compaction job remotely. + // It will fall back to local compaction if there is no remote job scheduler. + let waiters = if current_version.options.compaction.remote_compaction() { + if let Some(remote_job_scheduler) = &self.plugins.get::() { + let remote_compaction_job = CompactionJob { + compaction_region: compaction_region.clone(), + picker_output: picker_output.clone(), + start_time, + waiters, + }; + + let result = remote_job_scheduler + .schedule( + RemoteJob::CompactionJob(remote_compaction_job), + Box::new(DefaultNotifier { + request_sender: request_sender.clone(), + }), + ) + .await; + + match result { + Ok(job_id) => { + info!( + "Scheduled remote compaction job {} for region {}", + job_id, region_id + ); + return Ok(()); + } + Err(e) => { + error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id); + + // Return the waiters back to the caller for local compaction. + e.waiters + } + } + } else { + debug!( + "Remote compaction is not enabled, fallback to local compaction for region {}", + region_id + ); + waiters + } + } else { + waiters + }; + + // Create a local compaction task. + let mut local_compaction_task = Box::new(CompactionTaskImpl { request_sender, waiters, start_time, @@ -315,9 +340,29 @@ impl CompactionScheduler { picker_output, compaction_region, compactor: Arc::new(DefaultCompactor {}), + }); + + // Submit the compaction task. + self.scheduler + .schedule(Box::pin(async move { + local_compaction_task.run().await; + })) + .map_err(|e| { + error!(e; "Failed to submit compaction request for region {}", region_id); + // If failed to submit the job, we need to remove the region from the scheduler. + self.region_status.remove(®ion_id); + e + }) + } + + fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc) { + // Remove this region. + let Some(status) = self.region_status.remove(®ion_id) else { + return; }; - Some(Box::new(task)) + // Notifies all pending tasks. + status.on_failure(err); } } @@ -602,6 +647,7 @@ mod tests { waiter, &manifest_ctx, ) + .await .unwrap(); let output = output_rx.await.unwrap().unwrap(); assert_eq!(output, 0); @@ -620,6 +666,7 @@ mod tests { waiter, &manifest_ctx, ) + .await .unwrap(); let output = output_rx.await.unwrap().unwrap(); assert_eq!(output, 0); @@ -659,6 +706,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, ) + .await .unwrap(); // Should schedule 1 compaction. assert_eq!(1, scheduler.region_status.len()); @@ -687,6 +735,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, ) + .await .unwrap(); assert_eq!(1, scheduler.region_status.len()); assert_eq!(1, job_scheduler.num_jobs()); @@ -698,7 +747,9 @@ mod tests { .is_some()); // On compaction finished and schedule next compaction. - scheduler.on_compaction_finished(region_id, &manifest_ctx); + scheduler + .on_compaction_finished(region_id, &manifest_ctx) + .await; assert_eq!(1, scheduler.region_status.len()); assert_eq!(2, job_scheduler.num_jobs()); // 5 files for next compaction. @@ -718,6 +769,7 @@ mod tests { OptionOutputTx::none(), &manifest_ctx, ) + .await .unwrap(); assert_eq!(2, job_scheduler.num_jobs()); assert!(scheduler diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 7af21da298..bbb9cfe36d 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -60,6 +60,7 @@ use std::time::Instant; use api::region::RegionResponse; use async_trait::async_trait; +use common_base::Plugins; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; @@ -107,11 +108,14 @@ impl MitoEngine { mut config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, + plugins: Plugins, ) -> Result { config.sanitize(data_home)?; Ok(MitoEngine { - inner: Arc::new(EngineInner::new(config, log_store, object_store_manager).await?), + inner: Arc::new( + EngineInner::new(config, log_store, object_store_manager, plugins).await?, + ), }) } @@ -273,11 +277,13 @@ impl EngineInner { config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, + plugins: Plugins, ) -> Result { let config = Arc::new(config); let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); Ok(EngineInner { - workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?, + workers: WorkerGroup::start(config.clone(), log_store, object_store_manager, plugins) + .await?, config, wal_raw_entry_reader, }) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2f6e3ace34..57f4e957f4 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -754,6 +754,14 @@ pub enum Error { source: Arc, }, + #[snafu(display("Failed to parse job id"))] + ParseJobId { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: uuid::Error, + }, + #[snafu(display("Operation is not supported: {}", err_msg))] UnsupportedOperation { err_msg: String, @@ -812,7 +820,8 @@ impl ErrorExt for Error { | InvalidMetadata { .. } | InvalidRegionOptions { .. } | InvalidWalReadRequest { .. } - | PartitionOutOfRange { .. } => StatusCode::InvalidArguments, + | PartitionOutOfRange { .. } + | ParseJobId { .. } => StatusCode::InvalidArguments, InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated, diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index cdd2416940..cd0cb37102 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -38,7 +38,7 @@ pub mod region; mod region_write_ctx; pub mod request; pub mod row_converter; -pub(crate) mod schedule; +pub mod schedule; pub mod sst; mod time_provider; pub mod wal; diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 838b4c2468..4624d6d007 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -162,6 +162,12 @@ impl CompactionOptions { CompactionOptions::Twcs(opts) => opts.time_window, } } + + pub(crate) fn remote_compaction(&self) -> bool { + match self { + CompactionOptions::Twcs(opts) => opts.remote_compaction, + } + } } impl Default for CompactionOptions { @@ -184,6 +190,9 @@ pub struct TwcsOptions { /// Compaction time window defined when creating tables. #[serde(with = "humantime_serde")] pub time_window: Option, + /// Whether to use remote compaction. + #[serde_as(as = "DisplayFromStr")] + pub remote_compaction: bool, } with_prefix!(prefix_twcs "compaction.twcs."); @@ -208,6 +217,7 @@ impl Default for TwcsOptions { max_active_window_runs: 1, max_inactive_window_runs: 1, time_window: None, + remote_compaction: false, } } } @@ -567,6 +577,7 @@ mod tests { ("compaction.twcs.max_inactive_window_runs", "2"), ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), + ("compaction.twcs.remote_compaction", "false"), ("storage", "S3"), ("append_mode", "false"), ("index.inverted_index.ignore_column_ids", "1,2,3"), @@ -588,6 +599,7 @@ mod tests { max_active_window_runs: 8, max_inactive_window_runs: 2, time_window: Some(Duration::from_secs(3600 * 2)), + remote_compaction: false, }), storage: Some("S3".to_string()), append_mode: false, @@ -616,6 +628,7 @@ mod tests { max_active_window_runs: 8, max_inactive_window_runs: 2, time_window: Some(Duration::from_secs(3600 * 2)), + remote_compaction: false, }), storage: Some("S3".to_string()), append_mode: false, @@ -676,6 +689,7 @@ mod tests { max_active_window_runs: 8, max_inactive_window_runs: 2, time_window: Some(Duration::from_secs(3600 * 2)), + remote_compaction: false, }), storage: Some("S3".to_string()), append_mode: false, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index ece3a49d63..25f8a6985d 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -387,7 +387,7 @@ pub(crate) fn validate_proto_value( /// Oneshot output result sender. #[derive(Debug)] -pub(crate) struct OutputTx(Sender>); +pub struct OutputTx(Sender>); impl OutputTx { /// Creates a new output sender. diff --git a/src/mito2/src/schedule.rs b/src/mito2/src/schedule.rs index c5762d87ba..45db45b76e 100644 --- a/src/mito2/src/schedule.rs +++ b/src/mito2/src/schedule.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod remote_job_scheduler; pub mod scheduler; diff --git a/src/mito2/src/schedule/remote_job_scheduler.rs b/src/mito2/src/schedule/remote_job_scheduler.rs new file mode 100644 index 0000000000..ff87439e6a --- /dev/null +++ b/src/mito2/src/schedule/remote_job_scheduler.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::sync::Arc; +use std::time::Instant; + +use common_telemetry::error; +use serde::{Deserialize, Serialize}; +use snafu::{Location, ResultExt, Snafu}; +use store_api::storage::RegionId; +use tokio::sync::mpsc::Sender; +use uuid::Uuid; + +use crate::compaction::compactor::CompactionRegion; +use crate::compaction::picker::PickerOutput; +use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result}; +use crate::manifest::action::RegionEdit; +use crate::metrics::COMPACTION_FAILURE_COUNT; +use crate::request::{ + BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, +}; + +pub type RemoteJobSchedulerRef = Arc; + +#[cfg_attr(doc, aquamarine::aquamarine)] +/// RemoteJobScheduler is a trait that defines the API to schedule remote jobs. +/// For example, a compaction job can be scheduled remotely as the following workflow: +/// ```mermaid +/// participant User +/// participant MitoEngine +/// participant CompactionScheduler +/// participant Plugins +/// participant RemoteJobScheduler +/// +/// User->>MitoEngine: Initiates compaction +/// MitoEngine->>CompactionScheduler: schedule_compaction() +/// CompactionScheduler->>Plugins: Handle plugins +/// CompactionScheduler->>RemoteJobScheduler: schedule(CompactionJob) +/// RemoteJobScheduler-->>CompactionScheduler: Returns Job UUID +/// CompactionScheduler-->>MitoEngine: Task scheduled with Job UUID +/// MitoEngine-->>User: Compaction task scheduled +/// ``` +#[async_trait::async_trait] +pub trait RemoteJobScheduler: Send + Sync + 'static { + /// Sends a job to the scheduler and returns a UUID for the job. + async fn schedule( + &self, + job: RemoteJob, + notifier: Box, + ) -> Result; +} + +#[derive(Snafu, Debug)] +#[snafu(display("Internal error occurred in remote job scheduler: {}", reason))] +pub struct RemoteJobSchedulerError { + #[snafu(implicit)] + location: Location, + pub reason: String, + // Keep the waiters in the error so that we can notify them when fallback to the local compaction. + pub waiters: Vec, +} + +/// Notifier is used to notify the mito engine when a remote job is completed. +#[async_trait::async_trait] +pub trait Notifier: Send + Sync + 'static { + /// Notify the mito engine that a remote job is completed. + async fn notify(&self, result: RemoteJobResult, waiters: Vec); +} + +/// Unique id for a remote job. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct JobId(Uuid); + +impl JobId { + /// Parses job id from string. + pub fn parse_str(input: &str) -> Result { + Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu) + } +} + +impl fmt::Display for JobId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// RemoteJob is a job that can be executed remotely. For example, a remote compaction job. +#[allow(dead_code)] +pub enum RemoteJob { + CompactionJob(CompactionJob), +} + +/// CompactionJob is a remote job that compacts a set of files in a compaction service. +#[allow(dead_code)] +pub struct CompactionJob { + pub compaction_region: CompactionRegion, + pub picker_output: PickerOutput, + pub start_time: Instant, + /// Send the result of the compaction job to these waiters. + pub waiters: Vec, +} + +/// RemoteJobResult is the result of a remote job. +#[allow(dead_code)] +pub enum RemoteJobResult { + CompactionJobResult(CompactionJobResult), +} + +/// CompactionJobResult is the result of a compaction job. +#[allow(dead_code)] +pub struct CompactionJobResult { + pub job_id: JobId, + pub region_id: RegionId, + pub start_time: Instant, + pub region_edit: Result, +} + +/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine. +pub(crate) struct DefaultNotifier { + /// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed. + pub(crate) request_sender: Sender, +} + +impl DefaultNotifier { + fn on_failure(&self, err: Arc, region_id: RegionId, mut waiters: Vec) { + COMPACTION_FAILURE_COUNT.inc(); + for waiter in waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id })); + } + } +} + +#[async_trait::async_trait] +impl Notifier for DefaultNotifier { + async fn notify(&self, result: RemoteJobResult, waiters: Vec) { + match result { + RemoteJobResult::CompactionJobResult(result) => { + let notify = { + match result.region_edit { + Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished { + region_id: result.region_id, + senders: waiters, + start_time: result.start_time, + edit, + }), + Err(err) => { + error!( + "Compaction failed for region {}: {:?}", + result.region_id, err + ); + let err = Arc::new(err); + self.on_failure(err.clone(), result.region_id, waiters); + BackgroundNotify::CompactionFailed(CompactionFailed { + region_id: result.region_id, + err, + }) + } + } + }; + + if let Err(e) = self + .request_sender + .send(WorkerRequest::Background { + region_id: result.region_id, + notify, + }) + .await + { + error!( + "Failed to notify compaction job status for region {}, error: {:?}", + result.region_id, e + ); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_job_id() { + let id = Uuid::new_v4().to_string(); + let job_id = JobId::parse_str(&id).unwrap(); + assert_eq!(job_id.to_string(), id); + } +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 374e7548b0..f1d863aa38 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -32,6 +32,7 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; +use common_base::Plugins; use common_datasource::compression::CompressionType; use common_telemetry::warn; use common_test_util::temp_dir::{create_temp_dir, TempDir}; @@ -256,16 +257,24 @@ impl TestEnv { let data_home = self.data_home().display().to_string(); match log_store { - LogStoreImpl::RaftEngine(log_store) => { - MitoEngine::new(&data_home, config, log_store, object_store_manager) - .await - .unwrap() - } - LogStoreImpl::Kafka(log_store) => { - MitoEngine::new(&data_home, config, log_store, object_store_manager) - .await - .unwrap() - } + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( + &data_home, + config, + log_store, + object_store_manager, + Plugins::new(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new( + &data_home, + config, + log_store, + object_store_manager, + Plugins::new(), + ) + .await + .unwrap(), } } @@ -274,16 +283,24 @@ impl TestEnv { let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); let data_home = self.data_home().display().to_string(); match self.log_store.as_ref().unwrap().clone() { - LogStoreImpl::RaftEngine(log_store) => { - MitoEngine::new(&data_home, config, log_store, object_store_manager) - .await - .unwrap() - } - LogStoreImpl::Kafka(log_store) => { - MitoEngine::new(&data_home, config, log_store, object_store_manager) - .await - .unwrap() - } + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( + &data_home, + config, + log_store, + object_store_manager, + Plugins::new(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new( + &data_home, + config, + log_store, + object_store_manager, + Plugins::new(), + ) + .await + .unwrap(), } } @@ -434,6 +451,7 @@ impl TestEnv { config, log_store, self.object_store_manager.clone().unwrap(), + Plugins::new(), ) .await .unwrap(), @@ -442,6 +460,7 @@ impl TestEnv { config, log_store, self.object_store_manager.clone().unwrap(), + Plugins::new(), ) .await .unwrap(), @@ -456,6 +475,7 @@ impl TestEnv { config, log_store, self.object_store_manager.clone().unwrap(), + Plugins::new(), ) .await .unwrap(), @@ -464,6 +484,7 @@ impl TestEnv { config, log_store, self.object_store_manager.clone().unwrap(), + Plugins::new(), ) .await .unwrap(), @@ -484,16 +505,22 @@ impl TestEnv { config.sanitize(&data_home).unwrap(); match log_store { - LogStoreImpl::RaftEngine(log_store) => { - WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager)) - .await - .unwrap() - } - LogStoreImpl::Kafka(log_store) => { - WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager)) - .await - .unwrap() - } + LogStoreImpl::RaftEngine(log_store) => WorkerGroup::start( + Arc::new(config), + log_store, + Arc::new(object_store_manager), + Plugins::new(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => WorkerGroup::start( + Arc::new(config), + log_store, + Arc::new(object_store_manager), + Plugins::new(), + ) + .await + .unwrap(), } } diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 5bb0bfe14a..590c66e08c 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -16,6 +16,7 @@ use std::sync::{Arc, Mutex}; +use common_base::Plugins; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use object_store::services::Fs; @@ -86,6 +87,7 @@ impl SchedulerEnv { Arc::new(CacheManager::default()), Arc::new(MitoConfig::default()), WorkerListener::default(), + Plugins::new(), ) } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 7ffb271515..2aa251fc10 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -30,6 +30,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_base::Plugins; use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; @@ -126,6 +127,7 @@ impl WorkerGroup { config: Arc, log_store: Arc, object_store_manager: ObjectStoreManagerRef, + plugins: Plugins, ) -> Result { let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( config.global_write_buffer_size.as_bytes() as usize, @@ -171,6 +173,7 @@ impl WorkerGroup { time_provider: time_provider.clone(), flush_sender: flush_sender.clone(), flush_receiver: flush_receiver.clone(), + plugins: plugins.clone(), } .start() }) @@ -293,6 +296,7 @@ impl WorkerGroup { time_provider: time_provider.clone(), flush_sender: flush_sender.clone(), flush_receiver: flush_receiver.clone(), + plugins: Plugins::new(), } .start() }) @@ -363,6 +367,7 @@ struct WorkerStarter { flush_sender: watch::Sender<()>, /// Watch channel receiver to wait for background flush job. flush_receiver: watch::Receiver<()>, + plugins: Plugins, } impl WorkerStarter { @@ -398,6 +403,7 @@ impl WorkerStarter { self.cache_manager.clone(), self.config, self.listener.clone(), + self.plugins.clone(), ), stalled_requests: StalledRequests::default(), listener: self.listener, @@ -740,7 +746,8 @@ impl RegionWorkerLoop { continue; } DdlRequest::Compact(req) => { - self.handle_compaction_request(ddl.region_id, req, ddl.sender); + self.handle_compaction_request(ddl.region_id, req, ddl.sender) + .await; continue; } DdlRequest::Truncate(_) => { diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 1c5d968383..080c359784 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -24,7 +24,7 @@ use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// Handles compaction request submitted to region worker. - pub(crate) fn handle_compaction_request( + pub(crate) async fn handle_compaction_request( &mut self, region_id: RegionId, req: RegionCompactRequest, @@ -34,14 +34,18 @@ impl RegionWorkerLoop { return; }; COMPACTION_REQUEST_COUNT.inc(); - if let Err(e) = self.compaction_scheduler.schedule_compaction( - region.region_id, - req.options, - ®ion.version_control, - ®ion.access_layer, - sender, - ®ion.manifest_ctx, - ) { + if let Err(e) = self + .compaction_scheduler + .schedule_compaction( + region.region_id, + req.options, + ®ion.version_control, + ®ion.access_layer, + sender, + ®ion.manifest_ctx, + ) + .await + { error!(e; "Failed to schedule compaction task for region: {}", region_id); } else { info!( @@ -74,7 +78,8 @@ impl RegionWorkerLoop { // Schedule next compaction if necessary. self.compaction_scheduler - .on_compaction_finished(region_id, ®ion.manifest_ctx); + .on_compaction_finished(region_id, ®ion.manifest_ctx) + .await; } /// When compaction fails, we simply log the error. diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index b5d27e57ed..8acb289b24 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -242,14 +242,18 @@ impl RegionWorkerLoop { self.handle_stalled_requests().await; // Schedules compaction. - if let Err(e) = self.compaction_scheduler.schedule_compaction( - region.region_id, - compact_request::Options::Regular(Default::default()), - ®ion.version_control, - ®ion.access_layer, - OptionOutputTx::none(), - ®ion.manifest_ctx, - ) { + if let Err(e) = self + .compaction_scheduler + .schedule_compaction( + region.region_id, + compact_request::Options::Regular(Default::default()), + ®ion.version_control, + ®ion.access_layer, + OptionOutputTx::none(), + ®ion.manifest_ctx, + ) + .await + { warn!( "Failed to schedule compaction after flush, region: {}, err: {}", region.region_id, e diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 3c9c115fc4..33dcc20931 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -25,6 +25,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { "compaction.twcs.max_active_window_runs", "compaction.twcs.max_inactive_window_runs", "compaction.twcs.time_window", + "compaction.twcs.remote_compaction", "storage", "index.inverted_index.ignore_column_ids", "index.inverted_index.segment_row_count",