diff --git a/Cargo.lock b/Cargo.lock index fd17338f79..5ea9ae144c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1690,6 +1690,8 @@ dependencies = [ "partition", "paste", "prometheus", + "promql-parser", + "rand 0.9.0", "rustc-hash 2.0.0", "serde_json", "session", @@ -2364,6 +2366,7 @@ dependencies = [ "common-meta", "greptime-proto", "meta-client", + "session", "snafu 0.8.5", "tokio", "tonic 0.12.3", @@ -13115,6 +13118,7 @@ dependencies = [ "tonic 0.12.3", "tower 0.5.2", "url", + "urlencoding", "uuid", "yaml-rust", "zstd 0.13.2", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 4f749f49b2..602320696c 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -44,6 +44,8 @@ moka = { workspace = true, features = ["future", "sync"] } partition.workspace = true paste.workspace = true prometheus.workspace = true +promql-parser.workspace = true +rand.workspace = true rustc-hash.workspace = true serde_json.workspace = true session.workspace = true diff --git a/src/catalog/src/process_manager.rs b/src/catalog/src/process_manager.rs index 9ee6744323..03f75fe2bd 100644 --- a/src/catalog/src/process_manager.rs +++ b/src/catalog/src/process_manager.rs @@ -14,17 +14,24 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::fmt::{Debug, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant, UNIX_EPOCH}; use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo}; use common_base::cancellation::CancellationHandle; use common_frontend::selector::{FrontendSelector, MetaClientSelector}; -use common_telemetry::{debug, info, warn}; +use common_frontend::slow_query_event::SlowQueryEvent; +use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use meta_client::MetaClientRef; +use promql_parser::parser::EvalStmt; +use rand::random; +use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; +use sql::statements::statement::Statement; +use tokio::sync::mpsc::Sender; use crate::error; use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT}; @@ -44,6 +51,23 @@ pub struct ProcessManager { frontend_selector: Option, } +/// Represents a parsed query statement, functionally equivalent to [query::parser::QueryStatement]. +/// This enum is defined here to avoid cyclic dependencies with the query parser module. +#[derive(Debug, Clone)] +pub enum QueryStatement { + Sql(Statement), + Promql(EvalStmt), +} + +impl Display for QueryStatement { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + QueryStatement::Sql(stmt) => write!(f, "{}", stmt), + QueryStatement::Promql(eval_stmt) => write!(f, "{}", eval_stmt), + } + } +} + impl ProcessManager { /// Create a [ProcessManager] instance with server address and kv client. pub fn new(server_addr: String, meta_client: Option) -> Self { @@ -67,6 +91,7 @@ impl ProcessManager { query: String, client: String, query_id: Option, + _slow_query_timer: Option, ) -> Ticket { let id = query_id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed)); let process = ProcessInfo { @@ -93,6 +118,7 @@ impl ProcessManager { manager: self.clone(), id, cancellation_handle, + _slow_query_timer, } } @@ -223,6 +249,7 @@ pub struct Ticket { pub(crate) manager: ProcessManagerRef, pub(crate) id: ProcessId, pub cancellation_handle: Arc, + _slow_query_timer: Option, } impl Drop for Ticket { @@ -263,6 +290,107 @@ impl Debug for CancellableProcess { } } +/// SlowQueryTimer is used to log slow query when it's dropped. +/// In drop(), it will check if the query is slow and send the slow query event to the handler. +pub struct SlowQueryTimer { + start: Instant, + stmt: QueryStatement, + query_ctx: QueryContextRef, + threshold: Option, + sample_ratio: Option, + tx: Sender, +} + +impl SlowQueryTimer { + pub fn new( + stmt: QueryStatement, + query_ctx: QueryContextRef, + threshold: Option, + sample_ratio: Option, + tx: Sender, + ) -> Self { + Self { + start: Instant::now(), + stmt, + query_ctx, + threshold, + sample_ratio, + tx, + } + } +} + +impl SlowQueryTimer { + fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) { + let mut slow_query_event = SlowQueryEvent { + cost: elapsed.as_millis() as u64, + threshold: threshold.as_millis() as u64, + query: "".to_string(), + query_ctx: self.query_ctx.clone(), + + // The following fields are only used for PromQL queries. + is_promql: false, + promql_range: None, + promql_step: None, + promql_start: None, + promql_end: None, + }; + + match &self.stmt { + QueryStatement::Promql(stmt) => { + slow_query_event.is_promql = true; + slow_query_event.query = stmt.expr.to_string(); + slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64); + + let start = stmt + .start + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + + let end = stmt + .end + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64; + + slow_query_event.promql_range = Some((end - start) as u64); + slow_query_event.promql_start = Some(start); + slow_query_event.promql_end = Some(end); + } + QueryStatement::Sql(stmt) => { + slow_query_event.query = stmt.to_string(); + } + } + + // Send SlowQueryEvent to the handler. + if let Err(e) = self.tx.try_send(slow_query_event) { + error!(e; "Failed to send slow query event"); + } + } +} + +impl Drop for SlowQueryTimer { + fn drop(&mut self) { + if let Some(threshold) = self.threshold { + // Calculate the elaspsed duration since the timer is created. + let elapsed = self.start.elapsed(); + if elapsed > threshold { + if let Some(ratio) = self.sample_ratio { + // Only capture a portion of slow queries based on sample_ratio. + // Generate a random number in [0, 1) and compare it with sample_ratio. + if ratio >= 1.0 || random::() <= ratio { + self.send_slow_query_event(elapsed, threshold); + } + } else { + // Captures all slow queries if sample_ratio is not set. + self.send_slow_query_event(elapsed, threshold); + } + } + } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -278,6 +406,7 @@ mod tests { "SELECT * FROM table".to_string(), "".to_string(), None, + None, ); let running_processes = process_manager.local_processes(None).unwrap(); @@ -301,6 +430,7 @@ mod tests { "SELECT * FROM table".to_string(), "client1".to_string(), Some(custom_id), + None, ); assert_eq!(ticket.id, custom_id); @@ -321,6 +451,7 @@ mod tests { "SELECT * FROM table1".to_string(), "client1".to_string(), None, + None, ); let ticket2 = process_manager.clone().register_query( @@ -329,6 +460,7 @@ mod tests { "SELECT * FROM table2".to_string(), "client2".to_string(), None, + None, ); let running_processes = process_manager.local_processes(Some("public")).unwrap(); @@ -350,6 +482,7 @@ mod tests { "SELECT * FROM table1".to_string(), "client1".to_string(), None, + None, ); let _ticket2 = process_manager.clone().register_query( @@ -358,6 +491,7 @@ mod tests { "SELECT * FROM table2".to_string(), "client2".to_string(), None, + None, ); // Test listing processes for specific catalog @@ -384,6 +518,7 @@ mod tests { "SELECT * FROM table".to_string(), "client1".to_string(), None, + None, ); assert_eq!(process_manager.local_processes(None).unwrap().len(), 1); process_manager.deregister_query("public".to_string(), ticket.id); @@ -400,6 +535,7 @@ mod tests { "SELECT * FROM table".to_string(), "client1".to_string(), None, + None, ); assert!(!ticket.cancellation_handle.is_cancelled()); @@ -417,6 +553,7 @@ mod tests { "SELECT * FROM table".to_string(), "client1".to_string(), None, + None, ); assert!(!ticket.cancellation_handle.is_cancelled()); let killed = process_manager @@ -462,6 +599,7 @@ mod tests { "SELECT COUNT(*) FROM users WHERE age > 18".to_string(), "test_client".to_string(), Some(42), + None, ); let processes = process_manager.local_processes(None).unwrap(); @@ -488,6 +626,7 @@ mod tests { "SELECT * FROM table".to_string(), "client1".to_string(), None, + None, ); // Process should be registered diff --git a/src/common/frontend/Cargo.toml b/src/common/frontend/Cargo.toml index 5058fcc668..235bd6b0f8 100644 --- a/src/common/frontend/Cargo.toml +++ b/src/common/frontend/Cargo.toml @@ -12,6 +12,7 @@ common-macro.workspace = true common-meta.workspace = true greptime-proto.workspace = true meta-client.workspace = true +session.workspace = true snafu.workspace = true tonic.workspace = true diff --git a/src/common/frontend/src/lib.rs b/src/common/frontend/src/lib.rs index cfd485e25b..dacffd1996 100644 --- a/src/common/frontend/src/lib.rs +++ b/src/common/frontend/src/lib.rs @@ -19,6 +19,7 @@ use snafu::OptionExt; pub mod error; pub mod selector; +pub mod slow_query_event; #[derive(Debug, Clone, Eq, PartialEq)] pub struct DisplayProcessId { diff --git a/src/common/frontend/src/slow_query_event.rs b/src/common/frontend/src/slow_query_event.rs new file mode 100644 index 0000000000..4240b897c6 --- /dev/null +++ b/src/common/frontend/src/slow_query_event.rs @@ -0,0 +1,28 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use session::context::QueryContextRef; + +#[derive(Debug)] +pub struct SlowQueryEvent { + pub cost: u64, + pub threshold: u64, + pub query: String, + pub is_promql: bool, + pub query_ctx: QueryContextRef, + pub promql_range: Option, + pub promql_step: Option, + pub promql_start: Option, + pub promql_end: Option, +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 570c384922..ebfa43e2ec 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -32,7 +32,7 @@ use std::time::{Duration, SystemTime}; use async_stream::stream; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; -use catalog::process_manager::ProcessManagerRef; +use catalog::process_manager::{ProcessManagerRef, QueryStatement as CatalogQueryStatement}; use catalog::CatalogManagerRef; use client::OutputData; use common_base::cancellation::CancellableFuture; @@ -73,7 +73,7 @@ use query::query_engine::DescribeResult; use query::QueryEngineRef; use servers::error::{ self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu, - OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, + OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu, }; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, @@ -221,36 +221,40 @@ impl Instance { let query_interceptor = self.plugins.get::>(); let query_interceptor = query_interceptor.as_ref(); - let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder { - recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone()) + if should_capture_statement(Some(&stmt)) { + let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| { + recorder.start(CatalogQueryStatement::Sql(stmt.clone()), query_ctx.clone()) + }); + + let ticket = self.process_manager.register_query( + query_ctx.current_catalog().to_string(), + vec![query_ctx.current_schema()], + stmt.to_string(), + query_ctx.conn_info().to_string(), + Some(query_ctx.process_id()), + slow_query_timer, + ); + + let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor); + + CancellableFuture::new(query_fut, ticket.cancellation_handle.clone()) + .await + .map_err(|_| error::CancelledSnafu.build())? + .map(|output| { + let Output { meta, data } = output; + + let data = match data { + OutputData::Stream(stream) => OutputData::Stream(Box::pin( + CancellableStreamWrapper::new(stream, ticket), + )), + other => other, + }; + Output { data, meta } + }) } else { - None - }; - - let ticket = self.process_manager.register_query( - query_ctx.current_catalog().to_string(), - vec![query_ctx.current_schema()], - stmt.to_string(), - query_ctx.conn_info().to_string(), - Some(query_ctx.process_id()), - ); - - let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor); - - CancellableFuture::new(query_fut, ticket.cancellation_handle.clone()) - .await - .map_err(|_| error::CancelledSnafu.build())? - .map(|output| { - let Output { meta, data } = output; - - let data = match data { - OutputData::Stream(stream) => { - OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket))) - } - other => other, - }; - Output { data, meta } - }) + self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor) + .await + } } async fn exec_statement_with_timeout( @@ -572,13 +576,54 @@ impl SqlQueryHandler for Instance { } } - async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { - // plan should be prepared before exec - // we'll do check there - self.query_engine - .execute(plan.clone(), query_ctx) - .await - .context(ExecLogicalPlanSnafu) + async fn do_exec_plan( + &self, + stmt: Option, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result { + if should_capture_statement(stmt.as_ref()) { + // It's safe to unwrap here because we've already checked the type. + let stmt = stmt.unwrap(); + let query = stmt.to_string(); + let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| { + recorder.start(CatalogQueryStatement::Sql(stmt), query_ctx.clone()) + }); + + let ticket = self.process_manager.register_query( + query_ctx.current_catalog().to_string(), + vec![query_ctx.current_schema()], + query, + query_ctx.conn_info().to_string(), + Some(query_ctx.process_id()), + slow_query_timer, + ); + + let query_fut = self.query_engine.execute(plan.clone(), query_ctx); + + CancellableFuture::new(query_fut, ticket.cancellation_handle.clone()) + .await + .map_err(|_| error::CancelledSnafu.build())? + .map(|output| { + let Output { meta, data } = output; + + let data = match data { + OutputData::Stream(stream) => OutputData::Stream(Box::pin( + CancellableStreamWrapper::new(stream, ticket), + )), + other => other, + }; + Output { data, meta } + }) + .context(ExecLogicalPlanSnafu) + } else { + // plan should be prepared before exec + // we'll do check there + self.query_engine + .execute(plan.clone(), query_ctx) + .await + .context(ExecLogicalPlanSnafu) + } } #[tracing::instrument(skip_all)] @@ -672,12 +717,6 @@ impl PrometheusHandler for Instance { } })?; - let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder { - recorder.start(stmt.clone(), query_ctx.clone()) - } else { - None - }; - let plan = self .statement_executor .plan(&stmt, query_ctx.clone()) @@ -687,10 +726,47 @@ impl PrometheusHandler for Instance { interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?; - let output = self - .statement_executor - .exec_plan(plan, query_ctx.clone()) + // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement. + let query_statement = if let QueryStatement::Promql(eval_stmt) = stmt { + CatalogQueryStatement::Promql(eval_stmt) + } else { + // It should not happen since the query is already parsed successfully. + return UnexpectedResultSnafu { + reason: "The query should always be promql.".to_string(), + } + .fail(); + }; + let query = query_statement.to_string(); + + let slow_query_timer = self + .slow_query_recorder + .as_ref() + .and_then(|recorder| recorder.start(query_statement, query_ctx.clone())); + + let ticket = self.process_manager.register_query( + query_ctx.current_catalog().to_string(), + vec![query_ctx.current_schema()], + query, + query_ctx.conn_info().to_string(), + Some(query_ctx.process_id()), + slow_query_timer, + ); + + let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone()); + + let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone()) .await + .map_err(|_| servers::error::CancelledSnafu.build())? + .map(|output| { + let Output { meta, data } = output; + let data = match data { + OutputData::Stream(stream) => { + OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket))) + } + other => other, + }; + Output { data, meta } + }) .map_err(BoxedError::new) .context(ExecuteQuerySnafu)?; @@ -914,6 +990,15 @@ fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<( .context(SqlExecInterceptedSnafu) } +// Create a query ticket and slow query timer if the statement is a query or readonly statement. +fn should_capture_statement(stmt: Option<&Statement>) -> bool { + if let Some(stmt) = stmt { + matches!(stmt, Statement::Query(_)) || stmt.is_readonly() + } else { + false + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index eaf97a2e2b..a511febaaf 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -121,7 +121,8 @@ impl GrpcQueryHandler for Instance { .await .context(SubstraitDecodeLogicalPlanSnafu)?; let output = - SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?; + SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone()) + .await?; attach_timer(output, timer) } @@ -402,7 +403,7 @@ impl Instance { .context(common_query::error::GeneralDataFusionSnafu) .context(SubstraitDecodeLogicalPlanSnafu)?; - let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?; + let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?; Ok(attach_timer(output, timer)) } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 9fcfe4e870..17f9f27661 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -22,5 +22,5 @@ pub(crate) mod limiter; pub(crate) mod metrics; pub mod server; pub mod service_config; -pub(crate) mod slow_query_recorder; +pub mod slow_query_recorder; mod stream_wrapper; diff --git a/src/frontend/src/slow_query_recorder.rs b/src/frontend/src/slow_query_recorder.rs index c87a25f4e3..8f79108d75 100644 --- a/src/frontend/src/slow_query_recorder.rs +++ b/src/frontend/src/slow_query_recorder.rs @@ -14,22 +14,21 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant, UNIX_EPOCH}; use api::v1::value::ValueData; use api::v1::{ ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, }; +use catalog::process_manager::{QueryStatement as CatalogQueryStatement, SlowQueryTimer}; use catalog::CatalogManagerRef; use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME}; +use common_frontend::slow_query_event::SlowQueryEvent; use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions}; use common_telemetry::{debug, error, info, slow}; use common_time::timestamp::{TimeUnit, Timestamp}; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; -use query::parser::QueryStatement; -use rand::random; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::ResultExt; use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; @@ -39,16 +38,16 @@ use tokio::task::JoinHandle; use crate::error::{CatalogSnafu, Result, TableOperationSnafu}; -const SLOW_QUERY_TABLE_NAME: &str = "slow_queries"; -const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost"; -const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold"; -const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query"; -const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; -const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql"; -const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start"; -const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end"; -const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range"; -const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step"; +pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries"; +pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost"; +pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold"; +pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query"; +pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; +pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql"; +pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start"; +pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end"; +pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range"; +pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step"; const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d"; const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024; @@ -61,19 +60,6 @@ pub struct SlowQueryRecorder { _handle: Arc>, } -#[derive(Debug)] -struct SlowQueryEvent { - cost: u64, - threshold: u64, - query: String, - is_promql: bool, - query_ctx: QueryContextRef, - promql_range: Option, - promql_step: Option, - promql_start: Option, - promql_end: Option, -} - impl SlowQueryRecorder { /// Create a new SlowQueryRecorder. pub fn new( @@ -115,18 +101,17 @@ impl SlowQueryRecorder { /// The timer sets the start time when created and calculates the elapsed duration when dropped. pub fn start( &self, - stmt: QueryStatement, + stmt: CatalogQueryStatement, query_ctx: QueryContextRef, ) -> Option { if self.slow_query_opts.enable { - Some(SlowQueryTimer { + Some(SlowQueryTimer::new( stmt, query_ctx, - start: Instant::now(), // Set the initial start time. - threshold: self.slow_query_opts.threshold, - sample_ratio: self.slow_query_opts.sample_ratio, - tx: self.tx.clone(), - }) + self.slow_query_opts.threshold, + self.slow_query_opts.sample_ratio, + self.tx.clone(), + )) } else { None } @@ -447,85 +432,3 @@ impl SlowQueryEventHandler { ] } } - -/// SlowQueryTimer is used to log slow query when it's dropped. -/// In drop(), it will check if the query is slow and send the slow query event to the handler. -pub struct SlowQueryTimer { - start: Instant, - stmt: QueryStatement, - query_ctx: QueryContextRef, - threshold: Option, - sample_ratio: Option, - tx: Sender, -} - -impl SlowQueryTimer { - fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) { - let mut slow_query_event = SlowQueryEvent { - cost: elapsed.as_millis() as u64, - threshold: threshold.as_millis() as u64, - query: "".to_string(), - query_ctx: self.query_ctx.clone(), - - // The following fields are only used for PromQL queries. - is_promql: false, - promql_range: None, - promql_step: None, - promql_start: None, - promql_end: None, - }; - - match &self.stmt { - QueryStatement::Promql(stmt) => { - slow_query_event.is_promql = true; - slow_query_event.query = stmt.expr.to_string(); - slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64); - - let start = stmt - .start - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as i64; - - let end = stmt - .end - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as i64; - - slow_query_event.promql_range = Some((end - start) as u64); - slow_query_event.promql_start = Some(start); - slow_query_event.promql_end = Some(end); - } - QueryStatement::Sql(stmt) => { - slow_query_event.query = stmt.to_string(); - } - } - - // Send SlowQueryEvent to the handler. - if let Err(e) = self.tx.try_send(slow_query_event) { - error!(e; "Failed to send slow query event"); - } - } -} - -impl Drop for SlowQueryTimer { - fn drop(&mut self) { - if let Some(threshold) = self.threshold { - // Calculate the elaspsed duration since the timer is created. - let elapsed = self.start.elapsed(); - if elapsed > threshold { - if let Some(ratio) = self.sample_ratio { - // Only capture a portion of slow queries based on sample_ratio. - // Generate a random number in [0, 1) and compare it with sample_ratio. - if ratio >= 1.0 || random::() <= ratio { - self.send_slow_query_event(elapsed, threshold); - } - } else { - // Captures all slow queries if sample_ratio is not set. - self.send_slow_query_event(elapsed, threshold); - } - } - } - } -} diff --git a/src/frontend/src/stream_wrapper.rs b/src/frontend/src/stream_wrapper.rs index 2c1f4519b4..7ac27a1be2 100644 --- a/src/frontend/src/stream_wrapper.rs +++ b/src/frontend/src/stream_wrapper.rs @@ -187,6 +187,7 @@ mod tests { "query".to_string(), "client".to_string(), None, + None, ); let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket); @@ -210,6 +211,7 @@ mod tests { "query".to_string(), "client".to_string(), None, + None, ); // Cancel before creating the wrapper @@ -234,6 +236,7 @@ mod tests { "query".to_string(), "client".to_string(), None, + None, ); let cancellation_handle = ticket.cancellation_handle.clone(); @@ -261,6 +264,7 @@ mod tests { "query".to_string(), "client".to_string(), None, + None, ); let cancellation_handle = ticket.cancellation_handle.clone(); @@ -289,6 +293,7 @@ mod tests { "query".to_string(), "client".to_string(), None, + None, ); let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket); @@ -319,6 +324,7 @@ mod tests { "query".to_string(), "client".to_string(), None, + None, ); let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket); @@ -347,6 +353,7 @@ mod tests { "query".to_string(), "client".to_string(), None, + None, ); let cancellation_handle = ticket.cancellation_handle.clone(); diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 39130b9c7e..9e9f380986 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -640,6 +640,12 @@ pub enum Error { #[snafu(display("Unknown hint: {}", hint))] UnknownHint { hint: String }, + + #[snafu(display("Query has been cancelled"))] + Cancelled { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -765,6 +771,8 @@ impl ErrorExt for Error { DurationOverflow { .. } => StatusCode::InvalidArguments, HandleOtelArrowRequest { .. } => StatusCode::Internal, + + Cancelled { .. } => StatusCode::Cancelled, } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 74a375139a..85c598efc4 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1268,6 +1268,7 @@ mod test { use query::parser::PromQuery; use query::query_engine::DescribeResult; use session::context::QueryContextRef; + use sql::statements::statement::Statement; use tokio::sync::mpsc; use tokio::time::Instant; @@ -1298,6 +1299,7 @@ mod test { async fn do_exec_plan( &self, + _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, ) -> std::result::Result { diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 969ac56954..10c474c499 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -20,6 +20,7 @@ use datafusion_expr::LogicalPlan; use datatypes::schema::Schema; +use sql::statements::statement::Statement; pub mod addrs; pub mod configurator; @@ -55,6 +56,8 @@ pub mod tls; #[derive(Clone)] pub struct SqlPlan { query: String, + // Store the parsed statement to determine if it is a query and whether to track it. + statement: Option, plan: Option, schema: Option, } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 937deb0823..0a5e7909a0 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -136,6 +136,7 @@ impl MysqlInstanceShim { async fn do_exec_plan( &self, query: &str, + stmt: Option, plan: LogicalPlan, query_ctx: QueryContextRef, ) -> Result { @@ -144,7 +145,7 @@ impl MysqlInstanceShim { { Ok(output) } else { - self.query_handler.do_exec_plan(plan, query_ctx).await + self.query_handler.do_exec_plan(stmt, plan, query_ctx).await } } @@ -231,6 +232,7 @@ impl MysqlInstanceShim { self.save_plan( SqlPlan { query: query.to_string(), + statement: Some(statement), plan: None, schema: None, }, @@ -240,6 +242,7 @@ impl MysqlInstanceShim { self.save_plan( SqlPlan { query: query.to_string(), + statement: Some(statement), plan, schema, }, @@ -291,8 +294,13 @@ impl MysqlInstanceShim { debug!("Mysql execute prepared plan: {}", plan.display_indent()); vec![ - self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone()) - .await, + self.do_exec_plan( + &sql_plan.query, + sql_plan.statement.clone(), + plan, + query_ctx.clone(), + ) + .await, ] } None => { diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index fd0eb223a4..76d409a87f 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -209,6 +209,7 @@ impl QueryParser for DefaultQueryParser { if sql.is_empty() || fixtures::matches(sql) { return Ok(SqlPlan { query: sql.to_owned(), + statement: None, plan: None, schema: None, }); @@ -229,7 +230,7 @@ impl QueryParser for DefaultQueryParser { let describe_result = self .query_handler - .do_describe(stmt, query_ctx) + .do_describe(stmt.clone(), query_ctx) .await .map_err(convert_err)?; @@ -245,6 +246,7 @@ impl QueryParser for DefaultQueryParser { Ok(SqlPlan { query: sql.to_owned(), + statement: Some(stmt), plan, schema, }) @@ -300,7 +302,7 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner { .context(DataFusionSnafu) .map_err(convert_err)?; self.query_handler - .do_exec_plan(plan, query_ctx.clone()) + .do_exec_plan(sql_plan.statement.clone(), plan, query_ctx.clone()) .await } else { // manually replace variables in prepared statement when no diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs index eef443bbec..aebb566f80 100644 --- a/src/servers/src/query_handler/sql.rs +++ b/src/servers/src/query_handler/sql.rs @@ -41,6 +41,7 @@ pub trait SqlQueryHandler { async fn do_exec_plan( &self, + stmt: Option, plan: LogicalPlan, query_ctx: QueryContextRef, ) -> std::result::Result; @@ -88,9 +89,14 @@ where .collect() } - async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { + async fn do_exec_plan( + &self, + stmt: Option, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result { self.0 - .do_exec_plan(plan, query_ctx) + .do_exec_plan(stmt, plan, query_ctx) .await .map_err(BoxedError::new) .context(error::ExecutePlanSnafu) diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 93932252fb..e3822059a5 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -31,6 +31,7 @@ use servers::influxdb::InfluxdbRequest; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; +use sql::statements::statement::Statement; use tokio::sync::mpsc; struct DummyInstance { @@ -59,6 +60,7 @@ impl SqlQueryHandler for DummyInstance { async fn do_exec_plan( &self, + _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, ) -> std::result::Result { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 6ac835e72d..6bb0b79666 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -28,6 +28,7 @@ use servers::opentsdb::codec::DataPoint; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; +use sql::statements::statement::Statement; use tokio::sync::mpsc; struct DummyInstance { @@ -59,6 +60,7 @@ impl SqlQueryHandler for DummyInstance { async fn do_exec_plan( &self, + _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, ) -> std::result::Result { diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index f87697cf4d..e0bd688a15 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -35,6 +35,7 @@ use servers::prom_store::{snappy_compress, Metrics}; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse}; use session::context::QueryContextRef; +use sql::statements::statement::Statement; use tokio::sync::mpsc; struct DummyInstance { @@ -87,6 +88,7 @@ impl SqlQueryHandler for DummyInstance { async fn do_exec_plan( &self, + _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, ) -> std::result::Result { diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 41454c8ddc..461c448a84 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -68,7 +68,12 @@ impl SqlQueryHandler for DummyInstance { vec![Ok(output)] } - async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { + async fn do_exec_plan( + &self, + _stmt: Option, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> Result { Ok(self.query_engine.execute(plan, query_ctx).await.unwrap()) } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 9b91a4ffd9..4806d4554a 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -104,4 +104,5 @@ session = { workspace = true, features = ["testing"] } store-api.workspace = true tokio-postgres = { workspace = true } url = "2.3" +urlencoding = "2.1" yaml-rust = "0.4" diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b30076c27d..89a7203817 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use cache::{ build_datanode_cache_registry, build_fundamental_cache_registry, @@ -41,6 +42,7 @@ use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::build_wal_options_allocator; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; +use common_telemetry::logging::SlowQueryOptions; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::datanode::DatanodeBuilder; use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError}; @@ -327,6 +329,13 @@ impl GreptimeDbStandaloneBuilder { metadata_store: kv_backend_config, wal: self.metasrv_wal_config.clone().into(), grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"), + // Enable slow query log with 1s threshold to run the slow query test. + slow_query: Some(SlowQueryOptions { + enable: true, + // Set the threshold to 1s to run the slow query test. + threshold: Some(Duration::from_secs(1)), + ..Default::default() + }), ..StandaloneOptions::default() }; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8b0fb92f43..ef072b477b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use std::io::Write; use std::str::FromStr; +use std::time::Duration; use api::prom_store::remote::label_matcher::Type as MatcherType; use api::prom_store::remote::{ @@ -23,10 +24,13 @@ use api::prom_store::remote::{ use auth::user_provider_from_option; use axum::http::{HeaderName, HeaderValue, StatusCode}; use chrono::Utc; -use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME}; +use common_catalog::consts::{ + trace_services_table_name, DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME, +}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use frontend::slow_query_recorder::{SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME}; use log_query::{Context, Limit, LogQuery, TimeFilter}; use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter}; use loki_proto::prost_types::Timestamp; @@ -55,6 +59,7 @@ use tests_integration::test_util::{ setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, StorageType, }; +use urlencoding::encode; use yaml_rust::YamlLoader; #[macro_export] @@ -88,6 +93,7 @@ macro_rules! http_tests { test_http_auth, test_sql_api, + test_http_sql_slow_query, test_prometheus_promql_api, test_prom_http_api, test_metrics_api, @@ -542,6 +548,29 @@ pub async fn test_sql_api(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_http_sql_slow_query(store_type: StorageType) { + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await; + let client = TestClient::new(app).await; + + let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte"; + let encoded_slow_query = encode(slow_query); + + let query_params = format!("/v1/sql?sql={encoded_slow_query}"); + let res = client.get(&query_params).send().await; + assert_eq!(res.status(), StatusCode::OK); + + // Wait for the slow query to be recorded. + tokio::time::sleep(Duration::from_secs(5)).await; + + let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME); + let query = format!("SELECT {} FROM {table}", SLOW_QUERY_TABLE_QUERY_COLUMN_NAME); + + let expected = format!(r#"[["{}"]]"#, slow_query); + validate_data("test_http_sql_slow_query", &client, &query, &expected).await; + + guard.remove_all().await; +} + pub async fn test_prometheus_promql_api(store_type: StorageType) { let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "promql_api").await; let client = TestClient::new(app).await; @@ -1305,7 +1334,7 @@ write_interval = "30s" [slow_query] enable = true record_type = "system_table" -threshold = "30s" +threshold = "1s" sample_ratio = 1.0 ttl = "30d" diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index e975882bbc..cc738b79d6 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -16,6 +16,12 @@ use std::collections::HashMap; use auth::user_provider_from_option; use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc}; +use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME; +use frontend::slow_query_recorder::{ + SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME, + SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME, + SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME, +}; use sqlx::mysql::{MySqlConnection, MySqlDatabaseError, MySqlPoolOptions}; use sqlx::postgres::{PgDatabaseError, PgPoolOptions}; use sqlx::{Connection, Executor, Row}; @@ -64,10 +70,12 @@ macro_rules! sql_tests { test_mysql_crud, test_mysql_timezone, test_mysql_async_timestamp, + test_mysql_slow_query, test_postgres_auth, test_postgres_crud, test_postgres_timezone, test_postgres_bytea, + test_postgres_slow_query, test_postgres_datestyle, test_postgres_parameter_inference, test_postgres_array_types, @@ -580,6 +588,56 @@ pub async fn test_postgres_crud(store_type: StorageType) { let _ = fe_pg_server.shutdown().await; guard.remove_all().await; } + +pub async fn test_mysql_slow_query(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + + let (mut guard, fe_mysql_server) = + setup_mysql_server(store_type, "test_mysql_slow_query").await; + let addr = fe_mysql_server.bind_addr().unwrap().to_string(); + + let pool = MySqlPoolOptions::new() + .max_connections(2) + .connect(&format!("mysql://{addr}/public")) + .await + .unwrap(); + + // The slow query will run at least longer than 1s. + let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte"; + + // Simulate a slow query. + sqlx::query(slow_query).fetch_all(&pool).await.unwrap(); + + // Wait for the slow query to be recorded. + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME); + let query = format!( + "SELECT {}, {}, {}, {} FROM {table}", + SLOW_QUERY_TABLE_COST_COLUMN_NAME, + SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME, + SLOW_QUERY_TABLE_QUERY_COLUMN_NAME, + SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME + ); + + let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap(); + assert_eq!(rows.len(), 1); + + // Check the results. + let row = &rows[0]; + let cost: u64 = row.get(0); + let threshold: u64 = row.get(1); + let query: String = row.get(2); + let is_promql: bool = row.get(3); + + assert!(cost > 0 && threshold > 0 && cost > threshold); + assert_eq!(query, slow_query); + assert!(!is_promql); + + let _ = fe_mysql_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_postgres_bytea(store_type: StorageType) { let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_bytea").await; let addr = fe_pg_server.bind_addr().unwrap().to_string(); @@ -650,6 +708,46 @@ pub async fn test_postgres_bytea(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_postgres_slow_query(store_type: StorageType) { + let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_slow_query").await; + let addr = fe_pg_server.bind_addr().unwrap().to_string(); + + let pool = PgPoolOptions::new() + .max_connections(2) + .connect(&format!("postgres://{addr}/public")) + .await + .unwrap(); + + let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte"; + let _ = sqlx::query(slow_query).fetch_all(&pool).await.unwrap(); + + // Wait for the slow query to be recorded. + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME); + let query = format!( + "SELECT {}, {}, {}, {} FROM {table}", + SLOW_QUERY_TABLE_COST_COLUMN_NAME, + SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME, + SLOW_QUERY_TABLE_QUERY_COLUMN_NAME, + SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME + ); + let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap(); + assert_eq!(rows.len(), 1); + let row = &rows[0]; + let cost: i64 = row.get(0); + let threshold: i64 = row.get(1); + let query: String = row.get(2); + let is_promql: bool = row.get(3); + + assert!(cost > 0 && threshold > 0 && cost > threshold); + assert_eq!(query, slow_query); + assert!(!is_promql); + + let _ = fe_pg_server.shutdown().await; + guard.remove_all().await; +} + pub async fn test_postgres_datestyle(store_type: StorageType) { let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_datestyle").await; let addr = fe_pg_server.bind_addr().unwrap().to_string();