mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 03:40:37 +00:00
fix(compaction): unify behavior of database compaction options with TTL (#7402)
* fix: fix dynamic compactiom option,unify behavior of database compaction options with TTL option Signed-off-by: AntiTopQuark <AntiTopQuark1350@outlook.com> * fix unit test Signed-off-by: AntiTopQuark <AntiTopQuark1350@outlook.com> * add debug log Signed-off-by: AntiTopQuark <AntiTopQuark1350@outlook.com> --------- Signed-off-by: AntiTopQuark <AntiTopQuark1350@outlook.com>
This commit is contained in:
@@ -62,7 +62,7 @@ use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::scan_region::{PredicateGroup, ScanInput};
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::region::options::{MergeMode, RegionOptions};
|
||||
use crate::region::version::VersionControlRef;
|
||||
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
|
||||
use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
|
||||
@@ -311,9 +311,24 @@ impl CompactionScheduler {
|
||||
request: CompactionRequest,
|
||||
options: compact_request::Options,
|
||||
) -> Result<()> {
|
||||
let region_id = request.region_id();
|
||||
let (dynamic_compaction_opts, ttl) = find_dynamic_options(
|
||||
region_id.table_id(),
|
||||
&request.current_version.options,
|
||||
&request.schema_metadata_manager,
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(e; "Failed to find dynamic options for region: {}", region_id);
|
||||
(
|
||||
request.current_version.options.compaction.clone(),
|
||||
request.current_version.options.ttl.unwrap_or_default(),
|
||||
)
|
||||
});
|
||||
|
||||
let picker = new_picker(
|
||||
&options,
|
||||
&request.current_version.options.compaction,
|
||||
&dynamic_compaction_opts,
|
||||
request.current_version.options.append_mode,
|
||||
Some(self.engine_config.max_background_compactions),
|
||||
);
|
||||
@@ -328,21 +343,10 @@ impl CompactionScheduler {
|
||||
cache_manager,
|
||||
manifest_ctx,
|
||||
listener,
|
||||
schema_metadata_manager,
|
||||
schema_metadata_manager: _,
|
||||
max_parallelism,
|
||||
} = request;
|
||||
|
||||
let ttl = find_ttl(
|
||||
region_id.table_id(),
|
||||
current_version.options.ttl,
|
||||
&schema_metadata_manager,
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(e; "Failed to get ttl for region: {}", region_id);
|
||||
TimeToLive::default()
|
||||
});
|
||||
|
||||
debug!(
|
||||
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
|
||||
picker, region_id, ttl
|
||||
@@ -351,7 +355,10 @@ impl CompactionScheduler {
|
||||
let compaction_region = CompactionRegion {
|
||||
region_id,
|
||||
current_version: current_version.clone(),
|
||||
region_options: current_version.options.clone(),
|
||||
region_options: RegionOptions {
|
||||
compaction: dynamic_compaction_opts.clone(),
|
||||
..current_version.options.clone()
|
||||
},
|
||||
engine_config: engine_config.clone(),
|
||||
region_metadata: current_version.metadata.clone(),
|
||||
cache_manager: cache_manager.clone(),
|
||||
@@ -382,7 +389,7 @@ impl CompactionScheduler {
|
||||
|
||||
// If specified to run compaction remotely, we schedule the compaction job remotely.
|
||||
// It will fall back to local compaction if there is no remote job scheduler.
|
||||
let waiters = if current_version.options.compaction.remote_compaction() {
|
||||
let waiters = if dynamic_compaction_opts.remote_compaction() {
|
||||
if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
|
||||
let remote_compaction_job = CompactionJob {
|
||||
compaction_region: compaction_region.clone(),
|
||||
@@ -411,7 +418,7 @@ impl CompactionScheduler {
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
if !current_version.options.compaction.fallback_to_local() {
|
||||
if !dynamic_compaction_opts.fallback_to_local() {
|
||||
error!(e; "Failed to schedule remote compaction job for region {}", region_id);
|
||||
return RemoteCompactionSnafu {
|
||||
region_id,
|
||||
@@ -494,29 +501,88 @@ impl Drop for CompactionScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds TTL of table by first examine table options then database options.
|
||||
async fn find_ttl(
|
||||
/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
|
||||
async fn find_dynamic_options(
|
||||
table_id: TableId,
|
||||
table_ttl: Option<TimeToLive>,
|
||||
region_options: &crate::region::options::RegionOptions,
|
||||
schema_metadata_manager: &SchemaMetadataManagerRef,
|
||||
) -> Result<TimeToLive> {
|
||||
// If table TTL is set, we use it.
|
||||
if let Some(table_ttl) = table_ttl {
|
||||
return Ok(table_ttl);
|
||||
) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
|
||||
if region_options.compaction_override && region_options.ttl.is_some() {
|
||||
debug!(
|
||||
"Use region options directly for table {}: compaction={:?}, ttl={:?}",
|
||||
table_id, region_options.compaction, region_options.ttl
|
||||
);
|
||||
return Ok((
|
||||
region_options.compaction.clone(),
|
||||
region_options.ttl.unwrap(),
|
||||
));
|
||||
}
|
||||
|
||||
let ttl = tokio::time::timeout(
|
||||
let db_options = tokio::time::timeout(
|
||||
crate::config::FETCH_OPTION_TIMEOUT,
|
||||
schema_metadata_manager.get_schema_options_by_table_id(table_id),
|
||||
)
|
||||
.await
|
||||
.context(TimeoutSnafu)?
|
||||
.context(GetSchemaMetadataSnafu)?
|
||||
.and_then(|options| options.ttl)
|
||||
.unwrap_or_default()
|
||||
.into();
|
||||
.context(GetSchemaMetadataSnafu)?;
|
||||
|
||||
Ok(ttl)
|
||||
let ttl = if region_options.ttl.is_some() {
|
||||
debug!(
|
||||
"Use region TTL directly for table {}: ttl={:?}",
|
||||
table_id, region_options.ttl
|
||||
);
|
||||
region_options.ttl.unwrap()
|
||||
} else {
|
||||
db_options
|
||||
.as_ref()
|
||||
.and_then(|options| options.ttl)
|
||||
.unwrap_or_default()
|
||||
.into()
|
||||
};
|
||||
|
||||
let compaction = if !region_options.compaction_override {
|
||||
if let Some(schema_opts) = db_options {
|
||||
let map: HashMap<String, String> = schema_opts
|
||||
.extra_options
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
if k.starts_with("compaction.") {
|
||||
Some((k.clone(), v.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if map.is_empty() {
|
||||
region_options.compaction.clone()
|
||||
} else {
|
||||
crate::region::options::RegionOptions::try_from(&map)
|
||||
.map(|o| o.compaction)
|
||||
.unwrap_or_else(|e| {
|
||||
error!(e; "Failed to create RegionOptions from map");
|
||||
region_options.compaction.clone()
|
||||
})
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"DB options is None for table {}, use region compaction: compaction={:?}",
|
||||
table_id, region_options.compaction
|
||||
);
|
||||
region_options.compaction.clone()
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
"No schema options for table {}, use region compaction: compaction={:?}",
|
||||
table_id, region_options.compaction
|
||||
);
|
||||
region_options.compaction.clone()
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
|
||||
table_id, compaction, ttl
|
||||
);
|
||||
Ok((compaction, ttl))
|
||||
}
|
||||
|
||||
/// Status of running and pending region compaction tasks.
|
||||
@@ -805,8 +871,12 @@ struct PendingCompaction {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::region::StrictWindow;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_meta::key::schema_name::SchemaNameValue;
|
||||
use common_time::DatabaseTimeToLive;
|
||||
use tokio::sync::{Barrier, oneshot};
|
||||
|
||||
use super::*;
|
||||
@@ -818,6 +888,163 @@ mod tests {
|
||||
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
|
||||
use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_find_compaction_options_db_level() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
let builder = VersionControlBuilder::new();
|
||||
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
|
||||
let region_id = builder.region_id();
|
||||
let table_id = region_id.table_id();
|
||||
// Register table without ttl but with db-level compaction options
|
||||
let mut schema_value = SchemaNameValue {
|
||||
ttl: Some(DatabaseTimeToLive::default()),
|
||||
..Default::default()
|
||||
};
|
||||
schema_value
|
||||
.extra_options
|
||||
.insert("compaction.type".to_string(), "twcs".to_string());
|
||||
schema_value
|
||||
.extra_options
|
||||
.insert("compaction.twcs.time_window".to_string(), "2h".to_string());
|
||||
schema_metadata_manager
|
||||
.register_region_table_info(
|
||||
table_id,
|
||||
"t",
|
||||
"c",
|
||||
"s",
|
||||
Some(schema_value),
|
||||
kv_backend.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let version_control = Arc::new(builder.build());
|
||||
let region_opts = version_control.current().version.options.clone();
|
||||
let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
|
||||
.await
|
||||
.unwrap();
|
||||
match opts {
|
||||
crate::region::options::CompactionOptions::Twcs(t) => {
|
||||
assert_eq!(t.time_window_seconds(), Some(2 * 3600));
|
||||
}
|
||||
}
|
||||
let manifest_ctx = env
|
||||
.mock_manifest_context(version_control.current().version.metadata.clone())
|
||||
.await;
|
||||
let (tx, _rx) = mpsc::channel(4);
|
||||
let mut scheduler = env.mock_compaction_scheduler(tx);
|
||||
let (otx, _orx) = oneshot::channel();
|
||||
let request = scheduler
|
||||
.region_status
|
||||
.entry(region_id)
|
||||
.or_insert_with(|| {
|
||||
crate::compaction::CompactionStatus::new(
|
||||
region_id,
|
||||
version_control.clone(),
|
||||
env.access_layer.clone(),
|
||||
)
|
||||
})
|
||||
.new_compaction_request(
|
||||
scheduler.request_sender.clone(),
|
||||
OptionOutputTx::new(Some(OutputTx::new(otx))),
|
||||
scheduler.engine_config.clone(),
|
||||
scheduler.cache_manager.clone(),
|
||||
&manifest_ctx,
|
||||
scheduler.listener.clone(),
|
||||
schema_metadata_manager.clone(),
|
||||
1,
|
||||
);
|
||||
scheduler
|
||||
.schedule_compaction_request(
|
||||
request,
|
||||
compact_request::Options::Regular(Default::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_find_compaction_options_priority() {
|
||||
fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
|
||||
let mut schema_value = SchemaNameValue {
|
||||
ttl: Some(DatabaseTimeToLive::default()),
|
||||
..Default::default()
|
||||
};
|
||||
schema_value
|
||||
.extra_options
|
||||
.insert("compaction.type".to_string(), "twcs".to_string());
|
||||
schema_value.extra_options.insert(
|
||||
"compaction.twcs.time_window".to_string(),
|
||||
time_window.to_string(),
|
||||
);
|
||||
schema_value
|
||||
}
|
||||
|
||||
let cases = [
|
||||
(
|
||||
"db options set and table override set",
|
||||
Some(schema_value_with_twcs("2h")),
|
||||
true,
|
||||
Some(Duration::from_secs(5 * 3600)),
|
||||
Some(5 * 3600),
|
||||
),
|
||||
(
|
||||
"db options set and table override not set",
|
||||
Some(schema_value_with_twcs("2h")),
|
||||
false,
|
||||
None,
|
||||
Some(2 * 3600),
|
||||
),
|
||||
(
|
||||
"db options not set and table override set",
|
||||
None,
|
||||
true,
|
||||
Some(Duration::from_secs(4 * 3600)),
|
||||
Some(4 * 3600),
|
||||
),
|
||||
(
|
||||
"db options not set and table override not set",
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
),
|
||||
];
|
||||
|
||||
for (case_name, schema_value, override_set, table_window, expected_window) in cases {
|
||||
let builder = VersionControlBuilder::new();
|
||||
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
|
||||
let table_id = builder.region_id().table_id();
|
||||
schema_metadata_manager
|
||||
.register_region_table_info(
|
||||
table_id,
|
||||
"t",
|
||||
"c",
|
||||
"s",
|
||||
schema_value,
|
||||
kv_backend.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let version_control = Arc::new(builder.build());
|
||||
let mut region_opts = version_control.current().version.options.clone();
|
||||
region_opts.compaction_override = override_set;
|
||||
if let Some(window) = table_window {
|
||||
let crate::region::options::CompactionOptions::Twcs(twcs) =
|
||||
&mut region_opts.compaction;
|
||||
twcs.time_window = Some(window);
|
||||
}
|
||||
|
||||
let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
|
||||
.await
|
||||
.unwrap();
|
||||
match opts {
|
||||
crate::region::options::CompactionOptions::Twcs(t) => {
|
||||
assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_empty() {
|
||||
let env = SchedulerEnv::new().await;
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::access_layer::{
|
||||
};
|
||||
use crate::cache::{CacheManager, CacheManagerRef};
|
||||
use crate::compaction::picker::{PickerOutput, new_picker};
|
||||
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_ttl};
|
||||
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
|
||||
@@ -203,16 +203,22 @@ pub async fn open_compaction_region(
|
||||
// Use the specified ttl.
|
||||
Either::Left(ttl) => ttl,
|
||||
// Get the ttl from the schema metadata manager.
|
||||
Either::Right(schema_metadata_manager) => find_ttl(
|
||||
req.region_id.table_id(),
|
||||
current_version.options.ttl,
|
||||
&schema_metadata_manager,
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
|
||||
TimeToLive::default()
|
||||
}),
|
||||
Either::Right(schema_metadata_manager) => {
|
||||
let (_, ttl) = find_dynamic_options(
|
||||
req.region_id.table_id(),
|
||||
&req.region_options,
|
||||
&schema_metadata_manager,
|
||||
)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
|
||||
(
|
||||
crate::region::options::CompactionOptions::default(),
|
||||
TimeToLive::default(),
|
||||
)
|
||||
});
|
||||
ttl
|
||||
}
|
||||
};
|
||||
|
||||
Ok(CompactionRegion {
|
||||
|
||||
@@ -244,6 +244,7 @@ mod tests {
|
||||
options: RegionOptions {
|
||||
ttl: ttl.map(|t| t.into()),
|
||||
compaction: Default::default(),
|
||||
compaction_override: false,
|
||||
storage: None,
|
||||
append_mode: false,
|
||||
wal_options: Default::default(),
|
||||
|
||||
@@ -29,6 +29,7 @@ use serde_json::Value;
|
||||
use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
|
||||
use snafu::{ResultExt, ensure};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::mito_engine_options::COMPACTION_OVERRIDE;
|
||||
use store_api::storage::ColumnId;
|
||||
use strum::EnumString;
|
||||
|
||||
@@ -62,6 +63,7 @@ pub struct RegionOptions {
|
||||
pub ttl: Option<TimeToLive>,
|
||||
/// Compaction options.
|
||||
pub compaction: CompactionOptions,
|
||||
pub compaction_override: bool,
|
||||
/// Custom storage. Uses default storage if it is `None`.
|
||||
pub storage: Option<String>,
|
||||
/// If append mode is enabled, the region keeps duplicate rows.
|
||||
@@ -125,7 +127,8 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
|
||||
// See https://github.com/serde-rs/serde/issues/1626
|
||||
let options: RegionOptionsWithoutEnum =
|
||||
serde_json::from_str(&json).context(JsonOptionsSnafu)?;
|
||||
let compaction = if validate_enum_options(options_map, "compaction.type")? {
|
||||
let has_compaction_type = validate_enum_options(options_map, "compaction.type")?;
|
||||
let compaction = if has_compaction_type {
|
||||
serde_json::from_str(&json).context(JsonOptionsSnafu)?
|
||||
} else {
|
||||
CompactionOptions::default()
|
||||
@@ -146,9 +149,16 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
|
||||
None
|
||||
};
|
||||
|
||||
let compaction_override_flag = options_map
|
||||
.get(COMPACTION_OVERRIDE)
|
||||
.map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
|
||||
.unwrap_or(false);
|
||||
let compaction_override = has_compaction_type || compaction_override_flag;
|
||||
|
||||
let opts = RegionOptions {
|
||||
ttl: options.ttl,
|
||||
compaction,
|
||||
compaction_override,
|
||||
storage: options.storage,
|
||||
append_mode: options.append_mode,
|
||||
wal_options,
|
||||
@@ -517,6 +527,7 @@ mod tests {
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
..Default::default()
|
||||
}),
|
||||
compaction_override: true,
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(expect, options);
|
||||
@@ -644,6 +655,7 @@ mod tests {
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}),
|
||||
compaction_override: true,
|
||||
storage: Some("S3".to_string()),
|
||||
append_mode: false,
|
||||
wal_options,
|
||||
@@ -676,6 +688,7 @@ mod tests {
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}),
|
||||
compaction_override: false,
|
||||
storage: Some("S3".to_string()),
|
||||
append_mode: false,
|
||||
wal_options: WalOptions::Kafka(KafkaWalOptions {
|
||||
@@ -740,6 +753,7 @@ mod tests {
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}),
|
||||
compaction_override: false,
|
||||
storage: Some("S3".to_string()),
|
||||
append_mode: false,
|
||||
wal_options: WalOptions::Kafka(KafkaWalOptions {
|
||||
|
||||
@@ -68,6 +68,7 @@ use sql::statements::{
|
||||
sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
|
||||
};
|
||||
use sql::util::extract_tables_from_query;
|
||||
use store_api::mito_engine_options::{COMPACTION_OVERRIDE, COMPACTION_TYPE};
|
||||
use table::requests::{FILE_TABLE_META_KEY, TableOptions};
|
||||
use table::table_reference::TableReference;
|
||||
#[cfg(feature = "enterprise")]
|
||||
@@ -216,6 +217,11 @@ pub fn create_to_expr(
|
||||
.context(UnrecognizedTableOptionSnafu)?,
|
||||
);
|
||||
|
||||
let mut table_options = table_options;
|
||||
if table_options.contains_key(COMPACTION_TYPE) {
|
||||
table_options.insert(COMPACTION_OVERRIDE.to_string(), "true".to_string());
|
||||
}
|
||||
|
||||
let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
|
||||
|
||||
let expr = CreateTableExpr {
|
||||
|
||||
@@ -117,10 +117,13 @@ impl StatementExecutor {
|
||||
.map(|v| v.into_inner());
|
||||
|
||||
let create_expr = &mut expr_helper::create_to_expr(&stmt, &ctx)?;
|
||||
// We don't put ttl into the table options
|
||||
// Because it will be used directly while compaction.
|
||||
// Don't inherit schema-level TTL/compaction options into table options:
|
||||
// TTL is applied during compaction, and `compaction.*` is handled separately.
|
||||
if let Some(schema_options) = schema_options {
|
||||
for (key, value) in schema_options.extra_options.iter() {
|
||||
if key.starts_with("compaction.") {
|
||||
continue;
|
||||
}
|
||||
create_expr
|
||||
.table_options
|
||||
.entry(key.clone())
|
||||
|
||||
@@ -56,11 +56,13 @@ fn create_sql_options(table_meta: &TableMeta, schema_options: Option<SchemaOptio
|
||||
if let Some(ttl) = table_opts.ttl.map(|t| t.to_string()) {
|
||||
options.insert(TTL_KEY.to_string(), ttl);
|
||||
} else if let Some(database_ttl) = schema_options
|
||||
.as_ref()
|
||||
.and_then(|o| o.ttl)
|
||||
.map(|ttl| ttl.to_string())
|
||||
{
|
||||
options.insert(TTL_KEY.to_string(), database_ttl);
|
||||
};
|
||||
|
||||
for (k, v) in table_opts
|
||||
.extra_options
|
||||
.iter()
|
||||
|
||||
@@ -27,6 +27,8 @@ pub const TTL_KEY: &str = "ttl";
|
||||
pub const SNAPSHOT_READ: &str = "snapshot_read";
|
||||
/// Option key for compaction type.
|
||||
pub const COMPACTION_TYPE: &str = "compaction.type";
|
||||
/// Option key for forcing compaction options override.
|
||||
pub const COMPACTION_OVERRIDE: &str = "compaction.override";
|
||||
/// TWCS compaction strategy.
|
||||
pub const COMPACTION_TYPE_TWCS: &str = "twcs";
|
||||
/// Option key for twcs min file num to trigger a compaction.
|
||||
@@ -61,6 +63,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
|
||||
[
|
||||
"ttl",
|
||||
COMPACTION_TYPE,
|
||||
COMPACTION_OVERRIDE,
|
||||
TWCS_TRIGGER_FILE_NUM,
|
||||
TWCS_MAX_OUTPUT_FILE_SIZE,
|
||||
TWCS_TIME_WINDOW,
|
||||
@@ -90,6 +93,7 @@ mod tests {
|
||||
fn test_is_mito_engine_option_key() {
|
||||
assert!(is_mito_engine_option_key("ttl"));
|
||||
assert!(is_mito_engine_option_key("compaction.type"));
|
||||
assert!(is_mito_engine_option_key("compaction.override"));
|
||||
assert!(is_mito_engine_option_key(
|
||||
"compaction.twcs.trigger_file_num"
|
||||
));
|
||||
|
||||
@@ -110,27 +110,25 @@ Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE test1;
|
||||
|
||||
+-------+-----------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+-----------------------------------------+
|
||||
| test1 | CREATE TABLE IF NOT EXISTS "test1" ( |
|
||||
| | "host" STRING NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | append_mode = 'false', |
|
||||
| | 'compaction.twcs.time_window' = '1h', |
|
||||
| | 'compaction.type' = 'twcs', |
|
||||
| | 'memtable.type' = 'partition_tree', |
|
||||
| | merge_mode = 'last_non_null', |
|
||||
| | skip_wal = 'true', |
|
||||
| | ttl = '1h' |
|
||||
| | ) |
|
||||
+-------+-----------------------------------------+
|
||||
+-------+---------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+---------------------------------------+
|
||||
| test1 | CREATE TABLE IF NOT EXISTS "test1" ( |
|
||||
| | "host" STRING NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | append_mode = 'false', |
|
||||
| | 'memtable.type' = 'partition_tree', |
|
||||
| | merge_mode = 'last_non_null', |
|
||||
| | skip_wal = 'true', |
|
||||
| | ttl = '1h' |
|
||||
| | ) |
|
||||
+-------+---------------------------------------+
|
||||
|
||||
CREATE TABLE test2(host STRING, cpu DOUBLE, ts TIMESTAMP TIME INDEX) WITH (
|
||||
'append_mode'='true',
|
||||
@@ -141,27 +139,25 @@ Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE test2;
|
||||
|
||||
+-------+-----------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+-----------------------------------------+
|
||||
| test2 | CREATE TABLE IF NOT EXISTS "test2" ( |
|
||||
| | "host" STRING NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | append_mode = 'true', |
|
||||
| | 'compaction.twcs.time_window' = '1h', |
|
||||
| | 'compaction.type' = 'twcs', |
|
||||
| | 'memtable.type' = 'partition_tree', |
|
||||
| | merge_mode = '', |
|
||||
| | skip_wal = 'false', |
|
||||
| | ttl = '1h' |
|
||||
| | ) |
|
||||
+-------+-----------------------------------------+
|
||||
+-------+---------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------+---------------------------------------+
|
||||
| test2 | CREATE TABLE IF NOT EXISTS "test2" ( |
|
||||
| | "host" STRING NULL, |
|
||||
| | "cpu" DOUBLE NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | append_mode = 'true', |
|
||||
| | 'memtable.type' = 'partition_tree', |
|
||||
| | merge_mode = '', |
|
||||
| | skip_wal = 'false', |
|
||||
| | ttl = '1h' |
|
||||
| | ) |
|
||||
+-------+---------------------------------------+
|
||||
|
||||
INSERT INTO test2 VALUES('host1', 1.0, '2023-10-01 00:00:00');
|
||||
|
||||
@@ -183,6 +179,166 @@ DROP DATABASE mydb;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
--- test compaction options----
|
||||
CREATE DATABASE test_compaction_opt;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
USE test_compaction_opt;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE DATABASE test_compaction_opt;
|
||||
|
||||
+---------------------+---------------------------------------------------+
|
||||
| Database | Create Database |
|
||||
+---------------------+---------------------------------------------------+
|
||||
| test_compaction_opt | CREATE DATABASE IF NOT EXISTS test_compaction_opt |
|
||||
+---------------------+---------------------------------------------------+
|
||||
|
||||
CREATE TABLE test_table(ts TIMESTAMP TIME INDEX, val INT);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
+------------+-------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+------------+-------------------------------------------+
|
||||
| test_table | CREATE TABLE IF NOT EXISTS "test_table" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" INT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+------------+-------------------------------------------+
|
||||
|
||||
ALTER DATABASE test_compaction_opt SET 'compaction.type' = 'twcs';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER DATABASE test_compaction_opt SET 'compaction.twcs.time_window' = '2h';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE DATABASE test_compaction_opt;
|
||||
|
||||
+---------------------+---------------------------------------------------+
|
||||
| Database | Create Database |
|
||||
+---------------------+---------------------------------------------------+
|
||||
| test_compaction_opt | CREATE DATABASE IF NOT EXISTS test_compaction_opt |
|
||||
| | WITH( |
|
||||
| | 'compaction.twcs.time_window' = '2h', |
|
||||
| | 'compaction.type' = 'twcs' |
|
||||
| | ) |
|
||||
+---------------------+---------------------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
+------------+-------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+------------+-------------------------------------------+
|
||||
| test_table | CREATE TABLE IF NOT EXISTS "test_table" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" INT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+------------+-------------------------------------------+
|
||||
|
||||
CREATE TABLE test_table2(ts TIMESTAMP TIME INDEX, val INT);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE test_table2;
|
||||
|
||||
+-------------+--------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------------+--------------------------------------------+
|
||||
| test_table2 | CREATE TABLE IF NOT EXISTS "test_table2" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "val" INT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+-------------+--------------------------------------------+
|
||||
|
||||
USE public;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP DATABASE test_compaction_opt;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE DATABASE test_compaction_opt2;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
USE test_compaction_opt2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE test_table(ts TIMESTAMP TIME INDEX, v INT) WITH ('compaction.type'='twcs','compaction.twcs.time_window'='1h');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
+------------+-------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+------------+-------------------------------------------+
|
||||
| test_table | CREATE TABLE IF NOT EXISTS "test_table" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "v" INT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | 'compaction.override' = 'true', |
|
||||
| | 'compaction.twcs.time_window' = '1h', |
|
||||
| | 'compaction.type' = 'twcs' |
|
||||
| | ) |
|
||||
+------------+-------------------------------------------+
|
||||
|
||||
ALTER DATABASE test_compaction_opt2 SET 'compaction.twcs.time_window' = '3h';
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
+------------+-------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+------------+-------------------------------------------+
|
||||
| test_table | CREATE TABLE IF NOT EXISTS "test_table" ( |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | "v" INT NULL, |
|
||||
| | TIME INDEX ("ts") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | 'compaction.override' = 'true', |
|
||||
| | 'compaction.twcs.time_window' = '1h', |
|
||||
| | 'compaction.type' = 'twcs' |
|
||||
| | ) |
|
||||
+------------+-------------------------------------------+
|
||||
|
||||
USE public;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP DATABASE test_compaction_opt2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW DATABASES;
|
||||
|
||||
+--------------------+
|
||||
|
||||
@@ -49,5 +49,48 @@ USE public;
|
||||
|
||||
DROP DATABASE mydb;
|
||||
|
||||
--- test compaction options----
|
||||
|
||||
CREATE DATABASE test_compaction_opt;
|
||||
|
||||
USE test_compaction_opt;
|
||||
|
||||
SHOW CREATE DATABASE test_compaction_opt;
|
||||
|
||||
CREATE TABLE test_table(ts TIMESTAMP TIME INDEX, val INT);
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
ALTER DATABASE test_compaction_opt SET 'compaction.type' = 'twcs';
|
||||
|
||||
ALTER DATABASE test_compaction_opt SET 'compaction.twcs.time_window' = '2h';
|
||||
|
||||
SHOW CREATE DATABASE test_compaction_opt;
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
CREATE TABLE test_table2(ts TIMESTAMP TIME INDEX, val INT);
|
||||
|
||||
SHOW CREATE TABLE test_table2;
|
||||
|
||||
USE public;
|
||||
|
||||
DROP DATABASE test_compaction_opt;
|
||||
|
||||
CREATE DATABASE test_compaction_opt2;
|
||||
|
||||
USE test_compaction_opt2;
|
||||
|
||||
CREATE TABLE test_table(ts TIMESTAMP TIME INDEX, v INT) WITH ('compaction.type'='twcs','compaction.twcs.time_window'='1h');
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
ALTER DATABASE test_compaction_opt2 SET 'compaction.twcs.time_window' = '3h';
|
||||
|
||||
SHOW CREATE TABLE test_table;
|
||||
|
||||
USE public;
|
||||
|
||||
DROP DATABASE test_compaction_opt2;
|
||||
|
||||
SHOW DATABASES;
|
||||
|
||||
Reference in New Issue
Block a user