From 3331e3158c9dc0e743ff468b2e15459326f4b4e5 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 11 Sep 2023 15:11:56 +0800 Subject: [PATCH] feat(mito2): compaction (#2317) * feat: compaction component * feat: mito2 compaction * Avoid building time range predicates when merge SST files since in TWCS we don't enforce strict time window. * fix: some CR comments * minor: change CompactionRequest::senders to an option * chore: handle compaction finish error * feat: integrate compaction into region worker * chore: rebase upstream * fix: Some CR comments * chore: Apply suggestions from code review * style: fix clippy --------- Co-authored-by: Yingwen --- Cargo.lock | 1 + src/mito2/Cargo.toml | 1 + src/mito2/src/access_layer.rs | 1 + src/mito2/src/compaction.rs | 94 +++ src/mito2/src/compaction/output.rs | 88 +++ src/mito2/src/compaction/picker.rs | 47 ++ src/mito2/src/compaction/test_util.rs | 41 ++ src/mito2/src/compaction/twcs.rs | 843 ++++++++++++++++++++++ src/mito2/src/engine.rs | 2 + src/mito2/src/engine/compaction_test.rs | 143 ++++ src/mito2/src/error.rs | 24 + src/mito2/src/lib.rs | 4 + src/mito2/src/read/merge.rs | 2 +- src/mito2/src/read/scan_region.rs | 4 +- src/mito2/src/read/seq_scan.rs | 54 +- src/mito2/src/request.rs | 61 +- src/mito2/src/sst/file.rs | 12 + src/mito2/src/sst/version.rs | 18 + src/mito2/src/test_util.rs | 14 +- src/mito2/src/worker.rs | 16 +- src/mito2/src/worker/handle_compaction.rs | 110 +++ src/storage/src/compaction.rs | 3 - src/store-api/src/storage/engine.rs | 16 +- 23 files changed, 1551 insertions(+), 48 deletions(-) create mode 100644 src/mito2/src/compaction.rs create mode 100644 src/mito2/src/compaction/output.rs create mode 100644 src/mito2/src/compaction/picker.rs create mode 100644 src/mito2/src/compaction/test_util.rs create mode 100644 src/mito2/src/compaction/twcs.rs create mode 100644 src/mito2/src/engine/compaction_test.rs create mode 100644 src/mito2/src/worker/handle_compaction.rs diff --git a/Cargo.lock b/Cargo.lock index f8e46be4ee..a9a627e712 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5479,6 +5479,7 @@ dependencies = [ "dashmap", "datafusion", "datafusion-common", + "datafusion-expr", "datatypes", "futures", "humantime-serde", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index f455c53a0e..3151f6ab51 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -32,6 +32,7 @@ common-test-util = { workspace = true, optional = true } common-time = { workspace = true } dashmap = "5.4" datafusion-common.workspace = true +datafusion-expr.workspace = true datafusion.workspace = true datatypes = { workspace = true } futures.workspace = true diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 8e382c9131..4c0e8bbde5 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -74,6 +74,7 @@ impl AccessLayer { } /// Returns a new parquet writer to write the SST for specific `file_id`. + // TODO(hl): maybe rename to [sst_writer]. pub(crate) fn write_sst( &self, file_id: FileId, diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs new file mode 100644 index 0000000000..5f55bf6ebe --- /dev/null +++ b/src/mito2/src/compaction.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod output; +mod picker; +#[cfg(test)] +mod test_util; +mod twcs; + +use std::sync::Arc; +use std::time::Duration; + +use common_query::Output; +use common_telemetry::debug; +pub use picker::CompactionPickerRef; +use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions}; +use tokio::sync::{mpsc, oneshot}; + +use crate::access_layer::AccessLayerRef; +use crate::compaction::twcs::TwcsPicker; +use crate::error; +use crate::error::Result; +use crate::region::version::VersionRef; +use crate::request::WorkerRequest; +use crate::schedule::scheduler::SchedulerRef; +use crate::sst::file_purger::FilePurgerRef; + +/// Region compaction request. +pub struct CompactionRequest { + pub(crate) current_version: VersionRef, + pub(crate) access_layer: AccessLayerRef, + pub(crate) ttl: Option, + pub(crate) compaction_time_window: Option, + pub(crate) request_sender: mpsc::Sender, + pub(crate) waiter: Option>>, + pub(crate) file_purger: FilePurgerRef, +} + +impl CompactionRequest { + pub(crate) fn region_id(&self) -> RegionId { + self.current_version.metadata.region_id + } +} + +/// Builds compaction picker according to [CompactionStrategy]. +pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> CompactionPickerRef { + match strategy { + CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( + twcs_opts.max_active_window_files, + twcs_opts.max_inactive_window_files, + twcs_opts.time_window_seconds, + )) as Arc<_>, + } +} + +pub(crate) struct CompactionScheduler { + scheduler: SchedulerRef, + // TODO(hl): maybe tracks region compaction status in CompactionScheduler +} + +impl CompactionScheduler { + pub(crate) fn new(scheduler: SchedulerRef) -> Self { + Self { scheduler } + } + + /// Schedules a region compaction task. + pub(crate) fn schedule_compaction(&self, req: CompactionRequest) -> Result<()> { + self.scheduler.schedule(Box::pin(async { + // TODO(hl): build picker according to region options. + let picker = + compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); + debug!( + "Pick compaction strategy {:?} for region: {}", + picker, + req.region_id() + ); + let Some(mut task) = picker.pick(req) else { + return; + }; + task.run().await; + })) + } +} diff --git a/src/mito2/src/compaction/output.rs b/src/mito2/src/compaction/output.rs new file mode 100644 index 0000000000..96d95874b7 --- /dev/null +++ b/src/mito2/src/compaction/output.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::readable_size::ReadableSize; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; + +use crate::access_layer::AccessLayerRef; +use crate::error; +use crate::read::projection::ProjectionMapper; +use crate::read::seq_scan::SeqScan; +use crate::read::{BoxedBatchReader, Source}; +use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; +use crate::sst::parquet::{SstInfo, WriteOptions}; + +#[derive(Debug)] +pub(crate) struct CompactionOutput { + pub output_file_id: FileId, + /// Compaction output file level. + pub output_level: Level, + /// The left bound of time window. + pub time_window_bound: i64, + /// Time window size in seconds. + pub time_window_sec: i64, + /// Compaction input files. + pub inputs: Vec, +} + +impl CompactionOutput { + pub(crate) async fn build( + &self, + region_id: RegionId, + schema: RegionMetadataRef, + sst_layer: AccessLayerRef, + sst_write_buffer_size: ReadableSize, + ) -> error::Result> { + let reader = build_sst_reader(schema.clone(), sst_layer.clone(), &self.inputs).await?; + + let opts = WriteOptions { + write_buffer_size: sst_write_buffer_size, + ..Default::default() + }; + + // TODO(hl): measure merge elapsed time. + + let mut writer = sst_layer.write_sst(self.output_file_id, schema, Source::Reader(reader)); + let meta = writer.write_all(&opts).await?.map( + |SstInfo { + time_range, + file_size, + .. + }| { + FileMeta { + region_id, + file_id: self.output_file_id, + time_range, + level: self.output_level, + file_size, + } + }, + ); + + Ok(meta) + } +} + +/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. +async fn build_sst_reader( + schema: RegionMetadataRef, + sst_layer: AccessLayerRef, + inputs: &[FileHandle], +) -> error::Result { + SeqScan::new(sst_layer, ProjectionMapper::all(&schema)?) + .with_files(inputs.to_vec()) + .build_reader() + .await +} diff --git a/src/mito2/src/compaction/picker.rs b/src/mito2/src/compaction/picker.rs new file mode 100644 index 0000000000..1e986be0a6 --- /dev/null +++ b/src/mito2/src/compaction/picker.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use crate::compaction::CompactionRequest; + +pub type CompactionPickerRef = Arc; + +#[async_trait::async_trait] +pub trait CompactionTask: Debug + Send + Sync + 'static { + async fn run(&mut self); +} + +/// Picker picks input SST files and builds the compaction task. +/// Different compaction strategy may implement different pickers. +pub trait Picker: Debug + Send + 'static { + fn pick(&self, req: CompactionRequest) -> Option>; +} + +pub struct PickerContext { + compaction_time_window: Option, +} + +impl PickerContext { + pub fn with(compaction_time_window: Option) -> Self { + Self { + compaction_time_window, + } + } + + pub fn compaction_time_window(&self) -> Option { + self.compaction_time_window + } +} diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs new file mode 100644 index 0000000000..fefae906ba --- /dev/null +++ b/src/mito2/src/compaction/test_util.rs @@ -0,0 +1,41 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::Timestamp; + +use crate::sst::file::{FileHandle, FileId, FileMeta, Level}; +use crate::test_util::new_noop_file_purger; + +/// Test util to create file handles. +pub fn new_file_handle( + file_id: FileId, + start_ts_millis: i64, + end_ts_millis: i64, + level: Level, +) -> FileHandle { + let file_purger = new_noop_file_purger(); + FileHandle::new( + FileMeta { + region_id: 0.into(), + file_id, + time_range: ( + Timestamp::new_millisecond(start_ts_millis), + Timestamp::new_millisecond(end_ts_millis), + ), + level, + file_size: 0, + }, + file_purger, + ) +} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs new file mode 100644 index 0000000000..a3e1ca7cd3 --- /dev/null +++ b/src/mito2/src/compaction/twcs.rs @@ -0,0 +1,843 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use common_query::Output; +use common_telemetry::{debug, error, info}; +use common_time::timestamp::TimeUnit; +use common_time::timestamp_millis::BucketAligned; +use common_time::Timestamp; +use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; +use tokio::sync::mpsc; +use tokio::sync::oneshot::Sender; + +use crate::access_layer::AccessLayerRef; +use crate::compaction::output::CompactionOutput; +use crate::compaction::picker::{CompactionTask, Picker}; +use crate::compaction::CompactionRequest; +use crate::error; +use crate::error::CompactRegionSnafu; +use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, WorkerRequest}; +use crate::sst::file::{FileHandle, FileId, FileMeta}; +use crate::sst::file_purger::FilePurgerRef; +use crate::sst::version::LevelMeta; + +const MAX_PARALLEL_COMPACTION: usize = 8; + +/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction +/// candidates. +pub struct TwcsPicker { + max_active_window_files: usize, + max_inactive_window_files: usize, + time_window_seconds: Option, +} + +impl Debug for TwcsPicker { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TwcsPicker") + .field("max_active_window_files", &self.max_active_window_files) + .field("max_inactive_window_files", &self.max_inactive_window_files) + .finish() + } +} + +impl TwcsPicker { + pub fn new( + max_active_window_files: usize, + max_inactive_window_files: usize, + time_window_seconds: Option, + ) -> Self { + Self { + max_inactive_window_files, + max_active_window_files, + time_window_seconds, + } + } + + /// Builds compaction output from files. + /// For active writing window, we allow for at most `max_active_window_files` files to alleviate + /// fragmentation. For other windows, we allow at most 1 file at each window. + fn build_output( + &self, + time_windows: &BTreeMap>, + active_window: Option, + window_size: i64, + ) -> Vec { + let mut output = vec![]; + for (window, files) in time_windows { + if let Some(active_window) = active_window && *window == active_window { + if files.len() > self.max_active_window_files { + output.push(CompactionOutput { + output_file_id: FileId::random(), + output_level: 1, // we only have two levels and always compact to l1 + time_window_bound: *window, + time_window_sec: window_size, + inputs: files.clone(), + }); + } else { + debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window); + } + } else { + // not active writing window + if files.len() > self.max_inactive_window_files { + output.push(CompactionOutput { + output_file_id: FileId::random(), + output_level: 1, + time_window_bound: *window, + time_window_sec: window_size, + inputs: files.clone(), + }); + } else { + debug!("No enough files, current: {}, max_inactive_window_files: {}", files.len(), self.max_inactive_window_files) + } + } + } + output + } +} + +impl Picker for TwcsPicker { + fn pick(&self, req: CompactionRequest) -> Option> { + let CompactionRequest { + current_version, + access_layer, + ttl, + compaction_time_window, + request_sender, + waiter: waiters, + file_purger, + } = req; + + let region_metadata = current_version.metadata.clone(); + let region_id = region_metadata.region_id; + + let levels = current_version.ssts.levels(); + let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis()); + if !expired_ssts.is_empty() { + info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts); + // here we mark expired SSTs as compacting to avoid them being picked. + expired_ssts.iter().for_each(|f| f.set_compacting(true)); + } + + let time_window_size = compaction_time_window + .or(self.time_window_seconds) + .unwrap_or_else(|| { + let inferred = infer_time_bucket(levels[0].files()); + info!( + "Compaction window for region {} is not present, inferring from files: {:?}", + region_id, inferred + ); + inferred + }); + + // Find active window from files in level 0. + let active_window = find_latest_window_in_seconds(levels[0].files(), time_window_size); + // Assign files to windows + let windows = assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size); + let outputs = self.build_output(&windows, active_window, time_window_size); + + if outputs.is_empty() && expired_ssts.is_empty() { + return None; + } + let task = TwcsCompactionTask { + region_id, + schema: region_metadata, + sst_layer: access_layer, + outputs, + expired_ssts, + sst_write_buffer_size: ReadableSize::mb(4), + compaction_time_window: None, + request_sender, + sender: waiters, + file_purger, + }; + Some(Box::new(task)) + } +} + +/// Assigns files to windows with predefined window size (in seconds) by their max timestamps. +fn assign_to_windows<'a>( + files: impl Iterator, + time_window_size: i64, +) -> BTreeMap> { + let mut windows: BTreeMap> = BTreeMap::new(); + // Iterates all files and assign to time windows according to max timestamp + for file in files { + let (_, end) = file.time_range(); + let time_window = end + .convert_to(TimeUnit::Second) + .unwrap() + .value() + .align_to_ceil_by_bucket(time_window_size) + .unwrap_or(i64::MIN); + windows.entry(time_window).or_default().push(file.clone()); + } + windows +} + +/// Finds the latest active writing window among all files. +/// Returns `None` when there are no files or all files are corrupted. +fn find_latest_window_in_seconds<'a>( + files: impl Iterator, + time_window_size: i64, +) -> Option { + let mut latest_timestamp = None; + for f in files { + let (_, end) = f.time_range(); + if let Some(latest) = latest_timestamp && end > latest { + latest_timestamp = Some(end); + } else { + latest_timestamp = Some(end); + } + } + latest_timestamp + .and_then(|ts| ts.convert_to_ceil(TimeUnit::Second)) + .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size)) +} + +pub(crate) struct TwcsCompactionTask { + pub region_id: RegionId, + pub schema: RegionMetadataRef, + pub sst_layer: AccessLayerRef, + pub outputs: Vec, + pub expired_ssts: Vec, + pub sst_write_buffer_size: ReadableSize, + pub compaction_time_window: Option, + pub file_purger: FilePurgerRef, + /// Request sender to notify the worker. + pub(crate) request_sender: mpsc::Sender, + /// Sender that are used to notify waiters waiting for pending compaction tasks. + pub sender: Option>>, +} + +impl Debug for TwcsCompactionTask { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TwcsCompactionTask") + .field("region_id", &self.region_id) + .field("outputs", &self.outputs) + .field("expired_ssts", &self.expired_ssts) + .field("compaction_time_window", &self.compaction_time_window) + .finish() + } +} + +impl Drop for TwcsCompactionTask { + fn drop(&mut self) { + self.mark_files_compacting(false) + } +} + +impl TwcsCompactionTask { + fn mark_files_compacting(&self, compacting: bool) { + self.outputs + .iter() + .flat_map(|o| o.inputs.iter()) + .for_each(|f| f.set_compacting(compacting)) + } + + /// Merges all SST files. + /// Returns `(output files, input files)`. + async fn merge_ssts(&mut self) -> error::Result<(Vec, Vec)> { + let mut futs = Vec::with_capacity(self.outputs.len()); + let mut compacted_inputs = + Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum()); + let region_id = self.region_id; + for output in self.outputs.drain(..) { + let schema = self.schema.clone(); + let sst_layer = self.sst_layer.clone(); + let sst_write_buffer_size = self.sst_write_buffer_size; + compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); + + info!( + "Compaction output [{}]-> {}", + output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .collect::>() + .join(","), + output.output_file_id + ); + + // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. + futs.push(async move { + output + .build(region_id, schema, sst_layer, sst_write_buffer_size) + .await + }); + } + + let mut outputs = Vec::with_capacity(futs.len()); + while !futs.is_empty() { + let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); + for _ in 0..MAX_PARALLEL_COMPACTION { + if let Some(task) = futs.pop() { + task_chunk.push(common_runtime::spawn_bg(task)); + } + } + let metas = futures::future::try_join_all(task_chunk) + .await + .context(error::JoinSnafu)? + .into_iter() + .collect::>>()?; + outputs.extend(metas.into_iter().flatten()); + } + + let inputs = compacted_inputs.into_iter().collect(); + Ok((outputs, inputs)) + } + + async fn handle_compaction(&mut self) -> error::Result<(Vec, Vec)> { + self.mark_files_compacting(true); + let (output, mut compacted) = self.merge_ssts().await.map_err(|e| { + error!(e; "Failed to compact region: {}", self.region_id); + e + })?; + compacted.extend(self.expired_ssts.iter().map(FileHandle::meta)); + Ok((output, compacted)) + } + + /// Handles compaction success. + fn early_success(&mut self) { + if let Some(sender) = self.sender.take() { + let _ = sender.send(Ok(Output::AffectedRows(0)).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } + } + + /// Handles compaction failure, notifies all waiters. + fn on_failure(&mut self, err: Arc) { + if let Some(sender) = self.sender.take() { + let _ = sender.send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } + } + + /// Notifies region worker to handle post-compaction tasks. + async fn send_to_worker(&self, request: WorkerRequest) { + if let Err(e) = self.request_sender.send(request).await { + error!( + "Failed to notify compaction job status for region {}, request: {:?}", + self.region_id, e.0 + ); + } + } +} + +#[async_trait::async_trait] +impl CompactionTask for TwcsCompactionTask { + async fn run(&mut self) { + let notify = match self.handle_compaction().await { + Ok((added, deleted)) => { + info!( + "Compacted SST files, input: {:?}, output: {:?}, window: {:?}", + added, deleted, self.compaction_time_window + ); + self.early_success(); + + BackgroundNotify::CompactionFinished(CompactionFinished { + region_id: self.region_id, + compaction_outputs: added, + compacted_files: deleted, + sender: self.sender.take(), + file_purger: self.file_purger.clone(), + }) + } + Err(e) => { + error!(e; "Failed to compact region, region id: {}", self.region_id); + let err = Arc::new(e); + // notify compaction waiters + self.on_failure(err.clone()); + BackgroundNotify::CompactionFailed(CompactionFailed { + region_id: self.region_id, + err, + }) + } + }; + + self.send_to_worker(WorkerRequest::Background { + region_id: self.region_id, + notify, + }) + .await; + // TODO(hl): handle reschedule + } +} + +/// Infers the suitable time bucket duration. +/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span +/// into time bucket. +pub(crate) fn infer_time_bucket<'a>(files: impl Iterator) -> i64 { + let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second); + let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second); + + for f in files { + let (start, end) = f.time_range(); + min_ts = min_ts.min(start); + max_ts = max_ts.max(end); + } + + // safety: Convert whatever timestamp into seconds will not cause overflow. + let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value(); + let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value(); + + max_sec + .checked_sub(min_sec) + .map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow. + .unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty. +} + +/// Finds files that can be compacted in given level. +/// Currently they're files that is not currently under compaction. +#[inline] +fn find_compactable_files(level: &LevelMeta) -> Vec { + level.files().filter(|f| !f.compacting()).cloned().collect() +} + +/// Calculates timestamp span between start and end timestamp. +fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec { + assert!(start_sec <= end_sec); + + // if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot + // be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow. + let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); + let end_aligned = end_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); + + let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize); + while start_aligned < end_aligned { + res.push(start_aligned); + start_aligned += bucket_sec; + } + res.push(end_aligned); + res +} + +/// Calculates buckets for files. If file does not contain a time range in metadata, it will be +/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket) +/// so that all files without timestamp can be compacted together. +fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap> { + let mut buckets = HashMap::new(); + + for file in files { + let (start, end) = file.time_range(); + let bounds = file_time_bucket_span( + start.convert_to(TimeUnit::Second).unwrap().value(), + end.convert_to(TimeUnit::Second).unwrap().value(), + bucket_sec, + ); + for bound in bounds { + buckets + .entry(bound) + .or_insert_with(Vec::new) + .push(file.clone()); + } + } + buckets +} + +pub(crate) struct TimeBuckets([i64; 7]); + +impl TimeBuckets { + /// Fits a given time span into time bucket by find the minimum bucket that can cover the span. + /// Returns the max bucket if no such bucket can be found. + fn fit_time_bucket(&self, span_sec: i64) -> i64 { + assert!(span_sec >= 0); + match self.0.binary_search(&span_sec) { + Ok(idx) => self.0[idx], + Err(idx) => { + if idx < self.0.len() { + self.0[idx] + } else { + self.0.last().copied().unwrap() + } + } + } + } + + #[cfg(test)] + fn get(&self, idx: usize) -> i64 { + self.0[idx] + } + + fn max(&self) -> i64 { + self.0.last().copied().unwrap() + } +} + +/// A set of predefined time buckets. +pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([ + 60 * 60, // one hour + 2 * 60 * 60, // two hours + 12 * 60 * 60, // twelve hours + 24 * 60 * 60, // one day + 7 * 24 * 60 * 60, // one week + 365 * 24 * 60 * 60, // one year + 10 * 365 * 24 * 60 * 60, // ten years +]); + +/// Finds all expired SSTs across levels. +fn get_expired_ssts( + levels: &[LevelMeta], + ttl: Option, + now: Timestamp, +) -> Vec { + let Some(ttl) = ttl else { + return vec![]; + }; + + let expire_time = match now.sub_duration(ttl) { + Ok(expire_time) => expire_time, + Err(e) => { + error!(e; "Failed to calculate region TTL expire time"); + return vec![]; + } + }; + + levels + .iter() + .flat_map(|l| l.get_expired_files(&expire_time).into_iter()) + .collect() +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + use crate::compaction::test_util::new_file_handle; + use crate::sst::file::Level; + + #[test] + fn test_get_latest_window_in_seconds() { + assert_eq!( + Some(1), + find_latest_window_in_seconds([new_file_handle(FileId::random(), 0, 999, 0)].iter(), 1) + ); + assert_eq!( + Some(1), + find_latest_window_in_seconds( + [new_file_handle(FileId::random(), 0, 1000, 0)].iter(), + 1 + ) + ); + + assert_eq!( + Some(-9223372036854000), + find_latest_window_in_seconds( + [new_file_handle(FileId::random(), i64::MIN, i64::MIN + 1, 0)].iter(), + 3600, + ) + ); + + assert_eq!( + (i64::MAX / 10000000 + 1) * 10000, + find_latest_window_in_seconds( + [new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0)].iter(), + 10000, + ) + .unwrap() + ); + } + + #[test] + fn test_assign_to_windows() { + let windows = assign_to_windows( + [ + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + ] + .iter(), + 3, + ); + assert_eq!(5, windows.get(&0).unwrap().len()); + + let files = [FileId::random(); 3]; + let windows = assign_to_windows( + [ + new_file_handle(files[0], -2000, -3, 0), + new_file_handle(files[1], 0, 2999, 0), + new_file_handle(files[2], 50, 10001, 0), + ] + .iter(), + 3, + ); + assert_eq!(files[0], windows.get(&0).unwrap().get(0).unwrap().file_id()); + assert_eq!(files[1], windows.get(&3).unwrap().get(0).unwrap().file_id()); + assert_eq!( + files[2], + windows.get(&12).unwrap().get(0).unwrap().file_id() + ); + } + + struct CompactionPickerTestCase { + window_size: i64, + input_files: Vec, + expected_outputs: Vec, + } + + impl CompactionPickerTestCase { + fn check(&self) { + let windows = assign_to_windows(self.input_files.iter(), self.window_size); + let active_window = + find_latest_window_in_seconds(self.input_files.iter(), self.window_size); + let output = + TwcsPicker::new(4, 1, None).build_output(&windows, active_window, self.window_size); + + let output = output + .iter() + .map(|o| { + let input_file_ids = + o.inputs.iter().map(|f| f.file_id()).collect::>(); + ( + input_file_ids, + o.output_level, + o.time_window_sec, + o.time_window_bound, + ) + }) + .collect::>(); + + let expected = self + .expected_outputs + .iter() + .map(|o| { + let input_file_ids = o + .input_files + .iter() + .map(|idx| self.input_files[*idx].file_id()) + .collect::>(); + ( + input_file_ids, + o.output_level, + o.time_window_sec, + o.time_window_bound, + ) + }) + .collect::>(); + assert_eq!(expected, output); + } + } + + struct ExpectedOutput { + input_files: Vec, + output_level: Level, + time_window_sec: i64, + time_window_bound: i64, + strict_window: bool, + } + + #[test] + fn test_build_twcs_output() { + let file_ids = (0..4).map(|_| FileId::random()).collect::>(); + + CompactionPickerTestCase { + window_size: 3, + input_files: [ + new_file_handle(file_ids[0], -2000, -3, 0), + new_file_handle(file_ids[1], -3000, -100, 0), + new_file_handle(file_ids[2], 0, 2999, 0), //active windows + new_file_handle(file_ids[3], 50, 2998, 0), //active windows + ] + .to_vec(), + expected_outputs: vec![ExpectedOutput { + input_files: vec![0, 1], + output_level: 1, + time_window_sec: 3, + time_window_bound: 0, + strict_window: false, + }], + } + .check(); + + let file_ids = (0..6).map(|_| FileId::random()).collect::>(); + CompactionPickerTestCase { + window_size: 3, + input_files: [ + new_file_handle(file_ids[0], -2000, -3, 0), + new_file_handle(file_ids[1], -3000, -100, 0), + new_file_handle(file_ids[2], 0, 2999, 0), + new_file_handle(file_ids[3], 50, 2998, 0), + new_file_handle(file_ids[4], 11, 2990, 0), + new_file_handle(file_ids[5], 50, 4998, 0), + ] + .to_vec(), + expected_outputs: vec![ + ExpectedOutput { + input_files: vec![0, 1], + output_level: 1, + time_window_sec: 3, + time_window_bound: 0, + strict_window: false, + }, + ExpectedOutput { + input_files: vec![2, 3, 4], + output_level: 1, + time_window_sec: 3, + time_window_bound: 3, + strict_window: false, + }, + ], + } + .check(); + } + + #[test] + fn test_time_bucket() { + assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1)); + assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60)); + assert_eq!( + TIME_BUCKETS.get(1), + TIME_BUCKETS.fit_time_bucket(60 * 60 + 1) + ); + + assert_eq!( + TIME_BUCKETS.get(2), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1) + ); + assert_eq!( + TIME_BUCKETS.get(2), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2)) + ); + assert_eq!( + TIME_BUCKETS.get(3), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1) + ); + assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX)); + } + + #[test] + fn test_infer_time_buckets() { + assert_eq!( + TIME_BUCKETS.get(0), + infer_time_bucket( + [ + new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0), + new_file_handle(FileId::random(), 1, 10_000, 0) + ] + .iter() + ) + ); + } + + fn check_bucket_calculation( + bucket_sec: i64, + files: Vec, + expected: &[(i64, &[FileId])], + ) { + let res = calculate_time_buckets(bucket_sec, &files); + + let expected = expected + .iter() + .map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::>())) + .collect::>(); + + for (bucket, file_ids) in expected { + let actual = res + .get(&bucket) + .unwrap() + .iter() + .map(|f| f.file_id()) + .collect(); + assert_eq!( + file_ids, actual, + "bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}", + ); + } + } + + #[test] + fn test_calculate_time_buckets() { + let file_id_a = FileId::random(); + let file_id_b = FileId::random(); + // simple case, files with disjoint + check_bucket_calculation( + 10, + new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]), + &[(0, &[file_id_a]), (10, &[file_id_b])], + ); + + // files across buckets + check_bucket_calculation( + 10, + new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]), + &[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])], + ); + check_bucket_calculation( + 10, + new_file_handles(&[(file_id_a, 0, 10000)]), + &[(0, &[file_id_a]), (10, &[file_id_a])], + ); + + // file with an large time range + let file_id_array = &[file_id_a]; + let expected = (0..(TIME_BUCKETS.get(4) / TIME_BUCKETS.get(0))) + .map(|b| (b * TIME_BUCKETS.get(0), file_id_array as _)) + .collect::>(); + check_bucket_calculation( + TIME_BUCKETS.get(0), + new_file_handles(&[(file_id_a, 0, TIME_BUCKETS.get(4) * 1000)]), + &expected, + ); + } + + #[test] + fn test_time_bucket_span() { + assert_eq!(vec![0], file_time_bucket_span(1, 9, 10)); + assert_eq!(vec![0, 10], file_time_bucket_span(1, 10, 10)); + assert_eq!(vec![-10], file_time_bucket_span(-10, -1, 10)); + assert_eq!(vec![-10, 0], file_time_bucket_span(-10, 0, 10)); + } + + #[test] + fn test_time_bucket_span_large() { + assert_eq!( + vec![ + (i64::MAX - 10).align_by_bucket(10).unwrap(), + i64::MAX.align_by_bucket(10).unwrap(), + ], + file_time_bucket_span(i64::MAX - 10, i64::MAX, 10) + ); + + for bucket in 1..100 { + assert_eq!( + vec![ + i64::MIN, + (i64::MIN + bucket).align_by_bucket(bucket).unwrap() + ], + file_time_bucket_span(i64::MIN, i64::MIN + bucket, bucket) + ); + } + } + + fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec { + input + .iter() + .map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end, 0)) + .collect() + } + + // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. +} diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c30b0d8c4f..d2266be289 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -17,6 +17,8 @@ #[cfg(test)] mod close_test; #[cfg(test)] +mod compaction_test; +#[cfg(test)] mod create_test; #[cfg(test)] mod drop_test; diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs new file mode 100644 index 0000000000..584d21e7b5 --- /dev/null +++ b/src/mito2/src/engine/compaction_test.rs @@ -0,0 +1,143 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::ops::Range; + +use api::v1::{ColumnSchema, Rows}; +use common_query::Output; +use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use datatypes::prelude::ScalarVector; +use datatypes::vectors::TimestampMillisecondVector; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest, +}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::engine::MitoEngine; +use crate::test_util::{ + build_rows, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv, +}; + +async fn put_and_flush( + engine: &MitoEngine, + region_id: RegionId, + column_schemas: &[ColumnSchema], + rows: Range, +) { + let rows = Rows { + schema: column_schemas.to_vec(), + rows: build_rows(rows.start, rows.end), + }; + put_rows(engine, region_id, rows).await; + + let Output::AffectedRows(rows) = engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap() + else { + unreachable!() + }; + assert_eq!(0, rows); +} + +async fn delete_and_flush( + engine: &MitoEngine, + region_id: RegionId, + column_schemas: &[ColumnSchema], + rows: Range, +) { + let row_cnt = rows.len(); + let rows = Rows { + schema: column_schemas.to_vec(), + rows: build_rows(rows.start, rows.end), + }; + + let deleted = engine + .handle_request( + region_id, + RegionRequest::Delete(RegionDeleteRequest { rows }), + ) + .await + .unwrap(); + + let Output::AffectedRows(rows_affected) = deleted else { + unreachable!() + }; + assert_eq!(row_cnt, rows_affected); + + let Output::AffectedRows(rows) = engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap() + else { + unreachable!() + }; + assert_eq!(0, rows); +} + +async fn collect_stream_ts(stream: SendableRecordBatchStream) -> HashSet { + let mut res = HashSet::new(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + for batch in batches { + let ts_col = batch + .column_by_name("ts") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + res.extend(ts_col.iter_data().map(|t| t.unwrap().0.value())); + } + res +} + +#[tokio::test] +async fn test_compaction_region() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + put_and_flush(&engine, region_id, &column_schemas, 20..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 25..30).await; + + let output = engine + .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let stream = engine + .handle_query(region_id, ScanRequest::default()) + .await + .unwrap(); + + let vec = collect_stream_ts(stream).await; + assert_eq!((0..25).map(|v| v * 1000).collect::>(), vec); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 66335122b3..bd17311f42 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -435,6 +435,28 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to build time range predicate for compaction, location: {}, source: {}", + location, + source + ))] + BuildCompactionPredicate { + source: table::error::Error, + location: Location, + }, + + #[snafu(display( + "Failed to compact region {}, location: {}, source:{}", + region_id, + location, + source + ))] + CompactRegion { + region_id: RegionId, + source: Arc, + location: Location, + }, + #[snafu(display( "Failed to compat readers for region {}, reason: {}, location: {}", region_id, @@ -511,7 +533,9 @@ impl ErrorExt for Error { FlushRegion { source, .. } => source.status_code(), RegionDropped { .. } => StatusCode::Cancelled, RegionClosed { .. } => StatusCode::Cancelled, + BuildCompactionPredicate { .. } => StatusCode::Internal, RejectWrite { .. } => StatusCode::StorageUnavailable, + CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index aa0ac55a7f..a2a3252462 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -16,11 +16,15 @@ //! //! Mito is the a region engine to store timeseries data. +#![feature(let_chains)] + #[cfg(any(test, feature = "test"))] pub mod test_util; // TODO(yingwen): Remove all `allow(dead_code)` after finish refactoring mito. mod access_layer; +#[allow(dead_code)] +mod compaction; pub mod config; pub mod engine; pub mod error; diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 219ae6dab9..e5b8651fa7 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -116,7 +116,7 @@ impl MergeReaderBuilder { MergeReaderBuilder::default() } - /// Pushs a batch reader to sources. + /// Pushes a batch reader to sources. pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self { self.sources.push(Source::Reader(reader)); self diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a421057c22..6c743675ba 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -39,7 +39,7 @@ impl Scanner { /// Returns a [SendableRecordBatchStream] to retrieve scan results. pub(crate) async fn scan(&self) -> Result { match self { - Scanner::Seq(seq_scan) => seq_scan.build().await, + Scanner::Seq(seq_scan) => seq_scan.build_stream().await, } } } @@ -158,7 +158,7 @@ impl ScanRegion { None => ProjectionMapper::all(&self.version.metadata)?, }; - let seq_scan = SeqScan::new(self.access_layer.clone(), mapper, self.request) + let seq_scan = SeqScan::new(self.access_layer.clone(), mapper) .with_time_range(Some(time_range)) .with_predicate(Some(predicate)) .with_memtables(memtables) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 5e3381cd1e..9e34bee476 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -22,7 +22,6 @@ use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; use common_time::range::TimestampRange; use snafu::ResultExt; -use store_api::storage::ScanRequest; use table::predicate::Predicate; use crate::access_layer::AccessLayerRef; @@ -31,7 +30,7 @@ use crate::memtable::MemtableRef; use crate::read::compat::{self, CompatReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::projection::ProjectionMapper; -use crate::read::BatchReader; +use crate::read::{BatchReader, BoxedBatchReader}; use crate::sst::file::FileHandle; /// Scans a region and returns rows in a sorted sequence. @@ -42,10 +41,6 @@ pub struct SeqScan { access_layer: AccessLayerRef, /// Maps projected Batches to RecordBatches. mapper: Arc, - /// Original scan request to scan memtable. - // TODO(yingwen): Remove this if memtable::iter() takes another struct. - request: ScanRequest, - /// Time range filter for time index. time_range: Option, /// Predicate to push down. @@ -59,11 +54,7 @@ pub struct SeqScan { impl SeqScan { /// Creates a new [SeqScan]. #[must_use] - pub(crate) fn new( - access_layer: AccessLayerRef, - mapper: ProjectionMapper, - request: ScanRequest, - ) -> SeqScan { + pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> SeqScan { SeqScan { access_layer, mapper: Arc::new(mapper), @@ -71,7 +62,6 @@ impl SeqScan { predicate: None, memtables: Vec::new(), files: Vec::new(), - request, } } @@ -104,11 +94,32 @@ impl SeqScan { } /// Builds a stream for the query. - pub async fn build(&self) -> Result { + pub async fn build_stream(&self) -> Result { + // Scans all memtables and SSTs. Builds a merge reader to merge results. + let mut reader = self.build_reader().await?; + + // Creates a stream to poll the batch reader and convert batch into record batch. + let mapper = self.mapper.clone(); + let stream = try_stream! { + while let Some(batch) = reader.next_batch().await.map_err(BoxedError::new).context(ExternalSnafu)? { + yield mapper.convert(&batch)?; + } + }; + let stream = Box::pin(RecordBatchStreamAdaptor::new( + self.mapper.output_schema(), + Box::pin(stream), + )); + + Ok(stream) + } + + /// Builds a [BoxedBatchReader] from sequential scan. + pub async fn build_reader(&self) -> Result { // Scans all memtables and SSTs. Builds a merge reader to merge results. let mut builder = MergeReaderBuilder::new(); for mem in &self.memtables { - let iter = mem.iter(Some(self.mapper.column_ids()), &self.request.filters); + // TODO(hl): pass filters once memtable supports filter pushdown. + let iter = mem.iter(Some(self.mapper.column_ids()), &[]); builder.push_batch_iter(iter); } for file in &self.files { @@ -130,20 +141,7 @@ impl SeqScan { builder.push_batch_reader(Box::new(compat_reader)); } } - let mut reader = builder.build().await?; - // Creates a stream to poll the batch reader and convert batch into record batch. - let mapper = self.mapper.clone(); - let stream = try_stream! { - while let Some(batch) = reader.next_batch().await.map_err(BoxedError::new).context(ExternalSnafu)? { - yield mapper.convert(&batch)?; - } - }; - let stream = Box::pin(RecordBatchStreamAdaptor::new( - self.mapper.output_schema(), - Box::pin(stream), - )); - - Ok(stream) + Ok(Box::new(builder.build().await?)) } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index c4ec010f99..e90ba20915 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -25,6 +25,9 @@ use api::helper::{ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; use common_base::readable_size::ReadableSize; use common_query::Output; +use common_query::Output::AffectedRows; +use common_telemetry::tracing::log::info; +use common_telemetry::warn; use datatypes::prelude::DataType; use prost::Message; use smallvec::SmallVec; @@ -64,7 +67,7 @@ impl Default for RegionOptions { RegionOptions { write_buffer_size: Some(DEFAULT_WRITE_BUFFER_SIZE), ttl: None, - compaction_strategy: CompactionStrategy::LeveledTimeWindow, + compaction_strategy: CompactionStrategy::default(), } } } @@ -503,10 +506,14 @@ pub(crate) struct SenderDdlRequest { /// Notification from a background job. #[derive(Debug)] pub(crate) enum BackgroundNotify { - /// Flush is finished. + /// Flush has finished. FlushFinished(FlushFinished), - /// Flush is failed. + /// Flush has failed. FlushFailed(FlushFailed), + /// Compaction has finished. + CompactionFinished(CompactionFinished), + /// Compaction has failed. + CompactionFailed(CompactionFailed), } /// Notifies a flush job is finished. @@ -559,6 +566,54 @@ pub(crate) struct FlushFailed { pub(crate) err: Arc, } +/// Notifies a compaction job has finished. +#[derive(Debug)] +pub(crate) struct CompactionFinished { + /// Region id. + pub(crate) region_id: RegionId, + /// Compaction output files that are to be added to region version. + pub(crate) compaction_outputs: Vec, + /// Compacted files that are to be removed from region version. + pub(crate) compacted_files: Vec, + /// Compaction result sender. + pub(crate) sender: Option>>, + /// File purger for cleaning files on failure. + pub(crate) file_purger: FilePurgerRef, +} + +impl CompactionFinished { + pub fn on_success(self) { + if let Some(sender) = self.sender { + let _ = sender.send(Ok(AffectedRows(0))); + } + info!("Successfully compacted region: {}", self.region_id); + } + + /// Compaction succeeded but failed to update manifest or region's already been dropped, + /// clean compaction output files. + pub fn on_failure(self, _err: Error) { + for file in &self.compacted_files { + let file_id = file.file_id; + warn!( + "Cleaning region {} compaction output file: {}", + self.region_id, file_id + ); + self.file_purger.send_request(PurgeRequest { + region_id: self.region_id, + file_id, + }); + } + } +} + +/// A failing compaction result. +#[derive(Debug)] +pub(crate) struct CompactionFailed { + pub(crate) region_id: RegionId, + /// The error source of the failure. + pub(crate) err: Arc, +} + #[cfg(test)] mod tests { use api::v1::{Row, SemanticType}; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 05da526a20..ecf961854b 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -137,6 +137,18 @@ impl FileHandle { pub fn mark_deleted(&self) { self.inner.deleted.store(true, Ordering::Relaxed); } + + pub fn compacting(&self) -> bool { + self.inner.compacting.load(Ordering::Relaxed) + } + + pub fn set_compacting(&self, compacting: bool) { + self.inner.compacting.store(compacting, Ordering::Relaxed); + } + + pub fn meta(&self) -> FileMeta { + self.inner.meta.clone() + } } /// Inner data of [FileHandle]. diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index be71ebb1e2..b066af7ba3 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -17,6 +17,8 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; +use common_time::Timestamp; + use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL}; use crate::sst::file_purger::FilePurgerRef; @@ -104,6 +106,22 @@ impl LevelMeta { files: HashMap::new(), } } + + /// Returns expired SSTs from current level. + pub fn get_expired_files(&self, expire_time: &Timestamp) -> Vec { + self.files + .values() + .filter(|v| { + let (_, end) = v.time_range(); + &end < expire_time + }) + .cloned() + .collect() + } + + pub fn files(&self) -> impl Iterator { + self.files.values() + } } impl fmt::Debug for LevelMeta { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index a4a01d290d..cbc2e8896e 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -46,8 +46,20 @@ use crate::error::Result; use crate::flush::{WriteBufferManager, WriteBufferManagerRef}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; +use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest}; use crate::worker::WorkerGroup; +#[derive(Debug)] +pub(crate) struct NoopFilePurger; + +impl FilePurger for NoopFilePurger { + fn send_request(&self, _request: PurgeRequest) {} +} + +pub(crate) fn new_noop_file_purger() -> FilePurgerRef { + Arc::new(NoopFilePurger {}) +} + /// Env to test mito engine. pub struct TestEnv { /// Path to store data. @@ -447,7 +459,7 @@ impl WriteBufferManager for MockWriteBufferManager { } } -fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { +pub(crate) fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { api::v1::ColumnSchema { column_name: metadata.column_schema.name.clone(), datatype: ColumnDataTypeWrapper::try_from(metadata.column_schema.data_type.clone()) diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 9545d6fc72..3c9f4e7803 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -15,6 +15,7 @@ //! Structs and utilities for writing regions. mod handle_close; +mod handle_compaction; mod handle_create; mod handle_drop; mod handle_flush; @@ -36,6 +37,7 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; +use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; @@ -246,7 +248,8 @@ impl WorkerStarter { ))), scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, - flush_scheduler: FlushScheduler::new(self.scheduler), + flush_scheduler: FlushScheduler::new(self.scheduler.clone()), + compaction_scheduler: CompactionScheduler::new(self.scheduler), stalled_requests: StalledRequests::default(), listener: self.listener, }; @@ -397,6 +400,8 @@ struct RegionWorkerLoop { write_buffer_manager: WriteBufferManagerRef, /// Schedules background flush requests. flush_scheduler: FlushScheduler, + /// Scheduler for compaction tasks. + compaction_scheduler: CompactionScheduler, /// Stalled write requests. stalled_requests: StalledRequests, /// Event listener for tests. @@ -489,7 +494,10 @@ impl RegionWorkerLoop { self.handle_flush_request(ddl.region_id, ddl.sender).await; continue; } - DdlRequest::Compact(_) => todo!(), + DdlRequest::Compact(_) => { + self.handle_compaction_request(ddl.region_id, ddl.sender); + continue; + } }; if let Some(sender) = ddl.sender { @@ -506,6 +514,10 @@ impl RegionWorkerLoop { self.handle_flush_finished(region_id, req).await } BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await, + BackgroundNotify::CompactionFinished(req) => { + self.handle_compaction_finished(region_id, req).await + } + BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await, } } } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs new file mode 100644 index 0000000000..052bc419b1 --- /dev/null +++ b/src/mito2/src/worker/handle_compaction.rs @@ -0,0 +1,110 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::Output; +use common_telemetry::{error, info}; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; +use tokio::sync::oneshot; + +use crate::compaction::CompactionRequest; +use crate::error::{RegionNotFoundSnafu, Result}; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::region::MitoRegionRef; +use crate::request::{CompactionFailed, CompactionFinished}; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + /// Handles compaction request submitted to region worker. + pub(crate) fn handle_compaction_request( + &mut self, + region_id: RegionId, + sender: Option>>, + ) { + let Some(region) = self.regions.get_region(region_id) else { + if let Some(sender) = sender { + let _ = sender.send(RegionNotFoundSnafu { region_id }.fail()); + } + return; + }; + + let request = self.new_compaction_request(®ion, sender); + if let Err(e) = self.compaction_scheduler.schedule_compaction(request) { + error!(e; "Failed to schedule compaction task for region: {}", region_id); + } else { + info!( + "Successfully scheduled compaction task for region: {}", + region_id + ); + } + } + + /// Handles compaction finished, update region version and manifest, deleted compacted files. + pub(crate) async fn handle_compaction_finished( + &mut self, + region_id: RegionId, + mut request: CompactionFinished, + ) { + let Some(region) = self.regions.get_region(region_id) else { + request.on_failure(RegionNotFoundSnafu { region_id }.build()); + return; + }; + + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add: std::mem::take(&mut request.compaction_outputs), + files_to_remove: std::mem::take(&mut request.compacted_files), + compaction_time_window: None, // TODO(hl): update window maybe + flushed_entry_id: None, + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + if let Err(e) = region.manifest_manager.update(action_list).await { + error!(e; "Failed to update manifest, region: {}", region_id); + request.on_failure(e); + return; + } + + // Apply edit to region's version. + region + .version_control + .apply_edit(edit, region.file_purger.clone()); + request.on_success(); + } + + /// When compaction fails, we simply log the error. + pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) { + error!(req.err; "Failed to compact region: {}", req.region_id); + } + + /// Creates a new compaction request. + fn new_compaction_request( + &self, + region: &MitoRegionRef, + waiter: Option>>, + ) -> CompactionRequest { + let current_version = region.version_control.current().version; + let access_layer = region.access_layer.clone(); + let file_purger = region.file_purger.clone(); + + CompactionRequest { + current_version, + access_layer, + ttl: None, // TODO(hl): get TTL info from region metadata + compaction_time_window: None, // TODO(hl): get persisted region compaction time window + request_sender: self.sender.clone(), + waiter, + file_purger, + } + } +} diff --git a/src/storage/src/compaction.rs b/src/storage/src/compaction.rs index 3553e4e7c0..d63af5cf95 100644 --- a/src/storage/src/compaction.rs +++ b/src/storage/src/compaction.rs @@ -112,9 +112,6 @@ pub fn compaction_strategy_to_picker( strategy: &CompactionStrategy, ) -> CompactionPickerRef { match strategy { - CompactionStrategy::LeveledTimeWindow => { - Arc::new(LeveledTimeWindowPicker::default()) as Arc<_> - } CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( twcs_opts.max_active_window_files, twcs_opts.max_inactive_window_files, diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 92cd6808db..8f72db19f1 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -28,7 +28,6 @@ use crate::storage::descriptors::RegionDescriptor; use crate::storage::region::Region; const COMPACTION_STRATEGY_KEY: &str = "compaction"; -const COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE: &str = "LTW"; const COMPACTION_STRATEGY_TWCS_VALUE: &str = "TWCS"; const TWCS_MAX_ACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_active_window_files"; const TWCS_TIME_WINDOW_SECONDS_KEY: &str = "compaction.twcs.time_window_seconds"; @@ -125,15 +124,18 @@ pub struct CloseOptions { } /// Options for compactions -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub enum CompactionStrategy { - /// Leveled time window compaction strategy - #[default] - LeveledTimeWindow, /// TWCS Twcs(TwcsOptions), } +impl Default for CompactionStrategy { + fn default() -> Self { + Self::Twcs(TwcsOptions::default()) + } +} + /// TWCS compaction options. #[derive(Debug, Clone)] pub struct TwcsOptions { @@ -160,9 +162,7 @@ impl From<&HashMap> for CompactionStrategy { let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else { return CompactionStrategy::default(); }; - if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_LEVELED_TIME_WINDOW_VALUE) { - CompactionStrategy::LeveledTimeWindow - } else if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) { + if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) { let mut twcs_opts = TwcsOptions::default(); if let Some(max_active_window_files) = opts .get(TWCS_MAX_ACTIVE_WINDOW_FILES_KEY)