From 14b655ea571134ca6d23f5c7c7d6292132dfc8ec Mon Sep 17 00:00:00 2001 From: zyy17 Date: Thu, 15 May 2025 12:18:48 +0800 Subject: [PATCH] refactor: add `SlowQueryRecorder` to record slow query in system table and refactor slow query options (#6008) * refactor: add common-slow-query crate * refactor: refine the naming * chore: fix clippy * chore: fix typo * chore: sperate SlowQueryOptions From Logging * chore: fix clippy * chore: fix ci * chore: refine the code * chore: update config example * refactor: use drop() to end the slow query timer * refactor: move common-slow-query to frontend crate * chore: polish some code * refactor: code review * refactor: add promql_range/promql_step/promql_start/promql_end fields in slow_queries * refactor: add build_slow_query_logger() * refactor: turn on slow query on frontend by default --- Cargo.lock | 1 + config/config.md | 31 +- config/datanode.example.toml | 13 - config/flownode.example.toml | 13 - config/frontend.example.toml | 20 +- config/metasrv.example.toml | 13 - config/standalone.example.toml | 12 +- src/cmd/src/cli.rs | 1 + src/cmd/src/datanode/builder.rs | 1 + src/cmd/src/flownode.rs | 1 + src/cmd/src/frontend.rs | 3 +- src/cmd/src/metasrv.rs | 1 + src/cmd/src/standalone.rs | 8 +- src/cmd/tests/load_config_test.rs | 7 +- src/common/telemetry/src/logging.rs | 146 ++++--- src/frontend/Cargo.toml | 1 + src/frontend/src/frontend.rs | 6 +- src/frontend/src/instance.rs | 19 +- src/frontend/src/instance/builder.rs | 18 +- src/frontend/src/lib.rs | 1 + src/frontend/src/slow_query_recorder.rs | 531 ++++++++++++++++++++++++ src/query/src/lib.rs | 1 - src/query/src/stats.rs | 97 ----- tests-integration/src/cluster.rs | 2 - tests-integration/src/standalone.rs | 2 - tests-integration/tests/http.rs | 13 +- 26 files changed, 713 insertions(+), 249 deletions(-) create mode 100644 src/frontend/src/slow_query_recorder.rs delete mode 100644 src/query/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index 00048a781c..8e470fbd09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4457,6 +4457,7 @@ dependencies = [ "promql-parser", "prost 0.13.5", "query", + "rand 0.9.0", "serde", "serde_json", "servers", diff --git a/config/config.md b/config/config.md index f3230190c9..ff296b91ce 100644 --- a/config/config.md +++ b/config/config.md @@ -188,10 +188,11 @@ | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | -| `logging.slow_query` | -- | -- | The slow query log options. | -| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. | -| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. | -| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | +| `slow_query` | -- | -- | The slow query log options. | +| `slow_query.enable` | Bool | `false` | Whether to enable slow query log. | +| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. | +| `slow_query.threshold` | String | Unset | The threshold of slow query. | +| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | @@ -288,10 +289,12 @@ | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | -| `logging.slow_query` | -- | -- | The slow query log options. | -| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. | -| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. | -| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | +| `slow_query` | -- | -- | The slow query log options. | +| `slow_query.enable` | Bool | `true` | Whether to enable slow query log. | +| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`.
If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.
If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. | +| `slow_query.threshold` | String | Unset | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. | +| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. | +| `slow_query.ttl` | String | Unset | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | @@ -362,10 +365,6 @@ | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | -| `logging.slow_query` | -- | -- | The slow query log options. | -| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. | -| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. | -| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | @@ -529,10 +528,6 @@ | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | -| `logging.slow_query` | -- | -- | The slow query log options. | -| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. | -| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. | -| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | | `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. | | `export_metrics.enable` | Bool | `false` | whether enable export metrics. | | `export_metrics.write_interval` | String | `30s` | The interval of export metrics. | @@ -585,9 +580,5 @@ | `logging.max_log_files` | Integer | `720` | The maximum amount of log files. | | `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 | | `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- | -| `logging.slow_query` | -- | -- | The slow query log options. | -| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. | -| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. | -| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. | | `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. | | `tracing.tokio_console_addr` | String | Unset | The tokio console address. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 46beb51a23..6b00cf90bd 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -632,19 +632,6 @@ max_log_files = 720 [logging.tracing_sample_ratio] default_ratio = 1.0 -## The slow query log options. -[logging.slow_query] -## Whether to enable slow query log. -enable = false - -## The threshold of slow query. -## @toml2docs:none-default -threshold = "10s" - -## The sampling ratio of slow query log. The value should be in the range of (0, 1]. -## @toml2docs:none-default -sample_ratio = 1.0 - ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. [export_metrics] diff --git a/config/flownode.example.toml b/config/flownode.example.toml index 954daff176..43c7ef1141 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -100,19 +100,6 @@ max_log_files = 720 [logging.tracing_sample_ratio] default_ratio = 1.0 -## The slow query log options. -[logging.slow_query] -## Whether to enable slow query log. -enable = false - -## The threshold of slow query. -## @toml2docs:none-default -threshold = "10s" - -## The sampling ratio of slow query log. The value should be in the range of (0, 1]. -## @toml2docs:none-default -sample_ratio = 1.0 - ## The tracing options. Only effect when compiled with `tokio-console` feature. #+ [tracing] ## The tokio console address. diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 2e3ee4a69d..2a9ee2bc8c 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -223,18 +223,28 @@ max_log_files = 720 default_ratio = 1.0 ## The slow query log options. -[logging.slow_query] +[slow_query] ## Whether to enable slow query log. -enable = false +enable = true -## The threshold of slow query. +## The record type of slow queries. It can be `system_table` or `log`. +## If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`. +## If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. ## @toml2docs:none-default -threshold = "10s" +record_type = "system_table" -## The sampling ratio of slow query log. The value should be in the range of (0, 1]. +## The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. +## @toml2docs:none-default +threshold = "5s" + +## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. ## @toml2docs:none-default sample_ratio = 1.0 +## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. +## @toml2docs:none-default +ttl = "30d" + ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. [export_metrics] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 0e7f9b74f0..dadb077a86 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -218,19 +218,6 @@ max_log_files = 720 [logging.tracing_sample_ratio] default_ratio = 1.0 -## The slow query log options. -[logging.slow_query] -## Whether to enable slow query log. -enable = false - -## The threshold of slow query. -## @toml2docs:none-default -threshold = "10s" - -## The sampling ratio of slow query log. The value should be in the range of (0, 1]. -## @toml2docs:none-default -sample_ratio = 1.0 - ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. [export_metrics] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0e72cfcc7e..649b570350 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -724,17 +724,21 @@ max_log_files = 720 default_ratio = 1.0 ## The slow query log options. -[logging.slow_query] +[slow_query] ## Whether to enable slow query log. -enable = false +#+ enable = false + +## The record type of slow queries. It can be `system_table` or `log`. +## @toml2docs:none-default +#+ record_type = "system_table" ## The threshold of slow query. ## @toml2docs:none-default -threshold = "10s" +#+ threshold = "10s" ## The sampling ratio of slow query log. The value should be in the range of (0, 1]. ## @toml2docs:none-default -sample_ratio = 1.0 +#+ sample_ratio = 1.0 ## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API. ## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index a514fa0d3c..b57da1216a 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -76,6 +76,7 @@ impl Command { &opts, &TracingOptions::default(), None, + None, ); let tool = self.cmd.build().await.context(error::BuildCliSnafu)?; diff --git a/src/cmd/src/datanode/builder.rs b/src/cmd/src/datanode/builder.rs index 64acc29a3d..264a3b3f87 100644 --- a/src/cmd/src/datanode/builder.rs +++ b/src/cmd/src/datanode/builder.rs @@ -64,6 +64,7 @@ impl InstanceBuilder { &dn_opts.logging, &dn_opts.tracing, dn_opts.node_id.map(|x| x.to_string()), + None, ); log_versions(version(), short_version(), APP_NAME); diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index bfda82922c..896d55c8a7 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -244,6 +244,7 @@ impl StartCommand { &opts.component.logging, &opts.component.tracing, opts.component.node_id.map(|x| x.to_string()), + None, ); log_versions(version(), short_version(), APP_NAME); diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index a28f4cd8f9..192f5a4306 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -37,7 +37,6 @@ use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; use meta_client::{MetaClientOptions, MetaClientType}; -use query::stats::StatementStatistics; use servers::export_metrics::ExportMetricsTask; use servers::tls::{TlsMode, TlsOption}; use snafu::{OptionExt, ResultExt}; @@ -269,6 +268,7 @@ impl StartCommand { &opts.component.logging, &opts.component.tracing, opts.component.node_id.clone(), + opts.component.slow_query.as_ref(), ); log_versions(version(), short_version(), APP_NAME); @@ -368,7 +368,6 @@ impl StartCommand { catalog_manager, Arc::new(client), meta_client, - StatementStatistics::new(opts.logging.slow_query.clone()), ) .with_plugin(plugins.clone()) .with_local_cache_invalidator(layered_cache_registry) diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 87e632c660..bbe68f3729 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -300,6 +300,7 @@ impl StartCommand { &opts.component.logging, &opts.component.tracing, None, + None, ); log_versions(version(), short_version(), APP_NAME); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index dd9e7926c4..80379c3ba7 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -47,7 +47,7 @@ use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef}; use common_procedure::{ProcedureInfo, ProcedureManagerRef}; use common_telemetry::info; -use common_telemetry::logging::{LoggingOptions, TracingOptions}; +use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions}; use common_time::timezone::set_default_timezone; use common_version::{short_version, version}; use common_wal::config::DatanodeWalConfig; @@ -69,7 +69,6 @@ use frontend::service_config::{ }; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use mito2::config::MitoConfig; -use query::stats::StatementStatistics; use serde::{Deserialize, Serialize}; use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask}; use servers::grpc::GrpcOptions; @@ -153,6 +152,7 @@ pub struct StandaloneOptions { pub init_regions_in_background: bool, pub init_regions_parallelism: usize, pub max_in_flight_write_bytes: Option, + pub slow_query: Option, } impl Default for StandaloneOptions { @@ -184,6 +184,7 @@ impl Default for StandaloneOptions { init_regions_in_background: false, init_regions_parallelism: 16, max_in_flight_write_bytes: None, + slow_query: Some(SlowQueryOptions::default()), } } } @@ -223,6 +224,7 @@ impl StandaloneOptions { // Handle the export metrics task run by standalone to frontend for execution export_metrics: cloned_opts.export_metrics, max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes, + slow_query: cloned_opts.slow_query, ..Default::default() } } @@ -447,6 +449,7 @@ impl StartCommand { &opts.component.logging, &opts.component.tracing, None, + opts.component.slow_query.as_ref(), ); log_versions(version(), short_version(), APP_NAME); @@ -594,7 +597,6 @@ impl StartCommand { catalog_manager.clone(), node_manager.clone(), ddl_task_executor.clone(), - StatementStatistics::new(opts.logging.slow_query.clone()), ) .with_plugin(plugins.clone()) .try_build() diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 07be39dddb..c14417afba 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -18,7 +18,7 @@ use cmd::options::GreptimeOptions; use cmd::standalone::StandaloneOptions; use common_config::Configurable; use common_options::datanode::{ClientOptions, DatanodeClientOptions}; -use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT}; +use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT}; use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; @@ -167,11 +167,6 @@ fn test_load_metasrv_example_config() { level: Some("info".to_string()), otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()), tracing_sample_ratio: Some(Default::default()), - slow_query: SlowQueryOptions { - enable: false, - threshold: None, - sample_ratio: None, - }, ..Default::default() }, datanode: DatanodeClientOptions { diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 06b43a1d17..6ac4f343d6 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -68,24 +68,46 @@ pub struct LoggingOptions { /// The tracing sample ratio. pub tracing_sample_ratio: Option, - - /// The logging options of slow query. - pub slow_query: SlowQueryOptions, } /// The options of slow query. -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -#[serde(default)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct SlowQueryOptions { /// Whether to enable slow query log. pub enable: bool, + /// The record type of slow queries. + pub record_type: SlowQueriesRecordType, + /// The threshold of slow queries. #[serde(with = "humantime_serde")] pub threshold: Option, /// The sample ratio of slow queries. pub sample_ratio: Option, + + /// The table TTL of `slow_queries` system table. Default is "30d". + /// It's used when `record_type` is `SystemTable`. + pub ttl: Option, +} + +impl Default for SlowQueryOptions { + fn default() -> Self { + Self { + enable: true, + record_type: SlowQueriesRecordType::SystemTable, + threshold: Some(Duration::from_secs(5)), + sample_ratio: Some(1.0), + ttl: Some("30d".to_string()), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum SlowQueriesRecordType { + SystemTable, + Log, } #[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -118,7 +140,6 @@ impl Default for LoggingOptions { otlp_endpoint: None, tracing_sample_ratio: None, append_stdout: true, - slow_query: SlowQueryOptions::default(), // Rotation hourly, 24 files per day, keeps info log files of 30 days max_log_files: 720, } @@ -158,7 +179,8 @@ pub fn init_default_ut_logging() { "unittest", &opts, &TracingOptions::default(), - None + None, + None, )); crate::info!("logs dir = {}", dir); @@ -176,6 +198,7 @@ pub fn init_global_logging( opts: &LoggingOptions, tracing_opts: &TracingOptions, node_id: Option, + slow_query_opts: Option<&SlowQueryOptions>, ) -> Vec { static START: Once = Once::new(); let mut guards = vec![]; @@ -278,50 +301,7 @@ pub fn init_global_logging( None }; - let slow_query_logging_layer = if !opts.dir.is_empty() && opts.slow_query.enable { - let rolling_appender = RollingFileAppender::builder() - .rotation(Rotation::HOURLY) - .filename_prefix("greptimedb-slow-queries") - .max_log_files(opts.max_log_files) - .build(&opts.dir) - .unwrap_or_else(|e| { - panic!( - "initializing rolling file appender at {} failed: {}", - &opts.dir, e - ) - }); - let (writer, guard) = tracing_appender::non_blocking(rolling_appender); - guards.push(guard); - - // Only logs if the field contains "slow". - let slow_query_filter = FilterFn::new(|metadata| { - metadata - .fields() - .iter() - .any(|field| field.name().contains("slow")) - }); - - if opts.log_format == LogFormat::Json { - Some( - Layer::new() - .json() - .with_writer(writer) - .with_ansi(false) - .with_filter(slow_query_filter) - .boxed(), - ) - } else { - Some( - Layer::new() - .with_writer(writer) - .with_ansi(false) - .with_filter(slow_query_filter) - .boxed(), - ) - } - } else { - None - }; + let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards); // resolve log level settings from: // - options from command line or config files @@ -435,3 +415,67 @@ pub fn init_global_logging( guards } + +fn build_slow_query_logger( + opts: &LoggingOptions, + slow_query_opts: Option<&SlowQueryOptions>, + guards: &mut Vec, +) -> Option + Send + Sync + 'static>> +where + S: tracing::Subscriber + + Send + + 'static + + for<'span> tracing_subscriber::registry::LookupSpan<'span>, +{ + if let Some(slow_query_opts) = slow_query_opts { + if !opts.dir.is_empty() + && slow_query_opts.enable + && slow_query_opts.record_type == SlowQueriesRecordType::Log + { + let rolling_appender = RollingFileAppender::builder() + .rotation(Rotation::HOURLY) + .filename_prefix("greptimedb-slow-queries") + .max_log_files(opts.max_log_files) + .build(&opts.dir) + .unwrap_or_else(|e| { + panic!( + "initializing rolling file appender at {} failed: {}", + &opts.dir, e + ) + }); + let (writer, guard) = tracing_appender::non_blocking(rolling_appender); + guards.push(guard); + + // Only logs if the field contains "slow". + let slow_query_filter = FilterFn::new(|metadata| { + metadata + .fields() + .iter() + .any(|field| field.name().contains("slow")) + }); + + if opts.log_format == LogFormat::Json { + Some( + Layer::new() + .json() + .with_writer(writer) + .with_ansi(false) + .with_filter(slow_query_filter) + .boxed(), + ) + } else { + Some( + Layer::new() + .with_writer(writer) + .with_ansi(false) + .with_filter(slow_query_filter) + .boxed(), + ) + } + } else { + None + } + } else { + None + } +} diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 153fbcdd83..b6ae383c32 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -56,6 +56,7 @@ prometheus.workspace = true promql-parser.workspace = true prost.workspace = true query.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true servers.workspace = true diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 5574da2169..d92bd5737a 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; use common_config::config::Configurable; use common_options::datanode::DatanodeClientOptions; -use common_telemetry::logging::{LoggingOptions, TracingOptions}; +use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions}; use meta_client::MetaClientOptions; use query::options::QueryOptions; use serde::{Deserialize, Serialize}; @@ -38,7 +38,7 @@ use crate::service_config::{ PromStoreOptions, }; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct FrontendOptions { pub node_id: Option, @@ -61,6 +61,7 @@ pub struct FrontendOptions { pub tracing: TracingOptions, pub query: QueryOptions, pub max_in_flight_write_bytes: Option, + pub slow_query: Option, } impl Default for FrontendOptions { @@ -86,6 +87,7 @@ impl Default for FrontendOptions { tracing: TracingOptions::default(), query: QueryOptions::default(), max_in_flight_write_bytes: None, + slow_query: Some(SlowQueryOptions::default()), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1477a2b133..c527d27007 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -55,7 +55,6 @@ use query::metrics::OnDone; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::DescribeResult; -use query::stats::StatementStatistics; use query::QueryEngineRef; use servers::error as server_error; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; @@ -80,6 +79,7 @@ use crate::error::{ TableOperationSnafu, }; use crate::limiter::LimiterRef; +use crate::slow_query_recorder::SlowQueryRecorder; /// The frontend instance contains necessary components, and implements many /// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`], @@ -94,7 +94,7 @@ pub struct Instance { inserter: InserterRef, deleter: DeleterRef, table_metadata_manager: TableMetadataManagerRef, - stats: StatementStatistics, + slow_query_recorder: Option, limiter: Option, } @@ -166,9 +166,11 @@ impl Instance { let query_interceptor = self.plugins.get::>(); let query_interceptor = query_interceptor.as_ref(); - let _slow_query_timer = self - .stats - .start_slow_query_timer(QueryStatement::Sql(stmt.clone())); + let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder { + recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone()) + } else { + None + }; let output = match stmt { Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { @@ -212,6 +214,7 @@ impl Instance { self.statement_executor.execute_sql(stmt, query_ctx).await } }; + output.context(TableOperationSnafu) } } @@ -374,7 +377,11 @@ impl PrometheusHandler for Instance { } })?; - let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone()); + let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder { + recorder.start(stmt.clone(), query_ctx.clone()) + } else { + None + }; let plan = self .statement_executor diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index ffbfeabca1..d717b75c14 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -34,7 +34,6 @@ use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; use pipeline::pipeline_operator::PipelineOperator; use query::region_query::RegionQueryHandlerFactoryRef; -use query::stats::StatementStatistics; use query::QueryEngineFactory; use snafu::OptionExt; @@ -43,6 +42,7 @@ use crate::frontend::FrontendOptions; use crate::instance::region_query::FrontendRegionQueryHandler; use crate::instance::Instance; use crate::limiter::Limiter; +use crate::slow_query_recorder::SlowQueryRecorder; /// The frontend [`Instance`] builder. pub struct FrontendBuilder { @@ -54,7 +54,6 @@ pub struct FrontendBuilder { node_manager: NodeManagerRef, plugins: Option, procedure_executor: ProcedureExecutorRef, - stats: StatementStatistics, } impl FrontendBuilder { @@ -65,7 +64,6 @@ impl FrontendBuilder { catalog_manager: CatalogManagerRef, node_manager: NodeManagerRef, procedure_executor: ProcedureExecutorRef, - stats: StatementStatistics, ) -> Self { Self { options, @@ -76,7 +74,6 @@ impl FrontendBuilder { node_manager, plugins: None, procedure_executor, - stats, } } @@ -189,6 +186,17 @@ impl FrontendBuilder { plugins.insert::(statement_executor.clone()); + let slow_query_recorder = self.options.slow_query.and_then(|opts| { + opts.enable.then(|| { + SlowQueryRecorder::new( + opts.clone(), + inserter.clone(), + statement_executor.clone(), + self.catalog_manager.clone(), + ) + }) + }); + // Create the limiter if the max_in_flight_write_bytes is set. let limiter = self .options @@ -206,7 +214,7 @@ impl FrontendBuilder { inserter, deleter, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), - stats: self.stats, + slow_query_recorder, limiter, }) } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index f89ff7bb1e..7672ae26c4 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -22,3 +22,4 @@ pub(crate) mod limiter; pub(crate) mod metrics; pub mod server; pub mod service_config; +pub(crate) mod slow_query_recorder; diff --git a/src/frontend/src/slow_query_recorder.rs b/src/frontend/src/slow_query_recorder.rs new file mode 100644 index 0000000000..e9cf617183 --- /dev/null +++ b/src/frontend/src/slow_query_recorder.rs @@ -0,0 +1,531 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant, UNIX_EPOCH}; + +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest, + RowInsertRequests, Rows, SemanticType, +}; +use catalog::CatalogManagerRef; +use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME}; +use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions}; +use common_telemetry::{debug, error, info, slow}; +use common_time::timestamp::{TimeUnit, Timestamp}; +use operator::insert::InserterRef; +use operator::statement::StatementExecutorRef; +use query::parser::QueryStatement; +use rand::random; +use session::context::{QueryContextBuilder, QueryContextRef}; +use snafu::ResultExt; +use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; +use table::TableRef; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::task::JoinHandle; + +use crate::error::{CatalogSnafu, Result, TableOperationSnafu}; + +const SLOW_QUERY_TABLE_NAME: &str = "slow_queries"; +const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost"; +const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold"; +const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query"; +const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; +const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql"; +const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start"; +const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end"; +const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range"; +const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step"; + +const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d"; +const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024; + +/// SlowQueryRecorder is responsible for recording slow queries. +#[derive(Clone)] +pub struct SlowQueryRecorder { + tx: Sender, + slow_query_opts: SlowQueryOptions, + _handle: Arc>, +} + +#[derive(Debug)] +struct SlowQueryEvent { + cost: u64, + threshold: u64, + query: String, + is_promql: bool, + query_ctx: QueryContextRef, + promql_range: Option, + promql_step: Option, + promql_start: Option, + promql_end: Option, +} + +impl SlowQueryRecorder { + /// Create a new SlowQueryRecorder. + pub fn new( + slow_query_opts: SlowQueryOptions, + inserter: InserterRef, + statement_executor: StatementExecutorRef, + catalog_manager: CatalogManagerRef, + ) -> Self { + let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE); + + let ttl = slow_query_opts + .ttl + .clone() + .unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string()); + + // Start a new task to process the slow query events. + let event_handler = SlowQueryEventHandler { + inserter, + statement_executor, + catalog_manager, + rx, + record_type: slow_query_opts.record_type, + ttl, + }; + + // Start a new background task to process the slow query events. + let handle = tokio::spawn(async move { + event_handler.process_slow_query().await; + }); + + Self { + tx, + slow_query_opts, + _handle: Arc::new(handle), + } + } + + /// Starts a new SlowQueryTimer. Returns `None` if `slow_query.enable` is false. + /// The timer sets the start time when created and calculates the elapsed duration when dropped. + pub fn start( + &self, + stmt: QueryStatement, + query_ctx: QueryContextRef, + ) -> Option { + if self.slow_query_opts.enable { + Some(SlowQueryTimer { + stmt, + query_ctx, + start: Instant::now(), // Set the initial start time. + threshold: self.slow_query_opts.threshold, + sample_ratio: self.slow_query_opts.sample_ratio, + tx: self.tx.clone(), + }) + } else { + None + } + } +} + +struct SlowQueryEventHandler { + inserter: InserterRef, + statement_executor: StatementExecutorRef, + catalog_manager: CatalogManagerRef, + rx: Receiver, + record_type: SlowQueriesRecordType, + ttl: String, +} + +impl SlowQueryEventHandler { + async fn process_slow_query(mut self) { + info!( + "Start the background handler to process slow query events and record them in {:?}.", + self.record_type + ); + while let Some(event) = self.rx.recv().await { + self.record_slow_query(event).await; + } + } + + async fn record_slow_query(&self, event: SlowQueryEvent) { + match self.record_type { + SlowQueriesRecordType::Log => { + // Record the slow query in a specific logs file. + slow!( + cost = event.cost, + threshold = event.threshold, + query = event.query, + is_promql = event.is_promql, + promql_range = event.promql_range, + promql_step = event.promql_step, + promql_start = event.promql_start, + promql_end = event.promql_end, + ); + } + SlowQueriesRecordType::SystemTable => { + // Record the slow query in a system table that is stored in greptimedb itself. + if let Err(e) = self.insert_slow_query(&event).await { + error!(e; "Failed to insert slow query, query: {:?}", event); + } + } + } + } + + async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> { + debug!("Handle the slow query event: {:?}", event); + + let table = if let Some(table) = self + .catalog_manager + .table( + event.query_ctx.current_catalog(), + DEFAULT_PRIVATE_SCHEMA_NAME, + SLOW_QUERY_TABLE_NAME, + Some(&event.query_ctx), + ) + .await + .context(CatalogSnafu)? + { + table + } else { + // Create the system table if it doesn't exist. + self.create_system_table(event.query_ctx.clone()).await? + }; + + let insert = RowInsertRequest { + table_name: SLOW_QUERY_TABLE_NAME.to_string(), + rows: Some(Rows { + schema: self.build_insert_column_schema(), + rows: vec![Row { + values: vec![ + ValueData::U64Value(event.cost).into(), + ValueData::U64Value(event.threshold).into(), + ValueData::StringValue(event.query.to_string()).into(), + ValueData::BoolValue(event.is_promql).into(), + ValueData::TimestampNanosecondValue( + Timestamp::current_time(TimeUnit::Nanosecond).value(), + ) + .into(), + ValueData::U64Value(event.promql_range.unwrap_or(0)).into(), + ValueData::U64Value(event.promql_step.unwrap_or(0)).into(), + ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0)) + .into(), + ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(), + ], + }], + }), + }; + + let requests = RowInsertRequests { + inserts: vec![insert], + }; + + let table_info = table.table_info(); + let query_ctx = QueryContextBuilder::default() + .current_catalog(table_info.catalog_name.to_string()) + .current_schema(table_info.schema_name.to_string()) + .build() + .into(); + + self.inserter + .handle_row_inserts(requests, query_ctx, &self.statement_executor) + .await + .context(TableOperationSnafu)?; + + Ok(()) + } + + async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result { + let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog()); + if let Some(table) = self + .catalog_manager + .table( + &create_table_expr.catalog_name, + &create_table_expr.schema_name, + &create_table_expr.table_name, + Some(&query_ctx), + ) + .await + .context(CatalogSnafu)? + { + // The table is already created, so we don't need to create it again. + return Ok(table); + } + + // Create the `slow_queries` system table. + let table = self + .statement_executor + .create_table_inner(&mut create_table_expr, None, query_ctx.clone()) + .await + .context(TableOperationSnafu)?; + + info!( + "Create the {} system table in {:?} successfully.", + SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME + ); + + Ok(table) + } + + fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr { + let column_defs = vec![ + ColumnDef { + name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(), + data_type: ColumnDataType::Uint64 as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "The cost of the slow query in milliseconds".to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(), + data_type: ColumnDataType::Uint64 as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: + "When the query cost exceeds this value, it will be recorded as a slow query" + .to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(), + data_type: ColumnDataType::String as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "The original query statement".to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(), + data_type: ColumnDataType::Boolean as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "Whether the query is a PromQL query".to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), + data_type: ColumnDataType::TimestampNanosecond as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, + comment: "The timestamp of the slow query".to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(), + data_type: ColumnDataType::Uint64 as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "The time range of the PromQL query in milliseconds".to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(), + data_type: ColumnDataType::Uint64 as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "The step of the PromQL query in milliseconds".to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(), + data_type: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "The start timestamp of the PromQL query in milliseconds".to_string(), + datatype_extension: None, + options: None, + }, + ColumnDef { + name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(), + data_type: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + semantic_type: SemanticType::Field as i32, + comment: "The end timestamp of the PromQL query in milliseconds".to_string(), + datatype_extension: None, + options: None, + }, + ]; + + let table_options = HashMap::from([ + (APPEND_MODE_KEY.to_string(), "true".to_string()), + (TTL_KEY.to_string(), self.ttl.to_string()), + ]); + + CreateTableExpr { + catalog_name: catalog.to_string(), + schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), // Always to store in the `greptime_private` schema. + table_name: SLOW_QUERY_TABLE_NAME.to_string(), + desc: "GreptimeDB system table for storing slow queries".to_string(), + column_defs, + time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), + primary_keys: vec![], + create_if_not_exists: true, + table_options, + table_id: None, + engine: default_engine().to_string(), + } + } + + fn build_insert_column_schema(&self) -> Vec { + vec![ + ColumnSchema { + column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Boolean.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampMillisecond.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ] + } +} + +/// SlowQueryTimer is used to log slow query when it's dropped. +/// In drop(), it will check if the query is slow and send the slow query event to the handler. +pub struct SlowQueryTimer { + start: Instant, + stmt: QueryStatement, + query_ctx: QueryContextRef, + threshold: Option, + sample_ratio: Option, + tx: Sender, +} + +impl SlowQueryTimer { + fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) { + let mut slow_query_event = SlowQueryEvent { + cost: elapsed.as_millis() as u64, + threshold: threshold.as_millis() as u64, + query: "".to_string(), + query_ctx: self.query_ctx.clone(), + + // The following fields are only used for PromQL queries. + is_promql: false, + promql_range: None, + promql_step: None, + promql_start: None, + promql_end: None, + }; + + match &self.stmt { + QueryStatement::Promql(stmt) => { + slow_query_event.is_promql = true; + slow_query_event.query = stmt.expr.to_string(); + slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64); + + let start = stmt + .start + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + + let end = stmt + .end + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + + slow_query_event.promql_range = Some((end - start) as u64); + slow_query_event.promql_start = Some(start); + slow_query_event.promql_end = Some(end); + } + QueryStatement::Sql(stmt) => { + slow_query_event.query = stmt.to_string(); + } + } + + // Send SlowQueryEvent to the handler. + if let Err(e) = self.tx.try_send(slow_query_event) { + error!(e; "Failed to send slow query event"); + } + } +} + +impl Drop for SlowQueryTimer { + fn drop(&mut self) { + if let Some(threshold) = self.threshold { + // Calculate the elaspsed duration since the timer is created. + let elapsed = self.start.elapsed(); + if elapsed > threshold { + if let Some(ratio) = self.sample_ratio { + // Only capture a portion of slow queries based on sample_ratio. + // Generate a random number in [0, 1) and compare it with sample_ratio. + if ratio >= 1.0 || random::() <= ratio { + self.send_slow_query_event(elapsed, threshold); + } + } else { + // Captures all slow queries if sample_ratio is not set. + self.send_slow_query_event(elapsed, threshold); + } + } + } + } +} diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 428aa5cb45..9f9a901910 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -39,7 +39,6 @@ pub mod query_engine; mod range_select; pub mod region_query; pub mod sql; -pub mod stats; pub(crate) mod window_sort; #[cfg(test)] diff --git a/src/query/src/stats.rs b/src/query/src/stats.rs deleted file mode 100644 index 085642fbc1..0000000000 --- a/src/query/src/stats.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use common_telemetry::logging::SlowQueryOptions; -use common_telemetry::slow; -use rand::random; - -use crate::parser::QueryStatement; - -/// StatementStatistics is used to collect statistics for a statement. -#[derive(Default, Clone, Debug)] -pub struct StatementStatistics { - /// slow_query is used to configure slow query log. - pub slow_query: SlowQueryOptions, -} - -impl StatementStatistics { - pub fn new(slow_query_options: SlowQueryOptions) -> Self { - Self { - slow_query: slow_query_options, - } - } - - pub fn start_slow_query_timer(&self, stmt: QueryStatement) -> Option { - if self.slow_query.enable { - Some(SlowQueryTimer { - start: std::time::Instant::now(), - stmt, - threshold: self.slow_query.threshold, - sample_ratio: self.slow_query.sample_ratio, - }) - } else { - None - } - } -} - -/// SlowQueryTimer is used to log slow query when it's dropped. -pub struct SlowQueryTimer { - start: std::time::Instant, - stmt: QueryStatement, - threshold: Option, - sample_ratio: Option, -} - -impl SlowQueryTimer { - fn log_slow_query(&self, elapsed: Duration, threshold: Duration) { - match &self.stmt { - QueryStatement::Sql(stmt) => { - slow!( - cost = elapsed.as_millis() as u64, - threshold = threshold.as_millis() as u64, - sql = stmt.to_string() - ); - } - QueryStatement::Promql(stmt) => { - slow!( - cost = elapsed.as_millis() as u64, - threshold = threshold.as_millis() as u64, - promql = stmt.to_string() - ); - } - } - } -} - -impl Drop for SlowQueryTimer { - fn drop(&mut self) { - if let Some(threshold) = self.threshold { - let elapsed = self.start.elapsed(); - if elapsed > threshold { - if let Some(ratio) = self.sample_ratio { - // Generate a random number in [0, 1) and compare it with sample_ratio. - if ratio >= 1.0 || random::() <= ratio { - self.log_slow_query(elapsed, threshold); - } - } else { - // If sample_ratio is not set, log all slow queries. - self.log_slow_query(elapsed, threshold); - } - } - } - } -} diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 4693dc3e84..8e7834f643 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -54,7 +54,6 @@ use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; -use query::stats::StatementStatistics; use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; use servers::heartbeat_options::HeartbeatOptions; @@ -403,7 +402,6 @@ impl GreptimeDbClusterBuilder { catalog_manager, datanode_clients, meta_client, - StatementStatistics::default(), ) .with_local_cache_invalidator(cache_registry) .try_build() diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b14ebafb3f..7f266f56bd 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -46,7 +46,6 @@ use frontend::frontend::Frontend; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{Instance, StandaloneDatanodeManager}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; -use query::stats::StatementStatistics; use servers::grpc::GrpcOptions; use servers::server::ServerHandlers; use snafu::ResultExt; @@ -238,7 +237,6 @@ impl GreptimeDbStandaloneBuilder { catalog_manager.clone(), node_manager.clone(), ddl_task_executor.clone(), - StatementStatistics::default(), ) .with_plugin(plugins) .try_build() diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7ce500b9b1..fd37a7971a 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1093,9 +1093,6 @@ max_log_files = 720 append_stdout = true enable_otlp_tracing = false -[logging.slow_query] -enable = false - [[region_engine]] [region_engine.mito] @@ -1149,7 +1146,15 @@ type = "time_series" enable = false write_interval = "30s" -[tracing]"#, +[tracing] + +[slow_query] +enable = true +record_type = "system_table" +threshold = "5s" +sample_ratio = 1.0 +ttl = "30d" +"#, ) .trim() .to_string();