feat(mito2): compaction (#2317)

* feat: compaction component

* feat: mito2 compaction

* Avoid building time range predicates when merge SST files since in TWCS we don't enforce strict time window.

* fix: some CR comments

* minor: change CompactionRequest::senders to an option

* chore: handle compaction finish error

* feat: integrate compaction into region worker

* chore: rebase upstream

* fix: Some CR comments

* chore: Apply suggestions from code review

* style: fix clippy

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Lei, HUANG
2023-09-11 15:11:56 +08:00
committed by Ruihang Xia
parent a4604afde5
commit 3331e3158c
23 changed files with 1551 additions and 48 deletions

1
Cargo.lock generated
View File

@@ -5479,6 +5479,7 @@ dependencies = [
"dashmap",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datatypes",
"futures",
"humantime-serde",

View File

@@ -32,6 +32,7 @@ common-test-util = { workspace = true, optional = true }
common-time = { workspace = true }
dashmap = "5.4"
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion.workspace = true
datatypes = { workspace = true }
futures.workspace = true

View File

@@ -74,6 +74,7 @@ impl AccessLayer {
}
/// Returns a new parquet writer to write the SST for specific `file_id`.
// TODO(hl): maybe rename to [sst_writer].
pub(crate) fn write_sst(
&self,
file_id: FileId,

View File

@@ -0,0 +1,94 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod output;
mod picker;
#[cfg(test)]
mod test_util;
mod twcs;
use std::sync::Arc;
use std::time::Duration;
use common_query::Output;
use common_telemetry::debug;
pub use picker::CompactionPickerRef;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use tokio::sync::{mpsc, oneshot};
use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::error;
use crate::error::Result;
use crate::region::version::VersionRef;
use crate::request::WorkerRequest;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::FilePurgerRef;
/// Region compaction request.
pub struct CompactionRequest {
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) waiter: Option<oneshot::Sender<error::Result<Output>>>,
pub(crate) file_purger: FilePurgerRef,
}
impl CompactionRequest {
pub(crate) fn region_id(&self) -> RegionId {
self.current_version.metadata.region_id
}
}
/// Builds compaction picker according to [CompactionStrategy].
pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds,
)) as Arc<_>,
}
}
pub(crate) struct CompactionScheduler {
scheduler: SchedulerRef,
// TODO(hl): maybe tracks region compaction status in CompactionScheduler
}
impl CompactionScheduler {
pub(crate) fn new(scheduler: SchedulerRef) -> Self {
Self { scheduler }
}
/// Schedules a region compaction task.
pub(crate) fn schedule_compaction(&self, req: CompactionRequest) -> Result<()> {
self.scheduler.schedule(Box::pin(async {
// TODO(hl): build picker according to region options.
let picker =
compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default()));
debug!(
"Pick compaction strategy {:?} for region: {}",
picker,
req.region_id()
);
let Some(mut task) = picker.pick(req) else {
return;
};
task.run().await;
}))
}
}

View File

@@ -0,0 +1,88 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::readable_size::ReadableSize;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::error;
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::sst::parquet::{SstInfo, WriteOptions};
#[derive(Debug)]
pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// The left bound of time window.
pub time_window_bound: i64,
/// Time window size in seconds.
pub time_window_sec: i64,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
}
impl CompactionOutput {
pub(crate) async fn build(
&self,
region_id: RegionId,
schema: RegionMetadataRef,
sst_layer: AccessLayerRef,
sst_write_buffer_size: ReadableSize,
) -> error::Result<Option<FileMeta>> {
let reader = build_sst_reader(schema.clone(), sst_layer.clone(), &self.inputs).await?;
let opts = WriteOptions {
write_buffer_size: sst_write_buffer_size,
..Default::default()
};
// TODO(hl): measure merge elapsed time.
let mut writer = sst_layer.write_sst(self.output_file_id, schema, Source::Reader(reader));
let meta = writer.write_all(&opts).await?.map(
|SstInfo {
time_range,
file_size,
..
}| {
FileMeta {
region_id,
file_id: self.output_file_id,
time_range,
level: self.output_level,
file_size,
}
},
);
Ok(meta)
}
}
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(
schema: RegionMetadataRef,
sst_layer: AccessLayerRef,
inputs: &[FileHandle],
) -> error::Result<BoxedBatchReader> {
SeqScan::new(sst_layer, ProjectionMapper::all(&schema)?)
.with_files(inputs.to_vec())
.build_reader()
.await
}

View File

@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::sync::Arc;
use crate::compaction::CompactionRequest;
pub type CompactionPickerRef = Arc<dyn Picker + Send + Sync>;
#[async_trait::async_trait]
pub trait CompactionTask: Debug + Send + Sync + 'static {
async fn run(&mut self);
}
/// Picker picks input SST files and builds the compaction task.
/// Different compaction strategy may implement different pickers.
pub trait Picker: Debug + Send + 'static {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>>;
}
pub struct PickerContext {
compaction_time_window: Option<i64>,
}
impl PickerContext {
pub fn with(compaction_time_window: Option<i64>) -> Self {
Self {
compaction_time_window,
}
}
pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}
}

View File

@@ -0,0 +1,41 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::Timestamp;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level};
use crate::test_util::new_noop_file_purger;
/// Test util to create file handles.
pub fn new_file_handle(
file_id: FileId,
start_ts_millis: i64,
end_ts_millis: i64,
level: Level,
) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id,
time_range: (
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
),
level,
file_size: 0,
},
file_purger,
)
}

View File

@@ -0,0 +1,843 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_query::Output;
use common_telemetry::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use tokio::sync::oneshot::Sender;
use crate::access_layer::AccessLayerRef;
use crate::compaction::output::CompactionOutput;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, WorkerRequest};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
const MAX_PARALLEL_COMPACTION: usize = 8;
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
/// candidates.
pub struct TwcsPicker {
max_active_window_files: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
}
impl Debug for TwcsPicker {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TwcsPicker")
.field("max_active_window_files", &self.max_active_window_files)
.field("max_inactive_window_files", &self.max_inactive_window_files)
.finish()
}
}
impl TwcsPicker {
pub fn new(
max_active_window_files: usize,
max_inactive_window_files: usize,
time_window_seconds: Option<i64>,
) -> Self {
Self {
max_inactive_window_files,
max_active_window_files,
time_window_seconds,
}
}
/// Builds compaction output from files.
/// For active writing window, we allow for at most `max_active_window_files` files to alleviate
/// fragmentation. For other windows, we allow at most 1 file at each window.
fn build_output(
&self,
time_windows: &BTreeMap<i64, Vec<FileHandle>>,
active_window: Option<i64>,
window_size: i64,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
if let Some(active_window) = active_window && *window == active_window {
if files.len() > self.max_active_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1, // we only have two levels and always compact to l1
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
});
} else {
debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window);
}
} else {
// not active writing window
if files.len() > self.max_inactive_window_files {
output.push(CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
time_window_bound: *window,
time_window_sec: window_size,
inputs: files.clone(),
});
} else {
debug!("No enough files, current: {}, max_inactive_window_files: {}", files.len(), self.max_inactive_window_files)
}
}
}
output
}
}
impl Picker for TwcsPicker {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>> {
let CompactionRequest {
current_version,
access_layer,
ttl,
compaction_time_window,
request_sender,
waiter: waiters,
file_purger,
} = req;
let region_metadata = current_version.metadata.clone();
let region_id = region_metadata.region_id;
let levels = current_version.ssts.levels();
let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis());
if !expired_ssts.is_empty() {
info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
// here we mark expired SSTs as compacting to avoid them being picked.
expired_ssts.iter().for_each(|f| f.set_compacting(true));
}
let time_window_size = compaction_time_window
.or(self.time_window_seconds)
.unwrap_or_else(|| {
let inferred = infer_time_bucket(levels[0].files());
info!(
"Compaction window for region {} is not present, inferring from files: {:?}",
region_id, inferred
);
inferred
});
// Find active window from files in level 0.
let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size);
// Assign files to windows
let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(&windows, active_window, time_window_size);
if outputs.is_empty() && expired_ssts.is_empty() {
return None;
}
let task = TwcsCompactionTask {
region_id,
schema: region_metadata,
sst_layer: access_layer,
outputs,
expired_ssts,
sst_write_buffer_size: ReadableSize::mb(4),
compaction_time_window: None,
request_sender,
sender: waiters,
file_purger,
};
Some(Box::new(task))
}
}
/// Assigns files to windows with predefined window size (in seconds) by their max timestamps.
fn assign_to_windows<'a>(
files: impl Iterator<Item = &'a FileHandle>,
time_window_size: i64,
) -> BTreeMap<i64, Vec<FileHandle>> {
let mut windows: BTreeMap<i64, Vec<FileHandle>> = BTreeMap::new();
// Iterates all files and assign to time windows according to max timestamp
for file in files {
let (_, end) = file.time_range();
let time_window = end
.convert_to(TimeUnit::Second)
.unwrap()
.value()
.align_to_ceil_by_bucket(time_window_size)
.unwrap_or(i64::MIN);
windows.entry(time_window).or_default().push(file.clone());
}
windows
}
/// Finds the latest active writing window among all files.
/// Returns `None` when there are no files or all files are corrupted.
fn find_latest_window_in_seconds<'a>(
files: impl Iterator<Item = &'a FileHandle>,
time_window_size: i64,
) -> Option<i64> {
let mut latest_timestamp = None;
for f in files {
let (_, end) = f.time_range();
if let Some(latest) = latest_timestamp && end > latest {
latest_timestamp = Some(end);
} else {
latest_timestamp = Some(end);
}
}
latest_timestamp
.and_then(|ts| ts.convert_to_ceil(TimeUnit::Second))
.and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
}
pub(crate) struct TwcsCompactionTask {
pub region_id: RegionId,
pub schema: RegionMetadataRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub sst_write_buffer_size: ReadableSize,
pub compaction_time_window: Option<i64>,
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Sender that are used to notify waiters waiting for pending compaction tasks.
pub sender: Option<Sender<error::Result<Output>>>,
}
impl Debug for TwcsCompactionTask {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TwcsCompactionTask")
.field("region_id", &self.region_id)
.field("outputs", &self.outputs)
.field("expired_ssts", &self.expired_ssts)
.field("compaction_time_window", &self.compaction_time_window)
.finish()
}
}
impl Drop for TwcsCompactionTask {
fn drop(&mut self) {
self.mark_files_compacting(false)
}
}
impl TwcsCompactionTask {
fn mark_files_compacting(&self, compacting: bool) {
self.outputs
.iter()
.flat_map(|o| o.inputs.iter())
.for_each(|f| f.set_compacting(compacting))
}
/// Merges all SST files.
/// Returns `(output files, input files)`.
async fn merge_ssts(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
let mut futs = Vec::with_capacity(self.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum());
let region_id = self.region_id;
for output in self.outputs.drain(..) {
let schema = self.schema.clone();
let sst_layer = self.sst_layer.clone();
let sst_write_buffer_size = self.sst_write_buffer_size;
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
info!(
"Compaction output [{}]-> {}",
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.collect::<Vec<_>>()
.join(","),
output.output_file_id
);
// TODO(hl): Maybe spawn to runtime to exploit in-job parallelism.
futs.push(async move {
output
.build(region_id, schema, sst_layer, sst_write_buffer_size)
.await
});
}
let mut outputs = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION);
for _ in 0..MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_bg(task));
}
}
let metas = futures::future::try_join_all(task_chunk)
.await
.context(error::JoinSnafu)?
.into_iter()
.collect::<error::Result<Vec<_>>>()?;
outputs.extend(metas.into_iter().flatten());
}
let inputs = compacted_inputs.into_iter().collect();
Ok((outputs, inputs))
}
async fn handle_compaction(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
self.mark_files_compacting(true);
let (output, mut compacted) = self.merge_ssts().await.map_err(|e| {
error!(e; "Failed to compact region: {}", self.region_id);
e
})?;
compacted.extend(self.expired_ssts.iter().map(FileHandle::meta));
Ok((output, compacted))
}
/// Handles compaction success.
fn early_success(&mut self) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(Ok(Output::AffectedRows(0)).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Notifies region worker to handle post-compaction tasks.
async fn send_to_worker(&self, request: WorkerRequest) {
if let Err(e) = self.request_sender.send(request).await {
error!(
"Failed to notify compaction job status for region {}, request: {:?}",
self.region_id, e.0
);
}
}
}
#[async_trait::async_trait]
impl CompactionTask for TwcsCompactionTask {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
Ok((added, deleted)) => {
info!(
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}",
added, deleted, self.compaction_time_window
);
self.early_success();
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
compaction_outputs: added,
compacted_files: deleted,
sender: self.sender.take(),
file_purger: self.file_purger.clone(),
})
}
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.region_id);
let err = Arc::new(e);
// notify compaction waiters
self.on_failure(err.clone());
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: self.region_id,
err,
})
}
};
self.send_to_worker(WorkerRequest::Background {
region_id: self.region_id,
notify,
})
.await;
// TODO(hl): handle reschedule
}
}
/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>) -> i64 {
let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second);
let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second);
for f in files {
let (start, end) = f.time_range();
min_ts = min_ts.min(start);
max_ts = max_ts.max(end);
}
// safety: Convert whatever timestamp into seconds will not cause overflow.
let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();
max_sec
.checked_sub(min_sec)
.map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow.
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}
/// Finds files that can be compacted in given level.
/// Currently they're files that is not currently under compaction.
#[inline]
fn find_compactable_files(level: &LevelMeta) -> Vec<FileHandle> {
level.files().filter(|f| !f.compacting()).cloned().collect()
}
/// Calculates timestamp span between start and end timestamp.
fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<i64> {
assert!(start_sec <= end_sec);
// if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
// be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
while start_aligned < end_aligned {
res.push(start_aligned);
start_aligned += bucket_sec;
}
res.push(end_aligned);
res
}
/// Calculates buckets for files. If file does not contain a time range in metadata, it will be
/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
/// so that all files without timestamp can be compacted together.
fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap<i64, Vec<FileHandle>> {
let mut buckets = HashMap::new();
for file in files {
let (start, end) = file.time_range();
let bounds = file_time_bucket_span(
start.convert_to(TimeUnit::Second).unwrap().value(),
end.convert_to(TimeUnit::Second).unwrap().value(),
bucket_sec,
);
for bound in bounds {
buckets
.entry(bound)
.or_insert_with(Vec::new)
.push(file.clone());
}
}
buckets
}
pub(crate) struct TimeBuckets([i64; 7]);
impl TimeBuckets {
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
/// Returns the max bucket if no such bucket can be found.
fn fit_time_bucket(&self, span_sec: i64) -> i64 {
assert!(span_sec >= 0);
match self.0.binary_search(&span_sec) {
Ok(idx) => self.0[idx],
Err(idx) => {
if idx < self.0.len() {
self.0[idx]
} else {
self.0.last().copied().unwrap()
}
}
}
}
#[cfg(test)]
fn get(&self, idx: usize) -> i64 {
self.0[idx]
}
fn max(&self) -> i64 {
self.0.last().copied().unwrap()
}
}
/// A set of predefined time buckets.
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
60 * 60, // one hour
2 * 60 * 60, // two hours
12 * 60 * 60, // twelve hours
24 * 60 * 60, // one day
7 * 24 * 60 * 60, // one week
365 * 24 * 60 * 60, // one year
10 * 365 * 24 * 60 * 60, // ten years
]);
/// Finds all expired SSTs across levels.
fn get_expired_ssts(
levels: &[LevelMeta],
ttl: Option<Duration>,
now: Timestamp,
) -> Vec<FileHandle> {
let Some(ttl) = ttl else {
return vec![];
};
let expire_time = match now.sub_duration(ttl) {
Ok(expire_time) => expire_time,
Err(e) => {
error!(e; "Failed to calculate region TTL expire time");
return vec![];
}
};
levels
.iter()
.flat_map(|l| l.get_expired_files(&expire_time).into_iter())
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use crate::compaction::test_util::new_file_handle;
use crate::sst::file::Level;
#[test]
fn test_get_latest_window_in_seconds() {
assert_eq!(
Some(1),
find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1)
);
assert_eq!(
Some(1),
find_latest_window_in_seconds(
[new_file_handle(FileId::random(), 0, 1000, 0)].iter(),
1
)
);
assert_eq!(
Some(-9223372036854000),
find_latest_window_in_seconds(
[new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(),
3600,
)
);
assert_eq!(
(i64::MAX / 10000000 + 1) * 10000,
find_latest_window_in_seconds(
[new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(),
10000,
)
.unwrap()
);
}
#[test]
fn test_assign_to_windows() {
let windows = assign_to_windows(
[
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
new_file_handle(FileId::random(), 0, 999, 0),
]
.iter(),
3,
);
assert_eq!(5, windows.get(&0).unwrap().len());
let files = [FileId::random(); 3];
let windows = assign_to_windows(
[
new_file_handle(files[0], -2000, -3, 0),
new_file_handle(files[1], 0, 2999, 0),
new_file_handle(files[2], 50, 10001, 0),
]
.iter(),
3,
);
assert_eq!(files[0], windows.get(&0).unwrap().get(0).unwrap().file_id());
assert_eq!(files[1], windows.get(&3).unwrap().get(0).unwrap().file_id());
assert_eq!(
files[2],
windows.get(&12).unwrap().get(0).unwrap().file_id()
);
}
struct CompactionPickerTestCase {
window_size: i64,
input_files: Vec<FileHandle>,
expected_outputs: Vec<ExpectedOutput>,
}
impl CompactionPickerTestCase {
fn check(&self) {
let windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output =
TwcsPicker::new(4, 1, None).build_output(&windows, active_window, self.window_size);
let output = output
.iter()
.map(|o| {
let input_file_ids =
o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
)
})
.collect::<Vec<_>>();
let expected = self
.expected_outputs
.iter()
.map(|o| {
let input_file_ids = o
.input_files
.iter()
.map(|idx| self.input_files[*idx].file_id())
.collect::<HashSet<_>>();
(
input_file_ids,
o.output_level,
o.time_window_sec,
o.time_window_bound,
)
})
.collect::<Vec<_>>();
assert_eq!(expected, output);
}
}
struct ExpectedOutput {
input_files: Vec<usize>,
output_level: Level,
time_window_sec: i64,
time_window_bound: i64,
strict_window: bool,
}
#[test]
fn test_build_twcs_output() {
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0), //active windows
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
]
.to_vec(),
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
}],
}
.check();
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
input_files: [
new_file_handle(file_ids[0], -2000, -3, 0),
new_file_handle(file_ids[1], -3000, -100, 0),
new_file_handle(file_ids[2], 0, 2999, 0),
new_file_handle(file_ids[3], 50, 2998, 0),
new_file_handle(file_ids[4], 11, 2990, 0),
new_file_handle(file_ids[5], 50, 4998, 0),
]
.to_vec(),
expected_outputs: vec![
ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
time_window_sec: 3,
time_window_bound: 0,
strict_window: false,
},
ExpectedOutput {
input_files: vec![2, 3, 4],
output_level: 1,
time_window_sec: 3,
time_window_bound: 3,
strict_window: false,
},
],
}
.check();
}
#[test]
fn test_time_bucket() {
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1));
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60));
assert_eq!(
TIME_BUCKETS.get(1),
TIME_BUCKETS.fit_time_bucket(60 * 60 + 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2))
);
assert_eq!(
TIME_BUCKETS.get(3),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
);
assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
}
#[test]
fn test_infer_time_buckets() {
assert_eq!(
TIME_BUCKETS.get(0),
infer_time_bucket(
[
new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0),
new_file_handle(FileId::random(), 1, 10_000, 0)
]
.iter()
)
);
}
fn check_bucket_calculation(
bucket_sec: i64,
files: Vec<FileHandle>,
expected: &[(i64, &[FileId])],
) {
let res = calculate_time_buckets(bucket_sec, &files);
let expected = expected
.iter()
.map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();
for (bucket, file_ids) in expected {
let actual = res
.get(&bucket)
.unwrap()
.iter()
.map(|f| f.file_id())
.collect();
assert_eq!(
file_ids, actual,
"bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}",
);
}
}
#[test]
fn test_calculate_time_buckets() {
let file_id_a = FileId::random();
let file_id_b = FileId::random();
// simple case, files with disjoint
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_b])],
);
// files across buckets
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])],
);
check_bucket_calculation(
10,
new_file_handles(&[(file_id_a, 0, 10000)]),
&[(0, &[file_id_a]), (10, &[file_id_a])],
);
// file with an large time range
let file_id_array = &[file_id_a];
let expected = (0..(TIME_BUCKETS.get(4) / TIME_BUCKETS.get(0)))
.map(|b| (b * TIME_BUCKETS.get(0), file_id_array as _))
.collect::<Vec<_>>();
check_bucket_calculation(
TIME_BUCKETS.get(0),
new_file_handles(&[(file_id_a, 0, TIME_BUCKETS.get(4) * 1000)]),
&expected,
);
}
#[test]
fn test_time_bucket_span() {
assert_eq!(vec![0], file_time_bucket_span(1, 9, 10));
assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10));
assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10));
assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10));
}
#[test]
fn test_time_bucket_span_large() {
assert_eq!(
vec![
(i64::MAX - 10).align_by_bucket(10).unwrap(),
i64::MAX.align_by_bucket(10).unwrap(),
],
file_time_bucket_span(i64::MAX - 10, i64::MAX, 10)
);
for bucket in 1..100 {
assert_eq!(
vec![
i64::MIN,
(i64::MIN + bucket).align_by_bucket(bucket).unwrap()
],
file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket)
);
}
}
fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec<FileHandle> {
input
.iter()
.map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end, 0))
.collect()
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}

View File

@@ -17,6 +17,8 @@
#[cfg(test)]
mod close_test;
#[cfg(test)]
mod compaction_test;
#[cfg(test)]
mod create_test;
#[cfg(test)]
mod drop_test;

View File

@@ -0,0 +1,143 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::ops::Range;
use api::v1::{ColumnSchema, Rows};
use common_query::Output;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
};
async fn put_and_flush(
engine: &MitoEngine,
region_id: RegionId,
column_schemas: &[ColumnSchema],
rows: Range<usize>,
) {
let rows = Rows {
schema: column_schemas.to_vec(),
rows: build_rows(rows.start, rows.end),
};
put_rows(engine, region_id, rows).await;
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
.unwrap()
else {
unreachable!()
};
assert_eq!(0, rows);
}
async fn delete_and_flush(
engine: &MitoEngine,
region_id: RegionId,
column_schemas: &[ColumnSchema],
rows: Range<usize>,
) {
let row_cnt = rows.len();
let rows = Rows {
schema: column_schemas.to_vec(),
rows: build_rows(rows.start, rows.end),
};
let deleted = engine
.handle_request(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
.await
.unwrap();
let Output::AffectedRows(rows_affected) = deleted else {
unreachable!()
};
assert_eq!(row_cnt, rows_affected);
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.await
.unwrap()
else {
unreachable!()
};
assert_eq!(0, rows);
}
async fn collect_stream_ts(stream: SendableRecordBatchStream) -> HashSet<i64> {
let mut res = HashSet::new();
let batches = RecordBatches::try_collect(stream).await.unwrap();
for batch in batches {
let ts_col = batch
.column_by_name("ts")
.unwrap()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
res.extend(ts_col.iter_data().map(|t| t.unwrap().0.value()));
}
res
}
#[tokio::test]
async fn test_compaction_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 25..30).await;
let output = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let stream = engine
.handle_query(region_id, ScanRequest::default())
.await
.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<HashSet<_>>(), vec);
}

View File

@@ -435,6 +435,28 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Failed to build time range predicate for compaction, location: {}, source: {}",
location,
source
))]
BuildCompactionPredicate {
source: table::error::Error,
location: Location,
},
#[snafu(display(
"Failed to compact region {}, location: {}, source:{}",
region_id,
location,
source
))]
CompactRegion {
region_id: RegionId,
source: Arc<Error>,
location: Location,
},
#[snafu(display(
"Failed to compat readers for region {}, reason: {}, location: {}",
region_id,
@@ -511,7 +533,9 @@ impl ErrorExt for Error {
FlushRegion { source, .. } => source.status_code(),
RegionDropped { .. } => StatusCode::Cancelled,
RegionClosed { .. } => StatusCode::Cancelled,
BuildCompactionPredicate { .. } => StatusCode::Internal,
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
}
}

View File

@@ -16,11 +16,15 @@
//!
//! Mito is the a region engine to store timeseries data.
#![feature(let_chains)]
#[cfg(any(test, feature = "test"))]
pub mod test_util;
// TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito.
mod access_layer;
#[allow(dead_code)]
mod compaction;
pub mod config;
pub mod engine;
pub mod error;

View File

@@ -116,7 +116,7 @@ impl MergeReaderBuilder {
MergeReaderBuilder::default()
}
/// Pushs a batch reader to sources.
/// Pushes a batch reader to sources.
pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
self.sources.push(Source::Reader(reader));
self

View File

@@ -39,7 +39,7 @@ impl Scanner {
/// Returns a [SendableRecordBatchStream] to retrieve scan results.
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build().await,
Scanner::Seq(seq_scan) => seq_scan.build_stream().await,
}
}
}
@@ -158,7 +158,7 @@ impl ScanRegion {
None => ProjectionMapper::all(&self.version.metadata)?,
};
let seq_scan = SeqScan::new(self.access_layer.clone(), mapper, self.request)
let seq_scan = SeqScan::new(self.access_layer.clone(), mapper)
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
.with_memtables(memtables)

View File

@@ -22,7 +22,6 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_time::range::TimestampRange;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::predicate::Predicate;
use crate::access_layer::AccessLayerRef;
@@ -31,7 +30,7 @@ use crate::memtable::MemtableRef;
use crate::read::compat::{self, CompatReader};
use crate::read::merge::MergeReaderBuilder;
use crate::read::projection::ProjectionMapper;
use crate::read::BatchReader;
use crate::read::{BatchReader, BoxedBatchReader};
use crate::sst::file::FileHandle;
/// Scans a region and returns rows in a sorted sequence.
@@ -42,10 +41,6 @@ pub struct SeqScan {
access_layer: AccessLayerRef,
/// Maps projected Batches to RecordBatches.
mapper: Arc<ProjectionMapper>,
/// Original scan request to scan memtable.
// TODO(yingwen): Remove this if memtable::iter() takes another struct.
request: ScanRequest,
/// Time range filter for time index.
time_range: Option<TimestampRange>,
/// Predicate to push down.
@@ -59,11 +54,7 @@ pub struct SeqScan {
impl SeqScan {
/// Creates a new [SeqScan].
#[must_use]
pub(crate) fn new(
access_layer: AccessLayerRef,
mapper: ProjectionMapper,
request: ScanRequest,
) -> SeqScan {
pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> SeqScan {
SeqScan {
access_layer,
mapper: Arc::new(mapper),
@@ -71,7 +62,6 @@ impl SeqScan {
predicate: None,
memtables: Vec::new(),
files: Vec::new(),
request,
}
}
@@ -104,11 +94,32 @@ impl SeqScan {
}
/// Builds a stream for the query.
pub async fn build(&self) -> Result<SendableRecordBatchStream> {
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut reader = self.build_reader().await?;
// Creates a stream to poll the batch reader and convert batch into record batch.
let mapper = self.mapper.clone();
let stream = try_stream! {
while let Some(batch) = reader.next_batch().await.map_err(BoxedError::new).context(ExternalSnafu)? {
yield mapper.convert(&batch)?;
}
};
let stream = Box::pin(RecordBatchStreamAdaptor::new(
self.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
}
/// Builds a [BoxedBatchReader] from sequential scan.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut builder = MergeReaderBuilder::new();
for mem in &self.memtables {
let iter = mem.iter(Some(self.mapper.column_ids()), &self.request.filters);
// TODO(hl): pass filters once memtable supports filter pushdown.
let iter = mem.iter(Some(self.mapper.column_ids()), &[]);
builder.push_batch_iter(iter);
}
for file in &self.files {
@@ -130,20 +141,7 @@ impl SeqScan {
builder.push_batch_reader(Box::new(compat_reader));
}
}
let mut reader = builder.build().await?;
// Creates a stream to poll the batch reader and convert batch into record batch.
let mapper = self.mapper.clone();
let stream = try_stream! {
while let Some(batch) = reader.next_batch().await.map_err(BoxedError::new).context(ExternalSnafu)? {
yield mapper.convert(&batch)?;
}
};
let stream = Box::pin(RecordBatchStreamAdaptor::new(
self.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
Ok(Box::new(builder.build().await?))
}
}

View File

@@ -25,6 +25,9 @@ use api::helper::{
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_base::readable_size::ReadableSize;
use common_query::Output;
use common_query::Output::AffectedRows;
use common_telemetry::tracing::log::info;
use common_telemetry::warn;
use datatypes::prelude::DataType;
use prost::Message;
use smallvec::SmallVec;
@@ -64,7 +67,7 @@ impl Default for RegionOptions {
RegionOptions {
write_buffer_size: Some(DEFAULT_WRITE_BUFFER_SIZE),
ttl: None,
compaction_strategy: CompactionStrategy::LeveledTimeWindow,
compaction_strategy: CompactionStrategy::default(),
}
}
}
@@ -503,10 +506,14 @@ pub(crate) struct SenderDdlRequest {
/// Notification from a background job.
#[derive(Debug)]
pub(crate) enum BackgroundNotify {
/// Flush is finished.
/// Flush has finished.
FlushFinished(FlushFinished),
/// Flush is failed.
/// Flush has failed.
FlushFailed(FlushFailed),
/// Compaction has finished.
CompactionFinished(CompactionFinished),
/// Compaction has failed.
CompactionFailed(CompactionFailed),
}
/// Notifies a flush job is finished.
@@ -559,6 +566,54 @@ pub(crate) struct FlushFailed {
pub(crate) err: Arc<Error>,
}
/// Notifies a compaction job has finished.
#[derive(Debug)]
pub(crate) struct CompactionFinished {
/// Region id.
pub(crate) region_id: RegionId,
/// Compaction output files that are to be added to region version.
pub(crate) compaction_outputs: Vec<FileMeta>,
/// Compacted files that are to be removed from region version.
pub(crate) compacted_files: Vec<FileMeta>,
/// Compaction result sender.
pub(crate) sender: Option<oneshot::Sender<Result<Output>>>,
/// File purger for cleaning files on failure.
pub(crate) file_purger: FilePurgerRef,
}
impl CompactionFinished {
pub fn on_success(self) {
if let Some(sender) = self.sender {
let _ = sender.send(Ok(AffectedRows(0)));
}
info!("Successfully compacted region: {}", self.region_id);
}
/// Compaction succeeded but failed to update manifest or region's already been dropped,
/// clean compaction output files.
pub fn on_failure(self, _err: Error) {
for file in &self.compacted_files {
let file_id = file.file_id;
warn!(
"Cleaning region {} compaction output file: {}",
self.region_id, file_id
);
self.file_purger.send_request(PurgeRequest {
region_id: self.region_id,
file_id,
});
}
}
}
/// A failing compaction result.
#[derive(Debug)]
pub(crate) struct CompactionFailed {
pub(crate) region_id: RegionId,
/// The error source of the failure.
pub(crate) err: Arc<Error>,
}
#[cfg(test)]
mod tests {
use api::v1::{Row, SemanticType};

View File

@@ -137,6 +137,18 @@ impl FileHandle {
pub fn mark_deleted(&self) {
self.inner.deleted.store(true, Ordering::Relaxed);
}
pub fn compacting(&self) -> bool {
self.inner.compacting.load(Ordering::Relaxed)
}
pub fn set_compacting(&self, compacting: bool) {
self.inner.compacting.store(compacting, Ordering::Relaxed);
}
pub fn meta(&self) -> FileMeta {
self.inner.meta.clone()
}
}
/// Inner data of [FileHandle].

View File

@@ -17,6 +17,8 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use common_time::Timestamp;
use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL};
use crate::sst::file_purger::FilePurgerRef;
@@ -104,6 +106,22 @@ impl LevelMeta {
files: HashMap::new(),
}
}
/// Returns expired SSTs from current level.
pub fn get_expired_files(&self, expire_time: &Timestamp) -> Vec<FileHandle> {
self.files
.values()
.filter(|v| {
let (_, end) = v.time_range();
&end < expire_time
})
.cloned()
.collect()
}
pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
self.files.values()
}
}
impl fmt::Debug for LevelMeta {

View File

@@ -46,8 +46,20 @@ use crate::error::Result;
use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use crate::worker::WorkerGroup;
#[derive(Debug)]
pub(crate) struct NoopFilePurger;
impl FilePurger for NoopFilePurger {
fn send_request(&self, _request: PurgeRequest) {}
}
pub(crate) fn new_noop_file_purger() -> FilePurgerRef {
Arc::new(NoopFilePurger {})
}
/// Env to test mito engine.
pub struct TestEnv {
/// Path to store data.
@@ -447,7 +459,7 @@ impl WriteBufferManager for MockWriteBufferManager {
}
}
fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {
pub(crate) fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {
api::v1::ColumnSchema {
column_name: metadata.column_schema.name.clone(),
datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone())

View File

@@ -15,6 +15,7 @@
//! Structs and utilities for writing regions.
mod handle_close;
mod handle_compaction;
mod handle_create;
mod handle_drop;
mod handle_flush;
@@ -36,6 +37,7 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
@@ -246,7 +248,8 @@ impl<S: LogStore> WorkerStarter<S> {
))),
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler),
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
compaction_scheduler: CompactionScheduler::new(self.scheduler),
stalled_requests: StalledRequests::default(),
listener: self.listener,
};
@@ -397,6 +400,8 @@ struct RegionWorkerLoop<S> {
write_buffer_manager: WriteBufferManagerRef,
/// Schedules background flush requests.
flush_scheduler: FlushScheduler,
/// Scheduler for compaction tasks.
compaction_scheduler: CompactionScheduler,
/// Stalled write requests.
stalled_requests: StalledRequests,
/// Event listener for tests.
@@ -489,7 +494,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_flush_request(ddl.region_id, ddl.sender).await;
continue;
}
DdlRequest::Compact(_) => todo!(),
DdlRequest::Compact(_) => {
self.handle_compaction_request(ddl.region_id, ddl.sender);
continue;
}
};
if let Some(sender) = ddl.sender {
@@ -506,6 +514,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_flush_finished(region_id, req).await
}
BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
BackgroundNotify::CompactionFinished(req) => {
self.handle_compaction_finished(region_id, req).await
}
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
}
}
}

View File

@@ -0,0 +1,110 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::Output;
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::compaction::CompactionRequest;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles compaction request submitted to region worker.
pub(crate) fn handle_compaction_request(
&mut self,
region_id: RegionId,
sender: Option<oneshot::Sender<Result<Output>>>,
) {
let Some(region) = self.regions.get_region(region_id) else {
if let Some(sender) = sender {
let _ = sender.send(RegionNotFoundSnafu { region_id }.fail());
}
return;
};
let request = self.new_compaction_request(&region, sender);
if let Err(e) = self.compaction_scheduler.schedule_compaction(request) {
error!(e; "Failed to schedule compaction task for region: {}", region_id);
} else {
info!(
"Successfully scheduled compaction task for region: {}",
region_id
);
}
}
/// Handles compaction finished, update region version and manifest, deleted compacted files.
pub(crate) async fn handle_compaction_finished(
&mut self,
region_id: RegionId,
mut request: CompactionFinished,
) {
let Some(region) = self.regions.get_region(region_id) else {
request.on_failure(RegionNotFoundSnafu { region_id }.build());
return;
};
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: std::mem::take(&mut request.compaction_outputs),
files_to_remove: std::mem::take(&mut request.compacted_files),
compaction_time_window: None, // TODO(hl): update window maybe
flushed_entry_id: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
error!(e; "Failed to update manifest, region: {}", region_id);
request.on_failure(e);
return;
}
// Apply edit to region's version.
region
.version_control
.apply_edit(edit, region.file_purger.clone());
request.on_success();
}
/// When compaction fails, we simply log the error.
pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
error!(req.err; "Failed to compact region: {}", req.region_id);
}
/// Creates a new compaction request.
fn new_compaction_request(
&self,
region: &MitoRegionRef,
waiter: Option<oneshot::Sender<Result<Output>>>,
) -> CompactionRequest {
let current_version = region.version_control.current().version;
let access_layer = region.access_layer.clone();
let file_purger = region.file_purger.clone();
CompactionRequest {
current_version,
access_layer,
ttl: None, // TODO(hl): get TTL info from region metadata
compaction_time_window: None, // TODO(hl): get persisted region compaction time window
request_sender: self.sender.clone(),
waiter,
file_purger,
}
}
}

View File

@@ -112,9 +112,6 @@ pub fn compaction_strategy_to_picker<S: LogStore>(
strategy: &CompactionStrategy,
) -> CompactionPickerRef<S> {
match strategy {
CompactionStrategy::LeveledTimeWindow => {
Arc::new(LeveledTimeWindowPicker::default()) as Arc<_>
}
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,

View File

@@ -28,7 +28,6 @@ use crate::storage::descriptors::RegionDescriptor;
use crate::storage::region::Region;
const COMPACTION_STRATEGY_KEY: &str = "compaction";
const COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE: &str = "LTW";
const COMPACTION_STRATEGY_TWCS_VALUE: &str = "TWCS";
const TWCS_MAX_ACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_active_window_files";
const TWCS_TIME_WINDOW_SECONDS_KEY: &str = "compaction.twcs.time_window_seconds";
@@ -125,15 +124,18 @@ pub struct CloseOptions {
}
/// Options for compactions
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub enum CompactionStrategy {
/// Leveled time window compaction strategy
#[default]
LeveledTimeWindow,
/// TWCS
Twcs(TwcsOptions),
}
impl Default for CompactionStrategy {
fn default() -> Self {
Self::Twcs(TwcsOptions::default())
}
}
/// TWCS compaction options.
#[derive(Debug, Clone)]
pub struct TwcsOptions {
@@ -160,9 +162,7 @@ impl From<&HashMap<String, String>> for CompactionStrategy {
let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else {
return CompactionStrategy::default();
};
if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE) {
CompactionStrategy::LeveledTimeWindow
} else if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) {
if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) {
let mut twcs_opts = TwcsOptions::default();
if let Some(max_active_window_files) = opts
.get(TWCS_MAX_ACTIVE_WINDOW_FILES_KEY)