feat: introduce the interface of RemoteJobScheduler (#4181)

* refactor: add Compactor trait

* chore: add compact() in Compactor trait and expose compaction module

* refactor: add CompactionRequest and open_compaction_region

* refactor: export the compaction api

* refactor: add DefaultCompactor::new_from_request

* refactor: no need to pass mito_config in open_compaction_region()

* refactor: CompactionRequest -> &CompactionRequest

* fix: typo

* docs: add docs for public apis

* refactor: remove 'Picker' from Compactor

* chore: add logs

* chore: change pub attribute for Picker

* refactor: remove do_merge_ssts()

* refactor: update comments

* refactor: use CompactionRegion argument in Picker

* chore: make compaction module public and remove unnessary clone

* refactor: move build_compaction_task() in CompactionScheduler{}

* chore: use  in open_compaction_region() and add some comments for public structure

* refactor: add 'manifest_dir()' in store-api

* refactor: move the default implementation to DefaultCompactor

* refactor: remove Options from MergeOutput

* chore: minor modification

* fix: clippy errors

* fix: unit test errors

* refactor: remove 'manifest_dir()' from store-api crate(already have one in opener)

* refactor: use 'region_dir' in CompactionRequest

* refactor: refine naming

* refactor: refine naming

* refactor: remove clone()

* chore: add comments

* refactor: add PickerOutput field in CompactorRequest

* feat: introduce RemoteJobScheduler

* feat: add RemoteJobScheudler in schedule_compaction_request()

* refactor: use Option type for senders field of CompactionFinished

* refactor: modify CompactionJob

* refactor: schedule remote compaction job by options

* refactor: remove unused Options

* build: remove unused log

* refactor: fallback to local compaction if the remote compaction failed

* fix: clippy errors

* refactor: add plugins in mito2

* refactor: add from_u64() for JobId

* refactor: make schedule module public

* refactor: add error for RemoteJobScheduler

* refactor: add Notifier

* refactor: use Arc for Notifier

* refactor: add 'remote_compaction' in compaction options

* fix: clippy errors

* fix: unrecognized table option

* refactor: add 'start_time' in CompactionJob

* refactor: modify error type of RemoteJobScheduler

* chore: revert changes for request

* refactor: code refactor by review comment

* refactor: use string type for JobId

* refactor: add 'waiters' field in DefaultNotifier

* fix: build error

* refactor: take coderabbit's review comment

* refactor: use uuid::Uuid as JobId

* refactor: return waiters when schedule failed and add on_failure for DefaultNotifier

* refactor: move waiters from notifier to Job

* refactor: use ObjectStoreManagerRef in open_compaction_region()

* refactor: implement  for JobId and adds related unit tests

* fix: run unit tests failed

* refactor: add RemoteJobSchedulerError
This commit is contained in:
zyy17
2024-07-02 15:08:43 +08:00
committed by GitHub
parent db5d1162f0
commit f2c08b8ddd
15 changed files with 445 additions and 107 deletions

View File

@@ -343,7 +343,8 @@ impl DatanodeBuilder {
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
let engines = Self::build_store_engines(opts, object_store_manager).await?;
let engines =
Self::build_store_engines(opts, object_store_manager, self.plugins.clone()).await?;
for engine in engines {
region_server.register_engine(engine);
}
@@ -357,14 +358,19 @@ impl DatanodeBuilder {
async fn build_store_engines(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<Vec<RegionEngineRef>> {
let mut engines = vec![];
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
let mito_engine =
Self::build_mito_engine(opts, object_store_manager.clone(), config.clone())
.await?;
let mito_engine = Self::build_mito_engine(
opts,
object_store_manager.clone(),
config.clone(),
plugins.clone(),
)
.await?;
let metric_engine = MetricEngine::new(mito_engine.clone());
engines.push(Arc::new(mito_engine) as _);
@@ -387,6 +393,7 @@ impl DatanodeBuilder {
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
config: MitoConfig,
plugins: Plugins,
) -> Result<MitoEngine> {
let mito_engine = match &opts.wal {
DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
@@ -395,6 +402,7 @@ impl DatanodeBuilder {
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,
@@ -403,6 +411,7 @@ impl DatanodeBuilder {
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
plugins,
)
.await
.context(BuildMitoEngineSnafu)?,

View File

@@ -27,7 +27,8 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::region::compact_request;
use common_telemetry::{debug, error};
use common_base::Plugins;
use common_telemetry::{debug, error, info};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
@@ -59,6 +60,9 @@ use crate::region::options::MergeMode;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::version::LevelMeta;
@@ -103,6 +107,8 @@ pub(crate) struct CompactionScheduler {
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
/// Plugins for the compaction scheduler.
plugins: Plugins,
}
impl CompactionScheduler {
@@ -112,6 +118,7 @@ impl CompactionScheduler {
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
plugins: Plugins,
) -> Self {
Self {
scheduler,
@@ -120,12 +127,13 @@ impl CompactionScheduler {
cache_manager,
engine_config,
listener,
plugins,
}
}
/// Schedules a compaction for the region.
#[allow(clippy::too_many_arguments)]
pub(crate) fn schedule_compaction(
pub(crate) async fn schedule_compaction(
&mut self,
region_id: RegionId,
compact_options: compact_request::Options,
@@ -153,10 +161,11 @@ impl CompactionScheduler {
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request, compact_options)
.await
}
/// Notifies the scheduler that the compaction job is finished successfully.
pub(crate) fn on_compaction_finished(
pub(crate) async fn on_compaction_finished(
&mut self,
region_id: RegionId,
manifest_ctx: &ManifestContextRef,
@@ -175,10 +184,13 @@ impl CompactionScheduler {
self.listener.clone(),
);
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
) {
if let Err(e) = self
.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
)
.await
{
error!(e; "Failed to schedule next compaction for region {}", region_id);
}
}
@@ -219,48 +231,13 @@ impl CompactionScheduler {
/// Schedules a compaction request.
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(
async fn schedule_compaction_request(
&mut self,
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
let picker = new_picker(options.clone(), &request.current_version.options.compaction);
let region_id = request.region_id();
let Some(mut task) = self.build_compaction_task(request, options) else {
// Nothing to compact, remove it from the region status map.
self.region_status.remove(&region_id);
return Ok(());
};
// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
task.run().await;
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);
e
})
}
fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};
// Notifies all pending tasks.
status.on_failure(err);
}
fn build_compaction_task(
&self,
req: CompactionRequest,
options: compact_request::Options,
) -> Option<Box<dyn CompactionTask>> {
let picker = new_picker(options, &req.current_version.options.compaction);
let region_id = req.region_id();
let CompactionRequest {
engine_config,
current_version,
@@ -271,7 +248,7 @@ impl CompactionScheduler {
cache_manager,
manifest_ctx,
listener,
} = req;
} = request;
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
@@ -304,10 +281,58 @@ impl CompactionScheduler {
for waiter in waiters {
waiter.send(Ok(0));
}
return None;
self.region_status.remove(&region_id);
return Ok(());
};
let task = CompactionTaskImpl {
// If specified to run compaction remotely, we schedule the compaction job remotely.
// It will fall back to local compaction if there is no remote job scheduler.
let waiters = if current_version.options.compaction.remote_compaction() {
if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
let remote_compaction_job = CompactionJob {
compaction_region: compaction_region.clone(),
picker_output: picker_output.clone(),
start_time,
waiters,
};
let result = remote_job_scheduler
.schedule(
RemoteJob::CompactionJob(remote_compaction_job),
Box::new(DefaultNotifier {
request_sender: request_sender.clone(),
}),
)
.await;
match result {
Ok(job_id) => {
info!(
"Scheduled remote compaction job {} for region {}",
job_id, region_id
);
return Ok(());
}
Err(e) => {
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
// Return the waiters back to the caller for local compaction.
e.waiters
}
}
} else {
debug!(
"Remote compaction is not enabled, fallback to local compaction for region {}",
region_id
);
waiters
}
} else {
waiters
};
// Create a local compaction task.
let mut local_compaction_task = Box::new(CompactionTaskImpl {
request_sender,
waiters,
start_time,
@@ -315,9 +340,29 @@ impl CompactionScheduler {
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor {}),
});
// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
local_compaction_task.run().await;
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);
e
})
}
fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};
Some(Box::new(task))
// Notifies all pending tasks.
status.on_failure(err);
}
}
@@ -602,6 +647,7 @@ mod tests {
waiter,
&manifest_ctx,
)
.await
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
@@ -620,6 +666,7 @@ mod tests {
waiter,
&manifest_ctx,
)
.await
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
@@ -659,6 +706,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
@@ -687,6 +735,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
@@ -698,7 +747,9 @@ mod tests {
.is_some());
// On compaction finished and schedule next compaction.
scheduler.on_compaction_finished(region_id, &manifest_ctx);
scheduler
.on_compaction_finished(region_id, &manifest_ctx)
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
@@ -718,6 +769,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
)
.await
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(scheduler

View File

@@ -60,6 +60,7 @@ use std::time::Instant;
use api::region::RegionResponse;
use async_trait::async_trait;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
@@ -107,11 +108,14 @@ impl MitoEngine {
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;
Ok(MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store_manager).await?),
inner: Arc::new(
EngineInner::new(config, log_store, object_store_manager, plugins).await?,
),
})
}
@@ -273,11 +277,13 @@ impl EngineInner {
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?,
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager, plugins)
.await?,
config,
wal_raw_entry_reader,
})

View File

@@ -754,6 +754,14 @@ pub enum Error {
source: Arc<Error>,
},
#[snafu(display("Failed to parse job id"))]
ParseJobId {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: uuid::Error,
},
#[snafu(display("Operation is not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
@@ -812,7 +820,8 @@ impl ErrorExt for Error {
| InvalidMetadata { .. }
| InvalidRegionOptions { .. }
| InvalidWalReadRequest { .. }
| PartitionOutOfRange { .. } => StatusCode::InvalidArguments,
| PartitionOutOfRange { .. }
| ParseJobId { .. } => StatusCode::InvalidArguments,
InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated,

View File

@@ -38,7 +38,7 @@ pub mod region;
mod region_write_ctx;
pub mod request;
pub mod row_converter;
pub(crate) mod schedule;
pub mod schedule;
pub mod sst;
mod time_provider;
pub mod wal;

View File

@@ -162,6 +162,12 @@ impl CompactionOptions {
CompactionOptions::Twcs(opts) => opts.time_window,
}
}
pub(crate) fn remote_compaction(&self) -> bool {
match self {
CompactionOptions::Twcs(opts) => opts.remote_compaction,
}
}
}
impl Default for CompactionOptions {
@@ -184,6 +190,9 @@ pub struct TwcsOptions {
/// Compaction time window defined when creating tables.
#[serde(with = "humantime_serde")]
pub time_window: Option<Duration>,
/// Whether to use remote compaction.
#[serde_as(as = "DisplayFromStr")]
pub remote_compaction: bool,
}
with_prefix!(prefix_twcs "compaction.twcs.");
@@ -208,6 +217,7 @@ impl Default for TwcsOptions {
max_active_window_runs: 1,
max_inactive_window_runs: 1,
time_window: None,
remote_compaction: false,
}
}
}
@@ -567,6 +577,7 @@ mod tests {
("compaction.twcs.max_inactive_window_runs", "2"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("compaction.twcs.remote_compaction", "false"),
("storage", "S3"),
("append_mode", "false"),
("index.inverted_index.ignore_column_ids", "1,2,3"),
@@ -588,6 +599,7 @@ mod tests {
max_active_window_runs: 8,
max_inactive_window_runs: 2,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
}),
storage: Some("S3".to_string()),
append_mode: false,
@@ -616,6 +628,7 @@ mod tests {
max_active_window_runs: 8,
max_inactive_window_runs: 2,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
}),
storage: Some("S3".to_string()),
append_mode: false,
@@ -676,6 +689,7 @@ mod tests {
max_active_window_runs: 8,
max_inactive_window_runs: 2,
time_window: Some(Duration::from_secs(3600 * 2)),
remote_compaction: false,
}),
storage: Some("S3".to_string()),
append_mode: false,

View File

@@ -387,7 +387,7 @@ pub(crate) fn validate_proto_value(
/// Oneshot output result sender.
#[derive(Debug)]
pub(crate) struct OutputTx(Sender<Result<AffectedRows>>);
pub struct OutputTx(Sender<Result<AffectedRows>>);
impl OutputTx {
/// Creates a new output sender.

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod remote_job_scheduler;
pub mod scheduler;

View File

@@ -0,0 +1,201 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::error;
use serde::{Deserialize, Serialize};
use snafu::{Location, ResultExt, Snafu};
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::PickerOutput;
use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
use crate::manifest::action::RegionEdit;
use crate::metrics::COMPACTION_FAILURE_COUNT;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// RemoteJobScheduler is a trait that defines the API to schedule remote jobs.
/// For example, a compaction job can be scheduled remotely as the following workflow:
/// ```mermaid
/// participant User
/// participant MitoEngine
/// participant CompactionScheduler
/// participant Plugins
/// participant RemoteJobScheduler
///
/// User->>MitoEngine: Initiates compaction
/// MitoEngine->>CompactionScheduler: schedule_compaction()
/// CompactionScheduler->>Plugins: Handle plugins
/// CompactionScheduler->>RemoteJobScheduler: schedule(CompactionJob)
/// RemoteJobScheduler-->>CompactionScheduler: Returns Job UUID
/// CompactionScheduler-->>MitoEngine: Task scheduled with Job UUID
/// MitoEngine-->>User: Compaction task scheduled
/// ```
#[async_trait::async_trait]
pub trait RemoteJobScheduler: Send + Sync + 'static {
/// Sends a job to the scheduler and returns a UUID for the job.
async fn schedule(
&self,
job: RemoteJob,
notifier: Box<dyn Notifier>,
) -> Result<JobId, RemoteJobSchedulerError>;
}
#[derive(Snafu, Debug)]
#[snafu(display("Internal error occurred in remote job scheduler: {}", reason))]
pub struct RemoteJobSchedulerError {
#[snafu(implicit)]
location: Location,
pub reason: String,
// Keep the waiters in the error so that we can notify them when fallback to the local compaction.
pub waiters: Vec<OutputTx>,
}
/// Notifier is used to notify the mito engine when a remote job is completed.
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
/// Notify the mito engine that a remote job is completed.
async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
}
/// Unique id for a remote job.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct JobId(Uuid);
impl JobId {
/// Parses job id from string.
pub fn parse_str(input: &str) -> Result<JobId> {
Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu)
}
}
impl fmt::Display for JobId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// RemoteJob is a job that can be executed remotely. For example, a remote compaction job.
#[allow(dead_code)]
pub enum RemoteJob {
CompactionJob(CompactionJob),
}
/// CompactionJob is a remote job that compacts a set of files in a compaction service.
#[allow(dead_code)]
pub struct CompactionJob {
pub compaction_region: CompactionRegion,
pub picker_output: PickerOutput,
pub start_time: Instant,
/// Send the result of the compaction job to these waiters.
pub waiters: Vec<OutputTx>,
}
/// RemoteJobResult is the result of a remote job.
#[allow(dead_code)]
pub enum RemoteJobResult {
CompactionJobResult(CompactionJobResult),
}
/// CompactionJobResult is the result of a compaction job.
#[allow(dead_code)]
pub struct CompactionJobResult {
pub job_id: JobId,
pub region_id: RegionId,
pub start_time: Instant,
pub region_edit: Result<RegionEdit>,
}
/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
pub(crate) struct DefaultNotifier {
/// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed.
pub(crate) request_sender: Sender<WorkerRequest>,
}
impl DefaultNotifier {
fn on_failure(&self, err: Arc<Error>, region_id: RegionId, mut waiters: Vec<OutputTx>) {
COMPACTION_FAILURE_COUNT.inc();
for waiter in waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
}
}
}
#[async_trait::async_trait]
impl Notifier for DefaultNotifier {
async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
match result {
RemoteJobResult::CompactionJobResult(result) => {
let notify = {
match result.region_edit {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: result.region_id,
senders: waiters,
start_time: result.start_time,
edit,
}),
Err(err) => {
error!(
"Compaction failed for region {}: {:?}",
result.region_id, err
);
let err = Arc::new(err);
self.on_failure(err.clone(), result.region_id, waiters);
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: result.region_id,
err,
})
}
}
};
if let Err(e) = self
.request_sender
.send(WorkerRequest::Background {
region_id: result.region_id,
notify,
})
.await
{
error!(
"Failed to notify compaction job status for region {}, error: {:?}",
result.region_id, e
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_id() {
let id = Uuid::new_v4().to_string();
let job_id = JobId::parse_str(&id).unwrap();
assert_eq!(job_id.to_string(), id);
}
}

View File

@@ -32,6 +32,7 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_telemetry::warn;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
@@ -256,16 +257,24 @@ impl TestEnv {
let data_home = self.data_home().display().to_string();
match log_store {
LogStoreImpl::RaftEngine(log_store) => {
MitoEngine::new(&data_home, config, log_store, object_store_manager)
.await
.unwrap()
}
LogStoreImpl::Kafka(log_store) => {
MitoEngine::new(&data_home, config, log_store, object_store_manager)
.await
.unwrap()
}
LogStoreImpl::RaftEngine(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
Plugins::new(),
)
.await
.unwrap(),
LogStoreImpl::Kafka(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
Plugins::new(),
)
.await
.unwrap(),
}
}
@@ -274,16 +283,24 @@ impl TestEnv {
let object_store_manager = self.object_store_manager.as_ref().unwrap().clone();
let data_home = self.data_home().display().to_string();
match self.log_store.as_ref().unwrap().clone() {
LogStoreImpl::RaftEngine(log_store) => {
MitoEngine::new(&data_home, config, log_store, object_store_manager)
.await
.unwrap()
}
LogStoreImpl::Kafka(log_store) => {
MitoEngine::new(&data_home, config, log_store, object_store_manager)
.await
.unwrap()
}
LogStoreImpl::RaftEngine(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
Plugins::new(),
)
.await
.unwrap(),
LogStoreImpl::Kafka(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
Plugins::new(),
)
.await
.unwrap(),
}
}
@@ -434,6 +451,7 @@ impl TestEnv {
config,
log_store,
self.object_store_manager.clone().unwrap(),
Plugins::new(),
)
.await
.unwrap(),
@@ -442,6 +460,7 @@ impl TestEnv {
config,
log_store,
self.object_store_manager.clone().unwrap(),
Plugins::new(),
)
.await
.unwrap(),
@@ -456,6 +475,7 @@ impl TestEnv {
config,
log_store,
self.object_store_manager.clone().unwrap(),
Plugins::new(),
)
.await
.unwrap(),
@@ -464,6 +484,7 @@ impl TestEnv {
config,
log_store,
self.object_store_manager.clone().unwrap(),
Plugins::new(),
)
.await
.unwrap(),
@@ -484,16 +505,22 @@ impl TestEnv {
config.sanitize(&data_home).unwrap();
match log_store {
LogStoreImpl::RaftEngine(log_store) => {
WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager))
.await
.unwrap()
}
LogStoreImpl::Kafka(log_store) => {
WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager))
.await
.unwrap()
}
LogStoreImpl::RaftEngine(log_store) => WorkerGroup::start(
Arc::new(config),
log_store,
Arc::new(object_store_manager),
Plugins::new(),
)
.await
.unwrap(),
LogStoreImpl::Kafka(log_store) => WorkerGroup::start(
Arc::new(config),
log_store,
Arc::new(object_store_manager),
Plugins::new(),
)
.await
.unwrap(),
}
}

View File

@@ -16,6 +16,7 @@
use std::sync::{Arc, Mutex};
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs;
@@ -86,6 +87,7 @@ impl SchedulerEnv {
Arc::new(CacheManager::default()),
Arc::new(MitoConfig::default()),
WorkerListener::default(),
Plugins::new(),
)
}

View File

@@ -30,6 +30,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_base::Plugins;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
@@ -126,6 +127,7 @@ impl WorkerGroup {
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<WorkerGroup> {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
@@ -171,6 +173,7 @@ impl WorkerGroup {
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
plugins: plugins.clone(),
}
.start()
})
@@ -293,6 +296,7 @@ impl WorkerGroup {
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
plugins: Plugins::new(),
}
.start()
})
@@ -363,6 +367,7 @@ struct WorkerStarter<S> {
flush_sender: watch::Sender<()>,
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
plugins: Plugins,
}
impl<S: LogStore> WorkerStarter<S> {
@@ -398,6 +403,7 @@ impl<S: LogStore> WorkerStarter<S> {
self.cache_manager.clone(),
self.config,
self.listener.clone(),
self.plugins.clone(),
),
stalled_requests: StalledRequests::default(),
listener: self.listener,
@@ -740,7 +746,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
continue;
}
DdlRequest::Compact(req) => {
self.handle_compaction_request(ddl.region_id, req, ddl.sender);
self.handle_compaction_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Truncate(_) => {

View File

@@ -24,7 +24,7 @@ use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles compaction request submitted to region worker.
pub(crate) fn handle_compaction_request(
pub(crate) async fn handle_compaction_request(
&mut self,
region_id: RegionId,
req: RegionCompactRequest,
@@ -34,14 +34,18 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
COMPACTION_REQUEST_COUNT.inc();
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
req.options,
&region.version_control,
&region.access_layer,
sender,
&region.manifest_ctx,
) {
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
req.options,
&region.version_control,
&region.access_layer,
sender,
&region.manifest_ctx,
)
.await
{
error!(e; "Failed to schedule compaction task for region: {}", region_id);
} else {
info!(
@@ -74,7 +78,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Schedule next compaction if necessary.
self.compaction_scheduler
.on_compaction_finished(region_id, &region.manifest_ctx);
.on_compaction_finished(region_id, &region.manifest_ctx)
.await;
}
/// When compaction fails, we simply log the error.

View File

@@ -242,14 +242,18 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_stalled_requests().await;
// Schedules compaction.
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
OptionOutputTx::none(),
&region.manifest_ctx,
) {
if let Err(e) = self
.compaction_scheduler
.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
OptionOutputTx::none(),
&region.manifest_ctx,
)
.await
{
warn!(
"Failed to schedule compaction after flush, region: {}, err: {}",
region.region_id, e

View File

@@ -25,6 +25,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
"compaction.twcs.max_active_window_runs",
"compaction.twcs.max_inactive_window_runs",
"compaction.twcs.time_window",
"compaction.twcs.remote_compaction",
"storage",
"index.inverted_index.ignore_column_ids",
"index.inverted_index.segment_row_count",