diff --git a/Cargo.lock b/Cargo.lock
index 00048a781c..8e470fbd09 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4457,6 +4457,7 @@ dependencies = [
"promql-parser",
"prost 0.13.5",
"query",
+ "rand 0.9.0",
"serde",
"serde_json",
"servers",
diff --git a/config/config.md b/config/config.md
index f3230190c9..ff296b91ce 100644
--- a/config/config.md
+++ b/config/config.md
@@ -188,10 +188,11 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
-| `logging.slow_query` | -- | -- | The slow query log options. |
-| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
-| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
-| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
+| `slow_query` | -- | -- | The slow query log options. |
+| `slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
+| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
+| `slow_query.threshold` | String | Unset | The threshold of slow query. |
+| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
@@ -288,10 +289,12 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
-| `logging.slow_query` | -- | -- | The slow query log options. |
-| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
-| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
-| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
+| `slow_query` | -- | -- | The slow query log options. |
+| `slow_query.enable` | Bool | `true` | Whether to enable slow query log. |
+| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`.
If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.
If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. |
+| `slow_query.threshold` | String | Unset | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
+| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
+| `slow_query.ttl` | String | Unset | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
@@ -362,10 +365,6 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
-| `logging.slow_query` | -- | -- | The slow query log options. |
-| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
-| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
-| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
@@ -529,10 +528,6 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
-| `logging.slow_query` | -- | -- | The slow query log options. |
-| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
-| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
-| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
@@ -585,9 +580,5 @@
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.
Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
-| `logging.slow_query` | -- | -- | The slow query log options. |
-| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
-| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
-| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 46beb51a23..6b00cf90bd 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -632,19 +632,6 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
-## The slow query log options.
-[logging.slow_query]
-## Whether to enable slow query log.
-enable = false
-
-## The threshold of slow query.
-## @toml2docs:none-default
-threshold = "10s"
-
-## The sampling ratio of slow query log. The value should be in the range of (0, 1].
-## @toml2docs:none-default
-sample_ratio = 1.0
-
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index 954daff176..43c7ef1141 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -100,19 +100,6 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
-## The slow query log options.
-[logging.slow_query]
-## Whether to enable slow query log.
-enable = false
-
-## The threshold of slow query.
-## @toml2docs:none-default
-threshold = "10s"
-
-## The sampling ratio of slow query log. The value should be in the range of (0, 1].
-## @toml2docs:none-default
-sample_ratio = 1.0
-
## The tracing options. Only effect when compiled with `tokio-console` feature.
#+ [tracing]
## The tokio console address.
diff --git a/config/frontend.example.toml b/config/frontend.example.toml
index 2e3ee4a69d..2a9ee2bc8c 100644
--- a/config/frontend.example.toml
+++ b/config/frontend.example.toml
@@ -223,18 +223,28 @@ max_log_files = 720
default_ratio = 1.0
## The slow query log options.
-[logging.slow_query]
+[slow_query]
## Whether to enable slow query log.
-enable = false
+enable = true
-## The threshold of slow query.
+## The record type of slow queries. It can be `system_table` or `log`.
+## If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.
+## If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`.
## @toml2docs:none-default
-threshold = "10s"
+record_type = "system_table"
-## The sampling ratio of slow query log. The value should be in the range of (0, 1].
+## The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`.
+## @toml2docs:none-default
+threshold = "5s"
+
+## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged.
## @toml2docs:none-default
sample_ratio = 1.0
+## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
+## @toml2docs:none-default
+ttl = "30d"
+
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index 0e7f9b74f0..dadb077a86 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -218,19 +218,6 @@ max_log_files = 720
[logging.tracing_sample_ratio]
default_ratio = 1.0
-## The slow query log options.
-[logging.slow_query]
-## Whether to enable slow query log.
-enable = false
-
-## The threshold of slow query.
-## @toml2docs:none-default
-threshold = "10s"
-
-## The sampling ratio of slow query log. The value should be in the range of (0, 1].
-## @toml2docs:none-default
-sample_ratio = 1.0
-
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
[export_metrics]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 0e72cfcc7e..649b570350 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -724,17 +724,21 @@ max_log_files = 720
default_ratio = 1.0
## The slow query log options.
-[logging.slow_query]
+[slow_query]
## Whether to enable slow query log.
-enable = false
+#+ enable = false
+
+## The record type of slow queries. It can be `system_table` or `log`.
+## @toml2docs:none-default
+#+ record_type = "system_table"
## The threshold of slow query.
## @toml2docs:none-default
-threshold = "10s"
+#+ threshold = "10s"
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
## @toml2docs:none-default
-sample_ratio = 1.0
+#+ sample_ratio = 1.0
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs
index a514fa0d3c..b57da1216a 100644
--- a/src/cmd/src/cli.rs
+++ b/src/cmd/src/cli.rs
@@ -76,6 +76,7 @@ impl Command {
&opts,
&TracingOptions::default(),
None,
+ None,
);
let tool = self.cmd.build().await.context(error::BuildCliSnafu)?;
diff --git a/src/cmd/src/datanode/builder.rs b/src/cmd/src/datanode/builder.rs
index 64acc29a3d..264a3b3f87 100644
--- a/src/cmd/src/datanode/builder.rs
+++ b/src/cmd/src/datanode/builder.rs
@@ -64,6 +64,7 @@ impl InstanceBuilder {
&dn_opts.logging,
&dn_opts.tracing,
dn_opts.node_id.map(|x| x.to_string()),
+ None,
);
log_versions(version(), short_version(), APP_NAME);
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index bfda82922c..896d55c8a7 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -244,6 +244,7 @@ impl StartCommand {
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
+ None,
);
log_versions(version(), short_version(), APP_NAME);
diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs
index a28f4cd8f9..192f5a4306 100644
--- a/src/cmd/src/frontend.rs
+++ b/src/cmd/src/frontend.rs
@@ -37,7 +37,6 @@ use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
-use query::stats::StatementStatistics;
use servers::export_metrics::ExportMetricsTask;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
@@ -269,6 +268,7 @@ impl StartCommand {
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.clone(),
+ opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
@@ -368,7 +368,6 @@ impl StartCommand {
catalog_manager,
Arc::new(client),
meta_client,
- StatementStatistics::new(opts.logging.slow_query.clone()),
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(layered_cache_registry)
diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs
index 87e632c660..bbe68f3729 100644
--- a/src/cmd/src/metasrv.rs
+++ b/src/cmd/src/metasrv.rs
@@ -300,6 +300,7 @@ impl StartCommand {
&opts.component.logging,
&opts.component.tracing,
None,
+ None,
);
log_versions(version(), short_version(), APP_NAME);
diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs
index dd9e7926c4..80379c3ba7 100644
--- a/src/cmd/src/standalone.rs
+++ b/src/cmd/src/standalone.rs
@@ -47,7 +47,7 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_telemetry::info;
-use common_telemetry::logging::{LoggingOptions, TracingOptions};
+use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
@@ -69,7 +69,6 @@ use frontend::service_config::{
};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
-use query::stats::StatementStatistics;
use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions;
@@ -153,6 +152,7 @@ pub struct StandaloneOptions {
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option,
+ pub slow_query: Option,
}
impl Default for StandaloneOptions {
@@ -184,6 +184,7 @@ impl Default for StandaloneOptions {
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
+ slow_query: Some(SlowQueryOptions::default()),
}
}
}
@@ -223,6 +224,7 @@ impl StandaloneOptions {
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
+ slow_query: cloned_opts.slow_query,
..Default::default()
}
}
@@ -447,6 +449,7 @@ impl StartCommand {
&opts.component.logging,
&opts.component.tracing,
None,
+ opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
@@ -594,7 +597,6 @@ impl StartCommand {
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
- StatementStatistics::new(opts.logging.slow_query.clone()),
)
.with_plugin(plugins.clone())
.try_build()
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index 07be39dddb..c14417afba 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -18,7 +18,7 @@ use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::Configurable;
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
-use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT};
+use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
@@ -167,11 +167,6 @@ fn test_load_metasrv_example_config() {
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
- slow_query: SlowQueryOptions {
- enable: false,
- threshold: None,
- sample_ratio: None,
- },
..Default::default()
},
datanode: DatanodeClientOptions {
diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs
index 06b43a1d17..6ac4f343d6 100644
--- a/src/common/telemetry/src/logging.rs
+++ b/src/common/telemetry/src/logging.rs
@@ -68,24 +68,46 @@ pub struct LoggingOptions {
/// The tracing sample ratio.
pub tracing_sample_ratio: Option,
-
- /// The logging options of slow query.
- pub slow_query: SlowQueryOptions,
}
/// The options of slow query.
-#[derive(Clone, Debug, Serialize, Deserialize, Default)]
-#[serde(default)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SlowQueryOptions {
/// Whether to enable slow query log.
pub enable: bool,
+ /// The record type of slow queries.
+ pub record_type: SlowQueriesRecordType,
+
/// The threshold of slow queries.
#[serde(with = "humantime_serde")]
pub threshold: Option,
/// The sample ratio of slow queries.
pub sample_ratio: Option,
+
+ /// The table TTL of `slow_queries` system table. Default is "30d".
+ /// It's used when `record_type` is `SystemTable`.
+ pub ttl: Option,
+}
+
+impl Default for SlowQueryOptions {
+ fn default() -> Self {
+ Self {
+ enable: true,
+ record_type: SlowQueriesRecordType::SystemTable,
+ threshold: Some(Duration::from_secs(5)),
+ sample_ratio: Some(1.0),
+ ttl: Some("30d".to_string()),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
+#[serde(rename_all = "snake_case")]
+pub enum SlowQueriesRecordType {
+ SystemTable,
+ Log,
}
#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -118,7 +140,6 @@ impl Default for LoggingOptions {
otlp_endpoint: None,
tracing_sample_ratio: None,
append_stdout: true,
- slow_query: SlowQueryOptions::default(),
// Rotation hourly, 24 files per day, keeps info log files of 30 days
max_log_files: 720,
}
@@ -158,7 +179,8 @@ pub fn init_default_ut_logging() {
"unittest",
&opts,
&TracingOptions::default(),
- None
+ None,
+ None,
));
crate::info!("logs dir = {}", dir);
@@ -176,6 +198,7 @@ pub fn init_global_logging(
opts: &LoggingOptions,
tracing_opts: &TracingOptions,
node_id: Option,
+ slow_query_opts: Option<&SlowQueryOptions>,
) -> Vec {
static START: Once = Once::new();
let mut guards = vec![];
@@ -278,50 +301,7 @@ pub fn init_global_logging(
None
};
- let slow_query_logging_layer = if !opts.dir.is_empty() && opts.slow_query.enable {
- let rolling_appender = RollingFileAppender::builder()
- .rotation(Rotation::HOURLY)
- .filename_prefix("greptimedb-slow-queries")
- .max_log_files(opts.max_log_files)
- .build(&opts.dir)
- .unwrap_or_else(|e| {
- panic!(
- "initializing rolling file appender at {} failed: {}",
- &opts.dir, e
- )
- });
- let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
- guards.push(guard);
-
- // Only logs if the field contains "slow".
- let slow_query_filter = FilterFn::new(|metadata| {
- metadata
- .fields()
- .iter()
- .any(|field| field.name().contains("slow"))
- });
-
- if opts.log_format == LogFormat::Json {
- Some(
- Layer::new()
- .json()
- .with_writer(writer)
- .with_ansi(false)
- .with_filter(slow_query_filter)
- .boxed(),
- )
- } else {
- Some(
- Layer::new()
- .with_writer(writer)
- .with_ansi(false)
- .with_filter(slow_query_filter)
- .boxed(),
- )
- }
- } else {
- None
- };
+ let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
// resolve log level settings from:
// - options from command line or config files
@@ -435,3 +415,67 @@ pub fn init_global_logging(
guards
}
+
+fn build_slow_query_logger(
+ opts: &LoggingOptions,
+ slow_query_opts: Option<&SlowQueryOptions>,
+ guards: &mut Vec,
+) -> Option + Send + Sync + 'static>>
+where
+ S: tracing::Subscriber
+ + Send
+ + 'static
+ + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
+{
+ if let Some(slow_query_opts) = slow_query_opts {
+ if !opts.dir.is_empty()
+ && slow_query_opts.enable
+ && slow_query_opts.record_type == SlowQueriesRecordType::Log
+ {
+ let rolling_appender = RollingFileAppender::builder()
+ .rotation(Rotation::HOURLY)
+ .filename_prefix("greptimedb-slow-queries")
+ .max_log_files(opts.max_log_files)
+ .build(&opts.dir)
+ .unwrap_or_else(|e| {
+ panic!(
+ "initializing rolling file appender at {} failed: {}",
+ &opts.dir, e
+ )
+ });
+ let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
+ guards.push(guard);
+
+ // Only logs if the field contains "slow".
+ let slow_query_filter = FilterFn::new(|metadata| {
+ metadata
+ .fields()
+ .iter()
+ .any(|field| field.name().contains("slow"))
+ });
+
+ if opts.log_format == LogFormat::Json {
+ Some(
+ Layer::new()
+ .json()
+ .with_writer(writer)
+ .with_ansi(false)
+ .with_filter(slow_query_filter)
+ .boxed(),
+ )
+ } else {
+ Some(
+ Layer::new()
+ .with_writer(writer)
+ .with_ansi(false)
+ .with_filter(slow_query_filter)
+ .boxed(),
+ )
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+}
diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml
index 153fbcdd83..b6ae383c32 100644
--- a/src/frontend/Cargo.toml
+++ b/src/frontend/Cargo.toml
@@ -56,6 +56,7 @@ prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
+rand.workspace = true
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs
index 5574da2169..d92bd5737a 100644
--- a/src/frontend/src/frontend.rs
+++ b/src/frontend/src/frontend.rs
@@ -17,7 +17,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_config::config::Configurable;
use common_options::datanode::DatanodeClientOptions;
-use common_telemetry::logging::{LoggingOptions, TracingOptions};
+use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use meta_client::MetaClientOptions;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
@@ -38,7 +38,7 @@ use crate::service_config::{
PromStoreOptions,
};
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct FrontendOptions {
pub node_id: Option,
@@ -61,6 +61,7 @@ pub struct FrontendOptions {
pub tracing: TracingOptions,
pub query: QueryOptions,
pub max_in_flight_write_bytes: Option,
+ pub slow_query: Option,
}
impl Default for FrontendOptions {
@@ -86,6 +87,7 @@ impl Default for FrontendOptions {
tracing: TracingOptions::default(),
query: QueryOptions::default(),
max_in_flight_write_bytes: None,
+ slow_query: Some(SlowQueryOptions::default()),
}
}
}
diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs
index 1477a2b133..c527d27007 100644
--- a/src/frontend/src/instance.rs
+++ b/src/frontend/src/instance.rs
@@ -55,7 +55,6 @@ use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
-use query::stats::StatementStatistics;
use query::QueryEngineRef;
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
@@ -80,6 +79,7 @@ use crate::error::{
TableOperationSnafu,
};
use crate::limiter::LimiterRef;
+use crate::slow_query_recorder::SlowQueryRecorder;
/// The frontend instance contains necessary components, and implements many
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
@@ -94,7 +94,7 @@ pub struct Instance {
inserter: InserterRef,
deleter: DeleterRef,
table_metadata_manager: TableMetadataManagerRef,
- stats: StatementStatistics,
+ slow_query_recorder: Option,
limiter: Option,
}
@@ -166,9 +166,11 @@ impl Instance {
let query_interceptor = self.plugins.get::>();
let query_interceptor = query_interceptor.as_ref();
- let _slow_query_timer = self
- .stats
- .start_slow_query_timer(QueryStatement::Sql(stmt.clone()));
+ let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
+ recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
+ } else {
+ None
+ };
let output = match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
@@ -212,6 +214,7 @@ impl Instance {
self.statement_executor.execute_sql(stmt, query_ctx).await
}
};
+
output.context(TableOperationSnafu)
}
}
@@ -374,7 +377,11 @@ impl PrometheusHandler for Instance {
}
})?;
- let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone());
+ let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
+ recorder.start(stmt.clone(), query_ctx.clone())
+ } else {
+ None
+ };
let plan = self
.statement_executor
diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs
index ffbfeabca1..d717b75c14 100644
--- a/src/frontend/src/instance/builder.rs
+++ b/src/frontend/src/instance/builder.rs
@@ -34,7 +34,6 @@ use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::region_query::RegionQueryHandlerFactoryRef;
-use query::stats::StatementStatistics;
use query::QueryEngineFactory;
use snafu::OptionExt;
@@ -43,6 +42,7 @@ use crate::frontend::FrontendOptions;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::Instance;
use crate::limiter::Limiter;
+use crate::slow_query_recorder::SlowQueryRecorder;
/// The frontend [`Instance`] builder.
pub struct FrontendBuilder {
@@ -54,7 +54,6 @@ pub struct FrontendBuilder {
node_manager: NodeManagerRef,
plugins: Option,
procedure_executor: ProcedureExecutorRef,
- stats: StatementStatistics,
}
impl FrontendBuilder {
@@ -65,7 +64,6 @@ impl FrontendBuilder {
catalog_manager: CatalogManagerRef,
node_manager: NodeManagerRef,
procedure_executor: ProcedureExecutorRef,
- stats: StatementStatistics,
) -> Self {
Self {
options,
@@ -76,7 +74,6 @@ impl FrontendBuilder {
node_manager,
plugins: None,
procedure_executor,
- stats,
}
}
@@ -189,6 +186,17 @@ impl FrontendBuilder {
plugins.insert::(statement_executor.clone());
+ let slow_query_recorder = self.options.slow_query.and_then(|opts| {
+ opts.enable.then(|| {
+ SlowQueryRecorder::new(
+ opts.clone(),
+ inserter.clone(),
+ statement_executor.clone(),
+ self.catalog_manager.clone(),
+ )
+ })
+ });
+
// Create the limiter if the max_in_flight_write_bytes is set.
let limiter = self
.options
@@ -206,7 +214,7 @@ impl FrontendBuilder {
inserter,
deleter,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
- stats: self.stats,
+ slow_query_recorder,
limiter,
})
}
diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs
index f89ff7bb1e..7672ae26c4 100644
--- a/src/frontend/src/lib.rs
+++ b/src/frontend/src/lib.rs
@@ -22,3 +22,4 @@ pub(crate) mod limiter;
pub(crate) mod metrics;
pub mod server;
pub mod service_config;
+pub(crate) mod slow_query_recorder;
diff --git a/src/frontend/src/slow_query_recorder.rs b/src/frontend/src/slow_query_recorder.rs
new file mode 100644
index 0000000000..e9cf617183
--- /dev/null
+++ b/src/frontend/src/slow_query_recorder.rs
@@ -0,0 +1,531 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant, UNIX_EPOCH};
+
+use api::v1::value::ValueData;
+use api::v1::{
+ ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
+ RowInsertRequests, Rows, SemanticType,
+};
+use catalog::CatalogManagerRef;
+use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
+use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
+use common_telemetry::{debug, error, info, slow};
+use common_time::timestamp::{TimeUnit, Timestamp};
+use operator::insert::InserterRef;
+use operator::statement::StatementExecutorRef;
+use query::parser::QueryStatement;
+use rand::random;
+use session::context::{QueryContextBuilder, QueryContextRef};
+use snafu::ResultExt;
+use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
+use table::TableRef;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::task::JoinHandle;
+
+use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
+
+const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
+const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
+const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
+const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
+const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
+const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
+const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
+const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
+const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
+const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
+
+const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
+const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
+
+/// SlowQueryRecorder is responsible for recording slow queries.
+#[derive(Clone)]
+pub struct SlowQueryRecorder {
+ tx: Sender,
+ slow_query_opts: SlowQueryOptions,
+ _handle: Arc>,
+}
+
+#[derive(Debug)]
+struct SlowQueryEvent {
+ cost: u64,
+ threshold: u64,
+ query: String,
+ is_promql: bool,
+ query_ctx: QueryContextRef,
+ promql_range: Option,
+ promql_step: Option,
+ promql_start: Option,
+ promql_end: Option,
+}
+
+impl SlowQueryRecorder {
+ /// Create a new SlowQueryRecorder.
+ pub fn new(
+ slow_query_opts: SlowQueryOptions,
+ inserter: InserterRef,
+ statement_executor: StatementExecutorRef,
+ catalog_manager: CatalogManagerRef,
+ ) -> Self {
+ let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE);
+
+ let ttl = slow_query_opts
+ .ttl
+ .clone()
+ .unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string());
+
+ // Start a new task to process the slow query events.
+ let event_handler = SlowQueryEventHandler {
+ inserter,
+ statement_executor,
+ catalog_manager,
+ rx,
+ record_type: slow_query_opts.record_type,
+ ttl,
+ };
+
+ // Start a new background task to process the slow query events.
+ let handle = tokio::spawn(async move {
+ event_handler.process_slow_query().await;
+ });
+
+ Self {
+ tx,
+ slow_query_opts,
+ _handle: Arc::new(handle),
+ }
+ }
+
+ /// Starts a new SlowQueryTimer. Returns `None` if `slow_query.enable` is false.
+ /// The timer sets the start time when created and calculates the elapsed duration when dropped.
+ pub fn start(
+ &self,
+ stmt: QueryStatement,
+ query_ctx: QueryContextRef,
+ ) -> Option {
+ if self.slow_query_opts.enable {
+ Some(SlowQueryTimer {
+ stmt,
+ query_ctx,
+ start: Instant::now(), // Set the initial start time.
+ threshold: self.slow_query_opts.threshold,
+ sample_ratio: self.slow_query_opts.sample_ratio,
+ tx: self.tx.clone(),
+ })
+ } else {
+ None
+ }
+ }
+}
+
+struct SlowQueryEventHandler {
+ inserter: InserterRef,
+ statement_executor: StatementExecutorRef,
+ catalog_manager: CatalogManagerRef,
+ rx: Receiver,
+ record_type: SlowQueriesRecordType,
+ ttl: String,
+}
+
+impl SlowQueryEventHandler {
+ async fn process_slow_query(mut self) {
+ info!(
+ "Start the background handler to process slow query events and record them in {:?}.",
+ self.record_type
+ );
+ while let Some(event) = self.rx.recv().await {
+ self.record_slow_query(event).await;
+ }
+ }
+
+ async fn record_slow_query(&self, event: SlowQueryEvent) {
+ match self.record_type {
+ SlowQueriesRecordType::Log => {
+ // Record the slow query in a specific logs file.
+ slow!(
+ cost = event.cost,
+ threshold = event.threshold,
+ query = event.query,
+ is_promql = event.is_promql,
+ promql_range = event.promql_range,
+ promql_step = event.promql_step,
+ promql_start = event.promql_start,
+ promql_end = event.promql_end,
+ );
+ }
+ SlowQueriesRecordType::SystemTable => {
+ // Record the slow query in a system table that is stored in greptimedb itself.
+ if let Err(e) = self.insert_slow_query(&event).await {
+ error!(e; "Failed to insert slow query, query: {:?}", event);
+ }
+ }
+ }
+ }
+
+ async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> {
+ debug!("Handle the slow query event: {:?}", event);
+
+ let table = if let Some(table) = self
+ .catalog_manager
+ .table(
+ event.query_ctx.current_catalog(),
+ DEFAULT_PRIVATE_SCHEMA_NAME,
+ SLOW_QUERY_TABLE_NAME,
+ Some(&event.query_ctx),
+ )
+ .await
+ .context(CatalogSnafu)?
+ {
+ table
+ } else {
+ // Create the system table if it doesn't exist.
+ self.create_system_table(event.query_ctx.clone()).await?
+ };
+
+ let insert = RowInsertRequest {
+ table_name: SLOW_QUERY_TABLE_NAME.to_string(),
+ rows: Some(Rows {
+ schema: self.build_insert_column_schema(),
+ rows: vec![Row {
+ values: vec![
+ ValueData::U64Value(event.cost).into(),
+ ValueData::U64Value(event.threshold).into(),
+ ValueData::StringValue(event.query.to_string()).into(),
+ ValueData::BoolValue(event.is_promql).into(),
+ ValueData::TimestampNanosecondValue(
+ Timestamp::current_time(TimeUnit::Nanosecond).value(),
+ )
+ .into(),
+ ValueData::U64Value(event.promql_range.unwrap_or(0)).into(),
+ ValueData::U64Value(event.promql_step.unwrap_or(0)).into(),
+ ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0))
+ .into(),
+ ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(),
+ ],
+ }],
+ }),
+ };
+
+ let requests = RowInsertRequests {
+ inserts: vec![insert],
+ };
+
+ let table_info = table.table_info();
+ let query_ctx = QueryContextBuilder::default()
+ .current_catalog(table_info.catalog_name.to_string())
+ .current_schema(table_info.schema_name.to_string())
+ .build()
+ .into();
+
+ self.inserter
+ .handle_row_inserts(requests, query_ctx, &self.statement_executor)
+ .await
+ .context(TableOperationSnafu)?;
+
+ Ok(())
+ }
+
+ async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result {
+ let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog());
+ if let Some(table) = self
+ .catalog_manager
+ .table(
+ &create_table_expr.catalog_name,
+ &create_table_expr.schema_name,
+ &create_table_expr.table_name,
+ Some(&query_ctx),
+ )
+ .await
+ .context(CatalogSnafu)?
+ {
+ // The table is already created, so we don't need to create it again.
+ return Ok(table);
+ }
+
+ // Create the `slow_queries` system table.
+ let table = self
+ .statement_executor
+ .create_table_inner(&mut create_table_expr, None, query_ctx.clone())
+ .await
+ .context(TableOperationSnafu)?;
+
+ info!(
+ "Create the {} system table in {:?} successfully.",
+ SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME
+ );
+
+ Ok(table)
+ }
+
+ fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr {
+ let column_defs = vec![
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::Uint64 as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment: "The cost of the slow query in milliseconds".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::Uint64 as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment:
+ "When the query cost exceeds this value, it will be recorded as a slow query"
+ .to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::String as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment: "The original query statement".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::Boolean as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment: "Whether the query is a PromQL query".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::TimestampNanosecond as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Timestamp as i32,
+ comment: "The timestamp of the slow query".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::Uint64 as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment: "The time range of the PromQL query in milliseconds".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::Uint64 as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment: "The step of the PromQL query in milliseconds".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::TimestampMillisecond as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment: "The start timestamp of the PromQL query in milliseconds".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ColumnDef {
+ name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
+ data_type: ColumnDataType::TimestampMillisecond as i32,
+ is_nullable: false,
+ default_constraint: vec![],
+ semantic_type: SemanticType::Field as i32,
+ comment: "The end timestamp of the PromQL query in milliseconds".to_string(),
+ datatype_extension: None,
+ options: None,
+ },
+ ];
+
+ let table_options = HashMap::from([
+ (APPEND_MODE_KEY.to_string(), "true".to_string()),
+ (TTL_KEY.to_string(), self.ttl.to_string()),
+ ]);
+
+ CreateTableExpr {
+ catalog_name: catalog.to_string(),
+ schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), // Always to store in the `greptime_private` schema.
+ table_name: SLOW_QUERY_TABLE_NAME.to_string(),
+ desc: "GreptimeDB system table for storing slow queries".to_string(),
+ column_defs,
+ time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
+ primary_keys: vec![],
+ create_if_not_exists: true,
+ table_options,
+ table_id: None,
+ engine: default_engine().to_string(),
+ }
+ }
+
+ fn build_insert_column_schema(&self) -> Vec {
+ vec![
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::Uint64.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::Uint64.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::String.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::Boolean.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::TimestampNanosecond.into(),
+ semantic_type: SemanticType::Timestamp.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::Uint64.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::Uint64.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::TimestampMillisecond.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::TimestampMillisecond.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ]
+ }
+}
+
+/// SlowQueryTimer is used to log slow query when it's dropped.
+/// In drop(), it will check if the query is slow and send the slow query event to the handler.
+pub struct SlowQueryTimer {
+ start: Instant,
+ stmt: QueryStatement,
+ query_ctx: QueryContextRef,
+ threshold: Option,
+ sample_ratio: Option,
+ tx: Sender,
+}
+
+impl SlowQueryTimer {
+ fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
+ let mut slow_query_event = SlowQueryEvent {
+ cost: elapsed.as_millis() as u64,
+ threshold: threshold.as_millis() as u64,
+ query: "".to_string(),
+ query_ctx: self.query_ctx.clone(),
+
+ // The following fields are only used for PromQL queries.
+ is_promql: false,
+ promql_range: None,
+ promql_step: None,
+ promql_start: None,
+ promql_end: None,
+ };
+
+ match &self.stmt {
+ QueryStatement::Promql(stmt) => {
+ slow_query_event.is_promql = true;
+ slow_query_event.query = stmt.expr.to_string();
+ slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
+
+ let start = stmt
+ .start
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_millis() as i64;
+
+ let end = stmt
+ .end
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_millis() as i64;
+
+ slow_query_event.promql_range = Some((end - start) as u64);
+ slow_query_event.promql_start = Some(start);
+ slow_query_event.promql_end = Some(end);
+ }
+ QueryStatement::Sql(stmt) => {
+ slow_query_event.query = stmt.to_string();
+ }
+ }
+
+ // Send SlowQueryEvent to the handler.
+ if let Err(e) = self.tx.try_send(slow_query_event) {
+ error!(e; "Failed to send slow query event");
+ }
+ }
+}
+
+impl Drop for SlowQueryTimer {
+ fn drop(&mut self) {
+ if let Some(threshold) = self.threshold {
+ // Calculate the elaspsed duration since the timer is created.
+ let elapsed = self.start.elapsed();
+ if elapsed > threshold {
+ if let Some(ratio) = self.sample_ratio {
+ // Only capture a portion of slow queries based on sample_ratio.
+ // Generate a random number in [0, 1) and compare it with sample_ratio.
+ if ratio >= 1.0 || random::() <= ratio {
+ self.send_slow_query_event(elapsed, threshold);
+ }
+ } else {
+ // Captures all slow queries if sample_ratio is not set.
+ self.send_slow_query_event(elapsed, threshold);
+ }
+ }
+ }
+ }
+}
diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs
index 428aa5cb45..9f9a901910 100644
--- a/src/query/src/lib.rs
+++ b/src/query/src/lib.rs
@@ -39,7 +39,6 @@ pub mod query_engine;
mod range_select;
pub mod region_query;
pub mod sql;
-pub mod stats;
pub(crate) mod window_sort;
#[cfg(test)]
diff --git a/src/query/src/stats.rs b/src/query/src/stats.rs
deleted file mode 100644
index 085642fbc1..0000000000
--- a/src/query/src/stats.rs
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright 2023 Greptime Team
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::time::Duration;
-
-use common_telemetry::logging::SlowQueryOptions;
-use common_telemetry::slow;
-use rand::random;
-
-use crate::parser::QueryStatement;
-
-/// StatementStatistics is used to collect statistics for a statement.
-#[derive(Default, Clone, Debug)]
-pub struct StatementStatistics {
- /// slow_query is used to configure slow query log.
- pub slow_query: SlowQueryOptions,
-}
-
-impl StatementStatistics {
- pub fn new(slow_query_options: SlowQueryOptions) -> Self {
- Self {
- slow_query: slow_query_options,
- }
- }
-
- pub fn start_slow_query_timer(&self, stmt: QueryStatement) -> Option {
- if self.slow_query.enable {
- Some(SlowQueryTimer {
- start: std::time::Instant::now(),
- stmt,
- threshold: self.slow_query.threshold,
- sample_ratio: self.slow_query.sample_ratio,
- })
- } else {
- None
- }
- }
-}
-
-/// SlowQueryTimer is used to log slow query when it's dropped.
-pub struct SlowQueryTimer {
- start: std::time::Instant,
- stmt: QueryStatement,
- threshold: Option,
- sample_ratio: Option,
-}
-
-impl SlowQueryTimer {
- fn log_slow_query(&self, elapsed: Duration, threshold: Duration) {
- match &self.stmt {
- QueryStatement::Sql(stmt) => {
- slow!(
- cost = elapsed.as_millis() as u64,
- threshold = threshold.as_millis() as u64,
- sql = stmt.to_string()
- );
- }
- QueryStatement::Promql(stmt) => {
- slow!(
- cost = elapsed.as_millis() as u64,
- threshold = threshold.as_millis() as u64,
- promql = stmt.to_string()
- );
- }
- }
- }
-}
-
-impl Drop for SlowQueryTimer {
- fn drop(&mut self) {
- if let Some(threshold) = self.threshold {
- let elapsed = self.start.elapsed();
- if elapsed > threshold {
- if let Some(ratio) = self.sample_ratio {
- // Generate a random number in [0, 1) and compare it with sample_ratio.
- if ratio >= 1.0 || random::() <= ratio {
- self.log_slow_query(elapsed, threshold);
- }
- } else {
- // If sample_ratio is not set, log all slow queries.
- self.log_slow_query(elapsed, threshold);
- }
- }
- }
- }
-}
diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs
index 4693dc3e84..8e7834f643 100644
--- a/tests-integration/src/cluster.rs
+++ b/tests-integration/src/cluster.rs
@@ -54,7 +54,6 @@ use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
-use query::stats::StatementStatistics;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::heartbeat_options::HeartbeatOptions;
@@ -403,7 +402,6 @@ impl GreptimeDbClusterBuilder {
catalog_manager,
datanode_clients,
meta_client,
- StatementStatistics::default(),
)
.with_local_cache_invalidator(cache_registry)
.try_build()
diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs
index b14ebafb3f..7f266f56bd 100644
--- a/tests-integration/src/standalone.rs
+++ b/tests-integration/src/standalone.rs
@@ -46,7 +46,6 @@ use frontend::frontend::Frontend;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{Instance, StandaloneDatanodeManager};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
-use query::stats::StatementStatistics;
use servers::grpc::GrpcOptions;
use servers::server::ServerHandlers;
use snafu::ResultExt;
@@ -238,7 +237,6 @@ impl GreptimeDbStandaloneBuilder {
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
- StatementStatistics::default(),
)
.with_plugin(plugins)
.try_build()
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 7ce500b9b1..fd37a7971a 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1093,9 +1093,6 @@ max_log_files = 720
append_stdout = true
enable_otlp_tracing = false
-[logging.slow_query]
-enable = false
-
[[region_engine]]
[region_engine.mito]
@@ -1149,7 +1146,15 @@ type = "time_series"
enable = false
write_interval = "30s"
-[tracing]"#,
+[tracing]
+
+[slow_query]
+enable = true
+record_type = "system_table"
+threshold = "5s"
+sample_ratio = 1.0
+ttl = "30d"
+"#,
)
.trim()
.to_string();