feat(mito): Implements compaction scheduler (#2413)

* feat: allow multiple waiters in compaction request

* feat: compaction status wip

* feat: track region status in compaction scheduler

* feat: impl compaction scheduler

* feat: call compaction scheduler

* feat: remove status if nothing to compact

* feat: schedule compaction after flush

* feat: set compacting to false after compaction finished

* refactor: flush status only needs region id and version control

* refactor: schedule_compaction don't need region as argument

* test: test flush/scheduler for empty requests

* test: trigger compaction in test

* feat: notify scheduler on truncated

* chore: Apply suggestions from code review

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Yingwen
2023-09-17 17:15:11 +08:00
committed by GitHub
parent 693e8de83a
commit 55ae5e5b66
20 changed files with 768 additions and 167 deletions

View File

@@ -18,19 +18,23 @@ mod picker;
mod test_util;
mod twcs;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::debug;
use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::error::Result;
use crate::region::version::VersionRef;
use crate::request::{OptionOutputTx, WorkerRequest};
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::FilePurgerRef;
@@ -40,8 +44,10 @@ pub struct CompactionRequest {
pub(crate) access_layer: AccessLayerRef,
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) waiter: OptionOutputTx,
/// Waiters of the compaction request.
pub(crate) waiters: Vec<OutputTx>,
pub(crate) file_purger: FilePurgerRef,
}
@@ -49,6 +55,13 @@ impl CompactionRequest {
pub(crate) fn region_id(&self) -> RegionId {
self.current_version.metadata.region_id
}
/// Push waiter to the request.
pub(crate) fn push_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
}
/// Builds compaction picker according to [CompactionStrategy].
@@ -62,31 +75,307 @@ pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> Compactio
}
}
/// Compaction scheduler tracks and manages compaction tasks.
pub(crate) struct CompactionScheduler {
scheduler: SchedulerRef,
// TODO(hl): maybe tracks region compaction status in CompactionScheduler
/// Compacting regions.
region_status: HashMap<RegionId, CompactionStatus>,
/// Request sender of the worker that this scheduler belongs to.
request_sender: Sender<WorkerRequest>,
}
impl CompactionScheduler {
pub(crate) fn new(scheduler: SchedulerRef) -> Self {
Self { scheduler }
pub(crate) fn new(scheduler: SchedulerRef, request_sender: Sender<WorkerRequest>) -> Self {
Self {
scheduler,
region_status: HashMap::new(),
request_sender,
}
}
/// Schedules a region compaction task.
pub(crate) fn schedule_compaction(&self, req: CompactionRequest) -> Result<()> {
self.scheduler.schedule(Box::pin(async {
// TODO(hl): build picker according to region options.
let picker =
compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default()));
debug!(
"Pick compaction strategy {:?} for region: {}",
picker,
req.region_id()
);
let Some(mut task) = picker.pick(req) else {
return;
};
task.run().await;
}))
/// Schedules a compaction for the region.
pub(crate) fn schedule_compaction(
&mut self,
region_id: RegionId,
version_control: &VersionControlRef,
access_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
waiter: OptionOutputTx,
) -> Result<()> {
let status = self.region_status.entry(region_id).or_insert_with(|| {
CompactionStatus::new(
region_id,
version_control.clone(),
access_layer.clone(),
file_purger.clone(),
)
});
if status.compacting {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
return Ok(());
}
// The region can compact directly.
let request = status.new_compaction_request(self.request_sender.clone(), waiter);
// Mark the region as compacting.
status.compacting = true;
self.schedule_compaction_request(request)
}
/// Notifies the scheduler that the compaction job is finished successfully.
pub(crate) fn on_compaction_finished(&mut self, region_id: RegionId) {
let Some(status) = self.region_status.get_mut(&region_id) else {
return;
};
status.compacting = false;
// We should always try to compact the region until picker returns None.
let request =
status.new_compaction_request(self.request_sender.clone(), OptionOutputTx::none());
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(request) {
error!(e; "Failed to schedule next compaction for region {}", region_id);
}
}
/// Notifies the scheduler that the compaction job is failed.
pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};
// Fast fail: cancels all pending tasks and sends error to their waiters.
status.on_failure(err);
}
/// Notifies the scheduler that the region is dropped.
pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionDroppedSnafu { region_id }.build()),
);
}
/// Notifies the scheduler that the region is closed.
pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
}
/// Notifies the scheduler that the region is truncated.
pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionTruncatedSnafu { region_id }.build()),
);
}
/// Schedules a compaction request.
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> {
// TODO(hl): build picker according to region options.
let picker =
compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default()));
let region_id = request.region_id();
debug!(
"Pick compaction strategy {:?} for region: {}",
picker, region_id
);
let Some(mut task) = picker.pick(request) else {
// Nothing to compact, remove it from the region status map.
self.region_status.remove(&region_id);
return Ok(());
};
// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
task.run().await;
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);
e
})
}
fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(status) = self.region_status.remove(&region_id) else {
return;
};
// Notifies all pending tasks.
status.on_failure(err);
}
}
impl Drop for CompactionScheduler {
fn drop(&mut self) {
for (region_id, status) in self.region_status.drain() {
// We are shutting down so notify all pending tasks.
status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
}
}
}
/// Pending compaction tasks.
struct PendingCompaction {
waiters: Vec<OutputTx>,
}
impl PendingCompaction {
/// Push waiter to the request.
fn push_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
/// Send flush error to waiter.
fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
}
}
}
/// Status of running and pending region compaction tasks.
struct CompactionStatus {
/// Id of the region.
region_id: RegionId,
/// Version control of the region.
version_control: VersionControlRef,
/// Access layer of the region.
access_layer: AccessLayerRef,
/// File purger of the region.
file_purger: FilePurgerRef,
/// Whether a compaction task is running.
compacting: bool,
/// Compaction pending to schedule.
///
/// For simplicity, we merge all pending compaction requests into one.
pending_compaction: Option<PendingCompaction>,
}
impl CompactionStatus {
/// Creates a new [CompactionStatus]
fn new(
region_id: RegionId,
version_control: VersionControlRef,
access_layer: AccessLayerRef,
file_purger: FilePurgerRef,
) -> CompactionStatus {
CompactionStatus {
region_id,
version_control,
access_layer,
file_purger,
compacting: false,
pending_compaction: None,
}
}
/// Merge the watier to the pending compaction.
fn merge_waiter(&mut self, waiter: OptionOutputTx) {
let pending = self
.pending_compaction
.get_or_insert_with(|| PendingCompaction {
waiters: Vec::new(),
});
pending.push_waiter(waiter);
}
fn on_failure(self, err: Arc<Error>) {
if let Some(mut pending) = self.pending_compaction {
pending.on_failure(self.region_id, err.clone());
}
}
/// Creates a new compaction request for compaction picker.
///
/// It consumes all pending compaction waiters.
fn new_compaction_request(
&mut self,
request_sender: Sender<WorkerRequest>,
waiter: OptionOutputTx,
) -> CompactionRequest {
let current_version = self.version_control.current().version;
let mut req = CompactionRequest {
current_version,
access_layer: self.access_layer.clone(),
// TODO(hl): get TTL info from region metadata
ttl: None,
// TODO(hl): get persisted region compaction time window
compaction_time_window: None,
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
};
if let Some(pending) = self.pending_compaction.take() {
req.waiters = pending.waiters;
}
req.push_waiter(waiter);
req
}
}
#[cfg(test)]
mod tests {
use common_query::Output;
use tokio::sync::oneshot;
use super::*;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new();
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
// Nothing to compact.
let version_control = Arc::new(builder.build());
let (output_tx, output_rx) = oneshot::channel();
let waiter = OptionOutputTx::from(output_tx);
scheduler
.schedule_compaction(
builder.region_id(),
&version_control,
&env.access_layer,
&purger,
waiter,
)
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
assert!(scheduler.region_status.is_empty());
// Only one file, picker won't compact it.
let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
let (output_tx, output_rx) = oneshot::channel();
let waiter = OptionOutputTx::from(output_tx);
scheduler
.schedule_compaction(
builder.region_id(),
&version_control,
&env.access_layer,
&purger,
waiter,
)
.unwrap();
let output = output_rx.await.unwrap().unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
assert!(scheduler.region_status.is_empty());
}
}

View File

@@ -35,7 +35,7 @@ use crate::compaction::CompactionRequest;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OptionOutputTx, WorkerRequest,
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
@@ -123,7 +123,7 @@ impl Picker for TwcsPicker {
ttl,
compaction_time_window,
request_sender,
waiter,
waiters,
file_purger,
} = req;
@@ -156,8 +156,10 @@ impl Picker for TwcsPicker {
let outputs = self.build_output(&windows, active_window, time_window_size);
if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact.
waiter.send(Ok(Output::AffectedRows(0)));
// Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
for waiter in waiters {
waiter.send(Ok(Output::AffectedRows(0)));
}
return None;
}
let task = TwcsCompactionTask {
@@ -169,7 +171,7 @@ impl Picker for TwcsPicker {
sst_write_buffer_size: ReadableSize::mb(4),
compaction_time_window: None,
request_sender,
sender: waiter,
waiters,
file_purger,
};
Some(Box::new(task))
@@ -227,8 +229,8 @@ pub(crate) struct TwcsCompactionTask {
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Sender that are used to notify waiters waiting for pending compaction tasks.
pub sender: OptionOutputTx,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
pub waiters: Vec<OutputTx>,
}
impl Debug for TwcsCompactionTask {
@@ -321,10 +323,11 @@ impl TwcsCompactionTask {
/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
self.sender
.send_mut(Err(err.clone()).context(CompactRegionSnafu {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Notifies region worker to handle post-compaction tasks.
@@ -352,7 +355,7 @@ impl CompactionTask for TwcsCompactionTask {
region_id: self.region_id,
compaction_outputs: added,
compacted_files: deleted,
sender: self.sender.take(),
senders: std::mem::take(&mut self.waiters),
file_purger: self.file_purger.clone(),
})
}

View File

@@ -121,10 +121,12 @@ async fn test_compaction_region() {
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 5 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 25..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
let output = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
@@ -132,10 +134,14 @@ async fn test_compaction_region() {
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let stream = engine
.handle_query(region_id, ScanRequest::default())
.await
.unwrap();
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
1,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);

View File

@@ -402,8 +402,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Region {} is truncating, location: {}", region_id, location))]
RegionTruncating {
#[snafu(display("Region {} is truncated, location: {}", region_id, location))]
RegionTruncated {
region_id: RegionId,
location: Location,
},
@@ -516,7 +516,7 @@ impl ErrorExt for Error {
FlushRegion { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
RegionTruncating { .. } => StatusCode::Cancelled,
RegionTruncated { .. } => StatusCode::Cancelled,
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,

View File

@@ -26,12 +26,11 @@ use tokio::sync::mpsc;
use crate::access_layer::AccessLayerRef;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatingSnafu, Result,
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::memtable::MemtableBuilderRef;
use crate::read::Source;
use crate::region::version::{VersionControlData, VersionRef};
use crate::region::MitoRegionRef;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
SenderWriteRequest, WorkerRequest,
@@ -218,10 +217,10 @@ impl RegionFlushTask {
/// Converts the flush task into a background job.
///
/// We must call this in the region worker.
fn into_flush_job(mut self, region: &MitoRegionRef) -> Job {
fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
// Get a version of this region before creating a job to get current
// wal entry id, sequence and immutable memtables.
let version_data = region.version_control.current();
let version_data = version_control.current();
Box::pin(async move {
self.do_flush(version_data).await;
@@ -353,14 +352,15 @@ impl FlushScheduler {
/// Schedules a flush `task` for specific `region`.
pub(crate) fn schedule_flush(
&mut self,
region: &MitoRegionRef,
region_id: RegionId,
version_control: &VersionControlRef,
task: RegionFlushTask,
) -> Result<()> {
debug_assert_eq!(region.region_id, task.region_id);
debug_assert_eq!(region_id, task.region_id);
let version = region.version_control.current().version;
let version = version_control.current().version;
if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() {
debug_assert!(!self.region_status.contains_key(&region.region_id));
debug_assert!(!self.region_status.contains_key(&region_id));
// The region has nothing to flush.
task.on_success();
return Ok(());
@@ -369,8 +369,8 @@ impl FlushScheduler {
// Add this region to status map.
let flush_status = self
.region_status
.entry(region.region_id)
.or_insert_with(|| FlushStatus::new(region.clone()));
.entry(region_id)
.or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
// Checks whether we can flush the region now.
if flush_status.flushing {
// There is already a flush job running.
@@ -378,6 +378,7 @@ impl FlushScheduler {
return Ok(());
}
// TODO(yingwen): We can merge with pending and execute directly.
// If there are pending tasks, then we should push it to pending list.
if flush_status.pending_task.is_some() {
flush_status.merge_task(task);
@@ -385,18 +386,16 @@ impl FlushScheduler {
}
// Now we can flush the region directly.
region
.version_control
.freeze_mutable(&task.memtable_builder);
version_control.freeze_mutable(&task.memtable_builder);
// Submit a flush job.
let job = task.into_flush_job(region);
let job = task.into_flush_job(version_control);
if let Err(e) = self.scheduler.schedule(job) {
// If scheduler returns error, senders in the job will be dropped and waiters
// can get recv errors.
error!(e; "Failed to schedule flush job for region {}", region.region_id);
error!(e; "Failed to schedule flush job for region {}", region_id);
// Remove from region status if we can't submit the task.
self.region_status.remove(&region.region_id);
self.region_status.remove(&region_id);
return Err(e);
}
flush_status.flushing = true;
@@ -435,7 +434,7 @@ impl FlushScheduler {
pending_requests
}
/// Notifies the scheduler that the flush job is finished.
/// Notifies the scheduler that the flush job is failed.
pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
@@ -466,15 +465,15 @@ impl FlushScheduler {
self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
}
/// Notifies the scheduler that the region is truncating.
pub(crate) fn on_region_truncating(&mut self, region_id: RegionId) {
/// Notifies the scheduler that the region is truncated.
pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionTruncatingSnafu { region_id }.build()),
Arc::new(RegionTruncatedSnafu { region_id }.build()),
);
}
pub(crate) fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {
return;
@@ -518,7 +517,7 @@ impl FlushScheduler {
debug_assert!(self
.region_status
.values()
.all(|status| !status.flushing && status.pending_task.is_some()));
.all(|status| status.flushing || status.pending_task.is_some()));
// Get the first region from status map.
let Some(flush_status) = self
@@ -530,9 +529,10 @@ impl FlushScheduler {
};
debug_assert!(!flush_status.flushing);
let task = flush_status.pending_task.take().unwrap();
let region = flush_status.region.clone();
let region_id = flush_status.region_id;
let version_control = flush_status.version_control.clone();
self.schedule_flush(&region, task)
self.schedule_flush(region_id, &version_control, task)
}
}
@@ -550,8 +550,13 @@ impl Drop for FlushScheduler {
/// Tracks running and pending flush tasks and all pending requests of a region.
struct FlushStatus {
/// Current region.
region: MitoRegionRef,
region_id: RegionId,
/// Version control of the region.
version_control: VersionControlRef,
/// There is a flush task running.
///
/// It is possible that a region is not flushing but has pending task if the scheduler
/// doesn't schedules this region.
flushing: bool,
/// Task waiting for next flush.
pending_task: Option<RegionFlushTask>,
@@ -562,9 +567,10 @@ struct FlushStatus {
}
impl FlushStatus {
fn new(region: MitoRegionRef) -> FlushStatus {
fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
FlushStatus {
region,
region_id,
version_control,
flushing: false,
pending_task: None,
pending_ddls: Vec::new(),
@@ -587,14 +593,14 @@ impl FlushStatus {
}
for ddl in self.pending_ddls {
ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
region_id: self.region_id,
}));
}
for write_req in self.pending_writes {
write_req
.sender
.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
region_id: self.region_id,
}));
}
}
@@ -602,7 +608,11 @@ impl FlushStatus {
#[cfg(test)]
mod tests {
use tokio::sync::oneshot;
use super::*;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
#[test]
fn test_get_mutable_limit() {
@@ -656,4 +666,33 @@ mod tests {
manager.reserve_mem(100);
assert!(manager.should_flush_engine());
}
#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new();
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_flush_scheduler();
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let (output_tx, output_rx) = oneshot::channel();
let mut task = RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
senders: Vec::new(),
request_sender: tx,
access_layer: env.access_layer.clone(),
memtable_builder: builder.memtable_builder(),
file_purger: builder.file_purger(),
listener: WorkerListener::default(),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
assert!(scheduler.region_status.is_empty());
let output = output_rx.await.unwrap().unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
assert!(scheduler.region_status.is_empty());
}
}

View File

@@ -20,7 +20,7 @@ pub mod key_values;
pub(crate) mod version;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use common_query::logical_plan::Expr;
@@ -89,45 +89,6 @@ pub trait MemtableBuilder: Send + Sync + fmt::Debug {
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
// TODO(yingwen): Remove it once we port the memtable.
/// Empty memtable for test.
#[derive(Debug, Default)]
pub(crate) struct EmptyMemtable {
/// Id of this memtable.
id: MemtableId,
}
impl EmptyMemtable {
/// Returns a new memtable with specific `id`.
pub(crate) fn new(id: MemtableId) -> EmptyMemtable {
EmptyMemtable { id }
}
}
impl Memtable for EmptyMemtable {
fn id(&self) -> MemtableId {
self.id
}
fn write(&self, _kvs: &KeyValues) -> Result<()> {
Ok(())
}
fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
Box::new(std::iter::empty())
}
fn is_empty(&self) -> bool {
true
}
fn mark_immutable(&self) {}
fn stats(&self) -> MemtableStats {
MemtableStats::default()
}
}
/// Memtable memory allocation tracker.
#[derive(Default)]
pub struct AllocTracker {
@@ -205,21 +166,6 @@ impl Drop for AllocTracker {
}
}
/// Default memtable builder.
#[derive(Debug, Default)]
pub(crate) struct DefaultMemtableBuilder {
/// Next memtable id.
next_id: AtomicU32,
}
impl MemtableBuilder for DefaultMemtableBuilder {
fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(EmptyMemtable::new(
self.next_id.fetch_add(1, Ordering::Relaxed),
))
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -59,6 +59,13 @@ impl Scanner {
Scanner::Seq(seq_scan) => seq_scan.num_memtables(),
}
}
/// Returns SST file ids to scan.
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
match self {
Scanner::Seq(seq_scan) => seq_scan.file_ids(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]

View File

@@ -156,4 +156,9 @@ impl SeqScan {
pub(crate) fn num_files(&self) -> usize {
self.files.len()
}
/// Returns SST file ids to scan.
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
self.files.iter().map(|file| file.file_id()).collect()
}
}

View File

@@ -42,7 +42,8 @@ use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::{
CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result,
CompactRegionSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu,
InvalidRequestSnafu, Result,
};
use crate::memtable::MemtableId;
use crate::sst::file::FileMeta;
@@ -661,15 +662,17 @@ pub(crate) struct CompactionFinished {
pub(crate) compaction_outputs: Vec<FileMeta>,
/// Compacted files that are to be removed from region version.
pub(crate) compacted_files: Vec<FileMeta>,
/// Compaction result sender.
pub(crate) sender: OptionOutputTx,
/// Compaction result senders.
pub(crate) senders: Vec<OutputTx>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
}
impl CompactionFinished {
pub fn on_success(self) {
self.sender.send(Ok(AffectedRows(0)));
for sender in self.senders {
sender.send(Ok(AffectedRows(0)));
}
info!("Successfully compacted region: {}", self.region_id);
}
}
@@ -678,7 +681,12 @@ impl OnFailure for CompactionFinished {
/// Compaction succeeded but failed to update manifest or region's already been dropped,
/// clean compaction output files.
fn on_failure(&mut self, err: Error) {
self.sender.send_mut(Err(err));
let err = Arc::new(err);
for sender in self.senders.drain(..) {
sender.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
for file in &self.compacted_files {
let file_id = file.file_id;
warn!(

View File

@@ -14,6 +14,10 @@
//! Utilities for testing.
pub mod memtable_util;
pub mod scheduler_util;
pub mod version_util;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

View File

@@ -0,0 +1,81 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Memtable test utilities.
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use common_query::logical_plan::Expr;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef,
MemtableStats,
};
/// Empty memtable for test.
#[derive(Debug, Default)]
pub(crate) struct EmptyMemtable {
/// Id of this memtable.
id: MemtableId,
}
impl EmptyMemtable {
/// Returns a new memtable with specific `id`.
pub(crate) fn new(id: MemtableId) -> EmptyMemtable {
EmptyMemtable { id }
}
}
impl Memtable for EmptyMemtable {
fn id(&self) -> MemtableId {
self.id
}
fn write(&self, _kvs: &KeyValues) -> Result<()> {
Ok(())
}
fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
Box::new(std::iter::empty())
}
fn is_empty(&self) -> bool {
true
}
fn mark_immutable(&self) {}
fn stats(&self) -> MemtableStats {
MemtableStats::default()
}
}
/// Empty memtable builder.
#[derive(Debug, Default)]
pub(crate) struct EmptyMemtableBuilder {
/// Next memtable id.
next_id: AtomicU32,
}
impl MemtableBuilder for EmptyMemtableBuilder {
fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(EmptyMemtable::new(
self.next_id.fetch_add(1, Ordering::Relaxed),
))
}
}

View File

@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to mock flush and compaction schedulers.
use std::sync::Arc;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs;
use object_store::ObjectStore;
use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayer, AccessLayerRef};
use crate::compaction::CompactionScheduler;
use crate::flush::FlushScheduler;
use crate::request::WorkerRequest;
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
/// Scheduler mocker.
pub(crate) struct SchedulerEnv {
#[allow(unused)]
path: TempDir,
/// Mock access layer for test.
pub(crate) access_layer: AccessLayerRef,
scheduler: Option<SchedulerRef>,
}
impl SchedulerEnv {
/// Creates a new mocker.
pub(crate) fn new() -> SchedulerEnv {
let path = create_temp_dir("");
let mut builder = Fs::default();
builder.root(path.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
let access_layer = Arc::new(AccessLayer::new("", object_store.clone()));
SchedulerEnv {
path: create_temp_dir(""),
access_layer,
scheduler: None,
}
}
/// Creates a new compaction scheduler.
pub(crate) fn mock_compaction_scheduler(
&self,
request_sender: Sender<WorkerRequest>,
) -> CompactionScheduler {
let scheduler = self.get_scheduler();
CompactionScheduler::new(scheduler, request_sender)
}
/// Creates a new flush scheduler.
pub(crate) fn mock_flush_scheduler(&self) -> FlushScheduler {
let scheduler = self.get_scheduler();
FlushScheduler::new(scheduler)
}
fn get_scheduler(&self) -> SchedulerRef {
self.scheduler
.clone()
.unwrap_or_else(|| Arc::new(LocalScheduler::new(1)))
}
}

View File

@@ -0,0 +1,115 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities to mock version.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use crate::memtable::{MemtableBuilder, MemtableBuilderRef};
use crate::region::version::{Version, VersionBuilder, VersionControl};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
use crate::test_util::new_noop_file_purger;
fn new_region_metadata(region_id: RegionId) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(region_id);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag_0", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.primary_key(vec![2]);
builder.build().unwrap()
}
// Builder to mock a version control.
pub(crate) struct VersionControlBuilder {
metadata: RegionMetadata,
file_purger: FilePurgerRef,
memtable_builder: Arc<EmptyMemtableBuilder>,
files: HashMap<FileId, FileMeta>,
}
impl VersionControlBuilder {
pub(crate) fn new() -> VersionControlBuilder {
VersionControlBuilder {
metadata: new_region_metadata(RegionId::new(1, 1)),
file_purger: new_noop_file_purger(),
memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
files: HashMap::new(),
}
}
pub(crate) fn region_id(&self) -> RegionId {
self.metadata.region_id
}
pub(crate) fn file_purger(&self) -> FilePurgerRef {
self.file_purger.clone()
}
pub(crate) fn memtable_builder(&self) -> MemtableBuilderRef {
self.memtable_builder.clone()
}
pub(crate) fn push_l0_file(&mut self, start_ms: i64, end_ms: i64) -> &mut Self {
let file_id = FileId::random();
self.files.insert(
file_id,
FileMeta {
region_id: self.metadata.region_id,
file_id,
time_range: (
Timestamp::new_millisecond(start_ms),
Timestamp::new_millisecond(end_ms),
),
level: 0,
file_size: 0, // We don't care file size.
},
);
self
}
pub(crate) fn build_version(&self) -> Version {
let metadata = Arc::new(self.metadata.clone());
let mutable = self.memtable_builder.build(&metadata);
VersionBuilder::new(metadata, mutable)
.add_files(self.file_purger.clone(), self.files.values().cloned())
.build()
}
pub(crate) fn build(&self) -> VersionControl {
let version = self.build_version();
VersionControl::new(version)
}
}

View File

@@ -251,7 +251,7 @@ impl<S: LogStore> WorkerStarter<S> {
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
compaction_scheduler: CompactionScheduler::new(self.scheduler),
compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()),
stalled_requests: StalledRequests::default(),
listener: self.listener,
};

View File

@@ -55,7 +55,10 @@ impl<S> RegionWorkerLoop<S> {
// Try to submit a flush task.
let task = self.new_flush_task(&region, FlushReason::Alter);
if let Err(e) = self.flush_scheduler.schedule_flush(&region, task) {
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)
{
// Unable to flush the region, send error to waiter.
sender.send(Err(e));
return;

View File

@@ -33,6 +33,8 @@ impl<S> RegionWorkerLoop<S> {
self.regions.remove_region(region_id);
// Clean flush status.
self.flush_scheduler.on_region_closed(region_id);
// Clean compaction status.
self.compaction_scheduler.on_region_closed(region_id);
info!("Region {} closed", region_id);

View File

@@ -16,9 +16,7 @@ use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::compaction::CompactionRequest;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
@@ -33,8 +31,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
let request = self.new_compaction_request(&region, sender);
if let Err(e) = self.compaction_scheduler.schedule_compaction(request) {
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
&region.version_control,
&region.access_layer,
&region.file_purger,
sender,
) {
error!(e; "Failed to schedule compaction task for region: {}", region_id);
} else {
info!(
@@ -74,31 +77,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.version_control
.apply_edit(edit, &[], region.file_purger.clone());
request.on_success();
// Schedule next compaction if necessary.
self.compaction_scheduler.on_compaction_finished(region_id);
}
/// When compaction fails, we simply log the error.
pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
error!(req.err; "Failed to compact region: {}", req.region_id);
}
/// Creates a new compaction request.
fn new_compaction_request(
&self,
region: &MitoRegionRef,
waiter: OptionOutputTx,
) -> CompactionRequest {
let current_version = region.version_control.current().version;
let access_layer = region.access_layer.clone();
let file_purger = region.file_purger.clone();
CompactionRequest {
current_version,
access_layer,
ttl: None, // TODO(hl): get TTL info from region metadata
compaction_time_window: None, // TODO(hl): get persisted region compaction time window
request_sender: self.sender.clone(),
waiter,
file_purger,
}
self.compaction_scheduler
.on_compaction_failed(req.region_id, req.err);
}
}

View File

@@ -52,6 +52,8 @@ impl<S> RegionWorkerLoop<S> {
self.dropping_regions.insert_region(region.clone());
// Notifies flush scheduler.
self.flush_scheduler.on_region_dropped(region_id);
// Notifies compaction scheduler.
self.compaction_scheduler.on_region_dropped(region_id);
// mark region version as dropped
region.version_control.mark_dropped();

View File

@@ -14,12 +14,12 @@
//! Handling flush related requests.
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::{RegionTruncatingSnafu, Result};
use crate::error::{RegionTruncatedSnafu, Result};
use crate::flush::{FlushReason, RegionFlushTask};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
@@ -39,7 +39,10 @@ impl<S> RegionWorkerLoop<S> {
let mut task = self.new_flush_task(&region, FlushReason::Manual);
task.push_sender(sender);
if let Err(e) = self.flush_scheduler.schedule_flush(&region, task) {
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)
{
error!(e; "Failed to schedule flush task for region {}", region.region_id);
}
}
@@ -90,7 +93,11 @@ impl<S> RegionWorkerLoop<S> {
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task = self.new_flush_task(region, FlushReason::EngineFull);
self.flush_scheduler.schedule_flush(region, task)?;
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
task,
)?;
}
}
@@ -99,7 +106,11 @@ impl<S> RegionWorkerLoop<S> {
if let Some(region) = max_mem_region {
if !self.flush_scheduler.is_flush_requested(region.region_id) {
let task = self.new_flush_task(region, FlushReason::EngineFull);
self.flush_scheduler.schedule_flush(region, task)?;
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
task,
)?;
}
}
@@ -141,7 +152,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let version_data = region.version_control.current();
if let Some(truncated_entry_id) = version_data.version.truncated_entry_id {
if truncated_entry_id >= request.flushed_entry_id {
request.on_failure(RegionTruncatingSnafu { region_id }.build());
request.on_failure(RegionTruncatedSnafu { region_id }.build());
return;
}
}
@@ -198,6 +209,20 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
// Schedules compaction.
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
&region.version_control,
&region.access_layer,
&region.file_purger,
OptionOutputTx::none(),
) {
warn!(
"Failed to schedule compaction after flush, region: {}, err: {}",
region.region_id, e
);
}
self.listener.on_flush_success(region_id);
}
}

View File

@@ -44,8 +44,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region.manifest_manager.update(action_list).await?;
// Notifies flush scheduler.
self.flush_scheduler.on_region_truncating(region_id);
// TODO(DevilExileSu): Notifies compaction scheduler.
self.flush_scheduler.on_region_truncated(region_id);
// Notifies compaction scheduler.
self.compaction_scheduler.on_region_truncated(region_id);
// Reset region's version and mark all SSTs deleted.
region.version_control.truncate(