mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 18:00:41 +00:00
refactor: add SlowQueryRecorder to record slow query in system table and refactor slow query options (#6008)
* refactor: add common-slow-query crate * refactor: refine the naming * chore: fix clippy * chore: fix typo * chore: sperate SlowQueryOptions From Logging * chore: fix clippy * chore: fix ci * chore: refine the code * chore: update config example * refactor: use drop() to end the slow query timer * refactor: move common-slow-query to frontend crate * chore: polish some code * refactor: code review * refactor: add promql_range/promql_step/promql_start/promql_end fields in slow_queries * refactor: add build_slow_query_logger() * refactor: turn on slow query on frontend by default
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4457,6 +4457,7 @@ dependencies = [
|
||||
"promql-parser",
|
||||
"prost 0.13.5",
|
||||
"query",
|
||||
"rand 0.9.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"servers",
|
||||
|
||||
@@ -188,10 +188,11 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `slow_query` | -- | -- | The slow query log options. |
|
||||
| `slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
|
||||
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -288,10 +289,12 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `slow_query` | -- | -- | The slow query log options. |
|
||||
| `slow_query.enable` | Bool | `true` | Whether to enable slow query log. |
|
||||
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`.<br/>If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.<br/>If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. |
|
||||
| `slow_query.threshold` | String | Unset | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
|
||||
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
|
||||
| `slow_query.ttl` | String | Unset | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -362,10 +365,6 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -529,10 +528,6 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
@@ -585,9 +580,5 @@
|
||||
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
|
||||
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `logging.slow_query` | -- | -- | The slow query log options. |
|
||||
| `logging.slow_query.enable` | Bool | `false` | Whether to enable slow query log. |
|
||||
| `logging.slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `logging.slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
|
||||
@@ -632,19 +632,6 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
@@ -100,19 +100,6 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The tracing options. Only effect when compiled with `tokio-console` feature.
|
||||
#+ [tracing]
|
||||
## The tokio console address.
|
||||
|
||||
@@ -223,18 +223,28 @@ max_log_files = 720
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
[slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
enable = true
|
||||
|
||||
## The threshold of slow query.
|
||||
## The record type of slow queries. It can be `system_table` or `log`.
|
||||
## If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.
|
||||
## If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
record_type = "system_table"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`.
|
||||
## @toml2docs:none-default
|
||||
threshold = "5s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged.
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
|
||||
## @toml2docs:none-default
|
||||
ttl = "30d"
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
@@ -218,19 +218,6 @@ max_log_files = 720
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
|
||||
@@ -724,17 +724,21 @@ max_log_files = 720
|
||||
default_ratio = 1.0
|
||||
|
||||
## The slow query log options.
|
||||
[logging.slow_query]
|
||||
[slow_query]
|
||||
## Whether to enable slow query log.
|
||||
enable = false
|
||||
#+ enable = false
|
||||
|
||||
## The record type of slow queries. It can be `system_table` or `log`.
|
||||
## @toml2docs:none-default
|
||||
#+ record_type = "system_table"
|
||||
|
||||
## The threshold of slow query.
|
||||
## @toml2docs:none-default
|
||||
threshold = "10s"
|
||||
#+ threshold = "10s"
|
||||
|
||||
## The sampling ratio of slow query log. The value should be in the range of (0, 1].
|
||||
## @toml2docs:none-default
|
||||
sample_ratio = 1.0
|
||||
#+ sample_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
|
||||
@@ -76,6 +76,7 @@ impl Command {
|
||||
&opts,
|
||||
&TracingOptions::default(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let tool = self.cmd.build().await.context(error::BuildCliSnafu)?;
|
||||
|
||||
@@ -64,6 +64,7 @@ impl InstanceBuilder {
|
||||
&dn_opts.logging,
|
||||
&dn_opts.tracing,
|
||||
dn_opts.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
@@ -244,6 +244,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.map(|x| x.to_string()),
|
||||
None,
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ use frontend::heartbeat::HeartbeatTask;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use query::stats::StatementStatistics;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -269,6 +268,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
opts.component.node_id.clone(),
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
@@ -368,7 +368,6 @@ impl StartCommand {
|
||||
catalog_manager,
|
||||
Arc::new(client),
|
||||
meta_client,
|
||||
StatementStatistics::new(opts.logging.slow_query.clone()),
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.with_local_cache_invalidator(layered_cache_registry)
|
||||
|
||||
@@ -300,6 +300,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
|
||||
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
||||
use common_time::timezone::set_default_timezone;
|
||||
use common_version::{short_version, version};
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
@@ -69,7 +69,6 @@ use frontend::service_config::{
|
||||
};
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use mito2::config::MitoConfig;
|
||||
use query::stats::StatementStatistics;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
use servers::grpc::GrpcOptions;
|
||||
@@ -153,6 +152,7 @@ pub struct StandaloneOptions {
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
pub slow_query: Option<SlowQueryOptions>,
|
||||
}
|
||||
|
||||
impl Default for StandaloneOptions {
|
||||
@@ -184,6 +184,7 @@ impl Default for StandaloneOptions {
|
||||
init_regions_in_background: false,
|
||||
init_regions_parallelism: 16,
|
||||
max_in_flight_write_bytes: None,
|
||||
slow_query: Some(SlowQueryOptions::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,6 +224,7 @@ impl StandaloneOptions {
|
||||
// Handle the export metrics task run by standalone to frontend for execution
|
||||
export_metrics: cloned_opts.export_metrics,
|
||||
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
|
||||
slow_query: cloned_opts.slow_query,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -447,6 +449,7 @@ impl StartCommand {
|
||||
&opts.component.logging,
|
||||
&opts.component.tracing,
|
||||
None,
|
||||
opts.component.slow_query.as_ref(),
|
||||
);
|
||||
log_versions(version(), short_version(), APP_NAME);
|
||||
|
||||
@@ -594,7 +597,6 @@ impl StartCommand {
|
||||
catalog_manager.clone(),
|
||||
node_manager.clone(),
|
||||
ddl_task_executor.clone(),
|
||||
StatementStatistics::new(opts.logging.slow_query.clone()),
|
||||
)
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
|
||||
@@ -18,7 +18,7 @@ use cmd::options::GreptimeOptions;
|
||||
use cmd::standalone::StandaloneOptions;
|
||||
use common_config::Configurable;
|
||||
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_telemetry::logging::{LoggingOptions, DEFAULT_OTLP_ENDPOINT};
|
||||
use common_wal::config::raft_engine::RaftEngineConfig;
|
||||
use common_wal::config::DatanodeWalConfig;
|
||||
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
|
||||
@@ -167,11 +167,6 @@ fn test_load_metasrv_example_config() {
|
||||
level: Some("info".to_string()),
|
||||
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
slow_query: SlowQueryOptions {
|
||||
enable: false,
|
||||
threshold: None,
|
||||
sample_ratio: None,
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
datanode: DatanodeClientOptions {
|
||||
|
||||
@@ -68,24 +68,46 @@ pub struct LoggingOptions {
|
||||
|
||||
/// The tracing sample ratio.
|
||||
pub tracing_sample_ratio: Option<TracingSampleOptions>,
|
||||
|
||||
/// The logging options of slow query.
|
||||
pub slow_query: SlowQueryOptions,
|
||||
}
|
||||
|
||||
/// The options of slow query.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||
#[serde(default)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct SlowQueryOptions {
|
||||
/// Whether to enable slow query log.
|
||||
pub enable: bool,
|
||||
|
||||
/// The record type of slow queries.
|
||||
pub record_type: SlowQueriesRecordType,
|
||||
|
||||
/// The threshold of slow queries.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub threshold: Option<Duration>,
|
||||
|
||||
/// The sample ratio of slow queries.
|
||||
pub sample_ratio: Option<f64>,
|
||||
|
||||
/// The table TTL of `slow_queries` system table. Default is "30d".
|
||||
/// It's used when `record_type` is `SystemTable`.
|
||||
pub ttl: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for SlowQueryOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable: true,
|
||||
record_type: SlowQueriesRecordType::SystemTable,
|
||||
threshold: Some(Duration::from_secs(5)),
|
||||
sample_ratio: Some(1.0),
|
||||
ttl: Some("30d".to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum SlowQueriesRecordType {
|
||||
SystemTable,
|
||||
Log,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -118,7 +140,6 @@ impl Default for LoggingOptions {
|
||||
otlp_endpoint: None,
|
||||
tracing_sample_ratio: None,
|
||||
append_stdout: true,
|
||||
slow_query: SlowQueryOptions::default(),
|
||||
// Rotation hourly, 24 files per day, keeps info log files of 30 days
|
||||
max_log_files: 720,
|
||||
}
|
||||
@@ -158,7 +179,8 @@ pub fn init_default_ut_logging() {
|
||||
"unittest",
|
||||
&opts,
|
||||
&TracingOptions::default(),
|
||||
None
|
||||
None,
|
||||
None,
|
||||
));
|
||||
|
||||
crate::info!("logs dir = {}", dir);
|
||||
@@ -176,6 +198,7 @@ pub fn init_global_logging(
|
||||
opts: &LoggingOptions,
|
||||
tracing_opts: &TracingOptions,
|
||||
node_id: Option<String>,
|
||||
slow_query_opts: Option<&SlowQueryOptions>,
|
||||
) -> Vec<WorkerGuard> {
|
||||
static START: Once = Once::new();
|
||||
let mut guards = vec![];
|
||||
@@ -278,50 +301,7 @@ pub fn init_global_logging(
|
||||
None
|
||||
};
|
||||
|
||||
let slow_query_logging_layer = if !opts.dir.is_empty() && opts.slow_query.enable {
|
||||
let rolling_appender = RollingFileAppender::builder()
|
||||
.rotation(Rotation::HOURLY)
|
||||
.filename_prefix("greptimedb-slow-queries")
|
||||
.max_log_files(opts.max_log_files)
|
||||
.build(&opts.dir)
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"initializing rolling file appender at {} failed: {}",
|
||||
&opts.dir, e
|
||||
)
|
||||
});
|
||||
let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
|
||||
guards.push(guard);
|
||||
|
||||
// Only logs if the field contains "slow".
|
||||
let slow_query_filter = FilterFn::new(|metadata| {
|
||||
metadata
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name().contains("slow"))
|
||||
});
|
||||
|
||||
if opts.log_format == LogFormat::Json {
|
||||
Some(
|
||||
Layer::new()
|
||||
.json()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
} else {
|
||||
Some(
|
||||
Layer::new()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let slow_query_logging_layer = build_slow_query_logger(opts, slow_query_opts, &mut guards);
|
||||
|
||||
// resolve log level settings from:
|
||||
// - options from command line or config files
|
||||
@@ -435,3 +415,67 @@ pub fn init_global_logging(
|
||||
|
||||
guards
|
||||
}
|
||||
|
||||
fn build_slow_query_logger<S>(
|
||||
opts: &LoggingOptions,
|
||||
slow_query_opts: Option<&SlowQueryOptions>,
|
||||
guards: &mut Vec<WorkerGuard>,
|
||||
) -> Option<Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>>
|
||||
where
|
||||
S: tracing::Subscriber
|
||||
+ Send
|
||||
+ 'static
|
||||
+ for<'span> tracing_subscriber::registry::LookupSpan<'span>,
|
||||
{
|
||||
if let Some(slow_query_opts) = slow_query_opts {
|
||||
if !opts.dir.is_empty()
|
||||
&& slow_query_opts.enable
|
||||
&& slow_query_opts.record_type == SlowQueriesRecordType::Log
|
||||
{
|
||||
let rolling_appender = RollingFileAppender::builder()
|
||||
.rotation(Rotation::HOURLY)
|
||||
.filename_prefix("greptimedb-slow-queries")
|
||||
.max_log_files(opts.max_log_files)
|
||||
.build(&opts.dir)
|
||||
.unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"initializing rolling file appender at {} failed: {}",
|
||||
&opts.dir, e
|
||||
)
|
||||
});
|
||||
let (writer, guard) = tracing_appender::non_blocking(rolling_appender);
|
||||
guards.push(guard);
|
||||
|
||||
// Only logs if the field contains "slow".
|
||||
let slow_query_filter = FilterFn::new(|metadata| {
|
||||
metadata
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name().contains("slow"))
|
||||
});
|
||||
|
||||
if opts.log_format == LogFormat::Json {
|
||||
Some(
|
||||
Layer::new()
|
||||
.json()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
} else {
|
||||
Some(
|
||||
Layer::new()
|
||||
.with_writer(writer)
|
||||
.with_ansi(false)
|
||||
.with_filter(slow_query_filter)
|
||||
.boxed(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ prometheus.workspace = true
|
||||
promql-parser.workspace = true
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::sync::Arc;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_config::config::Configurable;
|
||||
use common_options::datanode::DatanodeClientOptions;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
|
||||
use meta_client::MetaClientOptions;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -38,7 +38,7 @@ use crate::service_config::{
|
||||
PromStoreOptions,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct FrontendOptions {
|
||||
pub node_id: Option<String>,
|
||||
@@ -61,6 +61,7 @@ pub struct FrontendOptions {
|
||||
pub tracing: TracingOptions,
|
||||
pub query: QueryOptions,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
pub slow_query: Option<SlowQueryOptions>,
|
||||
}
|
||||
|
||||
impl Default for FrontendOptions {
|
||||
@@ -86,6 +87,7 @@ impl Default for FrontendOptions {
|
||||
tracing: TracingOptions::default(),
|
||||
query: QueryOptions::default(),
|
||||
max_in_flight_write_bytes: None,
|
||||
slow_query: Some(SlowQueryOptions::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,7 +55,6 @@ use query::metrics::OnDone;
|
||||
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
|
||||
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
|
||||
use query::query_engine::DescribeResult;
|
||||
use query::stats::StatementStatistics;
|
||||
use query::QueryEngineRef;
|
||||
use servers::error as server_error;
|
||||
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
|
||||
@@ -80,6 +79,7 @@ use crate::error::{
|
||||
TableOperationSnafu,
|
||||
};
|
||||
use crate::limiter::LimiterRef;
|
||||
use crate::slow_query_recorder::SlowQueryRecorder;
|
||||
|
||||
/// The frontend instance contains necessary components, and implements many
|
||||
/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
|
||||
@@ -94,7 +94,7 @@ pub struct Instance {
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
stats: StatementStatistics,
|
||||
slow_query_recorder: Option<SlowQueryRecorder>,
|
||||
limiter: Option<LimiterRef>,
|
||||
}
|
||||
|
||||
@@ -166,9 +166,11 @@ impl Instance {
|
||||
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
|
||||
let query_interceptor = query_interceptor.as_ref();
|
||||
|
||||
let _slow_query_timer = self
|
||||
.stats
|
||||
.start_slow_query_timer(QueryStatement::Sql(stmt.clone()));
|
||||
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
|
||||
recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let output = match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
@@ -212,6 +214,7 @@ impl Instance {
|
||||
self.statement_executor.execute_sql(stmt, query_ctx).await
|
||||
}
|
||||
};
|
||||
|
||||
output.context(TableOperationSnafu)
|
||||
}
|
||||
}
|
||||
@@ -374,7 +377,11 @@ impl PrometheusHandler for Instance {
|
||||
}
|
||||
})?;
|
||||
|
||||
let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone());
|
||||
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
|
||||
recorder.start(stmt.clone(), query_ctx.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let plan = self
|
||||
.statement_executor
|
||||
|
||||
@@ -34,7 +34,6 @@ use operator::table::TableMutationOperator;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use pipeline::pipeline_operator::PipelineOperator;
|
||||
use query::region_query::RegionQueryHandlerFactoryRef;
|
||||
use query::stats::StatementStatistics;
|
||||
use query::QueryEngineFactory;
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -43,6 +42,7 @@ use crate::frontend::FrontendOptions;
|
||||
use crate::instance::region_query::FrontendRegionQueryHandler;
|
||||
use crate::instance::Instance;
|
||||
use crate::limiter::Limiter;
|
||||
use crate::slow_query_recorder::SlowQueryRecorder;
|
||||
|
||||
/// The frontend [`Instance`] builder.
|
||||
pub struct FrontendBuilder {
|
||||
@@ -54,7 +54,6 @@ pub struct FrontendBuilder {
|
||||
node_manager: NodeManagerRef,
|
||||
plugins: Option<Plugins>,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
stats: StatementStatistics,
|
||||
}
|
||||
|
||||
impl FrontendBuilder {
|
||||
@@ -65,7 +64,6 @@ impl FrontendBuilder {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
procedure_executor: ProcedureExecutorRef,
|
||||
stats: StatementStatistics,
|
||||
) -> Self {
|
||||
Self {
|
||||
options,
|
||||
@@ -76,7 +74,6 @@ impl FrontendBuilder {
|
||||
node_manager,
|
||||
plugins: None,
|
||||
procedure_executor,
|
||||
stats,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,6 +186,17 @@ impl FrontendBuilder {
|
||||
|
||||
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
|
||||
|
||||
let slow_query_recorder = self.options.slow_query.and_then(|opts| {
|
||||
opts.enable.then(|| {
|
||||
SlowQueryRecorder::new(
|
||||
opts.clone(),
|
||||
inserter.clone(),
|
||||
statement_executor.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)
|
||||
})
|
||||
});
|
||||
|
||||
// Create the limiter if the max_in_flight_write_bytes is set.
|
||||
let limiter = self
|
||||
.options
|
||||
@@ -206,7 +214,7 @@ impl FrontendBuilder {
|
||||
inserter,
|
||||
deleter,
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
stats: self.stats,
|
||||
slow_query_recorder,
|
||||
limiter,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -22,3 +22,4 @@ pub(crate) mod limiter;
|
||||
pub(crate) mod metrics;
|
||||
pub mod server;
|
||||
pub mod service_config;
|
||||
pub(crate) mod slow_query_recorder;
|
||||
|
||||
531
src/frontend/src/slow_query_recorder.rs
Normal file
531
src/frontend/src/slow_query_recorder.rs
Normal file
@@ -0,0 +1,531 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, UNIX_EPOCH};
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{
|
||||
ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
|
||||
RowInsertRequests, Rows, SemanticType,
|
||||
};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
|
||||
use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
|
||||
use common_telemetry::{debug, error, info, slow};
|
||||
use common_time::timestamp::{TimeUnit, Timestamp};
|
||||
use operator::insert::InserterRef;
|
||||
use operator::statement::StatementExecutorRef;
|
||||
use query::parser::QueryStatement;
|
||||
use rand::random;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
|
||||
use table::TableRef;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
|
||||
|
||||
const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
|
||||
const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
|
||||
const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
|
||||
const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
|
||||
const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
|
||||
const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
|
||||
const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
|
||||
const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
|
||||
const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
|
||||
const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
|
||||
|
||||
const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
|
||||
const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
|
||||
|
||||
/// SlowQueryRecorder is responsible for recording slow queries.
|
||||
#[derive(Clone)]
|
||||
pub struct SlowQueryRecorder {
|
||||
tx: Sender<SlowQueryEvent>,
|
||||
slow_query_opts: SlowQueryOptions,
|
||||
_handle: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SlowQueryEvent {
|
||||
cost: u64,
|
||||
threshold: u64,
|
||||
query: String,
|
||||
is_promql: bool,
|
||||
query_ctx: QueryContextRef,
|
||||
promql_range: Option<u64>,
|
||||
promql_step: Option<u64>,
|
||||
promql_start: Option<i64>,
|
||||
promql_end: Option<i64>,
|
||||
}
|
||||
|
||||
impl SlowQueryRecorder {
|
||||
/// Create a new SlowQueryRecorder.
|
||||
pub fn new(
|
||||
slow_query_opts: SlowQueryOptions,
|
||||
inserter: InserterRef,
|
||||
statement_executor: StatementExecutorRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
) -> Self {
|
||||
let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE);
|
||||
|
||||
let ttl = slow_query_opts
|
||||
.ttl
|
||||
.clone()
|
||||
.unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string());
|
||||
|
||||
// Start a new task to process the slow query events.
|
||||
let event_handler = SlowQueryEventHandler {
|
||||
inserter,
|
||||
statement_executor,
|
||||
catalog_manager,
|
||||
rx,
|
||||
record_type: slow_query_opts.record_type,
|
||||
ttl,
|
||||
};
|
||||
|
||||
// Start a new background task to process the slow query events.
|
||||
let handle = tokio::spawn(async move {
|
||||
event_handler.process_slow_query().await;
|
||||
});
|
||||
|
||||
Self {
|
||||
tx,
|
||||
slow_query_opts,
|
||||
_handle: Arc::new(handle),
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts a new SlowQueryTimer. Returns `None` if `slow_query.enable` is false.
|
||||
/// The timer sets the start time when created and calculates the elapsed duration when dropped.
|
||||
pub fn start(
|
||||
&self,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Option<SlowQueryTimer> {
|
||||
if self.slow_query_opts.enable {
|
||||
Some(SlowQueryTimer {
|
||||
stmt,
|
||||
query_ctx,
|
||||
start: Instant::now(), // Set the initial start time.
|
||||
threshold: self.slow_query_opts.threshold,
|
||||
sample_ratio: self.slow_query_opts.sample_ratio,
|
||||
tx: self.tx.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SlowQueryEventHandler {
|
||||
inserter: InserterRef,
|
||||
statement_executor: StatementExecutorRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
rx: Receiver<SlowQueryEvent>,
|
||||
record_type: SlowQueriesRecordType,
|
||||
ttl: String,
|
||||
}
|
||||
|
||||
impl SlowQueryEventHandler {
|
||||
async fn process_slow_query(mut self) {
|
||||
info!(
|
||||
"Start the background handler to process slow query events and record them in {:?}.",
|
||||
self.record_type
|
||||
);
|
||||
while let Some(event) = self.rx.recv().await {
|
||||
self.record_slow_query(event).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_slow_query(&self, event: SlowQueryEvent) {
|
||||
match self.record_type {
|
||||
SlowQueriesRecordType::Log => {
|
||||
// Record the slow query in a specific logs file.
|
||||
slow!(
|
||||
cost = event.cost,
|
||||
threshold = event.threshold,
|
||||
query = event.query,
|
||||
is_promql = event.is_promql,
|
||||
promql_range = event.promql_range,
|
||||
promql_step = event.promql_step,
|
||||
promql_start = event.promql_start,
|
||||
promql_end = event.promql_end,
|
||||
);
|
||||
}
|
||||
SlowQueriesRecordType::SystemTable => {
|
||||
// Record the slow query in a system table that is stored in greptimedb itself.
|
||||
if let Err(e) = self.insert_slow_query(&event).await {
|
||||
error!(e; "Failed to insert slow query, query: {:?}", event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> {
|
||||
debug!("Handle the slow query event: {:?}", event);
|
||||
|
||||
let table = if let Some(table) = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
event.query_ctx.current_catalog(),
|
||||
DEFAULT_PRIVATE_SCHEMA_NAME,
|
||||
SLOW_QUERY_TABLE_NAME,
|
||||
Some(&event.query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
table
|
||||
} else {
|
||||
// Create the system table if it doesn't exist.
|
||||
self.create_system_table(event.query_ctx.clone()).await?
|
||||
};
|
||||
|
||||
let insert = RowInsertRequest {
|
||||
table_name: SLOW_QUERY_TABLE_NAME.to_string(),
|
||||
rows: Some(Rows {
|
||||
schema: self.build_insert_column_schema(),
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
ValueData::U64Value(event.cost).into(),
|
||||
ValueData::U64Value(event.threshold).into(),
|
||||
ValueData::StringValue(event.query.to_string()).into(),
|
||||
ValueData::BoolValue(event.is_promql).into(),
|
||||
ValueData::TimestampNanosecondValue(
|
||||
Timestamp::current_time(TimeUnit::Nanosecond).value(),
|
||||
)
|
||||
.into(),
|
||||
ValueData::U64Value(event.promql_range.unwrap_or(0)).into(),
|
||||
ValueData::U64Value(event.promql_step.unwrap_or(0)).into(),
|
||||
ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0))
|
||||
.into(),
|
||||
ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(),
|
||||
],
|
||||
}],
|
||||
}),
|
||||
};
|
||||
|
||||
let requests = RowInsertRequests {
|
||||
inserts: vec![insert],
|
||||
};
|
||||
|
||||
let table_info = table.table_info();
|
||||
let query_ctx = QueryContextBuilder::default()
|
||||
.current_catalog(table_info.catalog_name.to_string())
|
||||
.current_schema(table_info.schema_name.to_string())
|
||||
.build()
|
||||
.into();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor)
|
||||
.await
|
||||
.context(TableOperationSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result<TableRef> {
|
||||
let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog());
|
||||
if let Some(table) = self
|
||||
.catalog_manager
|
||||
.table(
|
||||
&create_table_expr.catalog_name,
|
||||
&create_table_expr.schema_name,
|
||||
&create_table_expr.table_name,
|
||||
Some(&query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
// The table is already created, so we don't need to create it again.
|
||||
return Ok(table);
|
||||
}
|
||||
|
||||
// Create the `slow_queries` system table.
|
||||
let table = self
|
||||
.statement_executor
|
||||
.create_table_inner(&mut create_table_expr, None, query_ctx.clone())
|
||||
.await
|
||||
.context(TableOperationSnafu)?;
|
||||
|
||||
info!(
|
||||
"Create the {} system table in {:?} successfully.",
|
||||
SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME
|
||||
);
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr {
|
||||
let column_defs = vec![
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The cost of the slow query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment:
|
||||
"When the query cost exceeds this value, it will be recorded as a slow query"
|
||||
.to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::String as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The original query statement".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Boolean as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "Whether the query is a PromQL query".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::TimestampNanosecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
comment: "The timestamp of the slow query".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The time range of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The step of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::TimestampMillisecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The start timestamp of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnDef {
|
||||
name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
|
||||
data_type: ColumnDataType::TimestampMillisecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: vec![],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
comment: "The end timestamp of the PromQL query in milliseconds".to_string(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
|
||||
let table_options = HashMap::from([
|
||||
(APPEND_MODE_KEY.to_string(), "true".to_string()),
|
||||
(TTL_KEY.to_string(), self.ttl.to_string()),
|
||||
]);
|
||||
|
||||
CreateTableExpr {
|
||||
catalog_name: catalog.to_string(),
|
||||
schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), // Always to store in the `greptime_private` schema.
|
||||
table_name: SLOW_QUERY_TABLE_NAME.to_string(),
|
||||
desc: "GreptimeDB system table for storing slow queries".to_string(),
|
||||
column_defs,
|
||||
time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
|
||||
primary_keys: vec![],
|
||||
create_if_not_exists: true,
|
||||
table_options,
|
||||
table_id: None,
|
||||
engine: default_engine().to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_insert_column_schema(&self) -> Vec<ColumnSchema> {
|
||||
vec![
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::String.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Boolean.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::TimestampNanosecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::Uint64.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
..Default::default()
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
/// SlowQueryTimer is used to log slow query when it's dropped.
|
||||
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
|
||||
pub struct SlowQueryTimer {
|
||||
start: Instant,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
threshold: Option<Duration>,
|
||||
sample_ratio: Option<f64>,
|
||||
tx: Sender<SlowQueryEvent>,
|
||||
}
|
||||
|
||||
impl SlowQueryTimer {
|
||||
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
|
||||
let mut slow_query_event = SlowQueryEvent {
|
||||
cost: elapsed.as_millis() as u64,
|
||||
threshold: threshold.as_millis() as u64,
|
||||
query: "".to_string(),
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
|
||||
// The following fields are only used for PromQL queries.
|
||||
is_promql: false,
|
||||
promql_range: None,
|
||||
promql_step: None,
|
||||
promql_start: None,
|
||||
promql_end: None,
|
||||
};
|
||||
|
||||
match &self.stmt {
|
||||
QueryStatement::Promql(stmt) => {
|
||||
slow_query_event.is_promql = true;
|
||||
slow_query_event.query = stmt.expr.to_string();
|
||||
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
|
||||
|
||||
let start = stmt
|
||||
.start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
let end = stmt
|
||||
.end
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
slow_query_event.promql_range = Some((end - start) as u64);
|
||||
slow_query_event.promql_start = Some(start);
|
||||
slow_query_event.promql_end = Some(end);
|
||||
}
|
||||
QueryStatement::Sql(stmt) => {
|
||||
slow_query_event.query = stmt.to_string();
|
||||
}
|
||||
}
|
||||
|
||||
// Send SlowQueryEvent to the handler.
|
||||
if let Err(e) = self.tx.try_send(slow_query_event) {
|
||||
error!(e; "Failed to send slow query event");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SlowQueryTimer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(threshold) = self.threshold {
|
||||
// Calculate the elaspsed duration since the timer is created.
|
||||
let elapsed = self.start.elapsed();
|
||||
if elapsed > threshold {
|
||||
if let Some(ratio) = self.sample_ratio {
|
||||
// Only capture a portion of slow queries based on sample_ratio.
|
||||
// Generate a random number in [0, 1) and compare it with sample_ratio.
|
||||
if ratio >= 1.0 || random::<f64>() <= ratio {
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
} else {
|
||||
// Captures all slow queries if sample_ratio is not set.
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -39,7 +39,6 @@ pub mod query_engine;
|
||||
mod range_select;
|
||||
pub mod region_query;
|
||||
pub mod sql;
|
||||
pub mod stats;
|
||||
pub(crate) mod window_sort;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_telemetry::logging::SlowQueryOptions;
|
||||
use common_telemetry::slow;
|
||||
use rand::random;
|
||||
|
||||
use crate::parser::QueryStatement;
|
||||
|
||||
/// StatementStatistics is used to collect statistics for a statement.
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub struct StatementStatistics {
|
||||
/// slow_query is used to configure slow query log.
|
||||
pub slow_query: SlowQueryOptions,
|
||||
}
|
||||
|
||||
impl StatementStatistics {
|
||||
pub fn new(slow_query_options: SlowQueryOptions) -> Self {
|
||||
Self {
|
||||
slow_query: slow_query_options,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_slow_query_timer(&self, stmt: QueryStatement) -> Option<SlowQueryTimer> {
|
||||
if self.slow_query.enable {
|
||||
Some(SlowQueryTimer {
|
||||
start: std::time::Instant::now(),
|
||||
stmt,
|
||||
threshold: self.slow_query.threshold,
|
||||
sample_ratio: self.slow_query.sample_ratio,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// SlowQueryTimer is used to log slow query when it's dropped.
|
||||
pub struct SlowQueryTimer {
|
||||
start: std::time::Instant,
|
||||
stmt: QueryStatement,
|
||||
threshold: Option<Duration>,
|
||||
sample_ratio: Option<f64>,
|
||||
}
|
||||
|
||||
impl SlowQueryTimer {
|
||||
fn log_slow_query(&self, elapsed: Duration, threshold: Duration) {
|
||||
match &self.stmt {
|
||||
QueryStatement::Sql(stmt) => {
|
||||
slow!(
|
||||
cost = elapsed.as_millis() as u64,
|
||||
threshold = threshold.as_millis() as u64,
|
||||
sql = stmt.to_string()
|
||||
);
|
||||
}
|
||||
QueryStatement::Promql(stmt) => {
|
||||
slow!(
|
||||
cost = elapsed.as_millis() as u64,
|
||||
threshold = threshold.as_millis() as u64,
|
||||
promql = stmt.to_string()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SlowQueryTimer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(threshold) = self.threshold {
|
||||
let elapsed = self.start.elapsed();
|
||||
if elapsed > threshold {
|
||||
if let Some(ratio) = self.sample_ratio {
|
||||
// Generate a random number in [0, 1) and compare it with sample_ratio.
|
||||
if ratio >= 1.0 || random::<f64>() <= ratio {
|
||||
self.log_slow_query(elapsed, threshold);
|
||||
}
|
||||
} else {
|
||||
// If sample_ratio is not set, log all slow queries.
|
||||
self.log_slow_query(elapsed, threshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,6 @@ use meta_client::client::MetaClientBuilder;
|
||||
use meta_srv::cluster::MetaPeerClientRef;
|
||||
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
|
||||
use meta_srv::mocks::MockInfo;
|
||||
use query::stats::StatementStatistics;
|
||||
use servers::grpc::flight::FlightCraftWrapper;
|
||||
use servers::grpc::region_server::RegionServerRequestHandler;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
@@ -403,7 +402,6 @@ impl GreptimeDbClusterBuilder {
|
||||
catalog_manager,
|
||||
datanode_clients,
|
||||
meta_client,
|
||||
StatementStatistics::default(),
|
||||
)
|
||||
.with_local_cache_invalidator(cache_registry)
|
||||
.try_build()
|
||||
|
||||
@@ -46,7 +46,6 @@ use frontend::frontend::Frontend;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use query::stats::StatementStatistics;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::server::ServerHandlers;
|
||||
use snafu::ResultExt;
|
||||
@@ -238,7 +237,6 @@ impl GreptimeDbStandaloneBuilder {
|
||||
catalog_manager.clone(),
|
||||
node_manager.clone(),
|
||||
ddl_task_executor.clone(),
|
||||
StatementStatistics::default(),
|
||||
)
|
||||
.with_plugin(plugins)
|
||||
.try_build()
|
||||
|
||||
@@ -1093,9 +1093,6 @@ max_log_files = 720
|
||||
append_stdout = true
|
||||
enable_otlp_tracing = false
|
||||
|
||||
[logging.slow_query]
|
||||
enable = false
|
||||
|
||||
[[region_engine]]
|
||||
|
||||
[region_engine.mito]
|
||||
@@ -1149,7 +1146,15 @@ type = "time_series"
|
||||
enable = false
|
||||
write_interval = "30s"
|
||||
|
||||
[tracing]"#,
|
||||
[tracing]
|
||||
|
||||
[slow_query]
|
||||
enable = true
|
||||
record_type = "system_table"
|
||||
threshold = "5s"
|
||||
sample_ratio = 1.0
|
||||
ttl = "30d"
|
||||
"#,
|
||||
)
|
||||
.trim()
|
||||
.to_string();
|
||||
|
||||
Reference in New Issue
Block a user