diff --git a/Cargo.lock b/Cargo.lock index a03d3d19d0..be714e8d92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1666,6 +1666,7 @@ dependencies = [ "common-base", "common-catalog", "common-error", + "common-event-recorder", "common-frontend", "common-macro", "common-meta", @@ -2347,6 +2348,9 @@ dependencies = [ "common-macro", "common-telemetry", "common-time", + "humantime", + "humantime-serde", + "itertools 0.14.0", "serde", "serde_json", "snafu 0.8.5", @@ -2359,13 +2363,18 @@ dependencies = [ name = "common-frontend" version = "0.17.0" dependencies = [ + "api", "async-trait", "common-error", + "common-event-recorder", "common-grpc", "common-macro", "common-meta", + "common-time", "greptime-proto", + "humantime", "meta-client", + "serde", "session", "snafu 0.8.5", "tokio", @@ -4878,6 +4887,7 @@ dependencies = [ "common-config", "common-datasource", "common-error", + "common-event-recorder", "common-frontend", "common-function", "common-grpc", @@ -4898,6 +4908,7 @@ dependencies = [ "datanode", "datatypes", "futures", + "humantime", "humantime-serde", "lazy_static", "log-query", @@ -13069,6 +13080,7 @@ dependencies = [ "common-config", "common-error", "common-event-recorder", + "common-frontend", "common-grpc", "common-meta", "common-procedure", diff --git a/config/config.md b/config/config.md index 209bc48079..8f17a6cf61 100644 --- a/config/config.md +++ b/config/config.md @@ -305,7 +305,7 @@ | `slow_query.record_type` | String | `system_table` | The record type of slow queries. It can be `system_table` or `log`.
If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.
If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. | | `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. | | `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. | -| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. | +| `slow_query.ttl` | String | `90d` | The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`. | | `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | @@ -316,6 +316,8 @@ | `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `memory` | -- | -- | The memory options. | | `memory.enable_heap_profiling` | Bool | `true` | Whether to enable heap profiling activation during startup.
When enabled, heap profiling will be activated if the `MALLOC_CONF` environment variable
is set to "prof:true,prof_active:false". The official image adds this env variable.
Default is true. | +| `event_recorder` | -- | -- | Configuration options for the event recorder. | +| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. | ### Metasrv @@ -382,7 +384,7 @@ | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `event_recorder` | -- | -- | Configuration options for the event recorder. | -| `event_recorder.ttl` | String | `30d` | TTL for the events table that will be used to store the events. | +| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. | | `logging` | -- | -- | The logging options. | | `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index e71f5248fa..621eddc8a7 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -257,8 +257,8 @@ threshold = "30s" ## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. sample_ratio = 1.0 -## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. -ttl = "30d" +## The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`. +ttl = "90d" ## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. @@ -288,3 +288,8 @@ headers = { } ## is set to "prof:true,prof_active:false". The official image adds this env variable. ## Default is true. enable_heap_profiling = true + +## Configuration options for the event recorder. +[event_recorder] +## TTL for the events table that will be used to store the events. Default is `90d`. +ttl = "90d" diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 10f07c138d..ff3568da0b 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -242,8 +242,8 @@ create_topic_timeout = "30s" ## Configuration options for the event recorder. [event_recorder] -## TTL for the events table that will be used to store the events. -ttl = "30d" +## TTL for the events table that will be used to store the events. Default is `90d`. +ttl = "90d" ## The logging options. [logging] diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 602320696c..b09fce127d 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -21,6 +21,7 @@ bytes.workspace = true common-base.workspace = true common-catalog.workspace = true common-error.workspace = true +common-event-recorder.workspace = true common-frontend.workspace = true common-macro.workspace = true common-meta.workspace = true diff --git a/src/catalog/src/process_manager.rs b/src/catalog/src/process_manager.rs index 03f75fe2bd..4fe89b59e8 100644 --- a/src/catalog/src/process_manager.rs +++ b/src/catalog/src/process_manager.rs @@ -21,17 +21,17 @@ use std::time::{Duration, Instant, UNIX_EPOCH}; use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo}; use common_base::cancellation::CancellationHandle; +use common_event_recorder::EventRecorderRef; use common_frontend::selector::{FrontendSelector, MetaClientSelector}; use common_frontend::slow_query_event::SlowQueryEvent; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::logging::SlowQueriesRecordType; +use common_telemetry::{debug, info, slow, warn}; use common_time::util::current_time_millis; use meta_client::MetaClientRef; use promql_parser::parser::EvalStmt; use rand::random; -use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::statement::Statement; -use tokio::sync::mpsc::Sender; use crate::error; use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT}; @@ -249,6 +249,8 @@ pub struct Ticket { pub(crate) manager: ProcessManagerRef, pub(crate) id: ProcessId, pub cancellation_handle: Arc, + + // Keep the handle of the slow query timer to ensure it will trigger the event recording when dropped. _slow_query_timer: Option, } @@ -295,38 +297,37 @@ impl Debug for CancellableProcess { pub struct SlowQueryTimer { start: Instant, stmt: QueryStatement, - query_ctx: QueryContextRef, - threshold: Option, - sample_ratio: Option, - tx: Sender, + threshold: Duration, + sample_ratio: f64, + record_type: SlowQueriesRecordType, + recorder: EventRecorderRef, } impl SlowQueryTimer { pub fn new( stmt: QueryStatement, - query_ctx: QueryContextRef, - threshold: Option, - sample_ratio: Option, - tx: Sender, + threshold: Duration, + sample_ratio: f64, + record_type: SlowQueriesRecordType, + recorder: EventRecorderRef, ) -> Self { Self { start: Instant::now(), stmt, - query_ctx, threshold, sample_ratio, - tx, + record_type, + recorder, } } } impl SlowQueryTimer { - fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) { + fn send_slow_query_event(&self, elapsed: Duration) { let mut slow_query_event = SlowQueryEvent { cost: elapsed.as_millis() as u64, - threshold: threshold.as_millis() as u64, + threshold: self.threshold.as_millis() as u64, query: "".to_string(), - query_ctx: self.query_ctx.clone(), // The following fields are only used for PromQL queries. is_promql: false, @@ -363,29 +364,37 @@ impl SlowQueryTimer { } } - // Send SlowQueryEvent to the handler. - if let Err(e) = self.tx.try_send(slow_query_event) { - error!(e; "Failed to send slow query event"); + match self.record_type { + // Send the slow query event to the event recorder to persist it as the system table. + SlowQueriesRecordType::SystemTable => { + self.recorder.record(Box::new(slow_query_event)); + } + // Record the slow query in a specific logs file. + SlowQueriesRecordType::Log => { + slow!( + cost = slow_query_event.cost, + threshold = slow_query_event.threshold, + query = slow_query_event.query, + is_promql = slow_query_event.is_promql, + promql_range = slow_query_event.promql_range, + promql_step = slow_query_event.promql_step, + promql_start = slow_query_event.promql_start, + promql_end = slow_query_event.promql_end, + ); + } } } } impl Drop for SlowQueryTimer { fn drop(&mut self) { - if let Some(threshold) = self.threshold { - // Calculate the elaspsed duration since the timer is created. - let elapsed = self.start.elapsed(); - if elapsed > threshold { - if let Some(ratio) = self.sample_ratio { - // Only capture a portion of slow queries based on sample_ratio. - // Generate a random number in [0, 1) and compare it with sample_ratio. - if ratio >= 1.0 || random::() <= ratio { - self.send_slow_query_event(elapsed, threshold); - } - } else { - // Captures all slow queries if sample_ratio is not set. - self.send_slow_query_event(elapsed, threshold); - } + // Calculate the elaspsed duration since the timer is created. + let elapsed = self.start.elapsed(); + if elapsed > self.threshold { + // Only capture a portion of slow queries based on sample_ratio. + // Generate a random number in [0, 1) and compare it with sample_ratio. + if self.sample_ratio >= 1.0 || random::() <= self.sample_ratio { + self.send_slow_query_event(elapsed); } } } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 2c85f6a0bf..4ba504f37c 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -279,7 +279,7 @@ impl StartCommand { &opts.component.logging, &opts.component.tracing, opts.component.node_id.clone(), - opts.component.slow_query.as_ref(), + Some(&opts.component.slow_query), ); log_versions(verbose_version(), short_version(), APP_NAME); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b5abcd186e..f073ed9707 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -157,7 +157,7 @@ pub struct StandaloneOptions { pub init_regions_in_background: bool, pub init_regions_parallelism: usize, pub max_in_flight_write_bytes: Option, - pub slow_query: Option, + pub slow_query: SlowQueryOptions, pub query: QueryOptions, pub memory: MemoryOptions, } @@ -191,7 +191,7 @@ impl Default for StandaloneOptions { init_regions_in_background: false, init_regions_parallelism: 16, max_in_flight_write_bytes: None, - slow_query: Some(SlowQueryOptions::default()), + slow_query: SlowQueryOptions::default(), query: QueryOptions::default(), memory: MemoryOptions::default(), } @@ -486,7 +486,7 @@ impl StartCommand { &opts.component.logging, &opts.component.tracing, None, - opts.component.slow_query.as_ref(), + Some(&opts.component.slow_query), ); log_versions(verbose_version(), short_version(), APP_NAME); diff --git a/src/common/event-recorder/Cargo.toml b/src/common/event-recorder/Cargo.toml index 28cc9b54de..f9ca25c40c 100644 --- a/src/common/event-recorder/Cargo.toml +++ b/src/common/event-recorder/Cargo.toml @@ -12,6 +12,9 @@ common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true common-time.workspace = true +humantime.workspace = true +humantime-serde.workspace = true +itertools.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/common/event-recorder/src/lib.rs b/src/common/event-recorder/src/lib.rs index 4f764291a3..292f3fee8c 100644 --- a/src/common/event-recorder/src/lib.rs +++ b/src/common/event-recorder/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(duration_constructors)] + pub mod error; pub mod recorder; diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs index 6acad469bb..3aeebb2377 100644 --- a/src/common/event-recorder/src/recorder.rs +++ b/src/common/event-recorder/src/recorder.rs @@ -15,11 +15,10 @@ use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use std::time::Duration; use api::v1::column_data_type_extension::TypeExt; -use api::v1::helper::{tag_column_schema, time_index_column_schema}; use api::v1::value::ValueData; use api::v1::{ ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, @@ -29,6 +28,8 @@ use async_trait::async_trait; use backon::{BackoffBuilder, ExponentialBuilder}; use common_telemetry::{debug, error, info, warn}; use common_time::timestamp::{TimeUnit, Timestamp}; +use humantime::format_duration; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -51,12 +52,10 @@ pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; /// EventRecorderRef is the reference to the event recorder. pub type EventRecorderRef = Arc; -static EVENTS_TABLE_TTL: OnceLock = OnceLock::new(); - /// The time interval for flushing batched events to the event handler. pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5); -// The default TTL for the events table. -const DEFAULT_EVENTS_TABLE_TTL: &str = "30d"; +/// The default TTL(90 days) for the events table. +const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90); // The capacity of the tokio channel for transmitting events to background processor. const DEFAULT_CHANNEL_SIZE: usize = 2048; // The size of the buffer for batching events before flushing to event handler. @@ -73,6 +72,11 @@ const DEFAULT_MAX_RETRY_TIMES: u64 = 3; /// /// The event can also add the extra schema and row to the event by overriding the `extra_schema` and `extra_row` methods. pub trait Event: Send + Sync + Debug { + /// Returns the table name of the event. + fn table_name(&self) -> &str { + DEFAULT_EVENTS_TABLE_NAME + } + /// Returns the type of the event. fn event_type(&self) -> &str; @@ -108,81 +112,68 @@ pub trait Eventable: Send + Sync + Debug { } } -/// Returns the hints for the insert operation. -pub fn insert_hints() -> Vec<(&'static str, &'static str)> { - vec![ - ( - TTL_KEY, - EVENTS_TABLE_TTL - .get() - .map(|s| s.as_str()) - .unwrap_or(DEFAULT_EVENTS_TABLE_TTL), - ), - (APPEND_MODE_KEY, "true"), - ] +/// Groups events by its `event_type`. +#[allow(clippy::borrowed_box)] +pub fn group_events_by_type(events: &[Box]) -> HashMap<&str, Vec<&Box>> { + events + .iter() + .into_grouping_map_by(|event| event.event_type()) + .collect() } -/// Builds the row inserts request for the events that will be persisted to the events table. -pub fn build_row_inserts_request(events: &[Box]) -> Result { - // Aggregate the events by the event type. - let mut event_groups: HashMap<&str, Vec<&Box>> = HashMap::new(); +/// Builds the row inserts request for the events that will be persisted to the events table. The `events` should have the same event type, or it will return an error. +#[allow(clippy::borrowed_box)] +pub fn build_row_inserts_request(events: &[&Box]) -> Result { + // Ensure all the events are the same type. + validate_events(events)?; + // We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest. + let event = &events[0]; + let mut schema: Vec = Vec::with_capacity(3 + event.extra_schema().len()); + schema.extend(vec![ + ColumnSchema { + column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + ..Default::default() + }, + ]); + schema.extend(event.extra_schema()); + + let mut rows: Vec = Vec::with_capacity(events.len()); for event in events { - event_groups - .entry(event.event_type()) - .or_default() - .push(event); + let extra_row = event.extra_row()?; + let mut values = Vec::with_capacity(3 + extra_row.values.len()); + values.extend([ + ValueData::StringValue(event.event_type().to_string()).into(), + ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(), + ValueData::TimestampNanosecondValue(event.timestamp().value()).into(), + ]); + values.extend(extra_row.values); + rows.push(Row { values }); } - let mut row_insert_requests = RowInsertRequests { - inserts: Vec::with_capacity(event_groups.len()), - }; - - for (_, events) in event_groups { - validate_events(&events)?; - - // We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest. - let event = &events[0]; - let mut schema = vec![ - tag_column_schema(EVENTS_TABLE_TYPE_COLUMN_NAME, ColumnDataType::String), - ColumnSchema { - column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Binary as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), - ..Default::default() - }, - time_index_column_schema( - EVENTS_TABLE_TIMESTAMP_COLUMN_NAME, - ColumnDataType::TimestampNanosecond, - ), - ]; - schema.extend(event.extra_schema()); - - let rows = events - .iter() - .map(|event| { - let mut row = Row { - values: vec![ - ValueData::StringValue(event.event_type().to_string()).into(), - ValueData::BinaryValue(event.json_payload()?.as_bytes().to_vec()).into(), - ValueData::TimestampNanosecondValue(event.timestamp().value()).into(), - ], - }; - row.values.extend(event.extra_row()?.values); - Ok(row) - }) - .collect::>>()?; - - row_insert_requests.inserts.push(RowInsertRequest { - table_name: DEFAULT_EVENTS_TABLE_NAME.to_string(), + Ok(RowInsertRequests { + inserts: vec![RowInsertRequest { + table_name: event.table_name().to_string(), rows: Some(Rows { schema, rows }), - }); - } - - Ok(row_insert_requests) + }], + }) } // Ensure the events with the same event type have the same extra schema. @@ -211,25 +202,59 @@ pub trait EventRecorder: Send + Sync + Debug + 'static { fn close(&self); } +/// EventHandlerOptions is the options for the event handler. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct EventHandlerOptions { + /// TTL for the events table that will be used to store the events. + pub ttl: Duration, + /// Append mode for the events table that will be used to store the events. + pub append_mode: bool, +} + +impl Default for EventHandlerOptions { + fn default() -> Self { + Self { + ttl: DEFAULT_EVENTS_TABLE_TTL, + append_mode: true, + } + } +} + +impl EventHandlerOptions { + /// Converts the options to the hints for the insert operation. + pub fn to_hints(&self) -> Vec<(&str, String)> { + vec![ + (TTL_KEY, format_duration(self.ttl).to_string()), + (APPEND_MODE_KEY, self.append_mode.to_string()), + ] + } +} + /// EventHandler trait defines the interface for how to handle the event. #[async_trait] pub trait EventHandler: Send + Sync + 'static { /// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence. /// We use `&[Box]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails. async fn handle(&self, events: &[Box]) -> Result<()>; + + /// Returns the handler options for the event type. We can use different options for different event types. + fn options(&self, _event_type: &str) -> EventHandlerOptions { + EventHandlerOptions::default() + } } /// Configuration options for the event recorder. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct EventRecorderOptions { /// TTL for the events table that will be used to store the events. - pub ttl: String, + #[serde(with = "humantime_serde")] + pub ttl: Duration, } impl Default for EventRecorderOptions { fn default() -> Self { Self { - ttl: DEFAULT_EVENTS_TABLE_TTL.to_string(), + ttl: DEFAULT_EVENTS_TABLE_TTL, } } } @@ -246,9 +271,7 @@ pub struct EventRecorderImpl { } impl EventRecorderImpl { - pub fn new(event_handler: Box, opts: EventRecorderOptions) -> Self { - info!("Creating event recorder with options: {:?}", opts); - + pub fn new(event_handler: Box) -> Self { let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE); let cancel_token = CancellationToken::new(); @@ -273,14 +296,6 @@ impl EventRecorderImpl { recorder.handle = Some(handle); - // It only sets the ttl once, so it's safe to skip the error. - if EVENTS_TABLE_TTL.set(opts.ttl.clone()).is_err() { - info!( - "Events table ttl already set to {}, skip setting it", - opts.ttl - ); - } - recorder } } @@ -465,10 +480,7 @@ mod tests { #[tokio::test] async fn test_event_recorder() { - let mut event_recorder = EventRecorderImpl::new( - Box::new(TestEventHandlerImpl {}), - EventRecorderOptions::default(), - ); + let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {})); event_recorder.record(Box::new(TestEvent {})); // Sleep for a while to let the event be sent to the event handler. @@ -509,10 +521,8 @@ mod tests { #[tokio::test] async fn test_event_recorder_should_panic() { - let mut event_recorder = EventRecorderImpl::new( - Box::new(TestEventHandlerImplShouldPanic {}), - EventRecorderOptions::default(), - ); + let mut event_recorder = + EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {})); event_recorder.record(Box::new(TestEvent {})); @@ -529,4 +539,135 @@ mod tests { assert!(handle.await.unwrap_err().is_panic()); } } + + #[derive(Debug)] + struct TestEventA {} + + impl Event for TestEventA { + fn event_type(&self) -> &str { + "A" + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + #[derive(Debug)] + struct TestEventB {} + + impl Event for TestEventB { + fn table_name(&self) -> &str { + "table_B" + } + + fn event_type(&self) -> &str { + "B" + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + #[derive(Debug)] + struct TestEventC {} + + impl Event for TestEventC { + fn table_name(&self) -> &str { + "table_C" + } + + fn event_type(&self) -> &str { + "C" + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + #[test] + fn test_group_events_by_type() { + let events: Vec> = vec![ + Box::new(TestEventA {}), + Box::new(TestEventB {}), + Box::new(TestEventA {}), + Box::new(TestEventC {}), + Box::new(TestEventB {}), + Box::new(TestEventC {}), + Box::new(TestEventA {}), + ]; + + let event_groups = group_events_by_type(&events); + assert_eq!(event_groups.len(), 3); + assert_eq!(event_groups.get("A").unwrap().len(), 3); + assert_eq!(event_groups.get("B").unwrap().len(), 2); + assert_eq!(event_groups.get("C").unwrap().len(), 2); + } + + #[test] + fn test_build_row_inserts_request() { + let events: Vec> = vec![ + Box::new(TestEventA {}), + Box::new(TestEventB {}), + Box::new(TestEventA {}), + Box::new(TestEventC {}), + Box::new(TestEventB {}), + Box::new(TestEventC {}), + Box::new(TestEventA {}), + ]; + + let event_groups = group_events_by_type(&events); + assert_eq!(event_groups.len(), 3); + assert_eq!(event_groups.get("A").unwrap().len(), 3); + assert_eq!(event_groups.get("B").unwrap().len(), 2); + assert_eq!(event_groups.get("C").unwrap().len(), 2); + + for (event_type, events) in event_groups { + let row_inserts_request = build_row_inserts_request(&events).unwrap(); + if event_type == "A" { + assert_eq!(row_inserts_request.inserts.len(), 1); + assert_eq!( + row_inserts_request.inserts[0].table_name, + DEFAULT_EVENTS_TABLE_NAME + ); + assert_eq!( + row_inserts_request.inserts[0] + .rows + .as_ref() + .unwrap() + .rows + .len(), + 3 + ); + } else if event_type == "B" { + assert_eq!(row_inserts_request.inserts.len(), 1); + assert_eq!(row_inserts_request.inserts[0].table_name, "table_B"); + assert_eq!( + row_inserts_request.inserts[0] + .rows + .as_ref() + .unwrap() + .rows + .len(), + 2 + ); + } else if event_type == "C" { + assert_eq!(row_inserts_request.inserts.len(), 1); + assert_eq!(row_inserts_request.inserts[0].table_name, "table_C"); + assert_eq!( + row_inserts_request.inserts[0] + .rows + .as_ref() + .unwrap() + .rows + .len(), + 2 + ); + } else { + panic!("Unexpected event type: {}", event_type); + } + } + } } diff --git a/src/common/frontend/Cargo.toml b/src/common/frontend/Cargo.toml index 235bd6b0f8..de2224f36b 100644 --- a/src/common/frontend/Cargo.toml +++ b/src/common/frontend/Cargo.toml @@ -5,13 +5,18 @@ edition.workspace = true license.workspace = true [dependencies] +api.workspace = true async-trait.workspace = true common-error.workspace = true +common-event-recorder.workspace = true common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true +common-time.workspace = true greptime-proto.workspace = true +humantime.workspace = true meta-client.workspace = true +serde.workspace = true session.workspace = true snafu.workspace = true tonic.workspace = true diff --git a/src/common/frontend/src/slow_query_event.rs b/src/common/frontend/src/slow_query_event.rs index 4240b897c6..3845d59414 100644 --- a/src/common/frontend/src/slow_query_event.rs +++ b/src/common/frontend/src/slow_query_event.rs @@ -12,17 +12,121 @@ // See the License for the specific language governing permissions and // limitations under the License. -use session::context::QueryContextRef; +use std::any::Any; -#[derive(Debug)] +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType}; +use common_event_recorder::error::Result; +use common_event_recorder::Event; +use serde::Serialize; + +pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries"; +pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost"; +pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold"; +pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query"; +pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; +pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql"; +pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start"; +pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end"; +pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range"; +pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step"; +pub const SLOW_QUERY_EVENT_TYPE: &str = "slow_query"; + +/// SlowQueryEvent is the event of slow query. +#[derive(Debug, Serialize)] pub struct SlowQueryEvent { pub cost: u64, pub threshold: u64, pub query: String, pub is_promql: bool, - pub query_ctx: QueryContextRef, pub promql_range: Option, pub promql_step: Option, pub promql_start: Option, pub promql_end: Option, } + +impl Event for SlowQueryEvent { + fn table_name(&self) -> &str { + SLOW_QUERY_TABLE_NAME + } + + fn event_type(&self) -> &str { + SLOW_QUERY_EVENT_TYPE + } + + fn extra_schema(&self) -> Vec { + vec![ + ColumnSchema { + column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Boolean.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ] + } + + fn extra_row(&self) -> Result { + Ok(Row { + values: vec![ + ValueData::U64Value(self.cost).into(), + ValueData::U64Value(self.threshold).into(), + ValueData::StringValue(self.query.to_string()).into(), + ValueData::BoolValue(self.is_promql).into(), + ValueData::U64Value(self.promql_range.unwrap_or(0)).into(), + ValueData::U64Value(self.promql_step.unwrap_or(0)).into(), + ValueData::TimestampMillisecondValue(self.promql_start.unwrap_or(0)).into(), + ValueData::TimestampMillisecondValue(self.promql_end.unwrap_or(0)).into(), + ], + }) + } + + fn json_payload(&self) -> Result { + Ok("".to_string()) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/telemetry/src/lib.rs b/src/common/telemetry/src/lib.rs index e63d6e8af4..2f3866d49b 100644 --- a/src/common/telemetry/src/lib.rs +++ b/src/common/telemetry/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(let_chains)] +#![feature(duration_constructors)] pub mod logging; mod macros; diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index bcc7bc4078..72e994ad91 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -103,14 +103,15 @@ pub struct SlowQueryOptions { /// The threshold of slow queries. #[serde(with = "humantime_serde")] - pub threshold: Option, + pub threshold: Duration, /// The sample ratio of slow queries. - pub sample_ratio: Option, + pub sample_ratio: f64, - /// The table TTL of `slow_queries` system table. Default is "30d". + /// The table TTL of `slow_queries` system table. Default is "90d". /// It's used when `record_type` is `SystemTable`. - pub ttl: Option, + #[serde(with = "humantime_serde")] + pub ttl: Duration, } impl Default for SlowQueryOptions { @@ -118,9 +119,9 @@ impl Default for SlowQueryOptions { Self { enable: true, record_type: SlowQueriesRecordType::SystemTable, - threshold: Some(Duration::from_secs(30)), - sample_ratio: Some(1.0), - ttl: Some("30d".to_string()), + threshold: Duration::from_secs(30), + sample_ratio: 1.0, + ttl: Duration::from_days(90), } } } @@ -128,7 +129,9 @@ impl Default for SlowQueryOptions { #[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)] #[serde(rename_all = "snake_case")] pub enum SlowQueriesRecordType { + /// Record the slow query in the system table. SystemTable, + /// Record the slow query in a specific logs file. Log, } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index c1fc08fa33..69aad70a93 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -26,6 +26,7 @@ common-catalog.workspace = true common-config.workspace = true common-datasource.workspace = true common-error.workspace = true +common-event-recorder.workspace = true common-frontend.workspace = true common-function.workspace = true common-grpc.workspace = true @@ -45,6 +46,7 @@ datafusion-expr.workspace = true datanode.workspace = true datatypes.workspace = true futures.workspace = true +humantime.workspace = true humantime-serde.workspace = true lazy_static.workspace = true log-query.workspace = true diff --git a/src/frontend/src/events.rs b/src/frontend/src/events.rs new file mode 100644 index 0000000000..ade5045f50 --- /dev/null +++ b/src/frontend/src/events.rs @@ -0,0 +1,100 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use async_trait::async_trait; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME}; +use common_error::ext::BoxedError; +use common_event_recorder::error::{InsertEventsSnafu, Result}; +use common_event_recorder::{ + build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions, +}; +use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE; +use humantime::format_duration; +use operator::insert::InserterRef; +use operator::statement::StatementExecutorRef; +use session::context::QueryContextBuilder; +use snafu::ResultExt; +use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; + +/// EventHandlerImpl is the default event handler implementation in frontend. +pub struct EventHandlerImpl { + inserter: InserterRef, + statement_executor: StatementExecutorRef, + slow_query_ttl: Duration, + global_ttl: Duration, +} + +impl EventHandlerImpl { + /// Create a new EventHandlerImpl. + pub fn new( + inserter: InserterRef, + statement_executor: StatementExecutorRef, + slow_query_ttl: Duration, + global_ttl: Duration, + ) -> Self { + Self { + inserter, + statement_executor, + slow_query_ttl, + global_ttl, + } + } +} + +#[async_trait] +impl EventHandler for EventHandlerImpl { + async fn handle(&self, events: &[Box]) -> Result<()> { + let event_groups = group_events_by_type(events); + + for (event_type, events) in event_groups { + let opts = self.options(event_type); + let query_ctx = QueryContextBuilder::default() + .current_catalog(DEFAULT_CATALOG_NAME.to_string()) + .current_schema(DEFAULT_PRIVATE_SCHEMA_NAME.to_string()) + .set_extension(TTL_KEY.to_string(), format_duration(opts.ttl).to_string()) + .set_extension(APPEND_MODE_KEY.to_string(), opts.append_mode.to_string()) + .build() + .into(); + + self.inserter + .handle_row_inserts( + build_row_inserts_request(&events)?, + query_ctx, + &self.statement_executor, + false, + false, + ) + .await + .map_err(BoxedError::new) + .context(InsertEventsSnafu)?; + } + + Ok(()) + } + + fn options(&self, event_type: &str) -> EventHandlerOptions { + match event_type { + SLOW_QUERY_EVENT_TYPE => EventHandlerOptions { + ttl: self.slow_query_ttl, + append_mode: true, + }, + _ => EventHandlerOptions { + ttl: self.global_ttl, + append_mode: true, + }, + } + } +} diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index e4051965be..23b355f60c 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use common_config::config::Configurable; +use common_event_recorder::EventRecorderOptions; use common_options::datanode::DatanodeClientOptions; use common_options::memory::MemoryOptions; use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions}; @@ -62,8 +63,10 @@ pub struct FrontendOptions { pub tracing: TracingOptions, pub query: QueryOptions, pub max_in_flight_write_bytes: Option, - pub slow_query: Option, + pub slow_query: SlowQueryOptions, pub memory: MemoryOptions, + /// The event recorder options. + pub event_recorder: EventRecorderOptions, } impl Default for FrontendOptions { @@ -89,8 +92,9 @@ impl Default for FrontendOptions { tracing: TracingOptions::default(), query: QueryOptions::default(), max_in_flight_write_bytes: None, - slow_query: Some(SlowQueryOptions::default()), + slow_query: SlowQueryOptions::default(), memory: MemoryOptions::default(), + event_recorder: EventRecorderOptions::default(), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ebfa43e2ec..973829375e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -32,13 +32,16 @@ use std::time::{Duration, SystemTime}; use async_stream::stream; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; -use catalog::process_manager::{ProcessManagerRef, QueryStatement as CatalogQueryStatement}; +use catalog::process_manager::{ + ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer, +}; use catalog::CatalogManagerRef; use client::OutputData; use common_base::cancellation::CancellableFuture; use common_base::Plugins; use common_config::KvBackendConfig; use common_error::ext::{BoxedError, ErrorExt}; +use common_event_recorder::EventRecorderRef; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::table_name::TableNameKey; @@ -53,6 +56,7 @@ use common_procedure::ProcedureManagerRef; use common_query::Output; use common_recordbatch::error::StreamTimeoutSnafu; use common_recordbatch::RecordBatchStreamWrapper; +use common_telemetry::logging::SlowQueryOptions; use common_telemetry::{debug, error, info, tracing}; use dashmap::DashMap; use datafusion_expr::LogicalPlan; @@ -99,7 +103,6 @@ use crate::error::{ StatementTimeoutSnafu, TableOperationSnafu, }; use crate::limiter::LimiterRef; -use crate::slow_query_recorder::SlowQueryRecorder; use crate::stream_wrapper::CancellableStreamWrapper; lazy_static! { @@ -119,9 +122,10 @@ pub struct Instance { inserter: InserterRef, deleter: DeleterRef, table_metadata_manager: TableMetadataManagerRef, - slow_query_recorder: Option, + event_recorder: Option, limiter: Option, process_manager: ProcessManagerRef, + slow_query_options: SlowQueryOptions, // cache for otlp metrics // first layer key: db-string @@ -222,9 +226,20 @@ impl Instance { let query_interceptor = query_interceptor.as_ref(); if should_capture_statement(Some(&stmt)) { - let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| { - recorder.start(CatalogQueryStatement::Sql(stmt.clone()), query_ctx.clone()) - }); + let slow_query_timer = self + .slow_query_options + .enable + .then(|| self.event_recorder.clone()) + .flatten() + .map(|event_recorder| { + SlowQueryTimer::new( + CatalogQueryStatement::Sql(stmt.clone()), + self.slow_query_options.threshold, + self.slow_query_options.sample_ratio, + self.slow_query_options.record_type, + event_recorder, + ) + }); let ticket = self.process_manager.register_query( query_ctx.current_catalog().to_string(), @@ -586,9 +601,20 @@ impl SqlQueryHandler for Instance { // It's safe to unwrap here because we've already checked the type. let stmt = stmt.unwrap(); let query = stmt.to_string(); - let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| { - recorder.start(CatalogQueryStatement::Sql(stmt), query_ctx.clone()) - }); + let slow_query_timer = self + .slow_query_options + .enable + .then(|| self.event_recorder.clone()) + .flatten() + .map(|event_recorder| { + SlowQueryTimer::new( + CatalogQueryStatement::Sql(stmt.clone()), + self.slow_query_options.threshold, + self.slow_query_options.sample_ratio, + self.slow_query_options.record_type, + event_recorder, + ) + }); let ticket = self.process_manager.register_query( query_ctx.current_catalog().to_string(), @@ -739,9 +765,19 @@ impl PrometheusHandler for Instance { let query = query_statement.to_string(); let slow_query_timer = self - .slow_query_recorder - .as_ref() - .and_then(|recorder| recorder.start(query_statement, query_ctx.clone())); + .slow_query_options + .enable + .then(|| self.event_recorder.clone()) + .flatten() + .map(|event_recorder| { + SlowQueryTimer::new( + query_statement, + self.slow_query_options.threshold, + self.slow_query_options.sample_ratio, + self.slow_query_options.record_type, + event_recorder, + ) + }); let ticket = self.process_manager.register_query( query_ctx.current_catalog().to_string(), diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index ad67a7a607..021a40d8be 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -18,6 +18,7 @@ use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::process_manager::ProcessManagerRef; use catalog::CatalogManagerRef; use common_base::Plugins; +use common_event_recorder::EventRecorderImpl; use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef}; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::key::flow::FlowMetadataManager; @@ -40,11 +41,11 @@ use query::QueryEngineFactory; use snafu::OptionExt; use crate::error::{self, Result}; +use crate::events::EventHandlerImpl; use crate::frontend::FrontendOptions; use crate::instance::region_query::FrontendRegionQueryHandler; use crate::instance::Instance; use crate::limiter::Limiter; -use crate::slow_query_recorder::SlowQueryRecorder; /// The frontend [`Instance`] builder. pub struct FrontendBuilder { @@ -194,16 +195,12 @@ impl FrontendBuilder { plugins.insert::(statement_executor.clone()); - let slow_query_recorder = self.options.slow_query.and_then(|opts| { - opts.enable.then(|| { - SlowQueryRecorder::new( - opts.clone(), - inserter.clone(), - statement_executor.clone(), - self.catalog_manager.clone(), - ) - }) - }); + let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new( + inserter.clone(), + statement_executor.clone(), + self.options.slow_query.ttl, + self.options.event_recorder.ttl, + )))); // Create the limiter if the max_in_flight_write_bytes is set. let limiter = self @@ -222,10 +219,11 @@ impl FrontendBuilder { inserter, deleter, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), - slow_query_recorder, + event_recorder: Some(event_recorder), limiter, process_manager, otlp_metrics_table_legacy_cache: DashMap::new(), + slow_query_options: self.options.slow_query.clone(), }) } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 17f9f27661..a77799f405 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -15,6 +15,7 @@ #![feature(assert_matches)] pub mod error; +pub mod events; pub mod frontend; pub mod heartbeat; pub mod instance; @@ -22,5 +23,4 @@ pub(crate) mod limiter; pub(crate) mod metrics; pub mod server; pub mod service_config; -pub mod slow_query_recorder; mod stream_wrapper; diff --git a/src/frontend/src/slow_query_recorder.rs b/src/frontend/src/slow_query_recorder.rs deleted file mode 100644 index 8f79108d75..0000000000 --- a/src/frontend/src/slow_query_recorder.rs +++ /dev/null @@ -1,434 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use api::v1::value::ValueData; -use api::v1::{ - ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest, - RowInsertRequests, Rows, SemanticType, -}; -use catalog::process_manager::{QueryStatement as CatalogQueryStatement, SlowQueryTimer}; -use catalog::CatalogManagerRef; -use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME}; -use common_frontend::slow_query_event::SlowQueryEvent; -use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions}; -use common_telemetry::{debug, error, info, slow}; -use common_time::timestamp::{TimeUnit, Timestamp}; -use operator::insert::InserterRef; -use operator::statement::StatementExecutorRef; -use session::context::{QueryContextBuilder, QueryContextRef}; -use snafu::ResultExt; -use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; -use table::TableRef; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::task::JoinHandle; - -use crate::error::{CatalogSnafu, Result, TableOperationSnafu}; - -pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries"; -pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost"; -pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold"; -pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query"; -pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; -pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql"; -pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start"; -pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end"; -pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range"; -pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step"; - -const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d"; -const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024; - -/// SlowQueryRecorder is responsible for recording slow queries. -#[derive(Clone)] -pub struct SlowQueryRecorder { - tx: Sender, - slow_query_opts: SlowQueryOptions, - _handle: Arc>, -} - -impl SlowQueryRecorder { - /// Create a new SlowQueryRecorder. - pub fn new( - slow_query_opts: SlowQueryOptions, - inserter: InserterRef, - statement_executor: StatementExecutorRef, - catalog_manager: CatalogManagerRef, - ) -> Self { - let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE); - - let ttl = slow_query_opts - .ttl - .clone() - .unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string()); - - // Start a new task to process the slow query events. - let event_handler = SlowQueryEventHandler { - inserter, - statement_executor, - catalog_manager, - rx, - record_type: slow_query_opts.record_type, - ttl, - }; - - // Start a new background task to process the slow query events. - let handle = tokio::spawn(async move { - event_handler.process_slow_query().await; - }); - - Self { - tx, - slow_query_opts, - _handle: Arc::new(handle), - } - } - - /// Starts a new SlowQueryTimer. Returns `None` if `slow_query.enable` is false. - /// The timer sets the start time when created and calculates the elapsed duration when dropped. - pub fn start( - &self, - stmt: CatalogQueryStatement, - query_ctx: QueryContextRef, - ) -> Option { - if self.slow_query_opts.enable { - Some(SlowQueryTimer::new( - stmt, - query_ctx, - self.slow_query_opts.threshold, - self.slow_query_opts.sample_ratio, - self.tx.clone(), - )) - } else { - None - } - } -} - -struct SlowQueryEventHandler { - inserter: InserterRef, - statement_executor: StatementExecutorRef, - catalog_manager: CatalogManagerRef, - rx: Receiver, - record_type: SlowQueriesRecordType, - ttl: String, -} - -impl SlowQueryEventHandler { - async fn process_slow_query(mut self) { - info!( - "Start the background handler to process slow query events and record them in {:?}.", - self.record_type - ); - while let Some(event) = self.rx.recv().await { - self.record_slow_query(event).await; - } - } - - async fn record_slow_query(&self, event: SlowQueryEvent) { - match self.record_type { - SlowQueriesRecordType::Log => { - // Record the slow query in a specific logs file. - slow!( - cost = event.cost, - threshold = event.threshold, - query = event.query, - is_promql = event.is_promql, - promql_range = event.promql_range, - promql_step = event.promql_step, - promql_start = event.promql_start, - promql_end = event.promql_end, - ); - } - SlowQueriesRecordType::SystemTable => { - // Record the slow query in a system table that is stored in greptimedb itself. - if let Err(e) = self.insert_slow_query(&event).await { - error!(e; "Failed to insert slow query, query: {:?}", event); - } - } - } - } - - async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> { - debug!("Handle the slow query event: {:?}", event); - - let table = if let Some(table) = self - .catalog_manager - .table( - event.query_ctx.current_catalog(), - DEFAULT_PRIVATE_SCHEMA_NAME, - SLOW_QUERY_TABLE_NAME, - Some(&event.query_ctx), - ) - .await - .context(CatalogSnafu)? - { - table - } else { - // Create the system table if it doesn't exist. - self.create_system_table(event.query_ctx.clone()).await? - }; - - let insert = RowInsertRequest { - table_name: SLOW_QUERY_TABLE_NAME.to_string(), - rows: Some(Rows { - schema: self.build_insert_column_schema(), - rows: vec![Row { - values: vec![ - ValueData::U64Value(event.cost).into(), - ValueData::U64Value(event.threshold).into(), - ValueData::StringValue(event.query.to_string()).into(), - ValueData::BoolValue(event.is_promql).into(), - ValueData::TimestampNanosecondValue( - Timestamp::current_time(TimeUnit::Nanosecond).value(), - ) - .into(), - ValueData::U64Value(event.promql_range.unwrap_or(0)).into(), - ValueData::U64Value(event.promql_step.unwrap_or(0)).into(), - ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0)) - .into(), - ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(), - ], - }], - }), - }; - - let requests = RowInsertRequests { - inserts: vec![insert], - }; - - let table_info = table.table_info(); - let query_ctx = QueryContextBuilder::default() - .current_catalog(table_info.catalog_name.to_string()) - .current_schema(table_info.schema_name.to_string()) - .build() - .into(); - - self.inserter - .handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false) - .await - .context(TableOperationSnafu)?; - - Ok(()) - } - - async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result { - let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog()); - if let Some(table) = self - .catalog_manager - .table( - &create_table_expr.catalog_name, - &create_table_expr.schema_name, - &create_table_expr.table_name, - Some(&query_ctx), - ) - .await - .context(CatalogSnafu)? - { - // The table is already created, so we don't need to create it again. - return Ok(table); - } - - // Create the `slow_queries` system table. - let table = self - .statement_executor - .create_table_inner(&mut create_table_expr, None, query_ctx.clone()) - .await - .context(TableOperationSnafu)?; - - info!( - "Create the {} system table in {:?} successfully.", - SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME - ); - - Ok(table) - } - - fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr { - let column_defs = vec![ - ColumnDef { - name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "The cost of the slow query in milliseconds".to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: - "When the query cost exceeds this value, it will be recorded as a slow query" - .to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(), - data_type: ColumnDataType::String as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "The original query statement".to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(), - data_type: ColumnDataType::Boolean as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "Whether the query is a PromQL query".to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), - data_type: ColumnDataType::TimestampNanosecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Timestamp as i32, - comment: "The timestamp of the slow query".to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "The time range of the PromQL query in milliseconds".to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(), - data_type: ColumnDataType::Uint64 as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "The step of the PromQL query in milliseconds".to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(), - data_type: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "The start timestamp of the PromQL query in milliseconds".to_string(), - datatype_extension: None, - options: None, - }, - ColumnDef { - name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(), - data_type: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Field as i32, - comment: "The end timestamp of the PromQL query in milliseconds".to_string(), - datatype_extension: None, - options: None, - }, - ]; - - let table_options = HashMap::from([ - (APPEND_MODE_KEY.to_string(), "true".to_string()), - (TTL_KEY.to_string(), self.ttl.to_string()), - ]); - - CreateTableExpr { - catalog_name: catalog.to_string(), - schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), // Always to store in the `greptime_private` schema. - table_name: SLOW_QUERY_TABLE_NAME.to_string(), - desc: "GreptimeDB system table for storing slow queries".to_string(), - column_defs, - time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), - primary_keys: vec![], - create_if_not_exists: true, - table_options, - table_id: None, - engine: default_engine().to_string(), - } - } - - fn build_insert_column_schema(&self) -> Vec { - vec![ - ColumnSchema { - column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(), - datatype: ColumnDataType::String.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Boolean.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), - datatype: ColumnDataType::TimestampNanosecond.into(), - semantic_type: SemanticType::Timestamp.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(), - datatype: ColumnDataType::Uint64.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(), - datatype: ColumnDataType::TimestampMillisecond.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ColumnSchema { - column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(), - datatype: ColumnDataType::TimestampMillisecond.into(), - semantic_type: SemanticType::Field.into(), - ..Default::default() - }, - ] - } -} diff --git a/src/meta-srv/src/events.rs b/src/meta-srv/src/events.rs index 666432d5f3..cede03f12a 100644 --- a/src/meta-srv/src/events.rs +++ b/src/meta-srv/src/events.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use client::{Client, Database}; @@ -21,7 +22,9 @@ use common_error::ext::BoxedError; use common_event_recorder::error::{ InsertEventsSnafu, KvBackendSnafu, NoAvailableFrontendSnafu, Result, }; -use common_event_recorder::{build_row_inserts_request, insert_hints, Event, EventHandler}; +use common_event_recorder::{ + build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions, +}; use common_grpc::channel_manager::ChannelManager; use common_meta::peer::PeerLookupServiceRef; use common_telemetry::debug; @@ -36,13 +39,15 @@ pub mod region_migration_event; pub struct EventHandlerImpl { peer_lookup_service: PeerLookupServiceRef, channel_manager: ChannelManager, + ttl: Duration, } impl EventHandlerImpl { - pub fn new(meta_peer_client: MetaPeerClientRef) -> Self { + pub fn new(meta_peer_client: MetaPeerClientRef, ttl: Duration) -> Self { Self { peer_lookup_service: Arc::new(MetaPeerLookupService::new(meta_peer_client)), channel_manager: ChannelManager::new(), + ttl, } } } @@ -50,15 +55,35 @@ impl EventHandlerImpl { #[async_trait] impl EventHandler for EventHandlerImpl { async fn handle(&self, events: &[Box]) -> Result<()> { - self.build_database_client() - .await? - .row_inserts_with_hints(build_row_inserts_request(events)?, &insert_hints()) - .await - .map_err(BoxedError::new) - .context(InsertEventsSnafu)?; + let event_groups = group_events_by_type(events); + + for (event_type, events) in event_groups { + let opts = self.options(event_type); + let hints = opts.to_hints(); + + self.build_database_client() + .await? + .row_inserts_with_hints( + build_row_inserts_request(&events)?, + &hints + .iter() + .map(|(k, v)| (*k, v.as_str())) + .collect::>(), + ) + .await + .map_err(BoxedError::new) + .context(InsertEventsSnafu)?; + } Ok(()) } + + fn options(&self, _event_type: &str) -> EventHandlerOptions { + EventHandlerOptions { + ttl: self.ttl, + append_mode: true, + } + } } impl EventHandlerImpl { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 3da28f4dab..29456317c5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -195,10 +195,10 @@ impl MetasrvBuilder { .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); // Builds the event recorder to record important events and persist them as the system table. - let event_recorder = Arc::new(EventRecorderImpl::new( - Box::new(EventHandlerImpl::new(meta_peer_client.clone())), - options.event_recorder.clone(), - )); + let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new( + meta_peer_client.clone(), + options.event_recorder.ttl, + )))); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default())); let pushers = Pushers::default(); diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 4806d4554a..a0d9459286 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -28,6 +28,7 @@ common-catalog.workspace = true common-config.workspace = true common-error.workspace = true common-event-recorder.workspace = true +common-frontend.workspace = true common-grpc.workspace = true common-meta = { workspace = true, features = ["testing"] } common-procedure.workspace = true diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 89a7203817..6bf144d320 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -330,12 +330,12 @@ impl GreptimeDbStandaloneBuilder { wal: self.metasrv_wal_config.clone().into(), grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"), // Enable slow query log with 1s threshold to run the slow query test. - slow_query: Some(SlowQueryOptions { + slow_query: SlowQueryOptions { enable: true, // Set the threshold to 1s to run the slow query test. - threshold: Some(Duration::from_secs(1)), + threshold: Duration::from_secs(1), ..Default::default() - }), + }, ..StandaloneOptions::default() }; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index aefebe6939..1ad29525e2 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -28,9 +28,11 @@ use common_catalog::consts::{ trace_services_table_name, DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME, }; use common_error::status_code::StatusCode as ErrorCode; +use common_frontend::slow_query_event::{ + SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME, +}; use flate2::write::GzEncoder; use flate2::Compression; -use frontend::slow_query_recorder::{SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME}; use log_query::{Context, Limit, LogQuery, TimeFilter}; use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter}; use loki_proto::prost_types::Timestamp; @@ -1383,7 +1385,7 @@ enable = true record_type = "system_table" threshold = "1s" sample_ratio = 1.0 -ttl = "30d" +ttl = "2months 29days 2h 52m 48s" [query] parallelism = 0 diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index cc738b79d6..a5e256a300 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use auth::user_provider_from_option; use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc}; use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME; -use frontend::slow_query_recorder::{ +use common_frontend::slow_query_event::{ SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME, SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME, SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,