fix: unable to record slow query (#6590)

* refactor: add process manager for prometheus query

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: modify `register_query()` API to accept parsed statement(`catalog::process_manager::QueryStatement`)

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add the slow query timer in the `Tikcet` of ProcessManager

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* test: add integration tests

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add process manager in `do_exec_plan()`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* tests: add `test_postgres_slow_query` integration test

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: polish the code

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: create a query ticket and slow query timer if the statement is a query in `query_statement()`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: sqlness errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-08-05 20:35:12 -07:00
committed by GitHub
parent cc35bab5e4
commit 3a9f0220b5
25 changed files with 525 additions and 177 deletions

4
Cargo.lock generated
View File

@@ -1690,6 +1690,8 @@ dependencies = [
"partition",
"paste",
"prometheus",
"promql-parser",
"rand 0.9.0",
"rustc-hash 2.0.0",
"serde_json",
"session",
@@ -2364,6 +2366,7 @@ dependencies = [
"common-meta",
"greptime-proto",
"meta-client",
"session",
"snafu 0.8.5",
"tokio",
"tonic 0.12.3",
@@ -13115,6 +13118,7 @@ dependencies = [
"tonic 0.12.3",
"tower 0.5.2",
"url",
"urlencoding",
"uuid",
"yaml-rust",
"zstd 0.13.2",

View File

@@ -44,6 +44,8 @@ moka = { workspace = true, features = ["future", "sync"] }
partition.workspace = true
paste.workspace = true
prometheus.workspace = true
promql-parser.workspace = true
rand.workspace = true
rustc-hash.workspace = true
serde_json.workspace = true
session.workspace = true

View File

@@ -14,17 +14,24 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, UNIX_EPOCH};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_telemetry::{debug, info, warn};
use common_frontend::slow_query_event::SlowQueryEvent;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use promql_parser::parser::EvalStmt;
use rand::random;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::statement::Statement;
use tokio::sync::mpsc::Sender;
use crate::error;
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
@@ -44,6 +51,23 @@ pub struct ProcessManager {
frontend_selector: Option<MetaClientSelector>,
}
/// Represents a parsed query statement, functionally equivalent to [query::parser::QueryStatement].
/// This enum is defined here to avoid cyclic dependencies with the query parser module.
#[derive(Debug, Clone)]
pub enum QueryStatement {
Sql(Statement),
Promql(EvalStmt),
}
impl Display for QueryStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
QueryStatement::Sql(stmt) => write!(f, "{}", stmt),
QueryStatement::Promql(eval_stmt) => write!(f, "{}", eval_stmt),
}
}
}
impl ProcessManager {
/// Create a [ProcessManager] instance with server address and kv client.
pub fn new(server_addr: String, meta_client: Option<MetaClientRef>) -> Self {
@@ -67,6 +91,7 @@ impl ProcessManager {
query: String,
client: String,
query_id: Option<ProcessId>,
_slow_query_timer: Option<SlowQueryTimer>,
) -> Ticket {
let id = query_id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
let process = ProcessInfo {
@@ -93,6 +118,7 @@ impl ProcessManager {
manager: self.clone(),
id,
cancellation_handle,
_slow_query_timer,
}
}
@@ -223,6 +249,7 @@ pub struct Ticket {
pub(crate) manager: ProcessManagerRef,
pub(crate) id: ProcessId,
pub cancellation_handle: Arc<CancellationHandle>,
_slow_query_timer: Option<SlowQueryTimer>,
}
impl Drop for Ticket {
@@ -263,6 +290,107 @@ impl Debug for CancellableProcess {
}
}
/// SlowQueryTimer is used to log slow query when it's dropped.
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
pub struct SlowQueryTimer {
start: Instant,
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
}
impl SlowQueryTimer {
pub fn new(
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
) -> Self {
Self {
start: Instant::now(),
stmt,
query_ctx,
threshold,
sample_ratio,
tx,
}
}
}
impl SlowQueryTimer {
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
let mut slow_query_event = SlowQueryEvent {
cost: elapsed.as_millis() as u64,
threshold: threshold.as_millis() as u64,
query: "".to_string(),
query_ctx: self.query_ctx.clone(),
// The following fields are only used for PromQL queries.
is_promql: false,
promql_range: None,
promql_step: None,
promql_start: None,
promql_end: None,
};
match &self.stmt {
QueryStatement::Promql(stmt) => {
slow_query_event.is_promql = true;
slow_query_event.query = stmt.expr.to_string();
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
let start = stmt
.start
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let end = stmt
.end
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
slow_query_event.promql_range = Some((end - start) as u64);
slow_query_event.promql_start = Some(start);
slow_query_event.promql_end = Some(end);
}
QueryStatement::Sql(stmt) => {
slow_query_event.query = stmt.to_string();
}
}
// Send SlowQueryEvent to the handler.
if let Err(e) = self.tx.try_send(slow_query_event) {
error!(e; "Failed to send slow query event");
}
}
}
impl Drop for SlowQueryTimer {
fn drop(&mut self) {
if let Some(threshold) = self.threshold {
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > threshold {
if let Some(ratio) = self.sample_ratio {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if ratio >= 1.0 || random::<f64>() <= ratio {
self.send_slow_query_event(elapsed, threshold);
}
} else {
// Captures all slow queries if sample_ratio is not set.
self.send_slow_query_event(elapsed, threshold);
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -278,6 +406,7 @@ mod tests {
"SELECT * FROM table".to_string(),
"".to_string(),
None,
None,
);
let running_processes = process_manager.local_processes(None).unwrap();
@@ -301,6 +430,7 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
Some(custom_id),
None,
);
assert_eq!(ticket.id, custom_id);
@@ -321,6 +451,7 @@ mod tests {
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
None,
);
let ticket2 = process_manager.clone().register_query(
@@ -329,6 +460,7 @@ mod tests {
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
None,
);
let running_processes = process_manager.local_processes(Some("public")).unwrap();
@@ -350,6 +482,7 @@ mod tests {
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
None,
);
let _ticket2 = process_manager.clone().register_query(
@@ -358,6 +491,7 @@ mod tests {
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
None,
);
// Test listing processes for specific catalog
@@ -384,6 +518,7 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
process_manager.deregister_query("public".to_string(), ticket.id);
@@ -400,6 +535,7 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
@@ -417,6 +553,7 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
let killed = process_manager
@@ -462,6 +599,7 @@ mod tests {
"SELECT COUNT(*) FROM users WHERE age > 18".to_string(),
"test_client".to_string(),
Some(42),
None,
);
let processes = process_manager.local_processes(None).unwrap();
@@ -488,6 +626,7 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
// Process should be registered

View File

@@ -12,6 +12,7 @@ common-macro.workspace = true
common-meta.workspace = true
greptime-proto.workspace = true
meta-client.workspace = true
session.workspace = true
snafu.workspace = true
tonic.workspace = true

View File

@@ -19,6 +19,7 @@ use snafu::OptionExt;
pub mod error;
pub mod selector;
pub mod slow_query_event;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DisplayProcessId {

View File

@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use session::context::QueryContextRef;
#[derive(Debug)]
pub struct SlowQueryEvent {
pub cost: u64,
pub threshold: u64,
pub query: String,
pub is_promql: bool,
pub query_ctx: QueryContextRef,
pub promql_range: Option<u64>,
pub promql_step: Option<u64>,
pub promql_start: Option<i64>,
pub promql_end: Option<i64>,
}

View File

@@ -32,7 +32,7 @@ use std::time::{Duration, SystemTime};
use async_stream::stream;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::process_manager::ProcessManagerRef;
use catalog::process_manager::{ProcessManagerRef, QueryStatement as CatalogQueryStatement};
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::cancellation::CancellableFuture;
@@ -73,7 +73,7 @@ use query::query_engine::DescribeResult;
use query::QueryEngineRef;
use servers::error::{
self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu,
OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
@@ -221,36 +221,40 @@ impl Instance {
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor.as_ref();
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
if should_capture_statement(Some(&stmt)) {
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
recorder.start(CatalogQueryStatement::Sql(stmt.clone()), query_ctx.clone())
});
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
stmt.to_string(),
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
);
let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => OutputData::Stream(Box::pin(
CancellableStreamWrapper::new(stream, ticket),
)),
other => other,
};
Output { data, meta }
})
} else {
None
};
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
stmt.to_string(),
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
);
let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => {
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
}
other => other,
};
Output { data, meta }
})
self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
.await
}
}
async fn exec_statement_with_timeout(
@@ -572,13 +576,54 @@ impl SqlQueryHandler for Instance {
}
}
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
// plan should be prepared before exec
// we'll do check there
self.query_engine
.execute(plan.clone(), query_ctx)
.await
.context(ExecLogicalPlanSnafu)
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
if should_capture_statement(stmt.as_ref()) {
// It's safe to unwrap here because we've already checked the type.
let stmt = stmt.unwrap();
let query = stmt.to_string();
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
recorder.start(CatalogQueryStatement::Sql(stmt), query_ctx.clone())
});
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
query,
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
);
let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => OutputData::Stream(Box::pin(
CancellableStreamWrapper::new(stream, ticket),
)),
other => other,
};
Output { data, meta }
})
.context(ExecLogicalPlanSnafu)
} else {
// plan should be prepared before exec
// we'll do check there
self.query_engine
.execute(plan.clone(), query_ctx)
.await
.context(ExecLogicalPlanSnafu)
}
}
#[tracing::instrument(skip_all)]
@@ -672,12 +717,6 @@ impl PrometheusHandler for Instance {
}
})?;
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
recorder.start(stmt.clone(), query_ctx.clone())
} else {
None
};
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
@@ -687,10 +726,47 @@ impl PrometheusHandler for Instance {
interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
let output = self
.statement_executor
.exec_plan(plan, query_ctx.clone())
// Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
let query_statement = if let QueryStatement::Promql(eval_stmt) = stmt {
CatalogQueryStatement::Promql(eval_stmt)
} else {
// It should not happen since the query is already parsed successfully.
return UnexpectedResultSnafu {
reason: "The query should always be promql.".to_string(),
}
.fail();
};
let query = query_statement.to_string();
let slow_query_timer = self
.slow_query_recorder
.as_ref()
.and_then(|recorder| recorder.start(query_statement, query_ctx.clone()));
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
query,
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
);
let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| servers::error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => {
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
}
other => other,
};
Output { data, meta }
})
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;
@@ -914,6 +990,15 @@ fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<(
.context(SqlExecInterceptedSnafu)
}
// Create a query ticket and slow query timer if the statement is a query or readonly statement.
fn should_capture_statement(stmt: Option<&Statement>) -> bool {
if let Some(stmt) = stmt {
matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
} else {
false
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;

View File

@@ -121,7 +121,8 @@ impl GrpcQueryHandler for Instance {
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output =
SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
.await?;
attach_timer(output, timer)
}
@@ -402,7 +403,7 @@ impl Instance {
.context(common_query::error::GeneralDataFusionSnafu)
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
Ok(attach_timer(output, timer))
}

View File

@@ -22,5 +22,5 @@ pub(crate) mod limiter;
pub(crate) mod metrics;
pub mod server;
pub mod service_config;
pub(crate) mod slow_query_recorder;
pub mod slow_query_recorder;
mod stream_wrapper;

View File

@@ -14,22 +14,21 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
RowInsertRequests, Rows, SemanticType,
};
use catalog::process_manager::{QueryStatement as CatalogQueryStatement, SlowQueryTimer};
use catalog::CatalogManagerRef;
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_frontend::slow_query_event::SlowQueryEvent;
use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
use common_telemetry::{debug, error, info, slow};
use common_time::timestamp::{TimeUnit, Timestamp};
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::parser::QueryStatement;
use rand::random;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::ResultExt;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
@@ -39,16 +38,16 @@ use tokio::task::JoinHandle;
use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
@@ -61,19 +60,6 @@ pub struct SlowQueryRecorder {
_handle: Arc<JoinHandle<()>>,
}
#[derive(Debug)]
struct SlowQueryEvent {
cost: u64,
threshold: u64,
query: String,
is_promql: bool,
query_ctx: QueryContextRef,
promql_range: Option<u64>,
promql_step: Option<u64>,
promql_start: Option<i64>,
promql_end: Option<i64>,
}
impl SlowQueryRecorder {
/// Create a new SlowQueryRecorder.
pub fn new(
@@ -115,18 +101,17 @@ impl SlowQueryRecorder {
/// The timer sets the start time when created and calculates the elapsed duration when dropped.
pub fn start(
&self,
stmt: QueryStatement,
stmt: CatalogQueryStatement,
query_ctx: QueryContextRef,
) -> Option<SlowQueryTimer> {
if self.slow_query_opts.enable {
Some(SlowQueryTimer {
Some(SlowQueryTimer::new(
stmt,
query_ctx,
start: Instant::now(), // Set the initial start time.
threshold: self.slow_query_opts.threshold,
sample_ratio: self.slow_query_opts.sample_ratio,
tx: self.tx.clone(),
})
self.slow_query_opts.threshold,
self.slow_query_opts.sample_ratio,
self.tx.clone(),
))
} else {
None
}
@@ -447,85 +432,3 @@ impl SlowQueryEventHandler {
]
}
}
/// SlowQueryTimer is used to log slow query when it's dropped.
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
pub struct SlowQueryTimer {
start: Instant,
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
}
impl SlowQueryTimer {
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
let mut slow_query_event = SlowQueryEvent {
cost: elapsed.as_millis() as u64,
threshold: threshold.as_millis() as u64,
query: "".to_string(),
query_ctx: self.query_ctx.clone(),
// The following fields are only used for PromQL queries.
is_promql: false,
promql_range: None,
promql_step: None,
promql_start: None,
promql_end: None,
};
match &self.stmt {
QueryStatement::Promql(stmt) => {
slow_query_event.is_promql = true;
slow_query_event.query = stmt.expr.to_string();
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
let start = stmt
.start
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let end = stmt
.end
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
slow_query_event.promql_range = Some((end - start) as u64);
slow_query_event.promql_start = Some(start);
slow_query_event.promql_end = Some(end);
}
QueryStatement::Sql(stmt) => {
slow_query_event.query = stmt.to_string();
}
}
// Send SlowQueryEvent to the handler.
if let Err(e) = self.tx.try_send(slow_query_event) {
error!(e; "Failed to send slow query event");
}
}
}
impl Drop for SlowQueryTimer {
fn drop(&mut self) {
if let Some(threshold) = self.threshold {
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > threshold {
if let Some(ratio) = self.sample_ratio {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if ratio >= 1.0 || random::<f64>() <= ratio {
self.send_slow_query_event(elapsed, threshold);
}
} else {
// Captures all slow queries if sample_ratio is not set.
self.send_slow_query_event(elapsed, threshold);
}
}
}
}
}

View File

@@ -187,6 +187,7 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
@@ -210,6 +211,7 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
// Cancel before creating the wrapper
@@ -234,6 +236,7 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
@@ -261,6 +264,7 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
@@ -289,6 +293,7 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
@@ -319,6 +324,7 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
@@ -347,6 +353,7 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();

View File

@@ -640,6 +640,12 @@ pub enum Error {
#[snafu(display("Unknown hint: {}", hint))]
UnknownHint { hint: String },
#[snafu(display("Query has been cancelled"))]
Cancelled {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -765,6 +771,8 @@ impl ErrorExt for Error {
DurationOverflow { .. } => StatusCode::InvalidArguments,
HandleOtelArrowRequest { .. } => StatusCode::Internal,
Cancelled { .. } => StatusCode::Cancelled,
}
}

View File

@@ -1268,6 +1268,7 @@ mod test {
use query::parser::PromQuery;
use query::query_engine::DescribeResult;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
use tokio::time::Instant;
@@ -1298,6 +1299,7 @@ mod test {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -20,6 +20,7 @@
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
use sql::statements::statement::Statement;
pub mod addrs;
pub mod configurator;
@@ -55,6 +56,8 @@ pub mod tls;
#[derive(Clone)]
pub struct SqlPlan {
query: String,
// Store the parsed statement to determine if it is a query and whether to track it.
statement: Option<Statement>,
plan: Option<LogicalPlan>,
schema: Option<Schema>,
}

View File

@@ -136,6 +136,7 @@ impl MysqlInstanceShim {
async fn do_exec_plan(
&self,
query: &str,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
@@ -144,7 +145,7 @@ impl MysqlInstanceShim {
{
Ok(output)
} else {
self.query_handler.do_exec_plan(plan, query_ctx).await
self.query_handler.do_exec_plan(stmt, plan, query_ctx).await
}
}
@@ -231,6 +232,7 @@ impl MysqlInstanceShim {
self.save_plan(
SqlPlan {
query: query.to_string(),
statement: Some(statement),
plan: None,
schema: None,
},
@@ -240,6 +242,7 @@ impl MysqlInstanceShim {
self.save_plan(
SqlPlan {
query: query.to_string(),
statement: Some(statement),
plan,
schema,
},
@@ -291,8 +294,13 @@ impl MysqlInstanceShim {
debug!("Mysql execute prepared plan: {}", plan.display_indent());
vec![
self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone())
.await,
self.do_exec_plan(
&sql_plan.query,
sql_plan.statement.clone(),
plan,
query_ctx.clone(),
)
.await,
]
}
None => {

View File

@@ -209,6 +209,7 @@ impl QueryParser for DefaultQueryParser {
if sql.is_empty() || fixtures::matches(sql) {
return Ok(SqlPlan {
query: sql.to_owned(),
statement: None,
plan: None,
schema: None,
});
@@ -229,7 +230,7 @@ impl QueryParser for DefaultQueryParser {
let describe_result = self
.query_handler
.do_describe(stmt, query_ctx)
.do_describe(stmt.clone(), query_ctx)
.await
.map_err(convert_err)?;
@@ -245,6 +246,7 @@ impl QueryParser for DefaultQueryParser {
Ok(SqlPlan {
query: sql.to_owned(),
statement: Some(stmt),
plan,
schema,
})
@@ -300,7 +302,7 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
.context(DataFusionSnafu)
.map_err(convert_err)?;
self.query_handler
.do_exec_plan(plan, query_ctx.clone())
.do_exec_plan(sql_plan.statement.clone(), plan, query_ctx.clone())
.await
} else {
// manually replace variables in prepared statement when no

View File

@@ -41,6 +41,7 @@ pub trait SqlQueryHandler {
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error>;
@@ -88,9 +89,14 @@ where
.collect()
}
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
self.0
.do_exec_plan(plan, query_ctx)
.do_exec_plan(stmt, plan, query_ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecutePlanSnafu)

View File

@@ -31,6 +31,7 @@ use servers::influxdb::InfluxdbRequest;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
struct DummyInstance {
@@ -59,6 +60,7 @@ impl SqlQueryHandler for DummyInstance {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -28,6 +28,7 @@ use servers::opentsdb::codec::DataPoint;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
struct DummyInstance {
@@ -59,6 +60,7 @@ impl SqlQueryHandler for DummyInstance {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -35,6 +35,7 @@ use servers::prom_store::{snappy_compress, Metrics};
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
struct DummyInstance {
@@ -87,6 +88,7 @@ impl SqlQueryHandler for DummyInstance {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -68,7 +68,12 @@ impl SqlQueryHandler for DummyInstance {
vec![Ok(output)]
}
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
Ok(self.query_engine.execute(plan, query_ctx).await.unwrap())
}

View File

@@ -104,4 +104,5 @@ session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres = { workspace = true }
url = "2.3"
urlencoding = "2.1"
yaml-rust = "0.4"

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use cache::{
build_datanode_cache_registry, build_fundamental_cache_registry,
@@ -41,6 +42,7 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::SlowQueryOptions;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
@@ -327,6 +329,13 @@ impl GreptimeDbStandaloneBuilder {
metadata_store: kv_backend_config,
wal: self.metasrv_wal_config.clone().into(),
grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"),
// Enable slow query log with 1s threshold to run the slow query test.
slow_query: Some(SlowQueryOptions {
enable: true,
// Set the threshold to 1s to run the slow query test.
threshold: Some(Duration::from_secs(1)),
..Default::default()
}),
..StandaloneOptions::default()
};

View File

@@ -15,6 +15,7 @@
use std::collections::BTreeMap;
use std::io::Write;
use std::str::FromStr;
use std::time::Duration;
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{
@@ -23,10 +24,13 @@ use api::prom_store::remote::{
use auth::user_provider_from_option;
use axum::http::{HeaderName, HeaderValue, StatusCode};
use chrono::Utc;
use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
use common_catalog::consts::{
trace_services_table_name, DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME,
};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use frontend::slow_query_recorder::{SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME};
use log_query::{Context, Limit, LogQuery, TimeFilter};
use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter};
use loki_proto::prost_types::Timestamp;
@@ -55,6 +59,7 @@ use tests_integration::test_util::{
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
StorageType,
};
use urlencoding::encode;
use yaml_rust::YamlLoader;
#[macro_export]
@@ -88,6 +93,7 @@ macro_rules! http_tests {
test_http_auth,
test_sql_api,
test_http_sql_slow_query,
test_prometheus_promql_api,
test_prom_http_api,
test_metrics_api,
@@ -542,6 +548,29 @@ pub async fn test_sql_api(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_http_sql_slow_query(store_type: StorageType) {
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await;
let client = TestClient::new(app).await;
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
let encoded_slow_query = encode(slow_query);
let query_params = format!("/v1/sql?sql={encoded_slow_query}");
let res = client.get(&query_params).send().await;
assert_eq!(res.status(), StatusCode::OK);
// Wait for the slow query to be recorded.
tokio::time::sleep(Duration::from_secs(5)).await;
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!("SELECT {} FROM {table}", SLOW_QUERY_TABLE_QUERY_COLUMN_NAME);
let expected = format!(r#"[["{}"]]"#, slow_query);
validate_data("test_http_sql_slow_query", &client, &query, &expected).await;
guard.remove_all().await;
}
pub async fn test_prometheus_promql_api(store_type: StorageType) {
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "promql_api").await;
let client = TestClient::new(app).await;
@@ -1305,7 +1334,7 @@ write_interval = "30s"
[slow_query]
enable = true
record_type = "system_table"
threshold = "30s"
threshold = "1s"
sample_ratio = 1.0
ttl = "30d"

View File

@@ -16,6 +16,12 @@ use std::collections::HashMap;
use auth::user_provider_from_option;
use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc};
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
use frontend::slow_query_recorder::{
SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
};
use sqlx::mysql::{MySqlConnection, MySqlDatabaseError, MySqlPoolOptions};
use sqlx::postgres::{PgDatabaseError, PgPoolOptions};
use sqlx::{Connection, Executor, Row};
@@ -64,10 +70,12 @@ macro_rules! sql_tests {
test_mysql_crud,
test_mysql_timezone,
test_mysql_async_timestamp,
test_mysql_slow_query,
test_postgres_auth,
test_postgres_crud,
test_postgres_timezone,
test_postgres_bytea,
test_postgres_slow_query,
test_postgres_datestyle,
test_postgres_parameter_inference,
test_postgres_array_types,
@@ -580,6 +588,56 @@ pub async fn test_postgres_crud(store_type: StorageType) {
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_slow_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) =
setup_mysql_server(store_type, "test_mysql_slow_query").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
// The slow query will run at least longer than 1s.
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
// Simulate a slow query.
sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
// Wait for the slow query to be recorded.
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table}",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME
);
let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap();
assert_eq!(rows.len(), 1);
// Check the results.
let row = &rows[0];
let cost: u64 = row.get(0);
let threshold: u64 = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0 && threshold > 0 && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_bytea(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_bytea").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
@@ -650,6 +708,46 @@ pub async fn test_postgres_bytea(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_postgres_slow_query(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_slow_query").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
let _ = sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
// Wait for the slow query to be recorded.
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table}",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME
);
let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap();
assert_eq!(rows.len(), 1);
let row = &rows[0];
let cost: i64 = row.get(0);
let threshold: i64 = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0 && threshold > 0 && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_datestyle(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_datestyle").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();