fix: immediately reschedule a compaction after compaction (#1882)

* fix: immediately reschedule a compaction after compaction

* refactor: add WriterCompactRequest

* feat: reschedule compaction

* fix: only reschedule compaction when it's triggered by flush

* fix: remove max_files_in_l0

---------

Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Lei, HUANG
2023-07-10 15:05:31 +08:00
committed by GitHub
parent c3db99513a
commit 553530cff4
12 changed files with 219 additions and 114 deletions

View File

@@ -30,7 +30,7 @@ use crate::compaction::scheduler::CompactionRequestImpl;
use crate::compaction::task::{CompactionOutput, CompactionTask, CompactionTaskImpl};
use crate::error::{Result, TtlCalculationSnafu};
use crate::scheduler::Request;
use crate::sst::{FileHandle, LevelMeta};
use crate::sst::{FileHandle, FileId, LevelMeta};
/// Picker picks input SST files and builds the compaction task.
/// Different compaction strategy may implement different pickers.
@@ -152,6 +152,7 @@ impl<S: LogStore> Picker for LeveledTimeWindowPicker<S> {
expired_ssts,
sst_write_buffer_size: req.sst_write_buffer_size,
compaction_time_window,
reschedule_on_finish: req.reschedule_on_finish,
}));
}
@@ -186,6 +187,7 @@ impl<S> LeveledTimeWindowPicker<S> {
debug!("File bucket:{}, file groups: {:?}", time_window, buckets);
results.extend(buckets.into_iter().map(|(bound, files)| CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
time_window_bound: bound,
time_window_sec: time_window,

View File

@@ -56,7 +56,7 @@ impl<S: LogStore> Request for CompactionRequestImpl<S> {
pub struct CompactionRequestImpl<S: LogStore> {
pub region_id: RegionId,
pub sst_layer: AccessLayerRef,
pub writer: RegionWriterRef,
pub writer: RegionWriterRef<S>,
pub shared: SharedDataRef,
pub manifest: RegionManifest,
pub wal: Wal<S>,
@@ -66,6 +66,8 @@ pub struct CompactionRequestImpl<S: LogStore> {
pub sender: Option<Sender<Result<()>>>,
pub picker: CompactionPickerRef<S>,
pub sst_write_buffer_size: ReadableSize,
/// Whether to immediately reschedule another compaction when finished.
pub reschedule_on_finish: bool,
}
impl<S: LogStore> CompactionRequestImpl<S> {

View File

@@ -17,14 +17,17 @@ use std::fmt::{Debug, Formatter};
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info, timer};
use itertools::Itertools;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::compaction::writer::build_sst_reader;
use crate::error;
use crate::error::Result;
use crate::manifest::action::RegionEdit;
use crate::manifest::region::RegionManifest;
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::region::{CompactContext, RegionWriterRef, SharedDataRef, WriterCompactRequest};
use crate::schema::RegionSchemaRef;
use crate::sst::{
AccessLayerRef, FileHandle, FileId, FileMeta, Level, Source, SstInfo, WriteOptions,
@@ -42,13 +45,14 @@ pub struct CompactionTaskImpl<S: LogStore> {
pub schema: RegionSchemaRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub writer: RegionWriterRef,
pub writer: RegionWriterRef<S>,
pub shared_data: SharedDataRef,
pub wal: Wal<S>,
pub manifest: RegionManifest,
pub expired_ssts: Vec<FileHandle>,
pub sst_write_buffer_size: ReadableSize,
pub compaction_time_window: Option<i64>,
pub reschedule_on_finish: bool,
}
impl<S: LogStore> Debug for CompactionTaskImpl<S> {
@@ -77,6 +81,16 @@ impl<S: LogStore> CompactionTaskImpl<S> {
let sst_write_buffer_size = self.sst_write_buffer_size;
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
info!(
"Compaction output [{}]-> {}",
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.join(","),
output.output_file_id
);
// TODO(hl): Maybe spawn to runtime to exploit in-job parallelism.
futs.push(async move {
output
@@ -90,10 +104,14 @@ impl<S: LogStore> CompactionTaskImpl<S> {
let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION);
for _ in 0..MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
task_chunk.push(task);
task_chunk.push(common_runtime::spawn_bg(task));
}
}
let metas = futures::future::try_join_all(task_chunk).await?;
let metas = futures::future::try_join_all(task_chunk)
.await
.context(error::JoinSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?;
outputs.extend(metas.into_iter().flatten());
}
@@ -155,12 +173,40 @@ impl<S: LogStore> CompactionTask for CompactionTaskImpl<S> {
"Compacting SST files, input: {:?}, output: {:?}, window: {:?}",
input_ids, output_ids, self.compaction_time_window
);
self.write_manifest_and_apply(output, compacted)
let no_output = output.is_empty();
let write_result = self
.write_manifest_and_apply(output, compacted)
.await
.map_err(|e| {
error!(e; "Failed to update region manifest: {}", self.shared_data.name());
e
})
});
if !no_output && self.reschedule_on_finish {
// only reschedule another compaction if current compaction has output and it's
// triggered by flush.
if let Err(e) = self
.writer
.compact(WriterCompactRequest {
shared_data: self.shared_data.clone(),
sst_layer: self.sst_layer.clone(),
manifest: self.manifest.clone(),
wal: self.wal.clone(),
region_writer: self.writer.clone(),
compact_ctx: CompactContext { wait: false },
})
.await
{
error!(e; "Failed to schedule a compaction after compaction, region id: {}", self.shared_data.id());
} else {
info!(
"Immediately schedule another compaction for region: {}",
self.shared_data.id()
);
}
}
write_result
}
}
@@ -168,6 +214,7 @@ impl<S: LogStore> CompactionTask for CompactionTaskImpl<S> {
/// and a many-to-one compaction from level n+1 to level n+1.
#[derive(Debug)]
pub struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// The left bound of time window.
@@ -206,13 +253,12 @@ impl CompactionOutput {
)
.await?;
let output_file_id = FileId::random();
let opts = WriteOptions {
sst_write_buffer_size,
};
Ok(sst_layer
.write_sst(output_file_id, Source::Reader(reader), &opts)
let _timer = timer!(crate::metrics::MERGE_ELAPSED);
let meta = sst_layer
.write_sst(self.output_file_id, Source::Reader(reader), &opts)
.await?
.map(
|SstInfo {
@@ -221,12 +267,13 @@ impl CompactionOutput {
..
}| FileMeta {
region_id,
file_id: output_file_id,
file_id: self.output_file_id,
time_range,
level: self.output_level,
file_size,
},
))
);
Ok(meta)
}
}

View File

@@ -28,7 +28,7 @@ use store_api::logstore::LogStore;
use crate::compaction::picker::get_expired_ssts;
use crate::compaction::task::CompactionOutput;
use crate::compaction::{infer_time_bucket, CompactionRequestImpl, CompactionTaskImpl, Picker};
use crate::sst::{FileHandle, LevelMeta};
use crate::sst::{FileHandle, FileId, LevelMeta};
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
/// candidates.
@@ -76,6 +76,7 @@ impl<S> TwcsPicker<S> {
if let Some(active_window) = active_window && *window == active_window {
if files.len() > self.max_active_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1, // we only have two levels and always compact to l1
time_window_bound: *window,
time_window_sec: window_size,
@@ -85,18 +86,21 @@ impl<S> TwcsPicker<S> {
strict_window: false,
});
} else {
debug!("Active window not present or no enough files in active window {:?}", active_window);
debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window);
}
} else {
// not active writing window
if files.len() > self.max_inactive_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
strict_window: false,
});
} else {
debug!("No enough files, current: {}, max_inactive_window_files: {}", files.len(), self.max_inactive_window_files)
}
}
}
@@ -142,6 +146,10 @@ impl<S: LogStore> Picker for TwcsPicker<S> {
);
let outputs = self.build_output(&windows, active_window, time_window_size);
if outputs.is_empty() && expired_ssts.is_empty() {
return Ok(None);
}
let task = CompactionTaskImpl {
schema: req.schema(),
sst_layer: req.sst_layer.clone(),
@@ -153,6 +161,7 @@ impl<S: LogStore> Picker for TwcsPicker<S> {
expired_ssts,
sst_write_buffer_size: req.sst_write_buffer_size,
compaction_time_window: Some(time_window_size),
reschedule_on_finish: req.reschedule_on_finish,
};
Ok(Some(task))
}

View File

@@ -528,6 +528,16 @@ pub enum Error {
source: table::error::Error,
location: Location,
},
#[snafu(display(
"Failed to join spawned tasks, source: {}, location: {}",
source,
location
))]
JoinError {
source: JoinError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -628,6 +638,7 @@ impl ErrorExt for Error {
TtlCalculation { source, .. } => source.status_code(),
ConvertColumnsToRows { .. } | SortArrays { .. } => StatusCode::Unexpected,
BuildPredicate { source, .. } => source.status_code(),
JoinError { .. } => StatusCode::Unexpected,
}
}

View File

@@ -225,7 +225,7 @@ pub struct FlushJob<S: LogStore> {
/// Sst access layer of the region.
pub sst_layer: AccessLayerRef,
/// Region writer, used to persist log entry that points to the latest manifest file.
pub writer: RegionWriterRef,
pub writer: RegionWriterRef<S>,
/// Region write-ahead logging, used to write data/meta to the log file.
pub wal: Wal<S>,
/// Region manifest service, used to persist metadata.

View File

@@ -96,7 +96,7 @@ pub struct FlushRegionRequest<S: LogStore> {
/// Sst access layer of the region.
pub sst_layer: AccessLayerRef,
/// Region writer, used to persist log entry that points to the latest manifest file.
pub writer: RegionWriterRef,
pub writer: RegionWriterRef<S>,
/// Region write-ahead logging, used to write data/meta to the log file.
pub wal: Wal<S>,
/// Region manifest service, used to persist metadata.
@@ -149,6 +149,8 @@ impl<S: LogStore> From<&FlushRegionRequest<S>> for CompactionRequestImpl<S> {
sender: None,
picker: req.compaction_picker.clone(),
sst_write_buffer_size: req.engine_config.sst_write_buffer_size,
// compaction triggered by flush always reschedules
reschedule_on_finish: true,
}
}
}
@@ -331,13 +333,22 @@ async fn execute_flush_region<S: LogStore>(
let max_files_in_l0 = req.engine_config.max_files_in_l0;
let shared_data = req.shared.clone();
// If flush is success, schedule a compaction request for this region.
let _ = region::schedule_compaction(
shared_data,
compaction_scheduler,
compaction_request,
max_files_in_l0,
);
let level0_file_num = shared_data
.version_control
.current()
.ssts()
.level(0)
.file_num();
if level0_file_num <= max_files_in_l0 {
logging::debug!(
"No enough SST files in level 0 (threshold: {}), skip compaction",
max_files_in_l0
);
} else {
// If flush is success, schedule a compaction request for this region.
let _ =
region::schedule_compaction(shared_data, compaction_scheduler, compaction_request);
}
// Complete the request.
FlushRequest::Region { req, sender }.complete(Ok(()));

View File

@@ -32,6 +32,8 @@ pub const REGION_COUNT: &str = "storage.region_count";
pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed";
/// Elapsed time of a compact job.
pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed";
/// Elapsed time for merging SST files.
pub const MERGE_ELAPSED: &str = "storage.compaction.merge.elapsed";
/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";
/// Elapsed time of inserting memtable.

View File

@@ -51,7 +51,9 @@ use crate::memtable::MemtableBuilderRef;
use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef};
pub(crate) use crate::region::writer::schedule_compaction;
use crate::region::writer::DropContext;
pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext};
pub use crate::region::writer::{
AlterContext, RegionWriter, RegionWriterRef, WriterCompactRequest, WriterContext,
};
use crate::schema::compat::CompatWrite;
use crate::snapshot::SnapshotImpl;
use crate::sst::AccessLayerRef;
@@ -176,16 +178,11 @@ pub type RecoveredMetadataMap = BTreeMap<SequenceNumber, (ManifestVersion, RawRe
pub struct CompactContext {
/// Whether to wait the compaction result.
pub wait: bool,
/// Max file number in level 0.
pub max_files_in_l0: usize,
}
impl Default for CompactContext {
fn default() -> CompactContext {
CompactContext {
wait: true,
max_files_in_l0: 1,
}
CompactContext { wait: true }
}
}
@@ -238,6 +235,7 @@ impl<S: LogStore> RegionImpl<S> {
let version_control = VersionControl::with_version(version);
let wal = Wal::new(id, store_config.log_store);
let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy);
let inner = Arc::new(RegionInner {
shared: Arc::new(SharedData {
id,
@@ -250,12 +248,14 @@ impl<S: LogStore> RegionImpl<S> {
store_config.engine_config.clone(),
store_config.ttl,
store_config.write_buffer_size,
store_config.compaction_scheduler.clone(),
compaction_picker.clone(),
)),
wal,
flush_strategy: store_config.flush_strategy,
flush_scheduler: store_config.flush_scheduler,
compaction_scheduler: store_config.compaction_scheduler,
compaction_picker: compaction_strategy_to_picker(&store_config.compaction_strategy),
compaction_picker,
sst_layer: store_config.sst_layer,
manifest: store_config.manifest,
});
@@ -334,14 +334,16 @@ impl<S: LogStore> RegionImpl<S> {
last_flush_millis: AtomicI64::new(0),
});
let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy);
let writer = Arc::new(RegionWriter::new(
store_config.memtable_builder,
store_config.engine_config.clone(),
store_config.ttl,
store_config.write_buffer_size,
store_config.compaction_scheduler.clone(),
compaction_picker.clone(),
));
let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy);
let writer_ctx = WriterContext {
shared: &shared,
flush_strategy: &store_config.flush_strategy,
@@ -646,7 +648,7 @@ pub type SharedDataRef = Arc<SharedData>;
struct RegionInner<S: LogStore> {
shared: SharedDataRef,
writer: RegionWriterRef,
writer: RegionWriterRef<S>,
wal: Wal<S>,
flush_strategy: FlushStrategyRef,
flush_scheduler: FlushSchedulerRef<S>,
@@ -763,18 +765,16 @@ impl<S: LogStore> RegionInner<S> {
}
/// Compact the region manually.
async fn compact(&self, ctx: CompactContext) -> Result<()> {
let writer_ctx = WriterContext {
shared: &self.shared,
flush_strategy: &self.flush_strategy,
flush_scheduler: &self.flush_scheduler,
compaction_scheduler: &self.compaction_scheduler,
sst_layer: &self.sst_layer,
wal: &self.wal,
writer: &self.writer,
manifest: &self.manifest,
compaction_picker: self.compaction_picker.clone(),
};
self.writer.compact(writer_ctx, ctx).await
async fn compact(&self, compact_ctx: CompactContext) -> Result<()> {
self.writer
.compact(WriterCompactRequest {
shared_data: self.shared.clone(),
sst_layer: self.sst_layer.clone(),
manifest: self.manifest.clone(),
wal: self.wal.clone(),
region_writer: self.writer.clone(),
compact_ctx,
})
.await
}
}

View File

@@ -50,13 +50,13 @@ use crate::version::{VersionControl, VersionControlRef, VersionEdit};
use crate::wal::Wal;
use crate::write_batch::WriteBatch;
pub type RegionWriterRef = Arc<RegionWriter>;
pub type RegionWriterRef<S> = Arc<RegionWriter<S>>;
// TODO(yingwen): Add benches for write and support group commit to improve write throughput.
/// Region writer manages all write operations to the region.
#[derive(Debug)]
pub struct RegionWriter {
pub struct RegionWriter<S: LogStore> {
// To avoid dead lock, we need to ensure the lock order is: inner -> version_mutex.
/// Inner writer guarded by write lock, the write lock is used to ensure
/// all write operations are serialized.
@@ -65,15 +65,23 @@ pub struct RegionWriter {
///
/// Increasing committed sequence should be guarded by this lock.
version_mutex: Mutex<()>,
compaction_scheduler: CompactionSchedulerRef<S>,
compaction_picker: CompactionPickerRef<S>,
}
impl RegionWriter {
impl<S> RegionWriter<S>
where
S: LogStore,
{
pub fn new(
memtable_builder: MemtableBuilderRef,
config: Arc<EngineConfig>,
ttl: Option<Duration>,
write_buffer_size: usize,
) -> RegionWriter {
compaction_scheduler: CompactionSchedulerRef<S>,
compaction_picker: CompactionPickerRef<S>,
) -> RegionWriter<S> {
RegionWriter {
inner: Mutex::new(WriterInner::new(
memtable_builder,
@@ -82,11 +90,13 @@ impl RegionWriter {
write_buffer_size,
)),
version_mutex: Mutex::new(()),
compaction_scheduler,
compaction_picker,
}
}
/// Write to region in the write lock.
pub async fn write<S: LogStore>(
pub async fn write(
&self,
ctx: &WriteContext,
request: WriteBatch,
@@ -102,7 +112,7 @@ impl RegionWriter {
}
/// Replay data to memtables.
pub async fn replay<S: LogStore>(
pub async fn replay(
&self,
recovered_metadata: RecoveredMetadataMap,
writer_ctx: WriterContext<'_, S>,
@@ -114,7 +124,7 @@ impl RegionWriter {
}
/// Write and apply the region edit.
pub(crate) async fn write_edit_and_apply<S: LogStore>(
pub(crate) async fn write_edit_and_apply(
&self,
wal: &Wal<S>,
shared: &SharedDataRef,
@@ -172,11 +182,7 @@ impl RegionWriter {
}
/// Alter schema of the region.
pub async fn alter<S: LogStore>(
&self,
alter_ctx: AlterContext<'_, S>,
request: AlterRequest,
) -> Result<()> {
pub async fn alter(&self, alter_ctx: AlterContext<'_, S>, request: AlterRequest) -> Result<()> {
// To alter the schema, we need to acquire the write lock first, so we could
// avoid other writers write to the region and switch the memtable safely.
// Another potential benefit is that the write lock also protect against concurrent
@@ -239,7 +245,7 @@ impl RegionWriter {
/// Allocate a sequence and persist the manifest version using that sequence to the wal.
///
/// This method should be protected by the `version_mutex`.
async fn persist_manifest_version<S: LogStore>(
async fn persist_manifest_version(
&self,
wal: &Wal<S>,
version_control: &VersionControlRef,
@@ -279,7 +285,7 @@ impl RegionWriter {
Ok(())
}
pub async fn on_drop<S: LogStore>(&self, drop_ctx: DropContext<'_, S>) -> Result<()> {
pub async fn on_drop(&self, drop_ctx: DropContext<'_, S>) -> Result<()> {
// 1. Acquires the write lock.
// 2. Close writer reject any potential writing.
// 3. Waits or cancels the flush job.
@@ -346,11 +352,7 @@ impl RegionWriter {
}
/// Flush task manually
pub async fn flush<S: LogStore>(
&self,
writer_ctx: WriterContext<'_, S>,
ctx: &FlushContext,
) -> Result<()> {
pub async fn flush(&self, writer_ctx: WriterContext<'_, S>, ctx: &FlushContext) -> Result<()> {
let mut inner = self.inner.lock().await;
if !ctx.force {
@@ -369,17 +371,19 @@ impl RegionWriter {
}
/// Compact manually.
pub async fn compact<S: LogStore>(
&self,
writer_ctx: WriterContext<'_, S>,
ctx: CompactContext,
) -> Result<()> {
pub async fn compact(&self, request: WriterCompactRequest<S>) -> Result<()> {
let mut inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
let sst_write_buffer_size = inner.engine_config.sst_write_buffer_size;
inner
.manual_compact(writer_ctx, ctx, sst_write_buffer_size)
.manual_compact(
request,
self.compaction_picker.clone(),
self.compaction_scheduler.clone(),
sst_write_buffer_size,
)
.await
}
@@ -397,12 +401,25 @@ impl RegionWriter {
// Methods for tests.
#[cfg(test)]
impl RegionWriter {
impl<S> RegionWriter<S>
where
S: LogStore,
{
pub(crate) async fn write_buffer_size(&self) -> usize {
self.inner.lock().await.write_buffer_size
}
}
/// Structs needed by triggering a compaction.
pub struct WriterCompactRequest<S: LogStore> {
pub shared_data: SharedDataRef,
pub sst_layer: AccessLayerRef,
pub manifest: RegionManifest,
pub wal: Wal<S>,
pub region_writer: RegionWriterRef<S>,
pub compact_ctx: CompactContext,
}
pub struct WriterContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub flush_strategy: &'a FlushStrategyRef,
@@ -410,7 +427,7 @@ pub struct WriterContext<'a, S: LogStore> {
pub compaction_scheduler: &'a CompactionSchedulerRef<S>,
pub sst_layer: &'a AccessLayerRef,
pub wal: &'a Wal<S>,
pub writer: &'a RegionWriterRef,
pub writer: &'a RegionWriterRef<S>,
pub manifest: &'a RegionManifest,
pub compaction_picker: CompactionPickerRef<S>,
}
@@ -797,49 +814,49 @@ impl WriterInner {
async fn manual_compact<S: LogStore>(
&mut self,
writer_ctx: WriterContext<'_, S>,
compact_ctx: CompactContext,
request: WriterCompactRequest<S>,
compaction_picker: CompactionPickerRef<S>,
compaction_scheduler: CompactionSchedulerRef<S>,
sst_write_buffer_size: ReadableSize,
) -> Result<()> {
let region_id = writer_ctx.shared.id();
let compaction_time_window = writer_ctx
.shared
let region_id = request.shared_data.id();
let compaction_time_window = request
.shared_data
.version_control
.current()
.ssts()
.compaction_time_window();
let mut compaction_request = CompactionRequestImpl {
region_id,
sst_layer: writer_ctx.sst_layer.clone(),
writer: writer_ctx.writer.clone(),
shared: writer_ctx.shared.clone(),
manifest: writer_ctx.manifest.clone(),
wal: writer_ctx.wal.clone(),
sst_layer: request.sst_layer,
writer: request.region_writer,
shared: request.shared_data.clone(),
manifest: request.manifest,
wal: request.wal,
ttl: self.ttl,
compaction_time_window,
sender: None,
picker: writer_ctx.compaction_picker.clone(),
picker: compaction_picker,
sst_write_buffer_size,
// manual compaction does not reschedule itself.
reschedule_on_finish: false,
};
let compaction_scheduler = writer_ctx.compaction_scheduler.clone();
let shared_data = writer_ctx.shared.clone();
let compaction_scheduler = compaction_scheduler.clone();
logging::info!(
"Manual compact, region_id: {}, compact_ctx: {:?}",
region_id,
compact_ctx
request.compact_ctx
);
if compact_ctx.wait {
if request.compact_ctx.wait {
let (sender, receiver) = oneshot::channel();
compaction_request.sender = Some(sender);
if schedule_compaction(
shared_data,
request.shared_data,
compaction_scheduler,
compaction_request,
compact_ctx.max_files_in_l0,
) {
receiver
.await
@@ -847,10 +864,9 @@ impl WriterInner {
}
} else {
let _ = schedule_compaction(
shared_data,
request.shared_data,
compaction_scheduler,
compaction_request,
compact_ctx.max_files_in_l0,
);
}
@@ -882,23 +898,9 @@ pub(crate) fn schedule_compaction<S: LogStore>(
shared_data: SharedDataRef,
compaction_scheduler: CompactionSchedulerRef<S>,
compaction_request: CompactionRequestImpl<S>,
max_files_in_l0: usize,
) -> bool {
let region_id = shared_data.id();
let level0_file_num = shared_data
.version_control
.current()
.ssts()
.level(0)
.file_num();
if level0_file_num <= max_files_in_l0 {
logging::debug!(
"No enough SST files in level 0 (threshold: {}), skip compaction",
max_files_in_l0
);
return false;
}
match compaction_scheduler.schedule(compaction_request) {
Ok(scheduled) => {
logging::info!(

View File

@@ -18,6 +18,7 @@ mod stream_writer;
use std::collections::HashMap;
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -226,11 +227,25 @@ fn new_level_meta_vec() -> LevelMetaVec {
.unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL
}
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct FileHandle {
inner: Arc<FileHandleInner>,
}
impl Debug for FileHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandle")
.field("file_id", &self.inner.meta.file_id)
.field("region_id", &self.inner.meta.region_id)
.field("time_range", &self.inner.meta.time_range)
.field("size", &self.inner.meta.file_size)
.field("level", &self.inner.meta.level)
.field("compacting", &self.inner.compacting)
.field("deleted", &self.inner.deleted)
.finish()
}
}
impl FileHandle {
pub fn new(
meta: FileMeta,

View File

@@ -20,8 +20,9 @@ use log_store::LogConfig;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::manifest::Manifest;
use store_api::storage::{CompactionStrategy, TwcsOptions};
use crate::compaction::noop::NoopCompactionScheduler;
use crate::compaction::CompactionHandler;
use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE};
use crate::engine::{self, RegionMap};
use crate::file_purger::noop::NoopFilePurgeHandler;
@@ -33,7 +34,7 @@ use crate::scheduler::{LocalScheduler, SchedulerConfig};
use crate::sst::FsAccessLayer;
fn log_store_dir(store_dir: &str) -> String {
format!("{store_dir}/logstore")
format!("{store_dir}/wal")
}
/// Create a new StoreConfig for test.
@@ -96,7 +97,10 @@ pub async fn new_store_config_with_object_store(
};
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let compaction_scheduler = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
CompactionHandler::default(),
));
// We use an empty region map so actually the background worker of the picker is disabled.
let regions = Arc::new(RegionMap::new());
let flush_scheduler = Arc::new(
@@ -125,7 +129,7 @@ pub async fn new_store_config_with_object_store(
file_purger,
ttl: None,
write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize,
compaction_strategy: Default::default(),
compaction_strategy: CompactionStrategy::Twcs(TwcsOptions::default()),
},
regions,
)