diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 135b6a82e4..2247ebf5df 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -46,8 +46,8 @@ enable = true [prometheus_options] enable = true -# PromQL protocol options, see `standalone.example.toml`. -[promql_options] +# Prometheus protocol options, see `standalone.example.toml`. +[prom_options] addr = "127.0.0.1:4004" # Metasrv client options, see `datanode.example.toml`. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 69e58780c9..e05190dc91 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -71,9 +71,9 @@ enable = true # Whether to enable Prometheus remote write and read in HTTP API, true by default. enable = true -# PromQL protocol options. -[promql_options] -# PromQL server address, "127.0.0.1:4004" by default. +# Prom protocol options. +[prom_options] +# Prometheus API server address, "127.0.0.1:4004" by default. addr = "127.0.0.1:4004" # WAL options. diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 6c9ce4c4d6..2c54a1e896 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -23,6 +23,7 @@ use frontend::instance::Instance; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; +use frontend::prom::PromOptions; use meta_client::MetaClientOptions; use servers::auth::UserProviderRef; use servers::http::HttpOptions; @@ -67,6 +68,8 @@ pub struct StartCommand { #[clap(long)] mysql_addr: Option, #[clap(long)] + prom_addr: Option, + #[clap(long)] postgres_addr: Option, #[clap(long)] opentsdb_addr: Option, @@ -141,6 +144,9 @@ impl TryFrom for FrontendOptions { ..Default::default() }); } + if let Some(addr) = cmd.prom_addr { + opts.prom_options = Some(PromOptions { addr }); + } if let Some(addr) = cmd.postgres_addr { opts.postgres_options = Some(PostgresOptions { addr, @@ -186,6 +192,7 @@ mod tests { let command = StartCommand { http_addr: Some("127.0.0.1:1234".to_string()), grpc_addr: None, + prom_addr: Some("127.0.0.1:4444".to_string()), mysql_addr: Some("127.0.0.1:5678".to_string()), postgres_addr: Some("127.0.0.1:5432".to_string()), opentsdb_addr: Some("127.0.0.1:4321".to_string()), @@ -209,6 +216,7 @@ mod tests { opts.opentsdb_options.as_ref().unwrap().addr, "127.0.0.1:4321" ); + assert_eq!(opts.prom_options.as_ref().unwrap().addr, "127.0.0.1:4444"); let default_opts = FrontendOptions::default(); assert_eq!( @@ -247,6 +255,7 @@ mod tests { http_addr: None, grpc_addr: None, mysql_addr: None, + prom_addr: None, postgres_addr: None, opentsdb_addr: None, influxdb_enable: None, @@ -276,6 +285,7 @@ mod tests { http_addr: None, grpc_addr: None, mysql_addr: None, + prom_addr: None, postgres_addr: None, opentsdb_addr: None, influxdb_enable: None, diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 77fef7481b..4675f704a1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -28,8 +28,8 @@ use frontend::instance::Instance as FeInstance; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; +use frontend::prom::PromOptions; use frontend::prometheus::PrometheusOptions; -use frontend::promql::PromqlOptions; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; @@ -77,7 +77,7 @@ pub struct StandaloneOptions { pub opentsdb_options: Option, pub influxdb_options: Option, pub prometheus_options: Option, - pub promql_options: Option, + pub prom_options: Option, pub wal: WalConfig, pub storage: ObjectStoreConfig, pub compaction: CompactionConfig, @@ -96,7 +96,7 @@ impl Default for StandaloneOptions { opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), - promql_options: Some(PromqlOptions::default()), + prom_options: Some(PromOptions::default()), wal: WalConfig::default(), storage: ObjectStoreConfig::default(), compaction: CompactionConfig::default(), @@ -116,7 +116,7 @@ impl StandaloneOptions { opentsdb_options: self.opentsdb_options, influxdb_options: self.influxdb_options, prometheus_options: self.prometheus_options, - promql_options: self.promql_options, + prom_options: self.prom_options, meta_client_options: None, } } @@ -142,6 +142,8 @@ struct StartCommand { #[clap(long)] mysql_addr: Option, #[clap(long)] + prom_addr: Option, + #[clap(long)] postgres_addr: Option, #[clap(long)] opentsdb_addr: Option, @@ -254,6 +256,11 @@ impl TryFrom for FrontendOptions { ..Default::default() }) } + + if let Some(addr) = cmd.prom_addr { + opts.prom_options = Some(PromOptions { addr }) + } + if let Some(addr) = cmd.postgres_addr { opts.postgres_options = Some(PostgresOptions { addr, @@ -302,6 +309,7 @@ mod tests { http_addr: None, rpc_addr: None, mysql_addr: None, + prom_addr: None, postgres_addr: None, opentsdb_addr: None, config_file: Some(format!( @@ -347,6 +355,7 @@ mod tests { let command = StartCommand { http_addr: None, rpc_addr: None, + prom_addr: None, mysql_addr: None, postgres_addr: None, opentsdb_addr: None, diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 943c746dc8..3fe5b1ada6 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -24,7 +24,7 @@ use datatypes::schema::Schema; use futures::StreamExt; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use servers::error as server_error; -use servers::promql::PromqlHandler; +use servers::prom::PromHandler; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; @@ -366,7 +366,7 @@ impl SqlQueryHandler for Instance { } #[async_trait] -impl PromqlHandler for Instance { +impl PromHandler for Instance { async fn do_query(&self, query: &PromQuery) -> server_error::Result { let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED); diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 2dced030e1..da862511fa 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -28,8 +28,8 @@ use crate::instance::FrontendInstance; use crate::mysql::MysqlOptions; use crate::opentsdb::OpentsdbOptions; use crate::postgres::PostgresOptions; +use crate::prom::PromOptions; use crate::prometheus::PrometheusOptions; -use crate::promql::PromqlOptions; use crate::server::Services; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -43,7 +43,7 @@ pub struct FrontendOptions { pub opentsdb_options: Option, pub influxdb_options: Option, pub prometheus_options: Option, - pub promql_options: Option, + pub prom_options: Option, pub meta_client_options: Option, } @@ -58,7 +58,7 @@ impl Default for FrontendOptions { opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), - promql_options: Some(PromqlOptions::default()), + prom_options: Some(PromOptions::default()), meta_client_options: None, } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f5ea3a89a4..2bc57a1ff6 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -49,7 +49,7 @@ use query::parser::PromQuery; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use servers::error as server_error; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; -use servers::promql::{PromqlHandler, PromqlHandlerRef}; +use servers::prom::{PromHandler, PromHandlerRef}; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef}; use servers::query_handler::{ @@ -81,7 +81,7 @@ pub trait FrontendInstance: + InfluxdbLineProtocolHandler + PrometheusProtocolHandler + ScriptHandler - + PromqlHandler + + PromHandler + Send + Sync + 'static @@ -99,7 +99,7 @@ pub struct Instance { script_handler: Option, sql_handler: SqlQueryHandlerRef, grpc_query_handler: GrpcQueryHandlerRef, - promql_handler: Option, + promql_handler: Option, create_expr_factory: CreateExprFactoryRef, @@ -539,7 +539,7 @@ impl ScriptHandler for Instance { } #[async_trait] -impl PromqlHandler for Instance { +impl PromHandler for Instance { async fn do_query(&self, query: &PromQuery) -> server_error::Result { if let Some(promql_handler) = &self.promql_handler { promql_handler.do_query(query).await diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 4c33577f2c..b5d82f93b7 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -25,8 +25,8 @@ pub mod instance; pub mod mysql; pub mod opentsdb; pub mod postgres; +pub mod prom; pub mod prometheus; -pub mod promql; mod server; mod sql; mod table; diff --git a/src/frontend/src/promql.rs b/src/frontend/src/prom.rs similarity index 87% rename from src/frontend/src/promql.rs rename to src/frontend/src/prom.rs index a2e18a4922..d617d95f39 100644 --- a/src/frontend/src/promql.rs +++ b/src/frontend/src/prom.rs @@ -15,11 +15,11 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct PromqlOptions { +pub struct PromOptions { pub addr: String, } -impl Default for PromqlOptions { +impl Default for PromOptions { fn default() -> Self { Self { addr: "127.0.0.1:4004".to_string(), @@ -29,11 +29,11 @@ impl Default for PromqlOptions { #[cfg(test)] mod tests { - use super::PromqlOptions; + use super::PromOptions; #[test] fn test_prometheus_options() { - let default = PromqlOptions::default(); + let default = PromOptions::default(); assert_eq!(default.addr, "127.0.0.1:4004".to_string()); } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index a6ef49bddf..c6d609e3f8 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -25,7 +25,7 @@ use servers::http::HttpServer; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; -use servers::promql::PromqlServer; +use servers::prom::PromServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -183,15 +183,15 @@ impl Services { None }; - let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options { - let promql_addr = parse_addr(&promql_options.addr)?; + let prom_server_and_addr = if let Some(prom_options) = &opts.prom_options { + let prom_addr = parse_addr(&prom_options.addr)?; - let mut promql_server = PromqlServer::create_server(instance.clone()); + let mut prom_server = PromServer::create_server(instance.clone()); if let Some(user_provider) = user_provider { - promql_server.set_user_provider(user_provider); + prom_server.set_user_provider(user_provider); } - Some((promql_server as _, promql_addr)) + Some((prom_server as _, prom_addr)) } else { None }; @@ -202,7 +202,7 @@ impl Services { start_server(mysql_server_and_addr), start_server(postgres_server_and_addr), start_server(opentsdb_server_and_addr), - start_server(promql_server_and_addr), + start_server(prom_server_and_addr), ) .context(error::StartServerSnafu)?; Ok(()) diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index d759a57bb6..dc47fbfba9 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -28,8 +28,8 @@ pub mod line_writer; pub mod mysql; pub mod opentsdb; pub mod postgres; +pub mod prom; pub mod prometheus; -pub mod promql; pub mod query_handler; pub mod server; mod shutdown; diff --git a/src/servers/src/promql.rs b/src/servers/src/prom.rs similarity index 89% rename from src/servers/src/promql.rs rename to src/servers/src/prom.rs index 44d52c4af1..6382f37200 100644 --- a/src/servers/src/promql.rs +++ b/src/servers/src/prom.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! prom supply the prometheus HTTP API Server compliance use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -52,24 +53,25 @@ use crate::error::{ use crate::http::authorize::HttpAuth; use crate::server::Server; -pub const PROMQL_API_VERSION: &str = "v1"; +pub const PROM_API_VERSION: &str = "v1"; -pub type PromqlHandlerRef = Arc; +pub type PromHandlerRef = Arc; #[async_trait] -pub trait PromqlHandler { +pub trait PromHandler { async fn do_query(&self, query: &PromQuery) -> Result; } -pub struct PromqlServer { - query_handler: PromqlHandlerRef, +/// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API +pub struct PromServer { + query_handler: PromHandlerRef, shutdown_tx: Mutex>>, user_provider: Option, } -impl PromqlServer { - pub fn create_server(query_handler: PromqlHandlerRef) -> Box { - Box::new(PromqlServer { +impl PromServer { + pub fn create_server(query_handler: PromHandlerRef) -> Box { + Box::new(PromServer { query_handler, shutdown_tx: Mutex::new(None), user_provider: None, @@ -90,7 +92,7 @@ impl PromqlServer { .with_state(self.query_handler.clone()); Router::new() - .nest(&format!("/api/{PROMQL_API_VERSION}"), router) + .nest(&format!("/api/{PROM_API_VERSION}"), router) // middlewares .layer( ServiceBuilder::new() @@ -105,15 +107,15 @@ impl PromqlServer { } #[async_trait] -impl Server for PromqlServer { +impl Server for PromServer { async fn shutdown(&self) -> Result<()> { let mut shutdown_tx = self.shutdown_tx.lock().await; if let Some(tx) = shutdown_tx.take() { if tx.send(()).is_err() { - info!("Receiver dropped, the PromQl server has already existed"); + info!("Receiver dropped, the Prometheus API server has already existed"); } } - info!("Shutdown PromQL server"); + info!("Shutdown Prometheus API server"); Ok(()) } @@ -124,7 +126,9 @@ impl Server for PromqlServer { let mut shutdown_tx = self.shutdown_tx.lock().await; ensure!( shutdown_tx.is_none(), - AlreadyStartedSnafu { server: "PromQL" } + AlreadyStartedSnafu { + server: "Prometheus" + } ); let app = self.make_app(); @@ -135,7 +139,7 @@ impl Server for PromqlServer { server }; let listening = server.local_addr(); - info!("PromQL server is bound to {}", listening); + info!("Prometheus API server is bound to {}", listening); let graceful = server.with_graceful_shutdown(rx.map(drop)); graceful.await.context(StartHttpSnafu)?; @@ -145,22 +149,22 @@ impl Server for PromqlServer { } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct PromqlSeries { +pub struct PromSeries { metric: HashMap, values: Vec<(f64, String)>, } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct PromqlData { +pub struct PromData { #[serde(rename = "resultType")] result_type: String, - result: Vec, + result: Vec, } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct PromqlJsonResponse { +pub struct PromJsonResponse { status: String, - data: PromqlData, + data: PromData, #[serde(skip_serializing_if = "Option::is_none")] error: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -170,23 +174,23 @@ pub struct PromqlJsonResponse { warnings: Option>, } -impl PromqlJsonResponse { +impl PromJsonResponse { pub fn error(error_type: S1, reason: S2) -> Json where S1: Into, S2: Into, { - Json(PromqlJsonResponse { + Json(PromJsonResponse { status: "error".to_string(), - data: PromqlData::default(), + data: PromData::default(), error: Some(reason.into()), error_type: Some(error_type.into()), warnings: None, }) } - pub fn success(data: PromqlData) -> Json { - Json(PromqlJsonResponse { + pub fn success(data: PromData) -> Json { + Json(PromJsonResponse { status: "success".to_string(), data, error: None, @@ -224,7 +228,7 @@ impl PromqlJsonResponse { if err.status_code() == StatusCode::TableNotFound || err.status_code() == StatusCode::TableColumnNotFound { - Self::success(PromqlData { + Self::success(PromData { result_type: "matrix".to_string(), ..Default::default() }) @@ -235,7 +239,7 @@ impl PromqlJsonResponse { } } - fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result { + fn record_batches_to_data(batches: RecordBatches, metric_name: String) -> Result { // infer semantic type of each column from schema. // TODO(ruihang): wish there is a better way to do this. let mut timestamp_column_index = None; @@ -322,13 +326,13 @@ impl PromqlJsonResponse { let result = buffer .into_iter() - .map(|(tags, values)| PromqlSeries { + .map(|(tags, values)| PromSeries { metric: tags.into_iter().collect(), values, }) .collect(); - let data = PromqlData { + let data = PromData { result_type: "matrix".to_string(), result, }; @@ -346,10 +350,10 @@ pub struct InstantQuery { #[axum_macros::debug_handler] pub async fn instant_query( - State(_handler): State, + State(_handler): State, Query(_params): Query, -) -> Json { - PromqlJsonResponse::error( +) -> Json { + PromJsonResponse::error( "not implemented", "instant query api `/query` is not implemented. Use `/query_range` instead.", ) @@ -366,10 +370,10 @@ pub struct RangeQuery { #[axum_macros::debug_handler] pub async fn range_query( - State(handler): State, + State(handler): State, Query(params): Query, Form(form_params): Form, -) -> Json { +) -> Json { let prom_query = PromQuery { query: params.query.or(form_params.query).unwrap_or_default(), start: params.start.or(form_params.start).unwrap_or_default(), @@ -378,7 +382,7 @@ pub async fn range_query( }; let result = handler.do_query(&prom_query).await; let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default(); - PromqlJsonResponse::from_query_result(result, metric_name).await + PromJsonResponse::from_query_result(result, metric_name).await } fn retrieve_metric_name(promql: &str) -> Option { diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 6c90b28121..5a96ccb697 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -13,6 +13,7 @@ // limitations under the License. //! prometheus protocol supportings +//! handles prometheus remote_write, remote_read logic use std::cmp::Ordering; use std::collections::BTreeMap; use std::hash::{Hash, Hasher}; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 11ddb3b7b0..79cdc7173d 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -38,7 +38,7 @@ use once_cell::sync::OnceCell; use rand::Rng; use servers::grpc::GrpcServer; use servers::http::{HttpOptions, HttpServer}; -use servers::promql::PromqlServer; +use servers::prom::PromServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -305,7 +305,7 @@ pub async fn setup_test_http_app_with_frontend( (app, guard) } -pub async fn setup_test_promql_app_with_frontend( +pub async fn setup_test_prom_app_with_frontend( store_type: StorageType, name: &str, ) -> (Router, TestGuard) { @@ -320,8 +320,8 @@ pub async fn setup_test_promql_app_with_frontend( ) .await .unwrap(); - let promql_server = PromqlServer::create_server(Arc::new(frontend) as _); - let app = promql_server.make_app(); + let prom_server = PromServer::create_server(Arc::new(frontend) as _); + let app = prom_server.make_app(); (app, guard) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 1199f85f5e..ad6b498dc9 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -19,7 +19,7 @@ use serde_json::json; use servers::http::handler::HealthResponse; use servers::http::{JsonOutput, JsonResponse}; use tests_integration::test_util::{ - setup_test_http_app, setup_test_http_app_with_frontend, setup_test_promql_app_with_frontend, + setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend, StorageType, }; @@ -54,7 +54,7 @@ macro_rules! http_tests { test_sql_api, test_prometheus_promql_api, - test_promql_http_api, + test_prom_http_api, test_metrics_api, test_scripts_api, test_health_api, @@ -284,9 +284,9 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) { guard.remove_all().await; } -pub async fn test_promql_http_api(store_type: StorageType) { +pub async fn test_prom_http_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_promql_app_with_frontend(store_type, "promql_api").await; + let (app, mut guard) = setup_test_prom_app_with_frontend(store_type, "promql_api").await; let client = TestClient::new(app); // instant query