feat: add support for TWCS time window hints in insert operations (#6823)

* feat: Add support for TWCS time window hints in insert operations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: set system events table time window to 1d

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-26 18:52:00 +08:00
committed by GitHub
parent 8894cb5406
commit 3d1a4b56a4
6 changed files with 136 additions and 7 deletions

View File

@@ -16,7 +16,7 @@ use std::time::Duration;
use api::v1::RowInsertRequests;
use humantime::format_duration;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY, TWCS_TIME_WINDOW};
use crate::error::Result;
@@ -35,15 +35,23 @@ pub struct InsertOptions {
pub ttl: Duration,
/// Whether to use append mode for the insert.
pub append_mode: bool,
/// Time window for twcs compaction.
pub twcs_compaction_time_window: Option<Duration>,
}
impl InsertOptions {
/// Converts the insert options to a list of key-value string hints.
pub fn to_hints(&self) -> Vec<(&'static str, String)> {
vec![
let mut hints = vec![
(TTL_KEY, format_duration(self.ttl).to_string()),
(APPEND_MODE_KEY, self.append_mode.to_string()),
]
];
if let Some(time_window) = self.twcs_compaction_time_window {
hints.push((TWCS_TIME_WINDOW, format_duration(time_window).to_string()));
}
hints
}
}

View File

@@ -56,6 +56,8 @@ pub type EventRecorderRef = Arc<dyn EventRecorder>;
pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
/// The default TTL(90 days) for the events table.
const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
/// The default compaction time window for the events table.
pub const DEFAULT_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
// The capacity of the tokio channel for transmitting events to background processor.
const DEFAULT_CHANNEL_SIZE: usize = 2048;
// The size of the buffer for batching events before flushing to event handler.

View File

@@ -19,7 +19,10 @@ use client::inserter::{Context, InsertOptions, Inserter};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_event_recorder::error::{InsertEventsSnafu, Result};
use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler};
use common_event_recorder::{
build_row_inserts_request, group_events_by_type, Event, EventHandler,
DEFAULT_COMPACTION_TIME_WINDOW,
};
use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE;
use datafusion::common::HashMap;
use operator::statement::{InserterImpl, StatementExecutorRef};
@@ -47,6 +50,7 @@ impl EventHandlerImpl {
Some(InsertOptions {
ttl: slow_query_ttl,
append_mode: true,
twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
}),
)) as _,
)]),
@@ -55,6 +59,7 @@ impl EventHandlerImpl {
Some(InsertOptions {
ttl: global_ttl,
append_mode: true,
twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
}),
)),
}

View File

@@ -15,12 +15,13 @@
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use client::client_manager::NodeClients;
use client::inserter::InsertOptions;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_event_recorder::{EventRecorderImpl, EventRecorderRef};
use common_event_recorder::{EventRecorderImpl, EventRecorderRef, DEFAULT_COMPACTION_TIME_WINDOW};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
@@ -81,6 +82,9 @@ use crate::state::State;
use crate::table_meta_alloc::MetasrvPeerAllocator;
use crate::utils::insert_forwarder::InsertForwarder;
/// The time window for twcs compaction of the region stats table.
const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
// TODO(fys): try use derive_builder macro
pub struct MetasrvBuilder {
options: Option<MetasrvOptions>,
@@ -205,6 +209,7 @@ impl MetasrvBuilder {
Some(InsertOptions {
ttl: options.event_recorder.ttl,
append_mode: true,
twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
}),
));
// Builds the event recorder to record important events and persist them as the system table.
@@ -465,6 +470,9 @@ impl MetasrvBuilder {
Some(InsertOptions {
ttl: options.stats_persistence.ttl,
append_mode: true,
twcs_compaction_time_window: Some(
REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
),
}),
));

View File

@@ -52,7 +52,9 @@ use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::mito_engine_options::{
APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TWCS_TIME_WINDOW,
};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo;
use table::requests::{
@@ -1018,6 +1020,14 @@ pub fn fill_table_options_for_create(
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
}
if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
// We need to set the compaction type explicitly.
table_options.insert(
COMPACTION_TYPE.to_string(),
COMPACTION_TYPE_TWCS.to_string(),
);
}
}
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.

View File

@@ -42,6 +42,7 @@ async fn query_and_expect(instance: &Instance, sql: &str, expected: &str) {
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::column::Values;
use api::v1::column_data_type_extension::TypeExt;
@@ -60,17 +61,24 @@ mod test {
use common_meta::rpc::router::region_distribution;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_test_util::recordbatch::check_output_stream;
use frontend::instance::Instance;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextBuilder};
use store_api::mito_engine_options::TWCS_TIME_WINDOW;
use store_api::storage::RegionId;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use super::*;
use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::tests;
use crate::tests::test_util::{
both_instances_cases, distributed, execute_sql, standalone, MockInstance,
};
use crate::tests::MockDistributedInstance;
#[tokio::test(flavor = "multi_thread")]
@@ -1159,4 +1167,92 @@ CREATE TABLE {table_name} (
+---+------+---------------------+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
#[apply(both_instances_cases)]
async fn test_extra_external_table_options(instance: Arc<dyn MockInstance>) {
common_telemetry::init_default_ut_logging();
let frontend = instance.frontend();
let instance = frontend.as_ref();
let insert = InsertRequest {
table_name: "auto_created_table".to_string(),
columns: vec![
Column {
column_name: "a".to_string(),
values: Some(Values {
i32_values: vec![4, 6],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Int32 as i32,
..Default::default()
},
Column {
column_name: "c".to_string(),
values: Some(Values {
string_values: vec![
r#"{ "id": 1, "name": "Alice", "age": 30, "active": true }"#
.to_string(),
r#"{ "id": 2, "name": "Bob", "balance": 1234.56, "active": false }"#
.to_string(),
],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Json as i32,
..Default::default()
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
timestamp_millisecond_values: vec![
1672557975000,
1672557976000,
1672557977000,
],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 3,
};
let request = Request::Inserts(InsertRequests {
inserts: vec![insert],
});
let ctx = Arc::new(
QueryContextBuilder::default()
.set_extension(TWCS_TIME_WINDOW.to_string(), "1d".to_string())
.build(),
);
let output = GrpcQueryHandler::do_query(instance, request, ctx)
.await
.unwrap();
assert!(matches!(output.data, OutputData::AffectedRows(3)));
let output = execute_sql(&frontend, "show create table auto_created_table").await;
let expected = r#"+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| auto_created_table | CREATE TABLE IF NOT EXISTS "auto_created_table" ( |
| | "a" INT NULL, |
| | "c" JSON NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.time_window' = '1d', |
| | 'compaction.type' = 'twcs' |
| | ) |
+--------------------+---------------------------------------------------+"#;
check_output_stream(output.data, expected).await;
}
}