refactor: add Compactor trait to abstract the compaction (#4097)

* refactor: add Compactor trait

* chore: add compact() in Compactor trait and expose compaction module

* refactor: add CompactionRequest and open_compaction_region

* refactor: export the compaction api

* refactor: add DefaultCompactor::new_from_request

* refactor: no need to pass mito_config in open_compaction_region()

* refactor: CompactionRequest -> &CompactionRequest

* fix: typo

* docs: add docs for public apis

* refactor: remove 'Picker' from Compactor

* chore: add logs

* chore: change pub attribute for Picker

* refactor: remove do_merge_ssts()

* refactor: update comments

* refactor: use CompactionRegion argument in Picker

* chore: make compaction module public and remove unnessary clone

* refactor: move build_compaction_task() in CompactionScheduler{}

* chore: use  in open_compaction_region() and add some comments for public structure

* refactor: add 'manifest_dir()' in store-api

* refactor: move the default implementation to DefaultCompactor

* refactor: remove Options from MergeOutput

* chore: minor modification

* fix: clippy errors

* fix: unit test errors

* refactor: remove 'manifest_dir()' from store-api crate(already have one in opener)

* refactor: use 'region_dir' in CompactionRequest

* refactor: refine naming

* refactor: refine naming

* refactor: remove clone()

* chore: add comments

* refactor: add PickerOutput field in CompactorRequest
This commit is contained in:
zyy17
2024-06-17 11:03:47 +08:00
committed by GitHub
parent a2e3532a57
commit 5390603855
8 changed files with 604 additions and 313 deletions

View File

@@ -13,7 +13,8 @@
// limitations under the License.
mod buckets;
mod picker;
pub mod compactor;
pub mod picker;
mod task;
#[cfg(test)]
mod test_util;
@@ -31,7 +32,6 @@ use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
pub use picker::CompactionPickerRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -40,8 +40,9 @@ use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::compaction::window::WindowedCompactionPicker;
use crate::compaction::compactor::{CompactionRegion, DefaultCompactor};
use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
@@ -52,7 +53,6 @@ use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
use crate::read::BoxedBatchReader;
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
@@ -90,17 +90,6 @@ impl CompactionRequest {
}
}
/// Builds compaction picker according to [CompactionOptions].
pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef {
match strategy {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}
}
/// Compaction scheduler tracks and manages compaction tasks.
pub(crate) struct CompactionScheduler {
scheduler: SchedulerRef,
@@ -232,34 +221,13 @@ impl CompactionScheduler {
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
let picker = if let compact_request::Options::StrictWindow(window) = &options {
let window = if window.window_seconds == 0 {
None
} else {
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
compaction_options_to_picker(&request.current_version.options.compaction)
};
let region_id = request.region_id();
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
);
let pick_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick"])
.start_timer();
let Some(mut task) = picker.pick(request) else {
let Some(mut task) = self.build_compaction_task(request, options) else {
// Nothing to compact, remove it from the region status map.
self.region_status.remove(&region_id);
return Ok(());
};
drop(pick_timer);
// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
@@ -282,6 +250,70 @@ impl CompactionScheduler {
// Notifies all pending tasks.
status.on_failure(err);
}
fn build_compaction_task(
&self,
req: CompactionRequest,
options: compact_request::Options,
) -> Option<Box<dyn CompactionTask>> {
let picker = new_picker(options, &req.current_version.options.compaction);
let region_id = req.region_id();
let CompactionRequest {
engine_config,
current_version,
access_layer,
request_sender,
waiters,
start_time,
cache_manager,
manifest_ctx,
listener,
} = req;
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
);
let compaction_region = CompactionRegion {
region_id,
current_version: current_version.clone(),
region_options: current_version.options.clone(),
engine_config: engine_config.clone(),
region_metadata: current_version.metadata.clone(),
cache_manager: cache_manager.clone(),
access_layer: access_layer.clone(),
manifest_ctx: manifest_ctx.clone(),
};
let picker_output = {
let _pick_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["pick"])
.start_timer();
picker.pick(&compaction_region)
};
let picker_output = if let Some(picker_output) = picker_output {
picker_output
} else {
// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
for waiter in waiters {
waiter.send(Ok(0));
}
return None;
};
let task = CompactionTaskImpl {
request_sender,
waiters,
start_time,
listener,
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor {}),
};
Some(Box::new(task))
}
}
impl Drop for CompactionScheduler {
@@ -395,8 +427,8 @@ impl CompactionStatus {
}
}
#[derive(Debug)]
pub(crate) struct CompactionOutput {
#[derive(Debug, Clone)]
pub struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,

View File

@@ -0,0 +1,416 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::region::compact_request;
use common_telemetry::info;
use object_store::manager::ObjectStoreManager;
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::build_sst_reader;
use crate::compaction::picker::{new_picker, PickerOutput};
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::read::Source;
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionRef};
use crate::region::ManifestContext;
use crate::region::RegionState::Writable;
use crate::schedule::scheduler::LocalScheduler;
use crate::sst::file::{FileMeta, IndexType};
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::parquet::WriteOptions;
/// CompactionRegion represents a region that needs to be compacted.
/// It's the subset of MitoRegion.
#[derive(Clone)]
pub struct CompactionRegion {
pub region_id: RegionId,
pub region_options: RegionOptions,
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) region_metadata: RegionMetadataRef,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) manifest_ctx: Arc<ManifestContext>,
pub(crate) current_version: VersionRef,
}
/// CompactorRequest represents the request to compact a region.
#[derive(Debug, Clone)]
pub struct CompactorRequest {
pub region_id: RegionId,
pub region_dir: String,
pub region_options: HashMap<String, String>,
pub compaction_options: compact_request::Options,
pub picker_output: PickerOutput,
}
/// Open a compaction region from a compaction request.
/// It's simple version of RegionOpener::open().
pub async fn open_compaction_region(
req: &CompactorRequest,
mito_config: &MitoConfig,
object_store_manager: ObjectStoreManager,
) -> Result<CompactionRegion> {
let region_options = RegionOptions::try_from(&req.region_options)?;
let object_store = {
let name = &region_options.storage;
if let Some(name) = name {
object_store_manager
.find(name)
.context(ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?
} else {
object_store_manager.default_object_store()
}
};
let access_layer = {
let intermediate_manager =
IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone())
.await?;
Arc::new(AccessLayer::new(
req.region_dir.as_str(),
object_store.clone(),
intermediate_manager,
))
};
let manifest_manager = {
let region_manifest_options = RegionManifestOptions {
manifest_dir: new_manifest_dir(req.region_dir.as_str()),
object_store: object_store.clone(),
compress_type: manifest_compress_type(mito_config.compress_manifest),
checkpoint_distance: mito_config.manifest_checkpoint_distance,
};
RegionManifestManager::open(region_manifest_options, Default::default())
.await?
.context(EmptyRegionDirSnafu {
region_id: req.region_id,
region_dir: req.region_dir.as_str(),
})?
};
let manifest = manifest_manager.manifest();
let region_metadata = manifest.metadata.clone();
let manifest_ctx = Arc::new(ManifestContext::new(manifest_manager, Writable));
let file_purger = {
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs));
Arc::new(LocalFilePurger::new(
purge_scheduler.clone(),
access_layer.clone(),
None,
))
};
let current_version = {
let memtable_builder = MemtableBuilderProvider::new(None, Arc::new(mito_config.clone()))
.builder_for_options(
region_options.memtable.as_ref(),
!region_options.append_mode,
);
// Initial memtable id is 0.
let mutable = Arc::new(TimePartitions::new(
region_metadata.clone(),
memtable_builder.clone(),
0,
region_options.compaction.time_window(),
));
let version = VersionBuilder::new(region_metadata.clone(), mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(region_options.clone())
.build();
let version_control = Arc::new(VersionControl::new(version));
version_control.current().version
};
Ok(CompactionRegion {
region_options: region_options.clone(),
manifest_ctx,
access_layer,
current_version,
region_id: req.region_id,
cache_manager: Arc::new(CacheManager::default()),
engine_config: Arc::new(mito_config.clone()),
region_metadata: region_metadata.clone(),
})
}
/// `[MergeOutput]` represents the output of merging SST files.
#[derive(Default, Clone, Debug)]
pub struct MergeOutput {
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
}
impl MergeOutput {
pub fn is_empty(&self) -> bool {
self.files_to_add.is_empty() && self.files_to_remove.is_empty()
}
}
/// Compactor is the trait that defines the compaction logic.
#[async_trait::async_trait]
pub trait Compactor: Send + Sync + 'static {
/// Merge SST files for a region.
async fn merge_ssts(
&self,
compaction_region: &CompactionRegion,
picker_output: PickerOutput,
) -> Result<MergeOutput>;
/// Update the manifest after merging SST files.
async fn update_manifest(
&self,
compaction_region: &CompactionRegion,
merge_output: MergeOutput,
) -> Result<RegionEdit>;
/// Execute compaction for a region.
async fn compact(
&self,
compaction_region: &CompactionRegion,
compact_request_options: compact_request::Options,
) -> Result<()>;
}
/// DefaultCompactor is the default implementation of Compactor.
pub struct DefaultCompactor;
#[async_trait::async_trait]
impl Compactor for DefaultCompactor {
async fn merge_ssts(
&self,
compaction_region: &CompactionRegion,
mut picker_output: PickerOutput,
) -> Result<MergeOutput> {
let mut futs = Vec::with_capacity(picker_output.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
info!(
"Compaction region {} output [{}]-> {}",
compaction_region.region_id,
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.collect::<Vec<_>>()
.join(","),
output.output_file_id
);
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
..Default::default()
};
let create_inverted_index = compaction_region
.engine_config
.inverted_index
.create_on_compaction
.auto();
let mem_threshold_index_create = compaction_region
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
compaction_region
.engine_config
.inverted_index
.write_buffer_size
.as_bytes() as usize,
);
let region_metadata = compaction_region.region_metadata.clone();
let sst_layer = compaction_region.access_layer.clone();
let region_id = compaction_region.region_id;
let file_id = output.output_file_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
.current_version
.options
.index_options
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
futs.push(async move {
let reader = build_sst_reader(
region_metadata.clone(),
sst_layer.clone(),
Some(cache_manager.clone()),
&output.inputs,
append_mode,
output.filter_deleted,
output.output_time_range,
)
.await?;
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
file_id,
metadata: region_metadata,
source: Source::Reader(reader),
cache_manager,
storage,
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options,
},
&write_opts,
)
.await?
.map(|sst_info| FileMeta {
region_id,
file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
});
Ok(file_meta_opt)
});
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk =
Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION);
for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_bg(task));
}
}
let metas = futures::future::try_join_all(task_chunk)
.await
.context(JoinSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?;
output_files.extend(metas.into_iter().flatten());
}
let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
inputs.extend(
picker_output
.expired_ssts
.iter()
.map(|f| f.meta_ref().clone()),
);
Ok(MergeOutput {
files_to_add: output_files,
files_to_remove: inputs,
compaction_time_window: Some(picker_output.time_window_size),
})
}
async fn update_manifest(
&self,
compaction_region: &CompactionRegion,
merge_output: MergeOutput,
) -> Result<RegionEdit> {
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: merge_output.files_to_add,
files_to_remove: merge_output.files_to_remove,
compaction_time_window: merge_output
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
compaction_region
.manifest_ctx
.update_manifest(Writable, action_list)
.await?;
Ok(edit)
}
// The default implementation of compact combines the merge_ssts and update_manifest functions.
// Note: It's local compaction and only used for testing purpose.
async fn compact(
&self,
compaction_region: &CompactionRegion,
compact_request_options: compact_request::Options,
) -> Result<()> {
let picker_output = {
let picker_output = new_picker(
compact_request_options,
&compaction_region.region_options.compaction,
)
.pick(compaction_region);
if let Some(picker_output) = picker_output {
picker_output
} else {
info!(
"No files to compact for region_id: {}",
compaction_region.region_id
);
return Ok(());
}
};
let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
if merge_output.is_empty() {
info!(
"No files to compact for region_id: {}",
compaction_region.region_id
);
return Ok(());
}
self.update_manifest(compaction_region, merge_output)
.await?;
Ok(())
}
}

View File

@@ -15,17 +15,55 @@
use std::fmt::Debug;
use std::sync::Arc;
use crate::compaction::CompactionRequest;
use api::v1::region::compact_request;
pub type CompactionPickerRef = Arc<dyn Picker + Send + Sync>;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::twcs::TwcsPicker;
use crate::compaction::window::WindowedCompactionPicker;
use crate::compaction::CompactionOutput;
use crate::region::options::CompactionOptions;
use crate::sst::file::FileHandle;
#[async_trait::async_trait]
pub trait CompactionTask: Debug + Send + Sync + 'static {
pub(crate) trait CompactionTask: Debug + Send + Sync + 'static {
async fn run(&mut self);
}
/// Picker picks input SST files and builds the compaction task.
/// Picker picks input SST files for compaction.
/// Different compaction strategy may implement different pickers.
pub trait Picker: Debug + Send + 'static {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>>;
pub trait Picker: Debug + Send + Sync + 'static {
/// Picks input SST files for compaction.
fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput>;
}
/// PickerOutput is the output of a [`Picker`].
/// It contains the outputs of the compaction and the expired SST files.
#[derive(Default, Clone, Debug)]
pub struct PickerOutput {
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub time_window_size: i64,
}
/// Create a new picker based on the compaction request options and compaction options.
pub fn new_picker(
compact_request_options: compact_request::Options,
compaction_options: &CompactionOptions,
) -> Arc<dyn Picker> {
if let compact_request::Options::StrictWindow(window) = &compact_request_options {
let window = if window.window_seconds == 0 {
None
} else {
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
match compaction_options {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}
}
}

View File

@@ -14,71 +14,51 @@
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use common_telemetry::{error, info};
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::picker::CompactionTask;
use crate::compaction::{build_sst_reader, CompactionOutput};
use crate::config::MitoConfig;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::action::RegionEdit;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::{ManifestContextRef, RegionState};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileMeta, IndexType};
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;
const MAX_PARALLEL_COMPACTION: usize = 8;
/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 8;
pub(crate) struct CompactionTaskImpl {
pub engine_config: Arc<MitoConfig>,
pub region_id: RegionId,
pub metadata: RegionMetadataRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub compaction_time_window: Option<i64>,
pub compaction_region: CompactionRegion,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
pub waiters: Vec<OutputTx>,
/// Start time of compaction task
pub start_time: Instant,
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option<String>,
/// Index options of the region.
pub(crate) index_options: IndexOptions,
/// The region is using append mode.
pub(crate) append_mode: bool,
/// Manifest context.
pub(crate) manifest_ctx: ManifestContextRef,
/// Event listener.
pub(crate) listener: WorkerListener,
/// Compactor to handle compaction.
pub(crate) compactor: Arc<dyn Compactor>,
/// Output of the picker.
pub(crate) picker_output: PickerOutput,
}
impl Debug for CompactionTaskImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TwcsCompactionTask")
.field("region_id", &self.region_id)
.field("outputs", &self.outputs)
.field("expired_ssts", &self.expired_ssts)
.field("compaction_time_window", &self.compaction_time_window)
.field("append_mode", &self.append_mode)
.field("region_id", &self.compaction_region.region_id)
.field("picker_output", &self.picker_output)
.field(
"append_mode",
&self.compaction_region.region_options.append_mode,
)
.finish()
}
}
@@ -91,174 +71,54 @@ impl Drop for CompactionTaskImpl {
impl CompactionTaskImpl {
fn mark_files_compacting(&self, compacting: bool) {
self.outputs
self.picker_output
.outputs
.iter()
.flat_map(|o| o.inputs.iter())
.for_each(|f| f.set_compacting(compacting))
}
/// Merges all SST files.
/// Returns `(output files, input files)`.
async fn merge_ssts(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
let mut futs = Vec::with_capacity(self.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum());
for output in self.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
info!(
"Compaction region {} output [{}]-> {}",
self.region_id,
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.collect::<Vec<_>>()
.join(","),
output.output_file_id
);
let write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
..Default::default()
};
let create_inverted_index = self
.engine_config
.inverted_index
.create_on_compaction
.auto();
let mem_threshold_index_create = self
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
self.engine_config
.inverted_index
.write_buffer_size
.as_bytes() as usize,
);
let metadata = self.metadata.clone();
let sst_layer = self.sst_layer.clone();
let region_id = self.region_id;
let file_id = output.output_file_id;
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
let index_options = self.index_options.clone();
let append_mode = self.append_mode;
futs.push(async move {
let reader = build_sst_reader(
metadata.clone(),
sst_layer.clone(),
Some(cache_manager.clone()),
&output.inputs,
append_mode,
output.filter_deleted,
output.output_time_range,
)
.await?;
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
file_id,
metadata,
source: Source::Reader(reader),
cache_manager,
storage,
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options,
},
&write_opts,
)
.await?
.map(|sst_info| FileMeta {
region_id,
file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
});
Ok(file_meta_opt)
});
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION);
for _ in 0..MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_bg(task));
}
}
let metas = futures::future::try_join_all(task_chunk)
.await
.context(error::JoinSnafu)?
.into_iter()
.collect::<error::Result<Vec<_>>>()?;
output_files.extend(metas.into_iter().flatten());
}
let inputs = compacted_inputs.into_iter().collect();
Ok((output_files, inputs))
.for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
}
async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
self.mark_files_compacting(true);
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
.start_timer();
let (added, mut deleted) = match self.merge_ssts().await {
let compaction_result = match self
.compactor
.merge_ssts(&self.compaction_region, self.picker_output.clone())
.await
{
Ok(v) => v,
Err(e) => {
error!(e; "Failed to compact region: {}", self.region_id);
error!(e; "Failed to compact region: {}", self.compaction_region.region_id);
merge_timer.stop_and_discard();
return Err(e);
}
};
deleted.extend(self.expired_ssts.iter().map(|f| f.meta_ref().clone()));
let merge_time = merge_timer.stop_and_record();
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
self.region_id,
deleted,
added,
self.compaction_time_window,
self.compaction_region.region_id,
compaction_result.files_to_remove,
compaction_result.files_to_add,
compaction_result.compaction_time_window,
self.waiters.len(),
merge_time,
);
self.listener.on_merge_ssts_finished(self.region_id).await;
self.listener
.on_merge_ssts_finished(self.compaction_region.region_id)
.await;
let _manifest_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["write_manifest"])
.start_timer();
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: added,
files_to_remove: deleted,
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// We might leak files if we fail to update manifest. We can add a cleanup task to
// remove them later.
self.manifest_ctx
.update_manifest(RegionState::Writable, action_list)
.await?;
Ok(edit)
self.compactor
.update_manifest(&self.compaction_region, compaction_result)
.await
}
/// Handles compaction failure, notifies all waiters.
@@ -266,7 +126,7 @@ impl CompactionTaskImpl {
COMPACTION_FAILURE_COUNT.inc();
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
region_id: self.compaction_region.region_id,
}));
}
}
@@ -276,7 +136,7 @@ impl CompactionTaskImpl {
if let Err(e) = self.request_sender.send(request).await {
error!(
"Failed to notify compaction job status for region {}, request: {:?}",
self.region_id, e.0
self.compaction_region.region_id, e.0
);
}
}
@@ -287,25 +147,25 @@ impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
region_id: self.compaction_region.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
edit,
}),
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.region_id);
error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
let err = Arc::new(e);
// notify compaction waiters
self.on_failure(err.clone());
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: self.region_id,
region_id: self.compaction_region.region_id,
err,
})
}
};
self.send_to_worker(WorkerRequest::Background {
region_id: self.region_id,
region_id: self.compaction_region.region_id,
notify,
})
.await;

View File

@@ -22,9 +22,9 @@ use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest};
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, FileId};
use crate::sst::version::LevelMeta;
@@ -110,25 +110,10 @@ impl TwcsPicker {
}
impl Picker for TwcsPicker {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>> {
let CompactionRequest {
engine_config,
current_version,
access_layer,
request_sender,
waiters,
start_time,
cache_manager,
manifest_ctx,
listener,
..
} = req;
let region_metadata = current_version.metadata.clone();
let region_id = region_metadata.region_id;
let levels = current_version.ssts.levels();
let ttl = current_version.options.ttl;
fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
let region_id = compaction_region.region_id;
let levels = compaction_region.current_version.ssts.levels();
let ttl = compaction_region.current_version.options.ttl;
let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis());
if !expired_ssts.is_empty() {
info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
@@ -136,7 +121,8 @@ impl Picker for TwcsPicker {
expired_ssts.iter().for_each(|f| f.set_compacting(true));
}
let compaction_time_window = current_version
let compaction_time_window = compaction_region
.current_version
.compaction_time_window
.map(|window| window.as_secs() as i64);
let time_window_size = compaction_time_window
@@ -157,31 +143,14 @@ impl Picker for TwcsPicker {
let outputs = self.build_output(&windows, active_window);
if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
for waiter in waiters {
waiter.send(Ok(0));
}
return None;
}
let task = CompactionTaskImpl {
engine_config,
region_id,
metadata: region_metadata,
sst_layer: access_layer,
Some(PickerOutput {
outputs,
expired_ssts,
compaction_time_window: Some(time_window_size),
request_sender,
waiters,
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
append_mode: current_version.options.append_mode,
manifest_ctx,
listener,
};
Some(Box::new(task))
time_window_size,
})
}
}

View File

@@ -23,9 +23,9 @@ use common_time::Timestamp;
use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest};
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::region::version::VersionRef;
use crate::sst::file::{FileHandle, FileId};
@@ -101,42 +101,18 @@ impl WindowedCompactionPicker {
}
impl Picker for WindowedCompactionPicker {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>> {
let region_id = req.region_id();
let CompactionRequest {
engine_config,
current_version,
access_layer,
request_sender,
waiters,
start_time,
cache_manager,
manifest_ctx,
listener,
} = req;
fn pick(&self, compaction_region: &CompactionRegion) -> Option<PickerOutput> {
let (outputs, expired_ssts, time_window) = self.pick_inner(
compaction_region.current_version.metadata.region_id,
&compaction_region.current_version,
Timestamp::current_millis(),
);
let (outputs, expired_ssts, time_window) =
self.pick_inner(region_id, &current_version, Timestamp::current_millis());
let task = CompactionTaskImpl {
engine_config: engine_config.clone(),
region_id,
metadata: current_version.metadata.clone().clone(),
sst_layer: access_layer.clone(),
Some(PickerOutput {
outputs,
expired_ssts,
compaction_time_window: Some(time_window),
request_sender,
waiters,
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
append_mode: current_version.options.append_mode,
manifest_ctx,
listener,
};
Some(Box::new(task))
time_window_size: time_window,
})
}
}

View File

@@ -25,7 +25,7 @@ pub mod test_util;
mod access_layer;
mod cache;
mod compaction;
pub mod compaction;
pub mod config;
pub mod engine;
pub mod error;

View File

@@ -530,6 +530,6 @@ where
}
/// Returns the directory to the manifest files.
fn new_manifest_dir(region_dir: &str) -> String {
pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
join_dir(region_dir, "manifest")
}