diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 957d012473..794789d71d 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -30,7 +30,7 @@ use crate::compaction::scheduler::CompactionRequestImpl; use crate::compaction::task::{CompactionOutput, CompactionTask, CompactionTaskImpl}; use crate::error::{Result, TtlCalculationSnafu}; use crate::scheduler::Request; -use crate::sst::{FileHandle, LevelMeta}; +use crate::sst::{FileHandle, FileId, LevelMeta}; /// Picker picks input SST files and builds the compaction task. /// Different compaction strategy may implement different pickers. @@ -152,6 +152,7 @@ impl Picker for LeveledTimeWindowPicker { expired_ssts, sst_write_buffer_size: req.sst_write_buffer_size, compaction_time_window, + reschedule_on_finish: req.reschedule_on_finish, })); } @@ -186,6 +187,7 @@ impl LeveledTimeWindowPicker { debug!("File bucket:{}, file groups: {:?}", time_window, buckets); results.extend(buckets.into_iter().map(|(bound, files)| CompactionOutput { + output_file_id: FileId::random(), output_level: 1, time_window_bound: bound, time_window_sec: time_window, diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 1a37bdbcea..170b9c2f53 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -56,7 +56,7 @@ impl Request for CompactionRequestImpl { pub struct CompactionRequestImpl { pub region_id: RegionId, pub sst_layer: AccessLayerRef, - pub writer: RegionWriterRef, + pub writer: RegionWriterRef, pub shared: SharedDataRef, pub manifest: RegionManifest, pub wal: Wal, @@ -66,6 +66,8 @@ pub struct CompactionRequestImpl { pub sender: Option>>, pub picker: CompactionPickerRef, pub sst_write_buffer_size: ReadableSize, + /// Whether to immediately reschedule another compaction when finished. + pub reschedule_on_finish: bool, } impl CompactionRequestImpl { diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index b7cec2a36d..eec4aecf12 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -17,14 +17,17 @@ use std::fmt::{Debug, Formatter}; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error, info, timer}; +use itertools::Itertools; +use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::compaction::writer::build_sst_reader; +use crate::error; use crate::error::Result; use crate::manifest::action::RegionEdit; use crate::manifest::region::RegionManifest; -use crate::region::{RegionWriterRef, SharedDataRef}; +use crate::region::{CompactContext, RegionWriterRef, SharedDataRef, WriterCompactRequest}; use crate::schema::RegionSchemaRef; use crate::sst::{ AccessLayerRef, FileHandle, FileId, FileMeta, Level, Source, SstInfo, WriteOptions, @@ -42,13 +45,14 @@ pub struct CompactionTaskImpl { pub schema: RegionSchemaRef, pub sst_layer: AccessLayerRef, pub outputs: Vec, - pub writer: RegionWriterRef, + pub writer: RegionWriterRef, pub shared_data: SharedDataRef, pub wal: Wal, pub manifest: RegionManifest, pub expired_ssts: Vec, pub sst_write_buffer_size: ReadableSize, pub compaction_time_window: Option, + pub reschedule_on_finish: bool, } impl Debug for CompactionTaskImpl { @@ -77,6 +81,16 @@ impl CompactionTaskImpl { let sst_write_buffer_size = self.sst_write_buffer_size; compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); + info!( + "Compaction output [{}]-> {}", + output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .join(","), + output.output_file_id + ); + // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. futs.push(async move { output @@ -90,10 +104,14 @@ impl CompactionTaskImpl { let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); for _ in 0..MAX_PARALLEL_COMPACTION { if let Some(task) = futs.pop() { - task_chunk.push(task); + task_chunk.push(common_runtime::spawn_bg(task)); } } - let metas = futures::future::try_join_all(task_chunk).await?; + let metas = futures::future::try_join_all(task_chunk) + .await + .context(error::JoinSnafu)? + .into_iter() + .collect::>>()?; outputs.extend(metas.into_iter().flatten()); } @@ -155,12 +173,40 @@ impl CompactionTask for CompactionTaskImpl { "Compacting SST files, input: {:?}, output: {:?}, window: {:?}", input_ids, output_ids, self.compaction_time_window ); - self.write_manifest_and_apply(output, compacted) + + let no_output = output.is_empty(); + let write_result = self + .write_manifest_and_apply(output, compacted) .await .map_err(|e| { error!(e; "Failed to update region manifest: {}", self.shared_data.name()); e - }) + }); + + if !no_output && self.reschedule_on_finish { + // only reschedule another compaction if current compaction has output and it's + // triggered by flush. + if let Err(e) = self + .writer + .compact(WriterCompactRequest { + shared_data: self.shared_data.clone(), + sst_layer: self.sst_layer.clone(), + manifest: self.manifest.clone(), + wal: self.wal.clone(), + region_writer: self.writer.clone(), + compact_ctx: CompactContext { wait: false }, + }) + .await + { + error!(e; "Failed to schedule a compaction after compaction, region id: {}", self.shared_data.id()); + } else { + info!( + "Immediately schedule another compaction for region: {}", + self.shared_data.id() + ); + } + } + write_result } } @@ -168,6 +214,7 @@ impl CompactionTask for CompactionTaskImpl { /// and a many-to-one compaction from level n+1 to level n+1. #[derive(Debug)] pub struct CompactionOutput { + pub output_file_id: FileId, /// Compaction output file level. pub output_level: Level, /// The left bound of time window. @@ -206,13 +253,12 @@ impl CompactionOutput { ) .await?; - let output_file_id = FileId::random(); let opts = WriteOptions { sst_write_buffer_size, }; - - Ok(sst_layer - .write_sst(output_file_id, Source::Reader(reader), &opts) + let _timer = timer!(crate::metrics::MERGE_ELAPSED); + let meta = sst_layer + .write_sst(self.output_file_id, Source::Reader(reader), &opts) .await? .map( |SstInfo { @@ -221,12 +267,13 @@ impl CompactionOutput { .. }| FileMeta { region_id, - file_id: output_file_id, + file_id: self.output_file_id, time_range, level: self.output_level, file_size, }, - )) + ); + Ok(meta) } } diff --git a/src/storage/src/compaction/twcs.rs b/src/storage/src/compaction/twcs.rs index 1d327d3dca..17b7096aba 100644 --- a/src/storage/src/compaction/twcs.rs +++ b/src/storage/src/compaction/twcs.rs @@ -28,7 +28,7 @@ use store_api::logstore::LogStore; use crate::compaction::picker::get_expired_ssts; use crate::compaction::task::CompactionOutput; use crate::compaction::{infer_time_bucket, CompactionRequestImpl, CompactionTaskImpl, Picker}; -use crate::sst::{FileHandle, LevelMeta}; +use crate::sst::{FileHandle, FileId, LevelMeta}; /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. @@ -76,6 +76,7 @@ impl TwcsPicker { if let Some(active_window) = active_window && *window == active_window { if files.len() > self.max_active_window_files { output.push(CompactionOutput { + output_file_id: FileId::random(), output_level: 1, // we only have two levels and always compact to l1 time_window_bound: *window, time_window_sec: window_size, @@ -85,18 +86,21 @@ impl TwcsPicker { strict_window: false, }); } else { - debug!("Active window not present or no enough files in active window {:?}", active_window); + debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window); } } else { // not active writing window if files.len() > self.max_inactive_window_files { output.push(CompactionOutput { + output_file_id: FileId::random(), output_level: 1, time_window_bound: *window, time_window_sec: window_size, inputs: files.clone(), strict_window: false, }); + } else { + debug!("No enough files, current: {}, max_inactive_window_files: {}", files.len(), self.max_inactive_window_files) } } } @@ -142,6 +146,10 @@ impl Picker for TwcsPicker { ); let outputs = self.build_output(&windows, active_window, time_window_size); + + if outputs.is_empty() && expired_ssts.is_empty() { + return Ok(None); + } let task = CompactionTaskImpl { schema: req.schema(), sst_layer: req.sst_layer.clone(), @@ -153,6 +161,7 @@ impl Picker for TwcsPicker { expired_ssts, sst_write_buffer_size: req.sst_write_buffer_size, compaction_time_window: Some(time_window_size), + reschedule_on_finish: req.reschedule_on_finish, }; Ok(Some(task)) } diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index c483022fe1..039698504f 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -528,6 +528,16 @@ pub enum Error { source: table::error::Error, location: Location, }, + + #[snafu(display( + "Failed to join spawned tasks, source: {}, location: {}", + source, + location + ))] + JoinError { + source: JoinError, + location: Location, + }, } pub type Result = std::result::Result; @@ -628,6 +638,7 @@ impl ErrorExt for Error { TtlCalculation { source, .. } => source.status_code(), ConvertColumnsToRows { .. } | SortArrays { .. } => StatusCode::Unexpected, BuildPredicate { source, .. } => source.status_code(), + JoinError { .. } => StatusCode::Unexpected, } } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 873b8015de..901d7c2851 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -225,7 +225,7 @@ pub struct FlushJob { /// Sst access layer of the region. pub sst_layer: AccessLayerRef, /// Region writer, used to persist log entry that points to the latest manifest file. - pub writer: RegionWriterRef, + pub writer: RegionWriterRef, /// Region write-ahead logging, used to write data/meta to the log file. pub wal: Wal, /// Region manifest service, used to persist metadata. diff --git a/src/storage/src/flush/scheduler.rs b/src/storage/src/flush/scheduler.rs index b319b67477..6c7c8bb5ac 100644 --- a/src/storage/src/flush/scheduler.rs +++ b/src/storage/src/flush/scheduler.rs @@ -96,7 +96,7 @@ pub struct FlushRegionRequest { /// Sst access layer of the region. pub sst_layer: AccessLayerRef, /// Region writer, used to persist log entry that points to the latest manifest file. - pub writer: RegionWriterRef, + pub writer: RegionWriterRef, /// Region write-ahead logging, used to write data/meta to the log file. pub wal: Wal, /// Region manifest service, used to persist metadata. @@ -149,6 +149,8 @@ impl From<&FlushRegionRequest> for CompactionRequestImpl { sender: None, picker: req.compaction_picker.clone(), sst_write_buffer_size: req.engine_config.sst_write_buffer_size, + // compaction triggered by flush always reschedules + reschedule_on_finish: true, } } } @@ -331,13 +333,22 @@ async fn execute_flush_region( let max_files_in_l0 = req.engine_config.max_files_in_l0; let shared_data = req.shared.clone(); - // If flush is success, schedule a compaction request for this region. - let _ = region::schedule_compaction( - shared_data, - compaction_scheduler, - compaction_request, - max_files_in_l0, - ); + let level0_file_num = shared_data + .version_control + .current() + .ssts() + .level(0) + .file_num(); + if level0_file_num <= max_files_in_l0 { + logging::debug!( + "No enough SST files in level 0 (threshold: {}), skip compaction", + max_files_in_l0 + ); + } else { + // If flush is success, schedule a compaction request for this region. + let _ = + region::schedule_compaction(shared_data, compaction_scheduler, compaction_request); + } // Complete the request. FlushRequest::Region { req, sender }.complete(Ok(())); diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index 9cc4b37a04..d05c3176e6 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -32,6 +32,8 @@ pub const REGION_COUNT: &str = "storage.region_count"; pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed"; /// Elapsed time of a compact job. pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed"; +/// Elapsed time for merging SST files. +pub const MERGE_ELAPSED: &str = "storage.compaction.merge.elapsed"; /// Global write buffer size in bytes. pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes"; /// Elapsed time of inserting memtable. diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index de75b8e7b9..a94aa4f3a4 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -51,7 +51,9 @@ use crate::memtable::MemtableBuilderRef; use crate::metadata::{RegionMetaImpl, RegionMetadata, RegionMetadataRef}; pub(crate) use crate::region::writer::schedule_compaction; use crate::region::writer::DropContext; -pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext}; +pub use crate::region::writer::{ + AlterContext, RegionWriter, RegionWriterRef, WriterCompactRequest, WriterContext, +}; use crate::schema::compat::CompatWrite; use crate::snapshot::SnapshotImpl; use crate::sst::AccessLayerRef; @@ -176,16 +178,11 @@ pub type RecoveredMetadataMap = BTreeMap CompactContext { - CompactContext { - wait: true, - max_files_in_l0: 1, - } + CompactContext { wait: true } } } @@ -238,6 +235,7 @@ impl RegionImpl { let version_control = VersionControl::with_version(version); let wal = Wal::new(id, store_config.log_store); + let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy); let inner = Arc::new(RegionInner { shared: Arc::new(SharedData { id, @@ -250,12 +248,14 @@ impl RegionImpl { store_config.engine_config.clone(), store_config.ttl, store_config.write_buffer_size, + store_config.compaction_scheduler.clone(), + compaction_picker.clone(), )), wal, flush_strategy: store_config.flush_strategy, flush_scheduler: store_config.flush_scheduler, compaction_scheduler: store_config.compaction_scheduler, - compaction_picker: compaction_strategy_to_picker(&store_config.compaction_strategy), + compaction_picker, sst_layer: store_config.sst_layer, manifest: store_config.manifest, }); @@ -334,14 +334,16 @@ impl RegionImpl { last_flush_millis: AtomicI64::new(0), }); + let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy); let writer = Arc::new(RegionWriter::new( store_config.memtable_builder, store_config.engine_config.clone(), store_config.ttl, store_config.write_buffer_size, + store_config.compaction_scheduler.clone(), + compaction_picker.clone(), )); - let compaction_picker = compaction_strategy_to_picker(&store_config.compaction_strategy); let writer_ctx = WriterContext { shared: &shared, flush_strategy: &store_config.flush_strategy, @@ -646,7 +648,7 @@ pub type SharedDataRef = Arc; struct RegionInner { shared: SharedDataRef, - writer: RegionWriterRef, + writer: RegionWriterRef, wal: Wal, flush_strategy: FlushStrategyRef, flush_scheduler: FlushSchedulerRef, @@ -763,18 +765,16 @@ impl RegionInner { } /// Compact the region manually. - async fn compact(&self, ctx: CompactContext) -> Result<()> { - let writer_ctx = WriterContext { - shared: &self.shared, - flush_strategy: &self.flush_strategy, - flush_scheduler: &self.flush_scheduler, - compaction_scheduler: &self.compaction_scheduler, - sst_layer: &self.sst_layer, - wal: &self.wal, - writer: &self.writer, - manifest: &self.manifest, - compaction_picker: self.compaction_picker.clone(), - }; - self.writer.compact(writer_ctx, ctx).await + async fn compact(&self, compact_ctx: CompactContext) -> Result<()> { + self.writer + .compact(WriterCompactRequest { + shared_data: self.shared.clone(), + sst_layer: self.sst_layer.clone(), + manifest: self.manifest.clone(), + wal: self.wal.clone(), + region_writer: self.writer.clone(), + compact_ctx, + }) + .await } } diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 9e67eff6f0..78f425fc50 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -50,13 +50,13 @@ use crate::version::{VersionControl, VersionControlRef, VersionEdit}; use crate::wal::Wal; use crate::write_batch::WriteBatch; -pub type RegionWriterRef = Arc; +pub type RegionWriterRef = Arc>; // TODO(yingwen): Add benches for write and support group commit to improve write throughput. /// Region writer manages all write operations to the region. #[derive(Debug)] -pub struct RegionWriter { +pub struct RegionWriter { // To avoid dead lock, we need to ensure the lock order is: inner -> version_mutex. /// Inner writer guarded by write lock, the write lock is used to ensure /// all write operations are serialized. @@ -65,15 +65,23 @@ pub struct RegionWriter { /// /// Increasing committed sequence should be guarded by this lock. version_mutex: Mutex<()>, + + compaction_scheduler: CompactionSchedulerRef, + compaction_picker: CompactionPickerRef, } -impl RegionWriter { +impl RegionWriter +where + S: LogStore, +{ pub fn new( memtable_builder: MemtableBuilderRef, config: Arc, ttl: Option, write_buffer_size: usize, - ) -> RegionWriter { + compaction_scheduler: CompactionSchedulerRef, + compaction_picker: CompactionPickerRef, + ) -> RegionWriter { RegionWriter { inner: Mutex::new(WriterInner::new( memtable_builder, @@ -82,11 +90,13 @@ impl RegionWriter { write_buffer_size, )), version_mutex: Mutex::new(()), + compaction_scheduler, + compaction_picker, } } /// Write to region in the write lock. - pub async fn write( + pub async fn write( &self, ctx: &WriteContext, request: WriteBatch, @@ -102,7 +112,7 @@ impl RegionWriter { } /// Replay data to memtables. - pub async fn replay( + pub async fn replay( &self, recovered_metadata: RecoveredMetadataMap, writer_ctx: WriterContext<'_, S>, @@ -114,7 +124,7 @@ impl RegionWriter { } /// Write and apply the region edit. - pub(crate) async fn write_edit_and_apply( + pub(crate) async fn write_edit_and_apply( &self, wal: &Wal, shared: &SharedDataRef, @@ -172,11 +182,7 @@ impl RegionWriter { } /// Alter schema of the region. - pub async fn alter( - &self, - alter_ctx: AlterContext<'_, S>, - request: AlterRequest, - ) -> Result<()> { + pub async fn alter(&self, alter_ctx: AlterContext<'_, S>, request: AlterRequest) -> Result<()> { // To alter the schema, we need to acquire the write lock first, so we could // avoid other writers write to the region and switch the memtable safely. // Another potential benefit is that the write lock also protect against concurrent @@ -239,7 +245,7 @@ impl RegionWriter { /// Allocate a sequence and persist the manifest version using that sequence to the wal. /// /// This method should be protected by the `version_mutex`. - async fn persist_manifest_version( + async fn persist_manifest_version( &self, wal: &Wal, version_control: &VersionControlRef, @@ -279,7 +285,7 @@ impl RegionWriter { Ok(()) } - pub async fn on_drop(&self, drop_ctx: DropContext<'_, S>) -> Result<()> { + pub async fn on_drop(&self, drop_ctx: DropContext<'_, S>) -> Result<()> { // 1. Acquires the write lock. // 2. Close writer reject any potential writing. // 3. Waits or cancels the flush job. @@ -346,11 +352,7 @@ impl RegionWriter { } /// Flush task manually - pub async fn flush( - &self, - writer_ctx: WriterContext<'_, S>, - ctx: &FlushContext, - ) -> Result<()> { + pub async fn flush(&self, writer_ctx: WriterContext<'_, S>, ctx: &FlushContext) -> Result<()> { let mut inner = self.inner.lock().await; if !ctx.force { @@ -369,17 +371,19 @@ impl RegionWriter { } /// Compact manually. - pub async fn compact( - &self, - writer_ctx: WriterContext<'_, S>, - ctx: CompactContext, - ) -> Result<()> { + pub async fn compact(&self, request: WriterCompactRequest) -> Result<()> { let mut inner = self.inner.lock().await; ensure!(!inner.is_closed(), error::ClosedRegionSnafu); let sst_write_buffer_size = inner.engine_config.sst_write_buffer_size; + inner - .manual_compact(writer_ctx, ctx, sst_write_buffer_size) + .manual_compact( + request, + self.compaction_picker.clone(), + self.compaction_scheduler.clone(), + sst_write_buffer_size, + ) .await } @@ -397,12 +401,25 @@ impl RegionWriter { // Methods for tests. #[cfg(test)] -impl RegionWriter { +impl RegionWriter +where + S: LogStore, +{ pub(crate) async fn write_buffer_size(&self) -> usize { self.inner.lock().await.write_buffer_size } } +/// Structs needed by triggering a compaction. +pub struct WriterCompactRequest { + pub shared_data: SharedDataRef, + pub sst_layer: AccessLayerRef, + pub manifest: RegionManifest, + pub wal: Wal, + pub region_writer: RegionWriterRef, + pub compact_ctx: CompactContext, +} + pub struct WriterContext<'a, S: LogStore> { pub shared: &'a SharedDataRef, pub flush_strategy: &'a FlushStrategyRef, @@ -410,7 +427,7 @@ pub struct WriterContext<'a, S: LogStore> { pub compaction_scheduler: &'a CompactionSchedulerRef, pub sst_layer: &'a AccessLayerRef, pub wal: &'a Wal, - pub writer: &'a RegionWriterRef, + pub writer: &'a RegionWriterRef, pub manifest: &'a RegionManifest, pub compaction_picker: CompactionPickerRef, } @@ -797,49 +814,49 @@ impl WriterInner { async fn manual_compact( &mut self, - writer_ctx: WriterContext<'_, S>, - compact_ctx: CompactContext, + request: WriterCompactRequest, + compaction_picker: CompactionPickerRef, + compaction_scheduler: CompactionSchedulerRef, sst_write_buffer_size: ReadableSize, ) -> Result<()> { - let region_id = writer_ctx.shared.id(); - let compaction_time_window = writer_ctx - .shared + let region_id = request.shared_data.id(); + let compaction_time_window = request + .shared_data .version_control .current() .ssts() .compaction_time_window(); let mut compaction_request = CompactionRequestImpl { region_id, - sst_layer: writer_ctx.sst_layer.clone(), - writer: writer_ctx.writer.clone(), - shared: writer_ctx.shared.clone(), - manifest: writer_ctx.manifest.clone(), - wal: writer_ctx.wal.clone(), + sst_layer: request.sst_layer, + writer: request.region_writer, + shared: request.shared_data.clone(), + manifest: request.manifest, + wal: request.wal, ttl: self.ttl, compaction_time_window, sender: None, - picker: writer_ctx.compaction_picker.clone(), + picker: compaction_picker, sst_write_buffer_size, + // manual compaction does not reschedule itself. + reschedule_on_finish: false, }; - let compaction_scheduler = writer_ctx.compaction_scheduler.clone(); - let shared_data = writer_ctx.shared.clone(); - + let compaction_scheduler = compaction_scheduler.clone(); logging::info!( "Manual compact, region_id: {}, compact_ctx: {:?}", region_id, - compact_ctx + request.compact_ctx ); - if compact_ctx.wait { + if request.compact_ctx.wait { let (sender, receiver) = oneshot::channel(); compaction_request.sender = Some(sender); if schedule_compaction( - shared_data, + request.shared_data, compaction_scheduler, compaction_request, - compact_ctx.max_files_in_l0, ) { receiver .await @@ -847,10 +864,9 @@ impl WriterInner { } } else { let _ = schedule_compaction( - shared_data, + request.shared_data, compaction_scheduler, compaction_request, - compact_ctx.max_files_in_l0, ); } @@ -882,23 +898,9 @@ pub(crate) fn schedule_compaction( shared_data: SharedDataRef, compaction_scheduler: CompactionSchedulerRef, compaction_request: CompactionRequestImpl, - max_files_in_l0: usize, ) -> bool { let region_id = shared_data.id(); - let level0_file_num = shared_data - .version_control - .current() - .ssts() - .level(0) - .file_num(); - if level0_file_num <= max_files_in_l0 { - logging::debug!( - "No enough SST files in level 0 (threshold: {}), skip compaction", - max_files_in_l0 - ); - return false; - } match compaction_scheduler.schedule(compaction_request) { Ok(scheduled) => { logging::info!( diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 3236b456fd..0412ae560d 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -18,6 +18,7 @@ mod stream_writer; use std::collections::HashMap; use std::fmt; +use std::fmt::{Debug, Formatter}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -226,11 +227,25 @@ fn new_level_meta_vec() -> LevelMetaVec { .unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct FileHandle { inner: Arc, } +impl Debug for FileHandle { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("FileHandle") + .field("file_id", &self.inner.meta.file_id) + .field("region_id", &self.inner.meta.region_id) + .field("time_range", &self.inner.meta.time_range) + .field("size", &self.inner.meta.file_size) + .field("level", &self.inner.meta.level) + .field("compacting", &self.inner.compacting) + .field("deleted", &self.inner.deleted) + .finish() + } +} + impl FileHandle { pub fn new( meta: FileMeta, diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index ac94a19bd1..45ddc177b5 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -20,8 +20,9 @@ use log_store::LogConfig; use object_store::services::Fs; use object_store::ObjectStore; use store_api::manifest::Manifest; +use store_api::storage::{CompactionStrategy, TwcsOptions}; -use crate::compaction::noop::NoopCompactionScheduler; +use crate::compaction::CompactionHandler; use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE}; use crate::engine::{self, RegionMap}; use crate::file_purger::noop::NoopFilePurgeHandler; @@ -33,7 +34,7 @@ use crate::scheduler::{LocalScheduler, SchedulerConfig}; use crate::sst::FsAccessLayer; fn log_store_dir(store_dir: &str) -> String { - format!("{store_dir}/logstore") + format!("{store_dir}/wal") } /// Create a new StoreConfig for test. @@ -96,7 +97,10 @@ pub async fn new_store_config_with_object_store( }; let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); - let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); + let compaction_scheduler = Arc::new(LocalScheduler::new( + SchedulerConfig::default(), + CompactionHandler::default(), + )); // We use an empty region map so actually the background worker of the picker is disabled. let regions = Arc::new(RegionMap::new()); let flush_scheduler = Arc::new( @@ -125,7 +129,7 @@ pub async fn new_store_config_with_object_store( file_purger, ttl: None, write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize, - compaction_strategy: Default::default(), + compaction_strategy: CompactionStrategy::Twcs(TwcsOptions::default()), }, regions, )