From 3d1a4b56a4920c294277632e18586a48268474a5 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 26 Aug 2025 18:52:00 +0800 Subject: [PATCH] feat: add support for TWCS time window hints in insert operations (#6823) * feat: Add support for TWCS time window hints in insert operations Signed-off-by: WenyXu * feat: set system events table time window to 1d Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/client/src/inserter.rs | 14 +++- src/common/event-recorder/src/recorder.rs | 2 + src/frontend/src/events.rs | 7 +- src/meta-srv/src/metasrv/builder.rs | 10 ++- src/operator/src/insert.rs | 12 ++- tests-integration/src/grpc.rs | 98 ++++++++++++++++++++++- 6 files changed, 136 insertions(+), 7 deletions(-) diff --git a/src/client/src/inserter.rs b/src/client/src/inserter.rs index 38c0590c8e..41f17f6029 100644 --- a/src/client/src/inserter.rs +++ b/src/client/src/inserter.rs @@ -16,7 +16,7 @@ use std::time::Duration; use api::v1::RowInsertRequests; use humantime::format_duration; -use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; +use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY, TWCS_TIME_WINDOW}; use crate::error::Result; @@ -35,15 +35,23 @@ pub struct InsertOptions { pub ttl: Duration, /// Whether to use append mode for the insert. pub append_mode: bool, + /// Time window for twcs compaction. + pub twcs_compaction_time_window: Option, } impl InsertOptions { /// Converts the insert options to a list of key-value string hints. pub fn to_hints(&self) -> Vec<(&'static str, String)> { - vec![ + let mut hints = vec![ (TTL_KEY, format_duration(self.ttl).to_string()), (APPEND_MODE_KEY, self.append_mode.to_string()), - ] + ]; + + if let Some(time_window) = self.twcs_compaction_time_window { + hints.push((TWCS_TIME_WINDOW, format_duration(time_window).to_string())); + } + + hints } } diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs index b3575587aa..62d0733387 100644 --- a/src/common/event-recorder/src/recorder.rs +++ b/src/common/event-recorder/src/recorder.rs @@ -56,6 +56,8 @@ pub type EventRecorderRef = Arc; pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5); /// The default TTL(90 days) for the events table. const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90); +/// The default compaction time window for the events table. +pub const DEFAULT_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1); // The capacity of the tokio channel for transmitting events to background processor. const DEFAULT_CHANNEL_SIZE: usize = 2048; // The size of the buffer for batching events before flushing to event handler. diff --git a/src/frontend/src/events.rs b/src/frontend/src/events.rs index 23f4f71767..958a27a2e8 100644 --- a/src/frontend/src/events.rs +++ b/src/frontend/src/events.rs @@ -19,7 +19,10 @@ use client::inserter::{Context, InsertOptions, Inserter}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_event_recorder::error::{InsertEventsSnafu, Result}; -use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler}; +use common_event_recorder::{ + build_row_inserts_request, group_events_by_type, Event, EventHandler, + DEFAULT_COMPACTION_TIME_WINDOW, +}; use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE; use datafusion::common::HashMap; use operator::statement::{InserterImpl, StatementExecutorRef}; @@ -47,6 +50,7 @@ impl EventHandlerImpl { Some(InsertOptions { ttl: slow_query_ttl, append_mode: true, + twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW), }), )) as _, )]), @@ -55,6 +59,7 @@ impl EventHandlerImpl { Some(InsertOptions { ttl: global_ttl, append_mode: true, + twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW), }), )), } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index efb7a5ad4d..7e490ef827 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -15,12 +15,13 @@ use std::path::Path; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; use client::client_manager::NodeClients; use client::inserter::InsertOptions; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; -use common_event_recorder::{EventRecorderImpl, EventRecorderRef}; +use common_event_recorder::{EventRecorderImpl, EventRecorderRef, DEFAULT_COMPACTION_TIME_WINDOW}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; @@ -81,6 +82,9 @@ use crate::state::State; use crate::table_meta_alloc::MetasrvPeerAllocator; use crate::utils::insert_forwarder::InsertForwarder; +/// The time window for twcs compaction of the region stats table. +const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1); + // TODO(fys): try use derive_builder macro pub struct MetasrvBuilder { options: Option, @@ -205,6 +209,7 @@ impl MetasrvBuilder { Some(InsertOptions { ttl: options.event_recorder.ttl, append_mode: true, + twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW), }), )); // Builds the event recorder to record important events and persist them as the system table. @@ -465,6 +470,9 @@ impl MetasrvBuilder { Some(InsertOptions { ttl: options.stats_persistence.ttl, append_mode: true, + twcs_compaction_time_window: Some( + REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW, + ), }), )); diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index de27ee3673..8d6a11bdaf 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -52,7 +52,9 @@ use sql::statements::insert::Insert; use store_api::metric_engine_consts::{ LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, }; -use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY}; +use store_api::mito_engine_options::{ + APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TWCS_TIME_WINDOW, +}; use store_api::storage::{RegionId, TableId}; use table::metadata::TableInfo; use table::requests::{ @@ -1018,6 +1020,14 @@ pub fn fill_table_options_for_create( if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) { table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string()); } + if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) { + table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string()); + // We need to set the compaction type explicitly. + table_options.insert( + COMPACTION_TYPE.to_string(), + COMPACTION_TYPE_TWCS.to_string(), + ); + } } // Set append_mode to true for log table. // because log tables should keep rows with the same ts and tags. diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 5de87ea6eb..31d9e34bb6 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -42,6 +42,7 @@ async fn query_and_expect(instance: &Instance, sql: &str, expected: &str) { #[cfg(test)] mod test { use std::collections::HashMap; + use std::sync::Arc; use api::v1::column::Values; use api::v1::column_data_type_extension::TypeExt; @@ -60,17 +61,24 @@ mod test { use common_meta::rpc::router::region_distribution; use common_query::Output; use common_recordbatch::RecordBatches; + use common_test_util::recordbatch::check_output_stream; use frontend::instance::Instance; use query::parser::QueryLanguageParser; use query::query_engine::DefaultSerializer; + use rstest::rstest; + use rstest_reuse::apply; use servers::query_handler::grpc::GrpcQueryHandler; - use session::context::QueryContext; + use session::context::{QueryContext, QueryContextBuilder}; + use store_api::mito_engine_options::TWCS_TIME_WINDOW; use store_api::storage::RegionId; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use super::*; use crate::standalone::GreptimeDbStandaloneBuilder; use crate::tests; + use crate::tests::test_util::{ + both_instances_cases, distributed, execute_sql, standalone, MockInstance, + }; use crate::tests::MockDistributedInstance; #[tokio::test(flavor = "multi_thread")] @@ -1159,4 +1167,92 @@ CREATE TABLE {table_name} ( +---+------+---------------------+"; assert_eq!(recordbatches.pretty_print().unwrap(), expected); } + + #[apply(both_instances_cases)] + async fn test_extra_external_table_options(instance: Arc) { + common_telemetry::init_default_ut_logging(); + let frontend = instance.frontend(); + let instance = frontend.as_ref(); + + let insert = InsertRequest { + table_name: "auto_created_table".to_string(), + columns: vec![ + Column { + column_name: "a".to_string(), + values: Some(Values { + i32_values: vec![4, 6], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + Column { + column_name: "c".to_string(), + values: Some(Values { + string_values: vec![ + r#"{ "id": 1, "name": "Alice", "age": 30, "active": true }"# + .to_string(), + r#"{ "id": 2, "name": "Bob", "balance": 1234.56, "active": false }"# + .to_string(), + ], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Json as i32, + ..Default::default() + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + timestamp_millisecond_values: vec![ + 1672557975000, + 1672557976000, + 1672557977000, + ], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 3, + }; + let request = Request::Inserts(InsertRequests { + inserts: vec![insert], + }); + + let ctx = Arc::new( + QueryContextBuilder::default() + .set_extension(TWCS_TIME_WINDOW.to_string(), "1d".to_string()) + .build(), + ); + let output = GrpcQueryHandler::do_query(instance, request, ctx) + .await + .unwrap(); + assert!(matches!(output.data, OutputData::AffectedRows(3))); + + let output = execute_sql(&frontend, "show create table auto_created_table").await; + + let expected = r#"+--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| auto_created_table | CREATE TABLE IF NOT EXISTS "auto_created_table" ( | +| | "a" INT NULL, | +| | "c" JSON NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'compaction.twcs.time_window' = '1d', | +| | 'compaction.type' = 'twcs' | +| | ) | ++--------------------+---------------------------------------------------+"#; + check_output_stream(output.data, expected).await; + } }