feat: manual compaction (#3988)

* add compaction udf params

* wip: pass compaction options through grpc

* wip: pass compaction options all the way down to region server

* wip: window compaction task

* feat: trigger major compaction

* refactor: optimize compaction parameter parsing

* chore: rebase main

* chore: update proto

* chore: add some tests

* feat: validate catalog

* chore: fix typo and rebase main

* fix: some cr comments

* fix: file_time_bucket_span

* fix: avoid upper bound overflow

* chore: update proto
This commit is contained in:
Lei, HUANG
2024-05-22 17:42:21 +08:00
committed by GitHub
parent 9e1af79637
commit 090b59e8d6
21 changed files with 1410 additions and 541 deletions

3
Cargo.lock generated
View File

@@ -4233,7 +4233,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a11db14b8502f55ca5348917fd18e6fcf140f55e#a11db14b8502f55ca5348917fd18e6fcf140f55e"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3cd71167ee067c5679a7fb17cf58bdfbb5487a0d#3cd71167ee067c5679a7fb17cf58bdfbb5487a0d"
dependencies = [
"prost 0.12.4",
"serde",
@@ -10538,6 +10538,7 @@ dependencies = [
"datatypes",
"derive_builder 0.12.0",
"futures",
"greptime-proto",
"humantime",
"humantime-serde",
"parquet",

View File

@@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11db14b8502f55ca5348917fd18e6fcf140f55e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3cd71167ee067c5679a7fb17cf58bdfbb5487a0d" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -13,7 +13,9 @@
// limitations under the License.
use std::fmt;
use std::str::FromStr;
use api::v1::region::{compact_request, StrictWindow};
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
@@ -22,7 +24,7 @@ use common_query::error::{
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use common_telemetry::{error, info};
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
@@ -34,71 +36,78 @@ use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
macro_rules! define_table_function {
($name: expr, $display_name_str: expr, $display_name: ident, $func: ident, $request: ident) => {
/// A function to $func table, such as `$display_name(table_name)`.
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
pub(crate) async fn $display_name(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
/// Compact type: strict window.
const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";
let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: $display_name_str,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let (catalog_name, schema_name, table_name) =
table_name_to_full_name(table_name, &query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
let affected_rows = table_mutation_handler
.$func(
$request {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;
Ok(Value::from(affected_rows as u64))
#[admin_fn(
name = "FlushTableFunction",
display_name = "flush_table",
sig_fn = "flush_signature",
ret = "uint64"
)]
pub(crate) async fn flush_table(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_table",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
let affected_rows = table_mutation_handler
.flush(
FlushTableRequest {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;
Ok(Value::from(affected_rows as u64))
}
define_table_function!(
"FlushTableFunction",
"flush_table",
flush_table,
flush,
FlushTableRequest
);
#[admin_fn(
name = "CompactTableFunction",
display_name = "compact_table",
sig_fn = "compact_signature",
ret = "uint64"
)]
pub(crate) async fn compact_table(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let request = parse_compact_params(params, query_ctx)?;
info!("Compact table request: {:?}", request);
define_table_function!(
"CompactTableFunction",
"compact_table",
compact_table,
compact,
CompactTableRequest
);
let affected_rows = table_mutation_handler
.compact(request, query_ctx.clone())
.await?;
fn signature() -> Signature {
Ok(Value::from(affected_rows as u64))
}
fn flush_signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
@@ -106,12 +115,98 @@ fn signature() -> Signature {
)
}
fn compact_signature() -> Signature {
Signature::variadic(
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
/// Parses `compact_table` UDF parameters. This function accepts following combinations:
/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
fn parse_compact_params(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<CompactTableRequest> {
ensure!(
!params.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "Args cannot be empty",
}
);
let (table_name, compact_type) = match params {
[ValueRef::String(table_name)] => (
table_name,
compact_request::Options::Regular(Default::default()),
),
[ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => {
let compact_type = parse_compact_type(compact_ty_str, None)?;
(table_name, compact_type)
}
[ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str)] =>
{
let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
(table_name, compact_type)
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: "compact_table",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail()
}
};
let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
Ok(CompactTableRequest {
catalog_name,
schema_name,
table_name,
compact_options: compact_type,
})
}
fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW) {
let window_seconds = option
.map(|v| {
i64::from_str(v).map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!(
"Compact window is expected to be a valid number, provided: {}",
v
),
}
.build()
})
})
.transpose()?
.unwrap_or(0);
Ok(compact_request::Options::StrictWindow(StrictWindow {
window_seconds,
}))
} else {
Ok(compact_request::Options::Regular(Default::default()))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::region::compact_request::Options;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use session::context::QueryContext;
use super::*;
@@ -174,5 +269,109 @@ mod tests {
define_table_function_test!(flush_table, FlushTableFunction);
define_table_function_test!(compact_table, CompactTableFunction);
fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
for (params, expected) in cases {
let params = params
.iter()
.map(|s| ValueRef::String(s))
.collect::<Vec<_>>();
assert_eq!(
expected,
&parse_compact_params(&params, &QueryContext::arc()).unwrap()
);
}
}
#[test]
fn test_parse_compact_params() {
check_parse_compact_params(&[
(
&["table"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&[&format!(
"{}.{}.table",
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
)],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["table", "regular"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["table", "strict_window"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
},
),
(
&["table", "strict_window", "3600"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 3600,
}),
},
),
(
&["table", "regular", "abcd"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
]);
assert!(parse_compact_params(
&["table", "strict_window", "abc"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err());
assert!(parse_compact_params(
&["a.b.table", "strict_window", "abc"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err());
}
}

View File

@@ -219,6 +219,7 @@ impl RegionServerHandler for RegionServer {
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let tracing_context = TracingContext::from_current_span();
let results = if is_parallel {

View File

@@ -12,35 +12,54 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod buckets;
mod picker;
mod task;
#[cfg(test)]
mod test_util;
mod twcs;
mod window;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use api::v1::region::compact_request;
use common_telemetry::{debug, error};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use table::predicate::Predicate;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::compaction::window::WindowedCompactionPicker;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
TimeRangePredicateOverflowSnafu,
};
use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
use crate::read::BoxedBatchReader;
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileId, Level};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
/// Region compaction request.
@@ -116,9 +135,11 @@ impl CompactionScheduler {
}
/// Schedules a compaction for the region.
#[allow(clippy::too_many_arguments)]
pub(crate) fn schedule_compaction(
&mut self,
region_id: RegionId,
compact_options: compact_request::Options,
version_control: &VersionControlRef,
access_layer: &AccessLayerRef,
file_purger: &FilePurgerRef,
@@ -147,7 +168,7 @@ impl CompactionScheduler {
self.listener.clone(),
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request)
self.schedule_compaction_request(request, compact_options)
}
/// Notifies the scheduler that the compaction job is finished successfully.
@@ -159,6 +180,7 @@ impl CompactionScheduler {
let Some(status) = self.region_status.get_mut(&region_id) else {
return;
};
// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
self.request_sender.clone(),
@@ -169,7 +191,10 @@ impl CompactionScheduler {
self.listener.clone(),
);
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(request) {
if let Err(e) = self.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
) {
error!(e; "Failed to schedule next compaction for region {}", region_id);
}
}
@@ -210,8 +235,22 @@ impl CompactionScheduler {
/// Schedules a compaction request.
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> {
let picker = compaction_options_to_picker(&request.current_version.options.compaction);
fn schedule_compaction_request(
&mut self,
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
let picker = if let compact_request::Options::StrictWindow(window) = &options {
let window = if window.window_seconds == 0 {
None
} else {
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
compaction_options_to_picker(&request.current_version.options.compaction)
};
let region_id = request.region_id();
debug!(
"Pick compaction strategy {:?} for region: {}",
@@ -226,6 +265,7 @@ impl CompactionScheduler {
self.region_status.remove(&region_id);
return Ok(());
};
drop(pick_timer);
// Submit the compaction task.
@@ -334,6 +374,7 @@ impl CompactionStatus {
/// Creates a new compaction request for compaction picker.
///
/// It consumes all pending compaction waiters.
#[allow(clippy::too_many_arguments)]
fn new_compaction_request(
&mut self,
request_sender: Sender<WorkerRequest>,
@@ -368,6 +409,127 @@ impl CompactionStatus {
}
}
#[derive(Debug)]
pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
/// Whether to remove deletion markers.
pub filter_deleted: bool,
/// Compaction output time range.
pub output_time_range: Option<TimestampRange>,
}
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: Option<CacheManagerRef>,
inputs: &[FileHandle],
append_mode: bool,
filter_deleted: bool,
time_range: Option<TimestampRange>,
) -> Result<BoxedBatchReader> {
let mut scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?)
.with_files(inputs.to_vec())
.with_append_mode(append_mode)
.with_cache(cache)
.with_filter_deleted(filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.
if let Some(time_range) = time_range {
scan_input = scan_input.with_predicate(time_range_to_predicate(time_range, &metadata)?);
}
SeqScan::new(scan_input).build_reader().await
}
/// Converts time range to predicates so that rows outside the range will be filtered.
fn time_range_to_predicate(
range: TimestampRange,
metadata: &RegionMetadataRef,
) -> Result<Option<Predicate>> {
let ts_col = metadata.time_index_column();
// safety: time index column's type must be a valid timestamp type.
let ts_col_unit = ts_col
.column_schema
.data_type
.as_timestamp()
.unwrap()
.unit();
let exprs = match (range.start(), range.end()) {
(Some(start), Some(end)) => {
vec![
datafusion_expr::col(ts_col.column_schema.name.clone())
.gt_eq(ts_to_lit(*start, ts_col_unit)?),
datafusion_expr::col(ts_col.column_schema.name.clone())
.lt(ts_to_lit(*end, ts_col_unit)?),
]
}
(Some(start), None) => {
vec![datafusion_expr::col(ts_col.column_schema.name.clone())
.gt_eq(ts_to_lit(*start, ts_col_unit)?)]
}
(None, Some(end)) => {
vec![datafusion_expr::col(ts_col.column_schema.name.clone())
.lt(ts_to_lit(*end, ts_col_unit)?)]
}
(None, None) => {
return Ok(None);
}
};
Ok(Some(Predicate::new(exprs)))
}
fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
let ts = ts
.convert_to(ts_col_unit)
.context(TimeRangePredicateOverflowSnafu {
timestamp: ts,
unit: ts_col_unit,
})?;
let val = ts.value();
let scalar_value = match ts_col_unit {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
};
Ok(datafusion_expr::lit(scalar_value))
}
/// Finds all expired SSTs across levels.
fn get_expired_ssts(
levels: &[LevelMeta],
ttl: Option<Duration>,
now: Timestamp,
) -> Vec<FileHandle> {
let Some(ttl) = ttl else {
return vec![];
};
let expire_time = match now.sub_duration(ttl) {
Ok(expire_time) => expire_time,
Err(e) => {
error!(e; "Failed to calculate region TTL expire time");
return vec![];
}
};
levels
.iter()
.flat_map(|l| l.get_expired_files(&expire_time).into_iter())
.collect()
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
@@ -397,6 +559,7 @@ mod tests {
scheduler
.schedule_compaction(
builder.region_id(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
@@ -415,6 +578,7 @@ mod tests {
scheduler
.schedule_compaction(
builder.region_id(),
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
@@ -477,6 +641,7 @@ mod tests {
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
@@ -505,6 +670,7 @@ mod tests {
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,
@@ -536,6 +702,7 @@ mod tests {
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
&purger,

View File

@@ -0,0 +1,126 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use crate::sst::file::FileHandle;
/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>) -> i64 {
let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second);
let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second);
for f in files {
let (start, end) = f.time_range();
min_ts = min_ts.min(start);
max_ts = max_ts.max(end);
}
// safety: Convert whatever timestamp into seconds will not cause overflow.
let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();
max_sec
.checked_sub(min_sec)
.map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow.
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}
pub(crate) struct TimeBuckets([i64; 7]);
impl TimeBuckets {
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
/// Returns the max bucket if no such bucket can be found.
fn fit_time_bucket(&self, span_sec: i64) -> i64 {
assert!(span_sec >= 0);
match self.0.binary_search(&span_sec) {
Ok(idx) => self.0[idx],
Err(idx) => {
if idx < self.0.len() {
self.0[idx]
} else {
self.0.last().copied().unwrap()
}
}
}
}
#[cfg(test)]
fn get(&self, idx: usize) -> i64 {
self.0[idx]
}
fn max(&self) -> i64 {
self.0.last().copied().unwrap()
}
}
/// A set of predefined time buckets.
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
60 * 60, // one hour
2 * 60 * 60, // two hours
12 * 60 * 60, // twelve hours
24 * 60 * 60, // one day
7 * 24 * 60 * 60, // one week
365 * 24 * 60 * 60, // one year
10 * 365 * 24 * 60 * 60, // ten years
]);
#[cfg(test)]
mod tests {
use super::*;
use crate::compaction::test_util::new_file_handle;
use crate::sst::file::FileId;
#[test]
fn test_time_bucket() {
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1));
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60));
assert_eq!(
TIME_BUCKETS.get(1),
TIME_BUCKETS.fit_time_bucket(60 * 60 + 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2))
);
assert_eq!(
TIME_BUCKETS.get(3),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
);
assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
}
#[test]
fn test_infer_time_buckets() {
assert_eq!(
TIME_BUCKETS.get(0),
infer_time_bucket(
[
new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0),
new_file_handle(FileId::random(), 1, 10_000, 0)
]
.iter()
)
);
}
}

View File

@@ -0,0 +1,318 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
use common_telemetry::{error, info};
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::picker::CompactionTask;
use crate::compaction::{build_sst_reader, CompactionOutput};
use crate::config::MitoConfig;
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionState};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileMeta, IndexType};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;
const MAX_PARALLEL_COMPACTION: usize = 8;
pub(crate) struct CompactionTaskImpl {
pub engine_config: Arc<MitoConfig>,
pub region_id: RegionId,
pub metadata: RegionMetadataRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub compaction_time_window: Option<i64>,
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
pub waiters: Vec<OutputTx>,
/// Start time of compaction task
pub start_time: Instant,
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option<String>,
/// Index options of the region.
pub(crate) index_options: IndexOptions,
/// The region is using append mode.
pub(crate) append_mode: bool,
/// Manifest context.
pub(crate) manifest_ctx: ManifestContextRef,
/// Version control to update.
pub(crate) version_control: VersionControlRef,
/// Event listener.
pub(crate) listener: WorkerListener,
}
impl Debug for CompactionTaskImpl {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TwcsCompactionTask")
.field("region_id", &self.region_id)
.field("outputs", &self.outputs)
.field("expired_ssts", &self.expired_ssts)
.field("compaction_time_window", &self.compaction_time_window)
.field("append_mode", &self.append_mode)
.finish()
}
}
impl Drop for CompactionTaskImpl {
fn drop(&mut self) {
self.mark_files_compacting(false)
}
}
impl CompactionTaskImpl {
fn mark_files_compacting(&self, compacting: bool) {
self.outputs
.iter()
.flat_map(|o| o.inputs.iter())
.for_each(|f| f.set_compacting(compacting))
}
/// Merges all SST files.
/// Returns `(output files, input files)`.
async fn merge_ssts(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
let mut futs = Vec::with_capacity(self.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum());
for output in self.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
info!(
"Compaction region {} output [{}]-> {}",
self.region_id,
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.collect::<Vec<_>>()
.join(","),
output.output_file_id
);
let write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
..Default::default()
};
let create_inverted_index = self
.engine_config
.inverted_index
.create_on_compaction
.auto();
let mem_threshold_index_create = self
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
self.engine_config
.inverted_index
.write_buffer_size
.as_bytes() as usize,
);
let metadata = self.metadata.clone();
let sst_layer = self.sst_layer.clone();
let region_id = self.region_id;
let file_id = output.output_file_id;
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
let index_options = self.index_options.clone();
let append_mode = self.append_mode;
futs.push(async move {
let reader = build_sst_reader(
metadata.clone(),
sst_layer.clone(),
Some(cache_manager.clone()),
&output.inputs,
append_mode,
output.filter_deleted,
output.output_time_range,
)
.await?;
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
file_id,
metadata,
source: Source::Reader(reader),
cache_manager,
storage,
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options,
},
&write_opts,
)
.await?
.map(|sst_info| FileMeta {
region_id,
file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
});
Ok(file_meta_opt)
});
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION);
for _ in 0..MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_bg(task));
}
}
let metas = futures::future::try_join_all(task_chunk)
.await
.context(error::JoinSnafu)?
.into_iter()
.collect::<error::Result<Vec<_>>>()?;
output_files.extend(metas.into_iter().flatten());
}
let inputs = compacted_inputs.into_iter().collect();
Ok((output_files, inputs))
}
async fn handle_compaction(&mut self) -> error::Result<()> {
self.mark_files_compacting(true);
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
.start_timer();
let (added, mut deleted) = match self.merge_ssts().await {
Ok(v) => v,
Err(e) => {
error!(e; "Failed to compact region: {}", self.region_id);
merge_timer.stop_and_discard();
return Err(e);
}
};
deleted.extend(self.expired_ssts.iter().map(FileHandle::meta));
let merge_time = merge_timer.stop_and_record();
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
self.region_id,
deleted,
added,
self.compaction_time_window,
self.waiters.len(),
merge_time,
);
self.listener.on_merge_ssts_finished(self.region_id).await;
let _manifest_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["write_manifest"])
.start_timer();
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: added,
files_to_remove: deleted,
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// We might leak files if we fail to update manifest. We can add a cleanup task to
// remove them later.
self.manifest_ctx
.update_manifest(RegionState::Writable, action_list, || {
self.version_control
.apply_edit(edit, &[], self.file_purger.clone());
})
.await
}
/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
COMPACTION_FAILURE_COUNT.inc();
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Notifies region worker to handle post-compaction tasks.
async fn send_to_worker(&self, request: WorkerRequest) {
if let Err(e) = self.request_sender.send(request).await {
error!(
"Failed to notify compaction job status for region {}, request: {:?}",
self.region_id, e.0
);
}
}
}
#[async_trait::async_trait]
impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
}),
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.region_id);
let err = Arc::new(e);
// notify compaction waiters
self.on_failure(err.clone());
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: self.region_id,
err,
})
}
};
self.send_to_worker(WorkerRequest::Background {
region_id: self.region_id,
notify,
})
.await;
}
}

View File

@@ -15,44 +15,18 @@
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::config::MitoConfig;
use crate::error::{self, CompactRegionSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::region::options::IndexOptions;
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionState};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::sst::file::{FileHandle, FileId, FileMeta, IndexType, Level};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::compaction::task::CompactionTaskImpl;
use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest};
use crate::sst::file::{FileHandle, FileId};
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
const MAX_PARALLEL_COMPACTION: usize = 8;
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
/// candidates.
@@ -107,6 +81,7 @@ impl TwcsPicker {
output_level: 1, // we only have two levels and always compact to l1
inputs: files_in_window.clone(),
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
} else {
debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window);
@@ -119,6 +94,7 @@ impl TwcsPicker {
output_level: 1,
inputs: files_in_window.clone(),
filter_deleted,
output_time_range: None,
});
} else {
debug!(
@@ -147,6 +123,7 @@ impl Picker for TwcsPicker {
manifest_ctx,
version_control,
listener,
..
} = req;
let region_metadata = current_version.metadata.clone();
@@ -188,7 +165,7 @@ impl Picker for TwcsPicker {
}
return None;
}
let task = TwcsCompactionTask {
let task = CompactionTaskImpl {
engine_config,
region_id,
metadata: region_metadata,
@@ -329,393 +306,6 @@ fn find_latest_window_in_seconds<'a>(
.and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size))
}
pub(crate) struct TwcsCompactionTask {
pub engine_config: Arc<MitoConfig>,
pub region_id: RegionId,
pub metadata: RegionMetadataRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub compaction_time_window: Option<i64>,
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
pub waiters: Vec<OutputTx>,
/// Start time of compaction task
pub start_time: Instant,
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option<String>,
/// Index options of the region.
pub(crate) index_options: IndexOptions,
/// The region is using append mode.
pub(crate) append_mode: bool,
/// Manifest context.
pub(crate) manifest_ctx: ManifestContextRef,
/// Version control to update.
pub(crate) version_control: VersionControlRef,
/// Event listener.
pub(crate) listener: WorkerListener,
}
impl Debug for TwcsCompactionTask {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TwcsCompactionTask")
.field("region_id", &self.region_id)
.field("outputs", &self.outputs)
.field("expired_ssts", &self.expired_ssts)
.field("compaction_time_window", &self.compaction_time_window)
.field("append_mode", &self.append_mode)
.finish()
}
}
impl Drop for TwcsCompactionTask {
fn drop(&mut self) {
self.mark_files_compacting(false)
}
}
impl TwcsCompactionTask {
fn mark_files_compacting(&self, compacting: bool) {
self.outputs
.iter()
.flat_map(|o| o.inputs.iter())
.for_each(|f| f.set_compacting(compacting))
}
/// Merges all SST files.
/// Returns `(output files, input files)`.
async fn merge_ssts(&mut self) -> error::Result<(Vec<FileMeta>, Vec<FileMeta>)> {
let mut futs = Vec::with_capacity(self.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum());
for output in self.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
info!(
"Compaction region {}. Input [{}] -> output {}",
self.region_id,
output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.collect::<Vec<_>>()
.join(","),
output.output_file_id
);
let write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
..Default::default()
};
let create_inverted_index = self
.engine_config
.inverted_index
.create_on_compaction
.auto();
let mem_threshold_index_create = self
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
self.engine_config
.inverted_index
.write_buffer_size
.as_bytes() as usize,
);
let metadata = self.metadata.clone();
let sst_layer = self.sst_layer.clone();
let region_id = self.region_id;
let file_id = output.output_file_id;
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
let index_options = self.index_options.clone();
let append_mode = self.append_mode;
futs.push(async move {
let reader = build_sst_reader(
metadata.clone(),
sst_layer.clone(),
Some(cache_manager.clone()),
&output.inputs,
append_mode,
output.filter_deleted,
)
.await?;
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
file_id,
metadata,
source: Source::Reader(reader),
cache_manager,
storage,
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options,
},
&write_opts,
)
.await?
.map(|sst_info| FileMeta {
region_id,
file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
});
Ok(file_meta_opt)
});
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION);
for _ in 0..MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_bg(task));
}
}
let metas = futures::future::try_join_all(task_chunk)
.await
.context(error::JoinSnafu)?
.into_iter()
.collect::<error::Result<Vec<_>>>()?;
output_files.extend(metas.into_iter().flatten());
}
let inputs = compacted_inputs.into_iter().collect();
Ok((output_files, inputs))
}
async fn handle_compaction(&mut self) -> error::Result<()> {
self.mark_files_compacting(true);
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
.start_timer();
let (added, mut deleted) = match self.merge_ssts().await {
Ok(v) => v,
Err(e) => {
error!(e; "Failed to compact region: {}", self.region_id);
merge_timer.stop_and_discard();
return Err(e);
}
};
deleted.extend(self.expired_ssts.iter().map(FileHandle::meta));
let merge_time = merge_timer.stop_and_record();
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
self.region_id,
deleted,
added,
self.compaction_time_window,
self.waiters.len(),
merge_time,
);
self.listener.on_merge_ssts_finished(self.region_id).await;
let _manifest_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["write_manifest"])
.start_timer();
// Write region edit to manifest.
let edit = RegionEdit {
files_to_add: added,
files_to_remove: deleted,
compaction_time_window: self
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
// We might leak files if we fail to update manifest. We can add a cleanup task to
// remove them later.
self.manifest_ctx
.update_manifest(RegionState::Writable, action_list, || {
self.version_control
.apply_edit(edit, &[], self.file_purger.clone());
})
.await
}
/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
COMPACTION_FAILURE_COUNT.inc();
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
/// Notifies region worker to handle post-compaction tasks.
async fn send_to_worker(&self, request: WorkerRequest) {
if let Err(e) = self.request_sender.send(request).await {
error!(
"Failed to notify compaction job status for region {}, request: {:?}",
self.region_id, e.0
);
}
}
}
#[async_trait::async_trait]
impl CompactionTask for TwcsCompactionTask {
async fn run(&mut self) {
let notify = match self.handle_compaction().await {
Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: self.start_time,
}),
Err(e) => {
error!(e; "Failed to compact region, region id: {}", self.region_id);
let err = Arc::new(e);
// notify compaction waiters
self.on_failure(err.clone());
BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: self.region_id,
err,
})
}
};
self.send_to_worker(WorkerRequest::Background {
region_id: self.region_id,
notify,
})
.await;
}
}
/// Infers the suitable time bucket duration.
/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span
/// into time bucket.
pub(crate) fn infer_time_bucket<'a>(files: impl Iterator<Item = &'a FileHandle>) -> i64 {
let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second);
let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second);
for f in files {
let (start, end) = f.time_range();
min_ts = min_ts.min(start);
max_ts = max_ts.max(end);
}
// safety: Convert whatever timestamp into seconds will not cause overflow.
let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value();
let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value();
max_sec
.checked_sub(min_sec)
.map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow.
.unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty.
}
pub(crate) struct TimeBuckets([i64; 7]);
impl TimeBuckets {
/// Fits a given time span into time bucket by find the minimum bucket that can cover the span.
/// Returns the max bucket if no such bucket can be found.
fn fit_time_bucket(&self, span_sec: i64) -> i64 {
assert!(span_sec >= 0);
match self.0.binary_search(&span_sec) {
Ok(idx) => self.0[idx],
Err(idx) => {
if idx < self.0.len() {
self.0[idx]
} else {
self.0.last().copied().unwrap()
}
}
}
}
#[cfg(test)]
fn get(&self, idx: usize) -> i64 {
self.0[idx]
}
fn max(&self) -> i64 {
self.0.last().copied().unwrap()
}
}
/// A set of predefined time buckets.
pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([
60 * 60, // one hour
2 * 60 * 60, // two hours
12 * 60 * 60, // twelve hours
24 * 60 * 60, // one day
7 * 24 * 60 * 60, // one week
365 * 24 * 60 * 60, // one year
10 * 365 * 24 * 60 * 60, // ten years
]);
/// Finds all expired SSTs across levels.
fn get_expired_ssts(
levels: &[LevelMeta],
ttl: Option<Duration>,
now: Timestamp,
) -> Vec<FileHandle> {
let Some(ttl) = ttl else {
return vec![];
};
let expire_time = match now.sub_duration(ttl) {
Ok(expire_time) => expire_time,
Err(e) => {
error!(e; "Failed to calculate region TTL expire time");
return vec![];
}
};
levels
.iter()
.flat_map(|l| l.get_expired_files(&expire_time).into_iter())
.collect()
}
#[derive(Debug)]
pub(crate) struct CompactionOutput {
pub output_file_id: FileId,
/// Compaction output file level.
pub output_level: Level,
/// Compaction input files.
pub inputs: Vec<FileHandle>,
/// Whether to remove deletion markers.
pub filter_deleted: bool,
}
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: Option<CacheManagerRef>,
inputs: &[FileHandle],
append_mode: bool,
filter_deleted: bool,
) -> error::Result<BoxedBatchReader> {
let scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?)
.with_files(inputs.to_vec())
.with_cache(cache)
.with_append_mode(append_mode)
.with_filter_deleted(filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true);
SeqScan::new(scan_input).build_reader().await
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
@@ -1017,43 +607,5 @@ mod tests {
.check();
}
#[test]
fn test_time_bucket() {
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1));
assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60));
assert_eq!(
TIME_BUCKETS.get(1),
TIME_BUCKETS.fit_time_bucket(60 * 60 + 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1)
);
assert_eq!(
TIME_BUCKETS.get(2),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2))
);
assert_eq!(
TIME_BUCKETS.get(3),
TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1)
);
assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX));
}
#[test]
fn test_infer_time_buckets() {
assert_eq!(
TIME_BUCKETS.get(0),
infer_time_bucket(
[
new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0),
new_file_handle(FileId::random(), 1, 10_000, 0)
]
.iter()
)
);
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}

View File

@@ -0,0 +1,420 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::Debug;
use common_telemetry::info;
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest};
use crate::region::version::VersionRef;
use crate::sst::file::{FileHandle, FileId};
/// Compaction picker that splits the time range of all involved files to windows, and merges
/// the data segments intersects with those windows of files together so that the output files
/// never overlaps.
#[derive(Debug)]
pub struct WindowedCompactionPicker {
compaction_time_window_seconds: Option<i64>,
}
impl WindowedCompactionPicker {
pub fn new(window_seconds: Option<i64>) -> Self {
Self {
compaction_time_window_seconds: window_seconds,
}
}
// Computes compaction time window. First we respect user specified parameter, then
// use persisted window. If persist window is not present, we check the time window
// provided while creating table. If all of those are absent, we infer the window
// from files in level0.
fn calculate_time_window(&self, region_id: RegionId, current_version: &VersionRef) -> i64 {
self.compaction_time_window_seconds
.or(current_version
.compaction_time_window
.map(|t| t.as_secs() as i64))
.unwrap_or_else(|| {
let levels = current_version.ssts.levels();
let inferred = infer_time_bucket(levels[0].files());
info!(
"Compaction window for region {} is not present, inferring from files: {:?}",
region_id, inferred
);
inferred
})
}
fn pick_inner(
&self,
region_id: RegionId,
current_version: &VersionRef,
current_time: Timestamp,
) -> (Vec<CompactionOutput>, Vec<FileHandle>, i64) {
let time_window = self.calculate_time_window(region_id, current_version);
info!(
"Compaction window for region: {} is {} seconds",
region_id, time_window
);
let expired_ssts = get_expired_ssts(
current_version.ssts.levels(),
current_version.options.ttl,
current_time,
);
if !expired_ssts.is_empty() {
info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
// here we mark expired SSTs as compacting to avoid them being picked.
expired_ssts.iter().for_each(|f| f.set_compacting(true));
}
let windows = assign_files_to_time_windows(
time_window,
current_version
.ssts
.levels()
.iter()
.flat_map(|level| level.files.values()),
);
(build_output(windows), expired_ssts, time_window)
}
}
impl Picker for WindowedCompactionPicker {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>> {
let region_id = req.region_id();
let CompactionRequest {
engine_config,
current_version,
access_layer,
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
manifest_ctx,
version_control,
listener,
} = req;
let (outputs, expired_ssts, time_window) =
self.pick_inner(region_id, &current_version, Timestamp::current_millis());
let task = CompactionTaskImpl {
engine_config: engine_config.clone(),
region_id,
metadata: current_version.metadata.clone().clone(),
sst_layer: access_layer.clone(),
outputs,
expired_ssts,
compaction_time_window: Some(time_window),
request_sender,
waiters,
file_purger,
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
append_mode: current_version.options.append_mode,
manifest_ctx,
version_control,
listener,
};
Some(Box::new(task))
}
}
fn build_output(windows: BTreeMap<i64, (i64, Vec<FileHandle>)>) -> Vec<CompactionOutput> {
let mut outputs = Vec::with_capacity(windows.len());
for (lower_bound, (upper_bound, files)) in windows {
// safety: the upper bound must > lower bound.
let output_time_range = Some(
TimestampRange::new(
Timestamp::new_second(lower_bound),
Timestamp::new_second(upper_bound),
)
.unwrap(),
);
let output = CompactionOutput {
output_file_id: FileId::random(),
output_level: 1,
inputs: files,
filter_deleted: false,
output_time_range,
};
outputs.push(output);
}
outputs
}
/// Assigns files to time windows. If file does not contain a time range in metadata, it will be
/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket)
/// so that all files without timestamp can be compacted together.
fn assign_files_to_time_windows<'a>(
bucket_sec: i64,
files: impl Iterator<Item = &'a FileHandle>,
) -> BTreeMap<i64, (i64, Vec<FileHandle>)> {
let mut buckets = BTreeMap::new();
for file in files {
if file.compacting() {
continue;
}
let (start, end) = file.time_range();
let bounds = file_time_bucket_span(
// safety: converting whatever timestamp to seconds will not overflow.
start.convert_to(TimeUnit::Second).unwrap().value(),
end.convert_to(TimeUnit::Second).unwrap().value(),
bucket_sec,
);
for (lower_bound, upper_bound) in bounds {
let (_, files) = buckets
.entry(lower_bound)
.or_insert_with(|| (upper_bound, Vec::new()));
files.push(file.clone());
}
}
buckets
}
/// Calculates timestamp span between start and end timestamp.
fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<(i64, i64)> {
assert!(start_sec <= end_sec);
// if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot
// be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow.
let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN);
let end_aligned = end_sec
.align_by_bucket(bucket_sec)
.unwrap_or(start_aligned + (end_sec - start_sec));
let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize);
while start_aligned <= end_aligned {
let window_size = if start_aligned % bucket_sec == 0 {
bucket_sec
} else {
(start_aligned % bucket_sec).abs()
};
let upper_bound = start_aligned.checked_add(window_size).unwrap_or(i64::MAX);
res.push((start_aligned, upper_bound));
start_aligned = upper_bound;
}
res
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use store_api::storage::RegionId;
use crate::compaction::window::{file_time_bucket_span, WindowedCompactionPicker};
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::version::MemtableVersion;
use crate::region::options::RegionOptions;
use crate::region::version::{Version, VersionRef};
use crate::sst::file::{FileId, FileMeta, Level};
use crate::sst::version::SstVersion;
use crate::test_util::memtable_util::metadata_for_test;
use crate::test_util::NoopFilePurger;
fn build_version(files: &[(FileId, i64, i64, Level)], ttl: Option<Duration>) -> VersionRef {
let metadata = metadata_for_test();
let memtables = Arc::new(MemtableVersion::new(Arc::new(TimePartitions::new(
metadata.clone(),
Arc::new(PartitionTreeMemtableBuilder::new(
PartitionTreeConfig::default(),
None,
)),
0,
None,
))));
let file_purger_ref = Arc::new(NoopFilePurger);
let mut ssts = SstVersion::new();
ssts.add_files(
file_purger_ref,
files.iter().map(|(file_id, start, end, level)| FileMeta {
file_id: *file_id,
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: *level,
..Default::default()
}),
);
Arc::new(Version {
metadata,
memtables,
ssts: Arc::new(ssts),
flushed_entry_id: 0,
flushed_sequence: 0,
truncated_entry_id: None,
compaction_time_window: None,
options: RegionOptions {
ttl,
compaction: Default::default(),
storage: None,
append_mode: false,
wal_options: Default::default(),
index_options: Default::default(),
memtable: None,
},
})
}
#[test]
fn test_pick_expired() {
let picker = WindowedCompactionPicker::new(None);
let files = vec![(FileId::random(), 0, 10, 0)];
let version = build_version(&files, Some(Duration::from_millis(1)));
let (outputs, expired_ssts, _window) = picker.pick_inner(
RegionId::new(0, 0),
&version,
Timestamp::new_millisecond(12),
);
assert!(outputs.is_empty());
assert_eq!(1, expired_ssts.len());
}
const HOUR: i64 = 60 * 60 * 1000;
#[test]
fn test_infer_window() {
let picker = WindowedCompactionPicker::new(None);
let files = vec![
(FileId::random(), 0, HOUR, 0),
(FileId::random(), HOUR, HOUR * 2 - 1, 0),
];
let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
RegionId::new(0, 0),
&version,
Timestamp::new_millisecond(HOUR * 2),
);
assert!(expired_ssts.is_empty());
assert_eq!(2 * HOUR / 1000, window_seconds);
assert_eq!(1, outputs.len());
assert_eq!(2, outputs[0].inputs.len());
}
#[test]
fn test_assign_files_to_windows() {
let picker = WindowedCompactionPicker::new(Some(HOUR / 1000));
let files = vec![
(FileId::random(), 0, 2 * HOUR - 1, 0),
(FileId::random(), HOUR, HOUR * 3 - 1, 0),
];
let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64)));
let (outputs, expired_ssts, window_seconds) = picker.pick_inner(
RegionId::new(0, 0),
&version,
Timestamp::new_millisecond(HOUR * 3),
);
assert!(expired_ssts.is_empty());
assert_eq!(HOUR / 1000, window_seconds);
assert_eq!(3, outputs.len());
assert_eq!(1, outputs[0].inputs.len());
assert_eq!(files[0].0, outputs[0].inputs[0].file_id());
assert_eq!(
TimestampRange::new(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(HOUR)
),
outputs[0].output_time_range
);
assert_eq!(2, outputs[1].inputs.len());
assert_eq!(
TimestampRange::new(
Timestamp::new_millisecond(HOUR),
Timestamp::new_millisecond(2 * HOUR)
),
outputs[1].output_time_range
);
assert_eq!(1, outputs[2].inputs.len());
assert_eq!(files[1].0, outputs[2].inputs[0].file_id());
assert_eq!(
TimestampRange::new(
Timestamp::new_millisecond(2 * HOUR),
Timestamp::new_millisecond(3 * HOUR)
),
outputs[2].output_time_range
);
}
#[test]
fn test_file_time_bucket_span() {
assert_eq!(
vec![(i64::MIN, i64::MIN + 8),],
file_time_bucket_span(i64::MIN, i64::MIN + 1, 10)
);
assert_eq!(
vec![(i64::MIN, i64::MIN + 8), (i64::MIN + 8, i64::MIN + 18)],
file_time_bucket_span(i64::MIN, i64::MIN + 8, 10)
);
assert_eq!(
vec![
(i64::MIN, i64::MIN + 8),
(i64::MIN + 8, i64::MIN + 18),
(i64::MIN + 18, i64::MIN + 28)
],
file_time_bucket_span(i64::MIN, i64::MIN + 20, 10)
);
assert_eq!(
vec![(-10, 0), (0, 10), (10, 20)],
file_time_bucket_span(-1, 11, 10)
);
assert_eq!(
vec![(-3, 0), (0, 3), (3, 6)],
file_time_bucket_span(-1, 3, 3)
);
assert_eq!(vec![(0, 10)], file_time_bucket_span(0, 9, 10));
assert_eq!(
vec![(i64::MAX - (i64::MAX % 10), i64::MAX)],
file_time_bucket_span(i64::MAX - 1, i64::MAX, 10)
);
}
}

View File

@@ -137,7 +137,10 @@ async fn test_append_mode_compaction() {
flush_region(&engine, region_id, None).await;
let output = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(output.affected_rows, 0);

View File

@@ -131,7 +131,10 @@ async fn test_compaction_region() {
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
let result = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
@@ -179,7 +182,10 @@ async fn test_compaction_region_with_overlapping() {
delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
let result = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
@@ -227,7 +233,10 @@ async fn test_compaction_region_with_overlapping_delete_all() {
delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800
let result = engine
.handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {}))
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);

View File

@@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::JoinError;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
@@ -695,6 +696,18 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Time range predicate overflows, timestamp: {:?}, target unit: {}",
timestamp,
unit
))]
TimeRangePredicateOverflow {
timestamp: Timestamp,
unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build time range filters for value: {:?}", timestamp))]
BuildTimeRangeFilter {
timestamp: Timestamp,
@@ -810,6 +823,7 @@ impl ErrorExt for Error {
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
}
}

View File

@@ -665,8 +665,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::Compact(_) => {
self.handle_compaction_request(ddl.region_id, ddl.sender);
DdlRequest::Compact(req) => {
self.handle_compaction_request(ddl.region_id, req, ddl.sender);
continue;
}
DdlRequest::Truncate(_) => {

View File

@@ -14,6 +14,7 @@
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
use crate::metrics::COMPACTION_REQUEST_COUNT;
@@ -25,6 +26,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) fn handle_compaction_request(
&mut self,
region_id: RegionId,
req: RegionCompactRequest,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
@@ -33,6 +35,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
COMPACTION_REQUEST_COUNT.inc();
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
req.options,
&region.version_control,
&region.access_layer,
&region.file_purger,

View File

@@ -16,6 +16,7 @@
use std::sync::Arc;
use api::v1::region::compact_request;
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
@@ -236,6 +237,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Schedules compaction.
if let Err(e) = self.compaction_scheduler.schedule_compaction(
region.region_id,
compact_request::Options::Regular(Default::default()),
&region.version_control,
&region.access_layer,
&region.file_purger,

View File

@@ -109,6 +109,7 @@ impl Requester {
.map(|partition| {
RegionRequestBody::Compact(CompactRequest {
region_id: partition.id.into(),
options: Some(request.compact_options.clone()),
})
})
.collect();
@@ -145,6 +146,7 @@ impl Requester {
) -> Result<AffectedRows> {
let request = RegionRequestBody::Compact(CompactRequest {
region_id: region_id.into(),
options: None, // todo(hl): maybe also support parameters in region compaction.
});
info!("Handle region manual compaction request: {region_id}");

View File

@@ -12,20 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ensure;
use sql::ast::ObjectName;
use sql::error::{InvalidSqlSnafu, Result};
use sql::error::{InvalidSqlSnafu, PermissionDeniedSnafu, Result};
use sql::parser::ParserContext;
use crate::QueryContextRef;
/// Parse table name into `(catalog, schema, table)` with query context.
/// Parse table name into `(catalog, schema, table)` with query context and validates
/// if catalog matches current catalog in query context.
pub fn table_name_to_full_name(
name: &str,
query_ctx: &QueryContextRef,
) -> Result<(String, String, String)> {
let obj_name = ParserContext::parse_table_name(name, query_ctx.sql_dialect())?;
table_idents_to_full_name(&obj_name, query_ctx)
let (catalog, schema, table) = table_idents_to_full_name(&obj_name, query_ctx)?;
// todo(hl): also check if schema matches when rbac is ready. https://github.com/GreptimeTeam/greptimedb/pull/3988/files#r1608687652
ensure!(
catalog == query_ctx.current_catalog(),
PermissionDeniedSnafu {
target: catalog,
current: query_ctx.current_catalog(),
}
);
Ok((catalog, schema, table))
}
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>`) to tuple.

View File

@@ -213,6 +213,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Permission denied while operating catalog {} from current catalog {}",
target,
current
))]
PermissionDenied {
target: String,
current: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -241,7 +253,8 @@ impl ErrorExt for Error {
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }
| InvalidCast { .. } => StatusCode::InvalidArguments,
| InvalidCast { .. }
| PermissionDenied { .. } => StatusCode::InvalidArguments,
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),

View File

@@ -18,9 +18,9 @@ use std::fmt;
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::region::{
alter_request, region_request, AlterRequest, AlterRequests, CloseRequest, CompactRequest,
CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest,
InsertRequests, OpenRequest, TruncateRequest,
alter_request, compact_request, region_request, AlterRequest, AlterRequests, CloseRequest,
CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{self, Rows, SemanticType};
pub use common_base::AffectedRows;
@@ -199,9 +199,12 @@ fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest
fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = compact.region_id.into();
let options = compact
.options
.unwrap_or(compact_request::Options::Regular(Default::default()));
Ok(vec![(
region_id,
RegionRequest::Compact(RegionCompactRequest {}),
RegionRequest::Compact(RegionCompactRequest { options }),
)])
}
@@ -642,7 +645,18 @@ pub struct RegionFlushRequest {
}
#[derive(Debug)]
pub struct RegionCompactRequest {}
pub struct RegionCompactRequest {
pub options: compact_request::Options,
}
impl Default for RegionCompactRequest {
fn default() -> Self {
Self {
// Default to regular compaction.
options: compact_request::Options::Regular(Default::default()),
}
}
}
/// Truncate region request.
#[derive(Debug)]

View File

@@ -29,6 +29,7 @@ datafusion-physical-expr.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
futures.workspace = true
greptime-proto.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
paste = "1.0"

View File

@@ -25,6 +25,7 @@ use common_time::range::TimestampRange;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::ColumnSchema;
use greptime_proto::v1::region::compact_request;
use serde::{Deserialize, Serialize};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY};
use store_api::mito_engine_options::is_mito_engine_option_key;
@@ -238,11 +239,23 @@ pub struct FlushTableRequest {
pub table_name: String,
}
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, PartialEq)]
pub struct CompactTableRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub compact_options: compact_request::Options,
}
impl Default for CompactTableRequest {
fn default() -> Self {
Self {
catalog_name: Default::default(),
schema_name: Default::default(),
table_name: Default::default(),
compact_options: compact_request::Options::Regular(Default::default()),
}
}
}
/// Truncate table request