diff --git a/Cargo.lock b/Cargo.lock index 29bdceace7..af62adce23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4136,6 +4136,7 @@ dependencies = [ "regex", "serde", "serde_json", + "servers", "snafu", "table", "tokio", diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 27492982f8..f7ae79911a 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use clap::Parser; use common_telemetry::logging; use datanode::datanode::{ @@ -86,6 +88,10 @@ struct StartCommand { wal_dir: Option, #[clap(long)] procedure_dir: Option, + #[clap(long)] + http_addr: Option, + #[clap(long)] + http_timeout: Option, } impl StartCommand { @@ -155,6 +161,12 @@ impl TryFrom for DatanodeOptions { if let Some(procedure_dir) = cmd.procedure_dir { opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir)); } + if let Some(http_addr) = cmd.http_addr { + opts.http_opts.addr = http_addr + } + if let Some(http_timeout) = cmd.http_timeout { + opts.http_opts.timeout = Duration::from_secs(http_timeout) + } Ok(opts) } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 7dc801a597..d9170066bc 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use clap::Parser; use common_telemetry::{info, logging, warn}; use meta_srv::bootstrap::MetaSrvInstance; @@ -80,6 +82,10 @@ struct StartCommand { selector: Option, #[clap(long)] use_memory_store: bool, + #[clap(long)] + http_addr: Option, + #[clap(long)] + http_timeout: Option, } impl StartCommand { @@ -128,6 +134,13 @@ impl TryFrom for MetaSrvOptions { opts.use_memory_store = true; } + if let Some(http_addr) = cmd.http_addr { + opts.http_opts.addr = http_addr; + } + if let Some(http_timeout) = cmd.http_timeout { + opts.http_opts.timeout = Duration::from_secs(http_timeout); + } + Ok(opts) } } @@ -150,6 +163,8 @@ mod tests { config_file: None, selector: Some("LoadBased".to_string()), use_memory_store: false, + http_addr: None, + http_timeout: None, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); @@ -178,6 +193,8 @@ mod tests { selector: None, config_file: Some(file.path().to_str().unwrap().to_string()), use_memory_store: false, + http_addr: None, + http_timeout: None, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 2a5b3a6b65..06eb1139d2 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -14,7 +14,7 @@ pub mod error; mod global; -pub mod metric; +mod metrics; mod repeated_task; pub mod runtime; diff --git a/src/common/runtime/src/metric.rs b/src/common/runtime/src/metrics.rs similarity index 100% rename from src/common/runtime/src/metric.rs rename to src/common/runtime/src/metrics.rs diff --git a/src/common/runtime/src/runtime.rs b/src/common/runtime/src/runtime.rs index a16c4c4c3c..2caabc327b 100644 --- a/src/common/runtime/src/runtime.rs +++ b/src/common/runtime/src/runtime.rs @@ -24,7 +24,7 @@ use tokio::sync::oneshot; pub use tokio::task::{JoinError, JoinHandle}; use crate::error::*; -use crate::metric::*; +use crate::metrics::*; /// A runtime to run future tasks #[derive(Clone, Debug)] diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index c139580dc5..29dc94d4b2 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -12,7 +12,7 @@ deadlock_detection = ["parking_lot"] backtrace = "0.3" common-error = { path = "../error" } console-subscriber = { version = "0.1", optional = true } -metrics = "0.20" +metrics = "0.20.1" metrics-exporter-prometheus = { version = "0.11", default-features = false } once_cell = "1.10" opentelemetry = { version = "0.17", default-features = false, features = [ diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 17f1998265..482ec882c5 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -19,6 +19,7 @@ use common_base::readable_size::ReadableSize; use common_telemetry::info; use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; +use servers::http::HttpOptions; use servers::Mode; use storage::config::EngineConfig as StorageEngineConfig; use storage::scheduler::SchedulerConfig; @@ -224,6 +225,7 @@ pub struct DatanodeOptions { pub rpc_runtime_size: usize, pub mysql_addr: String, pub mysql_runtime_size: usize, + pub http_opts: HttpOptions, pub meta_client_options: Option, pub wal: WalConfig, pub storage: StorageConfig, @@ -241,6 +243,7 @@ impl Default for DatanodeOptions { rpc_runtime_size: 8, mysql_addr: "127.0.0.1:4406".to_string(), mysql_runtime_size: 2, + http_opts: HttpOptions::default(), meta_client_options: None, wal: WalConfig::default(), storage: StorageConfig::default(), @@ -252,14 +255,17 @@ impl Default for DatanodeOptions { /// Datanode service. pub struct Datanode { opts: DatanodeOptions, - services: Services, + services: Option, instance: InstanceRef, } impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { let instance = Arc::new(Instance::new(&opts).await?); - let services = Services::try_new(instance.clone(), &opts).await?; + let services = match opts.mode { + Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?), + Mode::Standalone => None, + }; Ok(Self { opts, services, @@ -280,7 +286,11 @@ impl Datanode { /// Start services of datanode. This method call will block until services are shutdown. pub async fn start_services(&mut self) -> Result<()> { - self.services.start(&self.opts).await + if let Some(service) = self.services.as_mut() { + service.start(&self.opts).await + } else { + Ok(()) + } } pub fn get_instance(&self) -> InstanceRef { @@ -292,7 +302,11 @@ impl Datanode { } async fn shutdown_services(&self) -> Result<()> { - self.services.shutdown().await + if let Some(service) = self.services.as_ref() { + service.shutdown().await + } else { + Ok(()) + } } pub async fn shutdown(&self) -> Result<()> { diff --git a/src/datanode/src/instance/script.rs b/src/datanode/src/instance/script.rs index fc7757a365..d3eb5cb29f 100644 --- a/src/datanode/src/instance/script.rs +++ b/src/datanode/src/instance/script.rs @@ -20,7 +20,7 @@ use common_telemetry::timer; use servers::query_handler::ScriptHandler; use crate::instance::Instance; -use crate::metric; +use crate::metrics; #[async_trait] impl ScriptHandler for Instance { @@ -30,7 +30,7 @@ impl ScriptHandler for Instance { name: &str, script: &str, ) -> servers::error::Result<()> { - let _timer = timer!(metric::METRIC_HANDLE_SCRIPTS_ELAPSED); + let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED); self.script_executor .insert_script(schema, name, script) .await @@ -42,7 +42,7 @@ impl ScriptHandler for Instance { name: &str, params: HashMap, ) -> servers::error::Result { - let _timer = timer!(metric::METRIC_RUN_SCRIPT_ELAPSED); + let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED); self.script_executor .execute_script(schema, name, params) .await diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 7b615c86b4..1cfa061064 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -37,7 +37,7 @@ use crate::error::{ TableIdProviderNotFoundSnafu, }; use crate::instance::Instance; -use crate::metric; +use crate::metrics; use crate::sql::{SqlHandler, SqlRequest}; impl Instance { @@ -190,7 +190,7 @@ impl Instance { promql: &PromQuery, query_ctx: QueryContextRef, ) -> Result { - let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED); + let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED); let stmt = QueryLanguageParser::parse_promql(promql).context(ExecuteSqlSnafu)?; @@ -294,7 +294,7 @@ impl StatementHandler for Instance { #[async_trait] impl PromHandler for Instance { async fn do_query(&self, query: &PromQuery) -> server_error::Result { - let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED); + let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED); self.execute_promql(query, QueryContext::arc()) .await diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 6acd1fd94a..862a8c8478 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -19,7 +19,7 @@ pub mod datanode; pub mod error; mod heartbeat; pub mod instance; -pub mod metric; +pub mod metrics; mod mock; mod script; pub mod server; diff --git a/src/datanode/src/metric.rs b/src/datanode/src/metrics.rs similarity index 100% rename from src/datanode/src/metric.rs rename to src/datanode/src/metrics.rs diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 2417625b0e..533f9daa11 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -18,9 +18,12 @@ use std::sync::Arc; use common_runtime::Builder as RuntimeBuilder; use servers::grpc::GrpcServer; +use servers::http::{HttpServer, HttpServerBuilder}; +use servers::metrics_handler::MetricsHandler; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::server::Server; use snafu::ResultExt; +use tokio::select; use crate::datanode::DatanodeOptions; use crate::error::{ @@ -33,6 +36,7 @@ pub mod grpc; /// All rpc services. pub struct Services { grpc_server: GrpcServer, + http_server: HttpServer, } impl Services { @@ -51,6 +55,9 @@ impl Services { None, grpc_runtime, ), + http_server: HttpServerBuilder::new(opts.http_opts.clone()) + .with_metrics_handler(MetricsHandler) + .build(), }) } @@ -58,10 +65,15 @@ impl Services { let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { addr: &opts.rpc_addr, })?; - self.grpc_server - .start(grpc_addr) - .await - .context(StartServerSnafu)?; + let http_addr = opts.http_opts.addr.parse().context(ParseAddrSnafu { + addr: &opts.http_opts.addr, + })?; + let grpc = self.grpc_server.start(grpc_addr); + let http = self.http_server.start(http_addr); + select!( + v = grpc => v.context(StartServerSnafu)?, + v = http => v.context(StartServerSnafu)?, + ); Ok(()) } @@ -69,6 +81,11 @@ impl Services { self.grpc_server .shutdown() .await - .context(ShutdownServerSnafu) + .context(ShutdownServerSnafu)?; + self.http_server + .shutdown() + .await + .context(ShutdownServerSnafu)?; + Ok(()) } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a37a5b9dcb..5dcfd0a5c7 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -40,7 +40,7 @@ use common_telemetry::timer; use datafusion::sql::sqlparser::ast::ObjectName; use datanode::instance::sql::table_idents_to_full_name; use datanode::instance::InstanceRef as DnInstanceRef; -use datanode::metric; +use datanode::metrics; use datatypes::schema::Schema; use distributed::DistInstance; use meta_client::client::{MetaClient, MetaClientBuilder}; @@ -532,7 +532,7 @@ impl SqlQueryHandler for Instance { type Error = Error; async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { - let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); + let _timer = timer!(metrics::METRIC_HANDLE_SQL_ELAPSED); let query_interceptor = self.plugins.get::>(); let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) { diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d4ab41dee8..d7d7af1d1d 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -22,7 +22,7 @@ use common_telemetry::info; use servers::auth::UserProviderRef; use servers::error::Error::InternalIo; use servers::grpc::GrpcServer; -use servers::http::HttpServer; +use servers::http::HttpServerBuilder; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; @@ -150,33 +150,33 @@ impl Services { if let Some(http_options) = &opts.http_options { let http_addr = parse_addr(&http_options.addr)?; - let mut http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(instance.clone()), - ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), - http_options.clone(), - ); + let mut http_server_builder = HttpServerBuilder::new(http_options.clone()); + http_server_builder + .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())); + if let Some(user_provider) = user_provider.clone() { - http_server.set_user_provider(user_provider); + http_server_builder.with_user_provider(user_provider); } if set_opentsdb_handler { - http_server.set_opentsdb_handler(instance.clone()); + http_server_builder.with_opentsdb_handler(instance.clone()); } if matches!( opts.influxdb_options, Some(InfluxdbOptions { enable: true }) ) { - http_server.set_influxdb_handler(instance.clone()); + http_server_builder.with_influxdb_handler(instance.clone()); } if matches!( opts.prometheus_options, Some(PrometheusOptions { enable: true }) ) { - http_server.set_prom_handler(instance.clone()); + http_server_builder.with_prom_handler(instance.clone()); } - http_server.set_script_handler(instance.clone()); - + http_server_builder.with_script_handler(instance.clone()); + let http_server = http_server_builder.build(); result.push((Box::new(http_server), http_addr)); } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index db451b8fae..a9bcbb7d84 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -41,6 +41,7 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic.workspace = true tower = "0.4" url = "2.3" +servers = { path = "../servers" } [dev-dependencies] tracing = "0.1" diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 77b89961f0..06d1167fec 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -20,8 +20,12 @@ use api::v1::meta::lock_server::LockServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; use etcd_client::Client; +use servers::http::{HttpServer, HttpServerBuilder}; +use servers::metrics_handler::MetricsHandler; +use servers::server::Server; use snafu::ResultExt; use tokio::net::TcpListener; +use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::server::Router; @@ -44,6 +48,8 @@ use crate::{error, Result}; pub struct MetaSrvInstance { meta_srv: MetaSrv, + http_srv: Arc, + opts: MetaSrvOptions, signal_sender: Option>, @@ -52,9 +58,14 @@ pub struct MetaSrvInstance { impl MetaSrvInstance { pub async fn new(opts: MetaSrvOptions) -> Result { let meta_srv = build_meta_srv(&opts).await?; - + let http_srv = Arc::new( + HttpServerBuilder::new(opts.http_opts.clone()) + .with_metrics_handler(MetricsHandler) + .build(), + ); Ok(MetaSrvInstance { meta_srv, + http_srv, opts, signal_sender: None, }) @@ -67,12 +78,24 @@ impl MetaSrvInstance { self.signal_sender = Some(tx); - bootstrap_meta_srv_with_router( + let meta_srv = bootstrap_meta_srv_with_router( &self.opts.bind_addr, router(self.meta_srv.clone()), &mut rx, - ) - .await?; + ); + let addr = self + .opts + .http_opts + .addr + .parse() + .context(error::ParseAddrSnafu { + addr: &self.opts.http_opts.addr, + })?; + let http_srv = self.http_srv.start(addr); + select! { + v = meta_srv => v?, + v = http_srv => v.map(|_| ()).context(error::StartMetricsExportSnafu)?, + } Ok(()) } @@ -86,7 +109,12 @@ impl MetaSrvInstance { } self.meta_srv.shutdown(); - + self.http_srv + .shutdown() + .await + .context(error::ShutdownServerSnafu { + server: self.http_srv.name(), + })?; Ok(()) } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index d311e5bb8f..4ab353842d 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -25,6 +25,13 @@ pub enum Error { #[snafu(display("Failed to send shutdown signal"))] SendShutdownSignal { source: SendError<()> }, + #[snafu(display("Failed to shutdown {} server, source: {}", server, source))] + ShutdownServer { + #[snafu(backtrace)] + source: servers::error::Error, + server: String, + }, + #[snafu(display("Error stream request next is None"))] StreamNone { backtrace: Backtrace }, @@ -55,7 +62,16 @@ pub enum Error { source: tonic::transport::Error, backtrace: Backtrace, }, - + #[snafu(display("Failed to start gRPC server, source: {}", source))] + StartMetricsExport { + #[snafu(backtrace)] + source: servers::error::Error, + }, + #[snafu(display("Failed to parse address {}, source: {}", addr, source))] + ParseAddr { + addr: String, + source: std::net::AddrParseError, + }, #[snafu(display("Empty table name"))] EmptyTableName { backtrace: Backtrace }, @@ -323,6 +339,7 @@ impl ErrorExt for Error { | Error::LockNotConfig { .. } | Error::ExceededRetryLimit { .. } | Error::SendShutdownSignal { .. } + | Error::ParseAddr { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } @@ -348,6 +365,9 @@ impl ErrorExt for Error { Error::InvalidCatalogValue { source, .. } => source.status_code(), Error::MetaInternal { source } => source.status_code(), Error::RecoverProcedure { source } => source.status_code(), + Error::ShutdownServer { source, .. } | Error::StartMetricsExport { source } => { + source.status_code() + } } } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 951dd393d2..d6a095042d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -21,6 +21,7 @@ use api::v1::meta::Peer; use common_procedure::ProcedureManagerRef; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; +use servers::http::HttpOptions; use snafu::ResultExt; use tokio::sync::broadcast::error::RecvError; @@ -44,6 +45,7 @@ pub struct MetaSrvOptions { pub datanode_lease_secs: i64, pub selector: SelectorType, pub use_memory_store: bool, + pub http_opts: HttpOptions, } impl Default for MetaSrvOptions { @@ -55,6 +57,7 @@ impl Default for MetaSrvOptions { datanode_lease_secs: 15, selector: SelectorType::default(), use_memory_store: false, + http_opts: HttpOptions::default(), } } } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 30cde4e20f..8e147a26b7 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -58,7 +58,7 @@ use crate::physical_planner::PhysicalPlanner; use crate::plan::LogicalPlan; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; use crate::query_engine::{QueryEngineContext, QueryEngineState}; -use crate::{metric, QueryEngine}; +use crate::{metrics, QueryEngine}; pub struct DatafusionQueryEngine { state: Arc, @@ -254,7 +254,7 @@ impl QueryEngine for DatafusionQueryEngine { impl LogicalOptimizer for DatafusionQueryEngine { fn optimize(&self, plan: &LogicalPlan) -> Result { - let _timer = timer!(metric::METRIC_OPTIMIZE_LOGICAL_ELAPSED); + let _timer = timer!(metrics::METRIC_OPTIMIZE_LOGICAL_ELAPSED); match plan { LogicalPlan::DfPlan(df_plan) => { let optimized_plan = self @@ -280,7 +280,7 @@ impl PhysicalPlanner for DatafusionQueryEngine { ctx: &mut QueryEngineContext, logical_plan: &LogicalPlan, ) -> Result> { - let _timer = timer!(metric::METRIC_CREATE_PHYSICAL_ELAPSED); + let _timer = timer!(metrics::METRIC_CREATE_PHYSICAL_ELAPSED); match logical_plan { LogicalPlan::DfPlan(df_plan) => { let state = ctx.state(); @@ -315,7 +315,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine { ctx: &mut QueryEngineContext, plan: Arc, ) -> Result> { - let _timer = timer!(metric::METRIC_OPTIMIZE_PHYSICAL_ELAPSED); + let _timer = timer!(metrics::METRIC_OPTIMIZE_PHYSICAL_ELAPSED); let mut new_plan = plan .as_any() @@ -342,7 +342,7 @@ impl QueryExecutor for DatafusionQueryEngine { ctx: &QueryEngineContext, plan: &Arc, ) -> Result { - let _timer = timer!(metric::METRIC_EXEC_PLAN_ELAPSED); + let _timer = timer!(metrics::METRIC_EXEC_PLAN_ELAPSED); match plan.output_partitioning().partition_count() { 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), 1 => Ok(plan diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 6d7775614f..878b54dec3 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -16,7 +16,7 @@ pub mod datafusion; pub mod error; pub mod executor; pub mod logical_optimizer; -mod metric; +mod metrics; mod optimizer; pub mod parser; pub mod physical_optimizer; diff --git a/src/query/src/metric.rs b/src/query/src/metrics.rs similarity index 97% rename from src/query/src/metric.rs rename to src/query/src/metrics.rs index ae306ff768..489e7fb62a 100644 --- a/src/query/src/metric.rs +++ b/src/query/src/metrics.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! query engine metrics - pub static METRIC_PARSE_SQL_ELAPSED: &str = "query.parse_sql_elapsed"; pub static METRIC_PARSE_PROMQL_ELAPSED: &str = "query.parse_promql_elapsed"; pub static METRIC_OPTIMIZE_LOGICAL_ELAPSED: &str = "query.optimize_logicalplan_elapsed"; diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index e7559f03e7..799dd6997b 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -28,7 +28,7 @@ use sql::statements::statement::Statement; use crate::error::{ MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result, }; -use crate::metric::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED}; +use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED}; const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 8d5b053262..aae858b59a 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -60,6 +60,7 @@ use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write}; use crate::auth::UserProviderRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; use crate::http::admin::flush; +use crate::metrics_handler::MetricsHandler; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ @@ -99,9 +100,10 @@ pub const HTTP_API_PREFIX: &str = "/v1/"; // TODO(fys): This is a temporary workaround, it will be improved later pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"]; +#[derive(Default)] pub struct HttpServer { - sql_handler: ServerSqlQueryHandlerRef, - grpc_handler: ServerGrpcQueryHandlerRef, + sql_handler: Option, + grpc_handler: Option, options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, @@ -109,9 +111,11 @@ pub struct HttpServer { script_handler: Option, shutdown_tx: Mutex>>, user_provider: Option, + metrics_handler: Option, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] pub struct HttpOptions { pub addr: String, #[serde(with = "humantime_serde")] @@ -354,65 +358,74 @@ pub struct ApiState { pub script_handler: Option, } -impl HttpServer { - pub fn new( - sql_handler: ServerSqlQueryHandlerRef, - grpc_handler: ServerGrpcQueryHandlerRef, - options: HttpOptions, - ) -> Self { +#[derive(Default)] +pub struct HttpServerBuilder { + inner: HttpServer, +} + +impl HttpServerBuilder { + pub fn new(options: HttpOptions) -> Self { Self { - sql_handler, - grpc_handler, - options, - opentsdb_handler: None, - influxdb_handler: None, - prom_handler: None, - user_provider: None, - script_handler: None, - shutdown_tx: Mutex::new(None), + inner: HttpServer { + sql_handler: None, + grpc_handler: None, + options, + opentsdb_handler: None, + influxdb_handler: None, + prom_handler: None, + user_provider: None, + script_handler: None, + metrics_handler: None, + shutdown_tx: Mutex::new(None), + }, } } - pub fn set_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) { - debug_assert!( - self.opentsdb_handler.is_none(), - "OpenTSDB handler can be set only once!" - ); - self.opentsdb_handler.get_or_insert(handler); + pub fn with_sql_handler(&mut self, handler: ServerSqlQueryHandlerRef) -> &mut Self { + self.inner.sql_handler.get_or_insert(handler); + self } - pub fn set_script_handler(&mut self, handler: ScriptHandlerRef) { - debug_assert!( - self.script_handler.is_none(), - "Script handler can be set only once!" - ); - self.script_handler.get_or_insert(handler); + pub fn with_grpc_handler(&mut self, handler: ServerGrpcQueryHandlerRef) -> &mut Self { + self.inner.grpc_handler.get_or_insert(handler); + self } - pub fn set_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) { - debug_assert!( - self.influxdb_handler.is_none(), - "Influxdb line protocol handler can be set only once!" - ); - self.influxdb_handler.get_or_insert(handler); + pub fn with_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) -> &mut Self { + self.inner.opentsdb_handler.get_or_insert(handler); + self } - pub fn set_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) { - debug_assert!( - self.prom_handler.is_none(), - "Prometheus protocol handler can be set only once!" - ); - self.prom_handler.get_or_insert(handler); + pub fn with_script_handler(&mut self, handler: ScriptHandlerRef) -> &mut Self { + self.inner.script_handler.get_or_insert(handler); + self } - pub fn set_user_provider(&mut self, user_provider: UserProviderRef) { - debug_assert!( - self.user_provider.is_none(), - "User provider can be set only once!" - ); - self.user_provider.get_or_insert(user_provider); + pub fn with_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) -> &mut Self { + self.inner.influxdb_handler.get_or_insert(handler); + self } + pub fn with_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) -> &mut Self { + self.inner.prom_handler.get_or_insert(handler); + self + } + + pub fn with_user_provider(&mut self, user_provider: UserProviderRef) -> &mut Self { + self.inner.user_provider.get_or_insert(user_provider); + self + } + + pub fn with_metrics_handler(&mut self, handler: MetricsHandler) -> &mut Self { + self.inner.metrics_handler.get_or_insert(handler); + self + } + pub fn build(&mut self) -> HttpServer { + std::mem::take(self).inner + } +} + +impl HttpServer { pub fn make_app(&self) -> Router { let mut api = OpenApi { info: Info { @@ -428,19 +441,25 @@ impl HttpServer { ..OpenApi::default() }; - let sql_router = self - .route_sql(ApiState { - sql_handler: self.sql_handler.clone(), - script_handler: self.script_handler.clone(), - }) - .finish_api(&mut api) - .layer(Extension(api)); + let mut router = Router::new(); - let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router); - router = router.nest( - &format!("/{HTTP_API_VERSION}/admin"), - self.route_admin(self.grpc_handler.clone()), - ); + if let Some(sql_handler) = self.sql_handler.clone() { + let sql_router = self + .route_sql(ApiState { + sql_handler, + script_handler: self.script_handler.clone(), + }) + .finish_api(&mut api) + .layer(Extension(api)); + router = router.nest(&format!("/{HTTP_API_VERSION}"), sql_router); + } + + if let Some(grpc_handler) = self.grpc_handler.clone() { + router = router.nest( + &format!("/{HTTP_API_VERSION}/admin"), + self.route_admin(grpc_handler.clone()), + ); + } if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { router = router.nest( @@ -472,7 +491,9 @@ impl HttpServer { ); } - router = router.route("/metrics", routing::get(handler::metrics)); + if let Some(metrics_handler) = self.metrics_handler { + router = router.nest("", self.route_metrics(metrics_handler)); + } router = router.route( "/health", @@ -498,6 +519,12 @@ impl HttpServer { ) } + fn route_metrics(&self, metrics_handler: MetricsHandler) -> Router { + Router::new() + .route("/metrics", routing::get(handler::metrics)) + .with_state(metrics_handler) + } + fn route_sql(&self, api_state: ApiState) -> ApiRouter { ApiRouter::new() .api_route( @@ -680,8 +707,10 @@ mod test { let instance = Arc::new(DummyInstance { _tx: tx }); let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone()); let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance); - - let server = HttpServer::new(sql_instance, grpc_instance, HttpOptions::default()); + let server = HttpServerBuilder::new(HttpOptions::default()) + .with_sql_handler(sql_instance) + .with_grpc_handler(grpc_instance) + .build(); server.make_app().route( "/test/timeout", get(forever.layer( diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 8f7ede1c91..606c06e23b 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -19,13 +19,13 @@ use aide::transform::TransformOperation; use axum::extract::{Json, Query, State}; use axum::{Extension, Form}; use common_error::status_code::StatusCode; -use common_telemetry::metric; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use session::context::UserInfo; use crate::http::{ApiState, JsonResponse}; +use crate::metrics_handler::MetricsHandler; #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct SqlQuery { @@ -114,12 +114,11 @@ pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation { /// Handler to export metrics #[axum_macros::debug_handler] -pub async fn metrics(Query(_params): Query>) -> String { - if let Some(handle) = metric::try_handle() { - handle.render() - } else { - "Prometheus handle not initialized.".to_owned() - } +pub async fn metrics( + State(state): State, + Query(_params): Query>, +) -> String { + state.render() } #[derive(Debug, Serialize, Deserialize, JsonSchema)] diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 8260da6f27..6341123464 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -25,6 +25,7 @@ pub mod http; pub mod influxdb; pub mod interceptor; pub mod line_writer; +pub mod metrics_handler; pub mod mysql; pub mod opentsdb; pub mod postgres; diff --git a/src/servers/src/metrics_handler.rs b/src/servers/src/metrics_handler.rs new file mode 100644 index 0000000000..89970cf308 --- /dev/null +++ b/src/servers/src/metrics_handler.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::metric; + +/// a server that serves metrics +/// only start when datanode starts in distributed mode +#[derive(Copy, Clone)] +pub struct MetricsHandler; + +impl MetricsHandler { + pub fn render(&self) -> String { + if let Some(handle) = metric::try_handle() { + handle.render() + } else { + "Prometheus handle not initialized.".to_owned() + } + } +} diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 6ad18a9ac8..4d3698c150 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -20,6 +20,7 @@ use axum::Form; use common_telemetry::metric; use metrics::counter; use servers::http::{handler as http_handler, script as script_handler, ApiState, JsonOutput}; +use servers::metrics_handler::MetricsHandler; use session::context::UserInfo; use table::test_util::MemTable; @@ -146,8 +147,8 @@ async fn test_metrics() { metric::init_default_metrics_recorder(); counter!("test_metrics", 1); - - let text = http_handler::metrics(Query(HashMap::default())).await; + let stats = MetricsHandler; + let text = http_handler::metrics(axum::extract::State(stats), Query(HashMap::default())).await; assert!(text.contains("test_metrics counter")); } diff --git a/src/servers/tests/http/http_test.rs b/src/servers/tests/http/http_test.rs index c6398022b0..b6b3a9fa13 100644 --- a/src/servers/tests/http/http_test.rs +++ b/src/servers/tests/http/http_test.rs @@ -14,17 +14,20 @@ use axum::Router; use axum_test_helper::TestClient; -use servers::http::{HttpOptions, HttpServer}; +use servers::http::{HttpOptions, HttpServerBuilder}; use table::test_util::MemTable; use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler}; fn make_test_app() -> Router { - let server = HttpServer::new( - create_testing_sql_query_handler(MemTable::default_numbers_table()), - create_testing_grpc_query_handler(MemTable::default_numbers_table()), - HttpOptions::default(), - ); + let server = HttpServerBuilder::new(HttpOptions::default()) + .with_sql_handler(create_testing_sql_query_handler( + MemTable::default_numbers_table(), + )) + .with_grpc_handler(create_testing_grpc_query_handler( + MemTable::default_numbers_table(), + )) + .build(); server.make_app() } diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 086c6403c5..da5d9d9a1f 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -23,7 +23,7 @@ use common_query::Output; use datatypes::schema::Schema; use query::parser::PromQuery; use servers::error::{Error, Result}; -use servers::http::{HttpOptions, HttpServer}; +use servers::http::{HttpOptions, HttpServerBuilder}; use servers::influxdb::InfluxdbRequest; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; @@ -94,7 +94,9 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: Arc>, db_name: Option<&str>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); + let mut server_builder = HttpServerBuilder::new(HttpOptions::default()); + server_builder.with_sql_handler(instance.clone()); + server_builder.with_grpc_handler(instance.clone()); let mut user_provider = MockUserProvider::default(); if let Some(name) = db_name { user_provider.set_authorization_info(DatabaseAuthInfo { @@ -103,9 +105,10 @@ fn make_test_app(tx: Arc>, db_name: Option<&str>) username: "greptime", }) } - server.set_user_provider(Arc::new(user_provider)); + server_builder.with_user_provider(Arc::new(user_provider)); - server.set_influxdb_handler(instance); + server_builder.with_influxdb_handler(instance); + let server = server_builder.build(); server.make_app() } diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 694751635e..8a0ed59986 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -22,7 +22,7 @@ use common_query::Output; use datatypes::schema::Schema; use query::parser::PromQuery; use servers::error::{self, Result}; -use servers::http::{HttpOptions, HttpServer}; +use servers::http::{HttpOptions, HttpServerBuilder}; use servers::opentsdb::codec::DataPoint; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; @@ -92,8 +92,11 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); - server.set_opentsdb_handler(instance); + let server = HttpServerBuilder::new(HttpOptions::default()) + .with_grpc_handler(instance.clone()) + .with_sql_handler(instance.clone()) + .with_opentsdb_handler(instance) + .build(); server.make_app() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 173382b1e4..69d4fb8046 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -26,7 +26,7 @@ use datatypes::schema::Schema; use prost::Message; use query::parser::PromQuery; use servers::error::{Error, Result}; -use servers::http::{HttpOptions, HttpServer}; +use servers::http::{HttpOptions, HttpServerBuilder}; use servers::prometheus; use servers::prometheus::{snappy_compress, Metrics}; use servers::query_handler::grpc::GrpcQueryHandler; @@ -117,8 +117,11 @@ impl SqlQueryHandler for DummyInstance { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); - let mut server = HttpServer::new(instance.clone(), instance.clone(), HttpOptions::default()); - server.set_prom_handler(instance); + let server = HttpServerBuilder::new(HttpOptions::default()) + .with_grpc_handler(instance.clone()) + .with_sql_handler(instance.clone()) + .with_prom_handler(instance) + .build(); server.make_app() } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 090311bd56..69b0999b23 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -38,7 +38,8 @@ use object_store::ObjectStore; use once_cell::sync::OnceCell; use rand::Rng; use servers::grpc::GrpcServer; -use servers::http::{HttpOptions, HttpServer}; +use servers::http::{HttpOptions, HttpServerBuilder}; +use servers::metrics_handler::MetricsHandler; use servers::prom::PromServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; @@ -271,11 +272,13 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router ) .await .unwrap(); - let http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(Arc::new(build_frontend_instance(instance.clone()))), - ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), - HttpOptions::default(), - ); + let http_server = HttpServerBuilder::new(HttpOptions::default()) + .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new( + build_frontend_instance(instance.clone()), + ))) + .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())) + .with_metrics_handler(MetricsHandler) + .build(); (http_server.make_app(), guard) } @@ -295,12 +298,11 @@ pub async fn setup_test_http_app_with_frontend( .await .unwrap(); let frontend_ref = Arc::new(frontend); - let mut http_server = HttpServer::new( - ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()), - ServerGrpcQueryHandlerAdaptor::arc(frontend_ref), - HttpOptions::default(), - ); - http_server.set_script_handler(instance.clone()); + let http_server = HttpServerBuilder::new(HttpOptions::default()) + .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref)) + .with_script_handler(instance.clone()) + .build(); let app = http_server.make_app(); (app, guard) } diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 3a819b8d3a..e3d96c2786 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -161,10 +161,16 @@ impl Env { "datanode" | "standalone" => { args.push("-c".to_string()); args.push(Self::generate_config_file(subcommand, db_ctx)); + args.push("--http-addr=0.0.0.0:5001".to_string()); + } + "frontend" => { + args.push("--metasrv-addr=0.0.0.0:3002".to_string()); + args.push("--http-addr=0.0.0.0:5000".to_string()); + } + "metasrv" => { + args.push("--use-memory-store".to_string()); + args.push("--http-addr=0.0.0.0:5002".to_string()); } - "frontend" => args.push("--metasrv-addr=0.0.0.0:3002".to_string()), - "metasrv" => args.push("--use-memory-store".to_string()), - _ => panic!("Unexpected subcommand: {subcommand}"), }