From 674bfd85c7a30d84ab27fd4a467e0347c3923d1e Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Wed, 12 Jul 2023 07:47:09 +0100 Subject: [PATCH] chore(prom)!: rename prometheus(remote storage) to prom-store and promql(HTTP server) to prometheus (#1931) * chore(prom): rename prometheus(remote storage) to prom-store and promql(HTTP server) to prometheus * chore: apply clippy suggestions * chore: adjust format according to rustfmt --- config/frontend.example.toml | 6 +- config/standalone.example.toml | 8 +- src/api/src/lib.rs | 2 +- src/cmd/src/frontend.rs | 9 +- src/cmd/src/standalone.rs | 10 +- src/frontend/src/error.rs | 4 +- src/frontend/src/frontend.rs | 6 +- src/frontend/src/instance.rs | 14 +- .../instance/{prometheus.rs => prom_store.rs} | 30 +- src/frontend/src/metrics.rs | 2 +- src/frontend/src/server.rs | 14 +- src/frontend/src/service_config.rs | 4 +- .../service_config/{prom.rs => prom_store.rs} | 18 +- src/frontend/src/service_config/prometheus.rs | 8 +- src/servers/src/grpc.rs | 16 +- src/servers/src/grpc/prom_query_gateway.rs | 19 +- src/servers/src/http.rs | 14 +- .../src/http/{prometheus.rs => prom_store.rs} | 18 +- src/servers/src/lib.rs | 2 +- src/servers/src/metrics.rs | 4 +- src/servers/src/prom.rs | 921 ---------- src/servers/src/prom_store.rs | 757 ++++++++ src/servers/src/prometheus.rs | 1580 +++++++++-------- src/servers/src/query_handler.rs | 12 +- src/servers/tests/http/mod.rs | 2 +- ...{prometheus_test.rs => prom_store_test.rs} | 22 +- tests-integration/src/lib.rs | 2 +- .../src/{prometheus.rs => prom_store.rs} | 34 +- tests-integration/src/test_util.rs | 4 +- tests-integration/tests/grpc.rs | 21 +- tests-integration/tests/http.rs | 22 +- 31 files changed, 1803 insertions(+), 1782 deletions(-) rename src/frontend/src/instance/{prometheus.rs => prom_store.rs} (86%) rename src/frontend/src/service_config/{prom.rs => prom_store.rs} (70%) rename src/servers/src/http/{prometheus.rs => prom_store.rs} (88%) delete mode 100644 src/servers/src/prom.rs create mode 100644 src/servers/src/prom_store.rs rename src/servers/tests/http/{prometheus_test.rs => prom_store_test.rs} (92%) rename tests-integration/src/{prometheus.rs => prom_store.rs} (84%) diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 830e2509c4..62fe78edef 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -47,12 +47,12 @@ runtime_size = 2 [influxdb_options] enable = true -# Prometheus protocol options, see `standalone.example.toml`. -[prometheus_options] +# Prometheus remote storage options, see `standalone.example.toml`. +[prom_store_options] enable = true # Prometheus protocol options, see `standalone.example.toml`. -[prom_options] +[prometheus_options] addr = "127.0.0.1:4004" # Metasrv client options, see `datanode.example.toml`. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 25223bf555..5e846908a7 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -69,13 +69,13 @@ runtime_size = 2 # Whether to enable InfluxDB protocol in HTTP API, true by default. enable = true -# Prometheus protocol options. -[prometheus_options] +# Prometheus remote storage options +[prom_store_options] # Whether to enable Prometheus remote write and read in HTTP API, true by default. enable = true -# Prom protocol options. -[prom_options] +# Prometheus protocol options +[prometheus_options] # Prometheus API server address, "127.0.0.1:4004" by default. addr = "127.0.0.1:4004" diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index 406f96df77..96c7f6a517 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -15,7 +15,7 @@ pub mod error; pub mod helper; -pub mod prometheus { +pub mod prom_store { pub mod remote { pub use greptime_proto::prometheus::remote::*; } diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index e8d5e2abbe..4d25c17247 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -19,7 +19,7 @@ use common_base::Plugins; use common_telemetry::logging; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; -use frontend::service_config::{InfluxdbOptions, PromOptions}; +use frontend::service_config::{InfluxdbOptions, PrometheusOptions}; use meta_client::MetaClientOptions; use servers::auth::UserProviderRef; use servers::tls::{TlsMode, TlsOption}; @@ -172,7 +172,7 @@ impl StartCommand { } if let Some(addr) = &self.prom_addr { - opts.prom_options = Some(PromOptions { addr: addr.clone() }); + opts.prometheus_options = Some(PrometheusOptions { addr: addr.clone() }); } if let Some(addr) = &self.postgres_addr { @@ -274,7 +274,10 @@ mod tests { opts.opentsdb_options.as_ref().unwrap().addr, "127.0.0.1:4321" ); - assert_eq!(opts.prom_options.as_ref().unwrap().addr, "127.0.0.1:4444"); + assert_eq!( + opts.prometheus_options.as_ref().unwrap().addr, + "127.0.0.1:4444" + ); let default_opts = FrontendOptions::default(); assert_eq!( diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index e13470d5bf..f607a3b796 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -23,7 +23,7 @@ use datanode::instance::InstanceRef; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::service_config::{ - GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions, + GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, PrometheusOptions, }; use serde::{Deserialize, Serialize}; @@ -89,8 +89,8 @@ pub struct StandaloneOptions { pub postgres_options: Option, pub opentsdb_options: Option, pub influxdb_options: Option, + pub prom_store_options: Option, pub prometheus_options: Option, - pub prom_options: Option, pub wal: WalConfig, pub storage: StorageConfig, pub procedure: ProcedureConfig, @@ -108,8 +108,8 @@ impl Default for StandaloneOptions { postgres_options: Some(PostgresOptions::default()), opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), + prom_store_options: Some(PromStoreOptions::default()), prometheus_options: Some(PrometheusOptions::default()), - prom_options: Some(PromOptions::default()), wal: WalConfig::default(), storage: StorageConfig::default(), procedure: ProcedureConfig::default(), @@ -128,8 +128,8 @@ impl StandaloneOptions { postgres_options: self.postgres_options, opentsdb_options: self.opentsdb_options, influxdb_options: self.influxdb_options, + prom_store_options: self.prom_store_options, prometheus_options: self.prometheus_options, - prom_options: self.prom_options, meta_client_options: None, logging: self.logging, ..Default::default() @@ -269,7 +269,7 @@ impl StartCommand { } if let Some(addr) = &self.prom_addr { - opts.prom_options = Some(PromOptions { addr: addr.clone() }) + opts.prometheus_options = Some(PrometheusOptions { addr: addr.clone() }) } if let Some(addr) = &self.postgres_addr { diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index c942d6fcd4..a8d579b74a 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -394,7 +394,7 @@ pub enum Error { "Failed to create logical plan for prometheus query, source: {}", source ))] - PrometheusRemoteQueryPlan { + PromStoreRemoteQueryPlan { #[snafu(backtrace)] source: servers::error::Error, }, @@ -611,7 +611,7 @@ impl ErrorExt for Error { Error::HandleHeartbeatResponse { source, .. } => source.status_code(), Error::RuntimeResource { source, .. } => source.status_code(), - Error::PrometheusRemoteQueryPlan { source, .. } + Error::PromStoreRemoteQueryPlan { source, .. } | Error::ExecutePromql { source, .. } => source.status_code(), Error::SqlExecIntercepted { source, .. } => source.status_code(), diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index a91e033ee8..c333d83f93 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -19,7 +19,7 @@ use servers::http::HttpOptions; use servers::Mode; use crate::service_config::{ - GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions, + GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, PrometheusOptions, }; @@ -35,8 +35,8 @@ pub struct FrontendOptions { pub postgres_options: Option, pub opentsdb_options: Option, pub influxdb_options: Option, + pub prom_store_options: Option, pub prometheus_options: Option, - pub prom_options: Option, pub meta_client_options: Option, pub logging: LoggingOptions, } @@ -53,8 +53,8 @@ impl Default for FrontendOptions { postgres_options: Some(PostgresOptions::default()), opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), + prom_store_options: Some(PromStoreOptions::default()), prometheus_options: Some(PrometheusOptions::default()), - prom_options: Some(PromOptions::default()), meta_client_options: None, logging: LoggingOptions::default(), } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 7a67445832..07b6d414f9 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -16,7 +16,7 @@ pub mod distributed; mod grpc; mod influxdb; mod opentsdb; -mod prometheus; +mod prom_store; mod script; mod standalone; @@ -60,11 +60,11 @@ use servers::error::{ExecuteQuerySnafu, ParsePromQLSnafu}; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, }; -use servers::prom::PromHandler; +use servers::prometheus::PrometheusHandler; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ - InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, + InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler, }; use session::context::QueryContextRef; use snafu::prelude::*; @@ -95,9 +95,9 @@ pub trait FrontendInstance: + SqlQueryHandler + OpentsdbProtocolHandler + InfluxdbLineProtocolHandler - + PrometheusProtocolHandler + + PromStoreProtocolHandler + ScriptHandler - + PromHandler + + PrometheusHandler + Send + Sync + 'static @@ -524,7 +524,7 @@ impl SqlQueryHandler for Instance { query: &PromQuery, query_ctx: QueryContextRef, ) -> Vec> { - let result = PromHandler::do_query(self, query, query_ctx) + let result = PrometheusHandler::do_query(self, query, query_ctx) .await .with_context(|_| ExecutePromqlSnafu { query: format!("{query:?}"), @@ -566,7 +566,7 @@ impl SqlQueryHandler for Instance { } #[async_trait] -impl PromHandler for Instance { +impl PrometheusHandler for Instance { async fn do_query( &self, query: &PromQuery, diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prom_store.rs similarity index 86% rename from src/frontend/src/instance/prometheus.rs rename to src/frontend/src/instance/prom_store.rs index fe30ec3c93..149a8bdf80 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::prometheus::remote::read_request::ResponseType; -use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; +use api::prom_store::remote::read_request::ResponseType; +use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; use async_trait::async_trait; use common_catalog::format_full_table_name; use common_error::prelude::BoxedError; @@ -23,17 +23,17 @@ use common_telemetry::logging; use metrics::counter; use prost::Message; use servers::error::{self, Result as ServerResult}; -use servers::prometheus::{self, Metrics}; -use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; +use servers::prom_store::{self, Metrics}; +use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use crate::error::{ - CatalogSnafu, ExecLogicalPlanSnafu, PrometheusRemoteQueryPlanSnafu, ReadTableSnafu, Result, + CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result, TableNotFoundSnafu, }; use crate::instance::Instance; -use crate::metrics::PROMETHEUS_REMOTE_WRITE_SAMPLES; +use crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; @@ -72,7 +72,7 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult ServerResult<()> { - let (requests, samples) = prometheus::to_grpc_insert_requests(request)?; + let (requests, samples) = prom_store::to_grpc_insert_requests(request)?; let _ = self .handle_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu)?; - counter!(PROMETHEUS_REMOTE_WRITE_SAMPLES, samples as u64); + counter!(PROM_STORE_REMOTE_WRITE_SAMPLES, samples as u64); Ok(()) } @@ -161,7 +161,7 @@ impl PrometheusProtocolHandler for Instance { &self, request: ReadRequest, ctx: QueryContextRef, - ) -> ServerResult { + ) -> ServerResult { let response_type = negotiate_response_type(&request.accepted_response_types)?; // TODO(dennis): use read_hints to speedup query if possible @@ -179,10 +179,10 @@ impl PrometheusProtocolHandler for Instance { }; // TODO(dennis): may consume too much memory, adds flow control - Ok(PrometheusResponse { + Ok(PromStoreResponse { content_type: "application/x-protobuf".to_string(), content_encoding: "snappy".to_string(), - body: prometheus::snappy_compress(&response.encode_to_vec())?, + body: prom_store::snappy_compress(&response.encode_to_vec())?, }) } ResponseType::StreamedXorChunks => error::NotSupportedSnafu { diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 1cd3d290ca..2ba922fa24 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -23,4 +23,4 @@ pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table"; pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows"; /// The samples count of Prometheus remote write. -pub const PROMETHEUS_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples"; +pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples"; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index c5f7677fde..f30e20c149 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -28,7 +28,7 @@ use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; -use servers::prom::PromServer; +use servers::prometheus::PrometheusServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -38,7 +38,7 @@ use crate::error::Error::StartServer; use crate::error::{self, Result}; use crate::frontend::FrontendOptions; use crate::instance::FrontendInstance; -use crate::service_config::{InfluxdbOptions, PrometheusOptions}; +use crate::service_config::{InfluxdbOptions, PromStoreOptions}; pub(crate) struct Services; @@ -172,8 +172,8 @@ impl Services { } if matches!( - opts.prometheus_options, - Some(PrometheusOptions { enable: true }) + opts.prom_store_options, + Some(PromStoreOptions { enable: true }) ) { let _ = http_server_builder.with_prom_handler(instance.clone()); } @@ -187,10 +187,10 @@ impl Services { result.push((Box::new(http_server), http_addr)); } - if let Some(prom_options) = &opts.prom_options { - let prom_addr = parse_addr(&prom_options.addr)?; + if let Some(prometheus_options) = &opts.prometheus_options { + let prom_addr = parse_addr(&prometheus_options.addr)?; - let mut prom_server = PromServer::create_server(instance); + let mut prom_server = PrometheusServer::create_server(instance); if let Some(user_provider) = user_provider { prom_server.set_user_provider(user_provider); } diff --git a/src/frontend/src/service_config.rs b/src/frontend/src/service_config.rs index b4435207bd..1c4a6f840f 100644 --- a/src/frontend/src/service_config.rs +++ b/src/frontend/src/service_config.rs @@ -17,7 +17,7 @@ pub mod influxdb; pub mod mysql; pub mod opentsdb; pub mod postgres; -pub mod prom; +pub mod prom_store; pub mod prometheus; pub use grpc::GrpcOptions; @@ -25,5 +25,5 @@ pub use influxdb::InfluxdbOptions; pub use mysql::MysqlOptions; pub use opentsdb::OpentsdbOptions; pub use postgres::PostgresOptions; -pub use prom::PromOptions; +pub use prom_store::PromStoreOptions; pub use prometheus::PrometheusOptions; diff --git a/src/frontend/src/service_config/prom.rs b/src/frontend/src/service_config/prom_store.rs similarity index 70% rename from src/frontend/src/service_config/prom.rs rename to src/frontend/src/service_config/prom_store.rs index d617d95f39..dfc36fe72f 100644 --- a/src/frontend/src/service_config/prom.rs +++ b/src/frontend/src/service_config/prom_store.rs @@ -15,25 +15,23 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct PromOptions { - pub addr: String, +pub struct PromStoreOptions { + pub enable: bool, } -impl Default for PromOptions { +impl Default for PromStoreOptions { fn default() -> Self { - Self { - addr: "127.0.0.1:4004".to_string(), - } + Self { enable: true } } } #[cfg(test)] mod tests { - use super::PromOptions; + use super::PromStoreOptions; #[test] - fn test_prometheus_options() { - let default = PromOptions::default(); - assert_eq!(default.addr, "127.0.0.1:4004".to_string()); + fn test_prom_store_options() { + let default = PromStoreOptions::default(); + assert!(default.enable); } } diff --git a/src/frontend/src/service_config/prometheus.rs b/src/frontend/src/service_config/prometheus.rs index 6dac05419a..7ae477abf2 100644 --- a/src/frontend/src/service_config/prometheus.rs +++ b/src/frontend/src/service_config/prometheus.rs @@ -16,12 +16,14 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PrometheusOptions { - pub enable: bool, + pub addr: String, } impl Default for PrometheusOptions { fn default() -> Self { - Self { enable: true } + Self { + addr: "127.0.0.1:4004".to_string(), + } } } @@ -32,6 +34,6 @@ mod tests { #[test] fn test_prometheus_options() { let default = PrometheusOptions::default(); - assert!(default.enable); + assert_eq!(default.addr, "127.0.0.1:4004".to_string()); } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index f9d6e64902..82a3af083a 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -46,7 +46,7 @@ use crate::error::{ use crate::grpc::database::DatabaseService; use crate::grpc::flight::FlightHandler; use crate::grpc::handler::GreptimeRequestHandler; -use crate::prom::PromHandlerRef; +use crate::prometheus::PrometheusHandlerRef; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::server::Server; @@ -56,7 +56,7 @@ pub struct GrpcServer { shutdown_tx: Mutex>>, request_handler: Arc, /// Handler for Prometheus-compatible PromQL queries. Only present for frontend server. - promql_handler: Option, + prometheus_handler: Option, /// gRPC serving state receiver. Only present if the gRPC server is started. /// Used to wait for the server to stop, performing the old blocking fashion. @@ -66,7 +66,7 @@ pub struct GrpcServer { impl GrpcServer { pub fn new( query_handler: ServerGrpcQueryHandlerRef, - promql_handler: Option, + prometheus_handler: Option, user_provider: Option, runtime: Arc, ) -> Self { @@ -78,7 +78,7 @@ impl GrpcServer { Self { shutdown_tx: Mutex::new(None), request_handler, - promql_handler, + prometheus_handler, serve_state: Mutex::new(None), } } @@ -97,7 +97,7 @@ impl GrpcServer { pub fn create_prom_query_gateway_service( &self, - handler: PromHandlerRef, + handler: PrometheusHandlerRef, ) -> PrometheusGatewayServer { PrometheusGatewayServer::new(PrometheusGatewayService::new(handler)) } @@ -178,9 +178,9 @@ impl Server for GrpcServer { .add_service(self.create_flight_service()) .add_service(self.create_database_service()) .add_service(self.create_healthcheck_service()); - if let Some(promql_handler) = &self.promql_handler { - builder = - builder.add_service(self.create_prom_query_gateway_service(promql_handler.clone())) + if let Some(prometheus_handler) = &self.prometheus_handler { + builder = builder + .add_service(self.create_prom_query_gateway_service(prometheus_handler.clone())) } let builder = builder.add_service(reflection_service); diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index 14b20edb22..3649deae2d 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -33,10 +33,12 @@ use tonic::{Request, Response}; use crate::error::InvalidQuerySnafu; use crate::grpc::handler::create_query_context; use crate::grpc::TonicResult; -use crate::prom::{retrieve_metric_name_and_result_type, PromHandlerRef, PromJsonResponse}; +use crate::prometheus::{ + retrieve_metric_name_and_result_type, PrometheusHandlerRef, PrometheusJsonResponse, +}; pub struct PrometheusGatewayService { - handler: PromHandlerRef, + handler: PrometheusHandlerRef, } #[async_trait] @@ -86,7 +88,7 @@ impl PrometheusGateway for PrometheusGatewayService { } impl PrometheusGatewayService { - pub fn new(handler: PromHandlerRef) -> Self { + pub fn new(handler: PrometheusHandlerRef) -> Self { Self { handler } } @@ -95,7 +97,7 @@ impl PrometheusGatewayService { query: PromQuery, ctx: Arc, is_range_query: bool, - ) -> PromJsonResponse { + ) -> PrometheusJsonResponse { let _timer = timer!( crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER, &[(crate::metrics::METRIC_DB_LABEL, ctx.get_db_string())] @@ -106,8 +108,11 @@ impl PrometheusGatewayService { match retrieve_metric_name_and_result_type(&query.query) { Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type), Err(err) => { - return PromJsonResponse::error(err.status_code().to_string(), err.to_string()) - .0 + return PrometheusJsonResponse::error( + err.status_code().to_string(), + err.to_string(), + ) + .0 } }; // range query only returns matrix @@ -115,7 +120,7 @@ impl PrometheusGatewayService { result_type = ValueType::Matrix; }; - PromJsonResponse::from_query_result(result, metric_name, result_type) + PrometheusJsonResponse::from_query_result(result, metric_name, result_type) .await .0 } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 19cf2bd066..db4b233716 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -19,7 +19,7 @@ pub mod influxdb; pub mod mem_prof; pub mod opentsdb; mod pprof; -pub mod prometheus; +pub mod prom_store; pub mod script; #[cfg(feature = "dashboard")] @@ -73,7 +73,7 @@ use crate::metrics_handler::MetricsHandler; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ - InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, + InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef, ScriptHandlerRef, }; use crate::server::Server; @@ -118,7 +118,7 @@ pub struct HttpServer { options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, - prom_handler: Option, + prom_handler: Option, script_handler: Option, shutdown_tx: Mutex>>, user_provider: Option, @@ -434,7 +434,7 @@ impl HttpServerBuilder { self } - pub fn with_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) -> &mut Self { + pub fn with_prom_handler(&mut self, handler: PromStoreProtocolHandlerRef) -> &mut Self { let _ = self.inner.prom_handler.get_or_insert(handler); self } @@ -625,10 +625,10 @@ impl HttpServer { .with_state(api_state) } - fn route_prom(&self, prom_handler: PrometheusProtocolHandlerRef) -> Router { + fn route_prom(&self, prom_handler: PromStoreProtocolHandlerRef) -> Router { Router::new() - .route("/write", routing::post(prometheus::remote_write)) - .route("/read", routing::post(prometheus::remote_read)) + .route("/write", routing::post(prom_store::remote_write)) + .route("/read", routing::post(prom_store::remote_read)) .with_state(prom_handler) } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prom_store.rs similarity index 88% rename from src/servers/src/http/prometheus.rs rename to src/servers/src/http/prom_store.rs index d5b7a75029..c050124b10 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prom_store.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::prometheus::remote::{ReadRequest, WriteRequest}; +use api::prom_store::remote::{ReadRequest, WriteRequest}; use axum::extract::{Query, RawBody, State}; use axum::http::{header, StatusCode}; use axum::response::IntoResponse; @@ -29,8 +29,8 @@ use snafu::prelude::*; use crate::error::{self, Result}; use crate::parse_catalog_and_schema_from_client_database_name; -use crate::prometheus::snappy_decompress; -use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse}; +use crate::prom_store::snappy_decompress; +use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse}; #[derive(Debug, Serialize, Deserialize, JsonSchema)] pub struct DatabaseQuery { @@ -47,14 +47,14 @@ impl Default for DatabaseQuery { #[axum_macros::debug_handler] pub async fn remote_write( - State(handler): State, + State(handler): State, Query(params): Query, RawBody(body): RawBody, ) -> Result<(StatusCode, ())> { let request = decode_remote_write_request(body).await?; let _timer = timer!( - crate::metrics::METRIC_HTTP_PROMETHEUS_WRITE_ELAPSED, + crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED, &[( crate::metrics::METRIC_DB_LABEL, params.db.clone().unwrap_or_default() @@ -72,7 +72,7 @@ pub async fn remote_write( Ok((StatusCode::NO_CONTENT, ())) } -impl IntoResponse for PrometheusResponse { +impl IntoResponse for PromStoreResponse { fn into_response(self) -> axum::response::Response { ( [ @@ -87,14 +87,14 @@ impl IntoResponse for PrometheusResponse { #[axum_macros::debug_handler] pub async fn remote_read( - State(handler): State, + State(handler): State, Query(params): Query, RawBody(body): RawBody, -) -> Result { +) -> Result { let request = decode_remote_read_request(body).await?; let _timer = timer!( - crate::metrics::METRIC_HTTP_PROMETHEUS_READ_ELAPSED, + crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED, &[( crate::metrics::METRIC_DB_LABEL, params.db.clone().unwrap_or_default() diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 3546a70e55..661b8f73a0 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -33,7 +33,7 @@ pub mod metrics_handler; pub mod mysql; pub mod opentsdb; pub mod postgres; -pub mod prom; +pub mod prom_store; pub mod prometheus; pub mod query_handler; pub mod server; diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 848871bf34..6795cde38f 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -40,9 +40,9 @@ pub(crate) const METRIC_HTTP_SQL_ELAPSED: &str = "servers.http_sql_elapsed"; pub(crate) const METRIC_HTTP_PROMQL_ELAPSED: &str = "servers.http_promql_elapsed"; pub(crate) const METRIC_AUTH_FAILURE: &str = "servers.auth_failure_count"; pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influxdb_write_elapsed"; -pub(crate) const METRIC_HTTP_PROMETHEUS_WRITE_ELAPSED: &str = +pub(crate) const METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: &str = "servers.http_prometheus_write_elapsed"; -pub(crate) const METRIC_HTTP_PROMETHEUS_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed"; +pub(crate) const METRIC_HTTP_PROM_STORE_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed"; pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str = "servers.opentsdb_line_write_elapsed"; diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs deleted file mode 100644 index 4b5b4da5df..0000000000 --- a/src/servers/src/prom.rs +++ /dev/null @@ -1,921 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! prom supply the prometheus HTTP API Server compliance -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::net::SocketAddr; -use std::sync::Arc; - -use async_trait::async_trait; -use axum::body::BoxBody; -use axum::extract::{Path, Query, State}; -use axum::{middleware, routing, Form, Json, Router}; -use common_catalog::consts::DEFAULT_SCHEMA_NAME; -use common_error::prelude::ErrorExt; -use common_error::status_code::StatusCode; -use common_query::Output; -use common_recordbatch::RecordBatches; -use common_telemetry::info; -use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; -use datatypes::prelude::ConcreteDataType; -use datatypes::scalars::ScalarVector; -use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; -use futures::FutureExt; -use promql_parser::label::METRIC_NAME; -use promql_parser::parser::{ - AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, - UnaryExpr, ValueType, VectorSelector, -}; -use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; -use schemars::JsonSchema; -use serde::de::{self, MapAccess, Visitor}; -use serde::{Deserialize, Serialize}; -use session::context::{QueryContext, QueryContextRef}; -use snafu::{ensure, Location, OptionExt, ResultExt}; -use tokio::sync::oneshot::Sender; -use tokio::sync::{oneshot, Mutex}; -use tower::ServiceBuilder; -use tower_http::auth::AsyncRequireAuthorizationLayer; -use tower_http::compression::CompressionLayer; -use tower_http::trace::TraceLayer; - -use crate::auth::UserProviderRef; -use crate::error::{ - AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result, - StartHttpSnafu, UnexpectedResultSnafu, -}; -use crate::http::authorize::HttpAuth; -use crate::http::track_metrics; -use crate::prometheus::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; -use crate::server::Server; - -pub const PROM_API_VERSION: &str = "v1"; - -pub type PromHandlerRef = Arc; - -#[async_trait] -pub trait PromHandler { - async fn do_query(&self, query: &PromQuery, query_ctx: QueryContextRef) -> Result; -} - -/// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API -pub struct PromServer { - query_handler: PromHandlerRef, - shutdown_tx: Mutex>>, - user_provider: Option, -} - -impl PromServer { - pub fn create_server(query_handler: PromHandlerRef) -> Box { - Box::new(PromServer { - query_handler, - shutdown_tx: Mutex::new(None), - user_provider: None, - }) - } - - pub fn set_user_provider(&mut self, user_provider: UserProviderRef) { - debug_assert!(self.user_provider.is_none()); - self.user_provider = Some(user_provider); - } - - pub fn make_app(&self) -> Router { - // TODO(ruihang): implement format_query, series, values, query_examplars and targets methods - - let router = Router::new() - .route("/query", routing::post(instant_query).get(instant_query)) - .route("/query_range", routing::post(range_query).get(range_query)) - .route("/labels", routing::post(labels_query).get(labels_query)) - .route("/series", routing::post(series_query).get(series_query)) - .route( - "/label/:label_name/values", - routing::get(label_values_query), - ) - .with_state(self.query_handler.clone()); - - Router::new() - .nest(&format!("/api/{PROM_API_VERSION}"), router) - // middlewares - .layer( - ServiceBuilder::new() - .layer(TraceLayer::new_for_http()) - .layer(CompressionLayer::new()) - // custom layer - .layer(AsyncRequireAuthorizationLayer::new( - HttpAuth::::new(self.user_provider.clone()), - )), - ) - // We need to register the metrics layer again since start a new http server - // for the PromServer. - .route_layer(middleware::from_fn(track_metrics)) - } -} - -pub const PROM_SERVER: &str = "PROM_SERVER"; - -#[async_trait] -impl Server for PromServer { - async fn shutdown(&self) -> Result<()> { - let mut shutdown_tx = self.shutdown_tx.lock().await; - if let Some(tx) = shutdown_tx.take() { - if tx.send(()).is_err() { - info!("Receiver dropped, the Prometheus API server has already existed"); - } - } - info!("Shutdown Prometheus API server"); - - Ok(()) - } - - async fn start(&self, listening: SocketAddr) -> Result { - let (tx, rx) = oneshot::channel(); - let server = { - let mut shutdown_tx = self.shutdown_tx.lock().await; - ensure!( - shutdown_tx.is_none(), - AlreadyStartedSnafu { - server: "Prometheus" - } - ); - - let app = self.make_app(); - let server = axum::Server::bind(&listening).serve(app.into_make_service()); - - *shutdown_tx = Some(tx); - - server - }; - let listening = server.local_addr(); - info!("Prometheus API server is bound to {}", listening); - - let graceful = server.with_graceful_shutdown(rx.map(drop)); - graceful.await.context(StartHttpSnafu)?; - - Ok(listening) - } - - fn name(&self) -> &str { - PROM_SERVER - } -} - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] -pub struct PromSeries { - pub metric: HashMap, - /// For [ValueType::Matrix] result type - pub values: Vec<(f64, String)>, - /// For [ValueType::Vector] result type - #[serde(skip_serializing_if = "Option::is_none")] - pub value: Option<(f64, String)>, -} - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] -pub struct PromData { - #[serde(rename = "resultType")] - pub result_type: String, - pub result: Vec, -} - -#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)] -#[serde(untagged)] -pub enum PromResponse { - PromData(PromData), - Labels(Vec), - Series(Vec>), - LabelValues(Vec), -} - -impl Default for PromResponse { - fn default() -> Self { - PromResponse::PromData(Default::default()) - } -} - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] -pub struct PromJsonResponse { - pub status: String, - pub data: PromResponse, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(rename = "errorType")] - pub error_type: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub warnings: Option>, -} - -impl PromJsonResponse { - pub fn error(error_type: S1, reason: S2) -> Json - where - S1: Into, - S2: Into, - { - Json(PromJsonResponse { - status: "error".to_string(), - data: PromResponse::default(), - error: Some(reason.into()), - error_type: Some(error_type.into()), - warnings: None, - }) - } - - pub fn success(data: PromResponse) -> Json { - Json(PromJsonResponse { - status: "success".to_string(), - data, - error: None, - error_type: None, - warnings: None, - }) - } - - /// Convert from `Result` - pub async fn from_query_result( - result: Result, - metric_name: String, - result_type: ValueType, - ) -> Json { - let response: Result> = try { - let json = match result? { - Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data( - batches, - metric_name, - result_type, - )?), - Output::Stream(stream) => { - let record_batches = RecordBatches::try_collect(stream) - .await - .context(CollectRecordbatchSnafu)?; - Self::success(Self::record_batches_to_data( - record_batches, - metric_name, - result_type, - )?) - } - Output::AffectedRows(_) => { - Self::error("Unexpected", "expected data result, but got affected rows") - } - }; - - json - }; - - let result_type_string = result_type.to_string(); - - match response { - Ok(resp) => resp, - Err(err) => { - // Prometheus won't report error if querying nonexist label and metric - if err.status_code() == StatusCode::TableNotFound - || err.status_code() == StatusCode::TableColumnNotFound - { - Self::success(PromResponse::PromData(PromData { - result_type: result_type_string, - ..Default::default() - })) - } else { - Self::error(err.status_code().to_string(), err.to_string()) - } - } - } - } - - /// Convert [RecordBatches] to [PromData] - fn record_batches_to_data( - batches: RecordBatches, - metric_name: String, - result_type: ValueType, - ) -> Result { - // infer semantic type of each column from schema. - // TODO(ruihang): wish there is a better way to do this. - let mut timestamp_column_index = None; - let mut tag_column_indices = Vec::new(); - let mut first_field_column_index = None; - - for (i, column) in batches.schema().column_schemas().iter().enumerate() { - match column.data_type { - ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => { - if timestamp_column_index.is_none() { - timestamp_column_index = Some(i); - } - } - ConcreteDataType::Float64(_) => { - if first_field_column_index.is_none() { - first_field_column_index = Some(i); - } - } - ConcreteDataType::String(_) => { - tag_column_indices.push(i); - } - _ => {} - } - } - - let timestamp_column_index = timestamp_column_index.context(InternalSnafu { - err_msg: "no timestamp column found".to_string(), - })?; - let first_field_column_index = first_field_column_index.context(InternalSnafu { - err_msg: "no value column found".to_string(), - })?; - - let metric_name = (METRIC_NAME.to_string(), metric_name); - let mut buffer = BTreeMap::, Vec<(f64, String)>>::new(); - - for batch in batches.iter() { - // prepare things... - let tag_columns = tag_column_indices - .iter() - .map(|i| { - batch - .column(*i) - .as_any() - .downcast_ref::() - .unwrap() - }) - .collect::>(); - let tag_names = tag_column_indices - .iter() - .map(|c| batches.schema().column_name_by_index(*c).to_string()) - .collect::>(); - let timestamp_column = batch - .column(timestamp_column_index) - .as_any() - .downcast_ref::() - .unwrap(); - let field_column = batch - .column(first_field_column_index) - .as_any() - .downcast_ref::() - .unwrap(); - - // assemble rows - for row_index in 0..batch.num_rows() { - // retrieve tags - // TODO(ruihang): push table name `__metric__` - let mut tags = vec![metric_name.clone()]; - for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { - // TODO(ruihang): add test for NULL tag - if let Some(tag_value) = tag_column.get_data(row_index) { - tags.push((tag_name.to_string(), tag_value.to_string())); - } - } - - // retrieve timestamp - let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into(); - let timestamp = timestamp_millis as f64 / 1000.0; - - // retrieve value - if let Some(v) = field_column.get_data(row_index) { - buffer - .entry(tags) - .or_default() - .push((timestamp, Into::::into(v).to_string())); - }; - } - } - - let result = buffer - .into_iter() - .map(|(tags, mut values)| { - let metric = tags.into_iter().collect(); - match result_type { - ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries { - metric, - value: values.pop(), - ..Default::default() - }), - ValueType::Matrix => Ok(PromSeries { - metric, - values, - ..Default::default() - }), - } - }) - .collect::>>()?; - - let result_type_string = result_type.to_string(); - let data = PromResponse::PromData(PromData { - result_type: result_type_string, - result, - }); - - Ok(data) - } -} - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct InstantQuery { - query: Option, - time: Option, - timeout: Option, - db: Option, -} - -#[axum_macros::debug_handler] -pub async fn instant_query( - State(handler): State, - Query(params): Query, - Form(form_params): Form, -) -> Json { - // Extract time from query string, or use current server time if not specified. - let time = params - .time - .or(form_params.time) - .unwrap_or_else(current_time_rfc3339); - let prom_query = PromQuery { - query: params.query.or(form_params.query).unwrap_or_default(), - start: time.clone(), - end: time, - step: "1s".to_string(), - }; - - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); - - let query_ctx = QueryContext::with(catalog, schema); - - let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; - let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) { - Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type), - Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()), - }; - PromJsonResponse::from_query_result(result, metric_name, result_type).await -} - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct RangeQuery { - query: Option, - start: Option, - end: Option, - step: Option, - timeout: Option, - db: Option, -} - -#[axum_macros::debug_handler] -pub async fn range_query( - State(handler): State, - Query(params): Query, - Form(form_params): Form, -) -> Json { - let prom_query = PromQuery { - query: params.query.or(form_params.query).unwrap_or_default(), - start: params.start.or(form_params.start).unwrap_or_default(), - end: params.end.or(form_params.end).unwrap_or_default(), - step: params.step.or(form_params.step).unwrap_or_default(), - }; - - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); - - let query_ctx = QueryContext::with(catalog, schema); - - let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; - let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) { - Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()), - Ok((metric_name, _)) => metric_name.unwrap_or_default(), - }; - PromJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await -} - -#[derive(Debug, Default, Serialize, JsonSchema)] -struct Matches(Vec); - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct LabelsQuery { - start: Option, - end: Option, - #[serde(flatten)] - matches: Matches, - db: Option, -} - -// Custom Deserialize method to support parsing repeated match[] -impl<'de> Deserialize<'de> for Matches { - fn deserialize(deserializer: D) -> std::result::Result - where - D: de::Deserializer<'de>, - { - struct MatchesVisitor; - - impl<'d> Visitor<'d> for MatchesVisitor { - type Value = Vec; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a string") - } - - fn visit_map(self, mut access: M) -> std::result::Result - where - M: MapAccess<'d>, - { - let mut matches = Vec::new(); - while let Some((key, value)) = access.next_entry::()? { - if key == "match[]" { - matches.push(value); - } - } - Ok(matches) - } - } - Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?)) - } -} - -#[axum_macros::debug_handler] -pub async fn labels_query( - State(handler): State, - Query(params): Query, - Form(form_params): Form, -) -> Json { - let mut queries = params.matches.0; - if queries.is_empty() { - queries = form_params.matches.0; - } - if queries.is_empty() { - return PromJsonResponse::error("Unsupported", "match[] parameter is required"); - } - - let start = params - .start - .or(form_params.start) - .unwrap_or_else(yesterday_rfc3339); - let end = params - .end - .or(form_params.end) - .unwrap_or_else(current_time_rfc3339); - - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); - let query_ctx = Arc::new(QueryContext::with(catalog, schema)); - - let mut labels = HashSet::new(); - let _ = labels.insert(METRIC_NAME.to_string()); - - for query in queries { - let prom_query = PromQuery { - query, - start: start.clone(), - end: end.clone(), - step: DEFAULT_LOOKBACK_STRING.to_string(), - }; - - let result = handler.do_query(&prom_query, query_ctx.clone()).await; - - let response = retrieve_labels_name_from_query_result(result, &mut labels).await; - - if let Err(err) = response { - // Prometheus won't report error if querying nonexist label and metric - if err.status_code() != StatusCode::TableNotFound - && err.status_code() != StatusCode::TableColumnNotFound - { - return PromJsonResponse::error(err.status_code().to_string(), err.to_string()); - } - } - } - - let _ = labels.remove(TIMESTAMP_COLUMN_NAME); - let _ = labels.remove(FIELD_COLUMN_NAME); - - let mut sorted_labels: Vec = labels.into_iter().collect(); - sorted_labels.sort(); - PromJsonResponse::success(PromResponse::Labels(sorted_labels)) -} - -async fn retrieve_series_from_query_result( - result: Result, - series: &mut Vec>, - table_name: &str, -) -> Result<()> { - match result? { - Output::RecordBatches(batches) => { - record_batches_to_series(batches, series, table_name)?; - Ok(()) - } - Output::Stream(stream) => { - let batches = RecordBatches::try_collect(stream) - .await - .context(CollectRecordbatchSnafu)?; - record_batches_to_series(batches, series, table_name)?; - Ok(()) - } - Output::AffectedRows(_) => Err(Error::UnexpectedResult { - reason: "expected data result, but got affected rows".to_string(), - location: Location::default(), - }), - } -} - -/// Retrieve labels name from query result -async fn retrieve_labels_name_from_query_result( - result: Result, - labels: &mut HashSet, -) -> Result<()> { - match result? { - Output::RecordBatches(batches) => { - record_batches_to_labels_name(batches, labels)?; - Ok(()) - } - Output::Stream(stream) => { - let batches = RecordBatches::try_collect(stream) - .await - .context(CollectRecordbatchSnafu)?; - record_batches_to_labels_name(batches, labels)?; - Ok(()) - } - Output::AffectedRows(_) => UnexpectedResultSnafu { - reason: "expected data result, but got affected rows".to_string(), - } - .fail(), - } -} - -fn record_batches_to_series( - batches: RecordBatches, - series: &mut Vec>, - table_name: &str, -) -> Result<()> { - for batch in batches.iter() { - for row in batch.rows() { - let mut element: HashMap = row - .iter() - .enumerate() - .map(|(idx, column)| { - let column_name = batch.schema.column_name_by_index(idx); - (column_name.to_string(), column.to_string()) - }) - .collect(); - let _ = element.insert("__name__".to_string(), table_name.to_string()); - series.push(element); - } - } - Ok(()) -} - -/// Retrieve labels name from record batches -fn record_batches_to_labels_name( - batches: RecordBatches, - labels: &mut HashSet, -) -> Result<()> { - let mut column_indices = Vec::new(); - let mut field_column_indices = Vec::new(); - for (i, column) in batches.schema().column_schemas().iter().enumerate() { - if let ConcreteDataType::Float64(_) = column.data_type { - field_column_indices.push(i); - } - column_indices.push(i); - } - - if field_column_indices.is_empty() { - return Err(Error::Internal { - err_msg: "no value column found".to_string(), - }); - } - - for batch in batches.iter() { - let names = column_indices - .iter() - .map(|c| batches.schema().column_name_by_index(*c).to_string()) - .collect::>(); - - let field_columns = field_column_indices - .iter() - .map(|i| { - batch - .column(*i) - .as_any() - .downcast_ref::() - .unwrap() - }) - .collect::>(); - - for row_index in 0..batch.num_rows() { - // if all field columns are null, skip this row - if field_columns - .iter() - .all(|c| c.get_data(row_index).is_none()) - { - continue; - } - - // if a field is not null, record the tag name and return - names.iter().for_each(|name| { - let _ = labels.insert(name.to_string()); - }); - return Ok(()); - } - } - Ok(()) -} - -pub(crate) fn retrieve_metric_name_and_result_type( - promql: &str, -) -> Result<(Option, ValueType)> { - let promql_expr = promql_parser::parser::parse(promql) - .map_err(|reason| InvalidQuerySnafu { reason }.build())?; - let metric_name = promql_expr_to_metric_name(&promql_expr); - let result_type = promql_expr.value_type(); - - Ok((metric_name, result_type)) -} - -fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option { - match expr { - PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr), - PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(expr), - PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => { - promql_expr_to_metric_name(lhs).or(promql_expr_to_metric_name(rhs)) - } - PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(expr), - PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(expr), - PromqlExpr::NumberLiteral(_) => Some(String::new()), - PromqlExpr::StringLiteral(_) => Some(String::new()), - PromqlExpr::Extension(_) => None, - PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => { - matchers.find_matchers(METRIC_NAME).pop().cloned() - } - PromqlExpr::MatrixSelector(MatrixSelector { - vector_selector, .. - }) => { - let VectorSelector { matchers, .. } = vector_selector; - matchers.find_matchers(METRIC_NAME).pop().cloned() - } - PromqlExpr::Call(Call { args, .. }) => { - args.args.iter().find_map(|e| promql_expr_to_metric_name(e)) - } - } -} - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct LabelValueQuery { - start: Option, - end: Option, - #[serde(flatten)] - matches: Matches, - db: Option, -} - -#[axum_macros::debug_handler] -pub async fn label_values_query( - State(handler): State, - Path(label_name): Path, - Query(params): Query, -) -> Json { - let queries = params.matches.0; - if queries.is_empty() { - return PromJsonResponse::error("Invalid argument", "match[] parameter is required"); - } - - let start = params.start.unwrap_or_else(yesterday_rfc3339); - let end = params.end.unwrap_or_else(current_time_rfc3339); - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); - let query_ctx = Arc::new(QueryContext::with(catalog, schema)); - - let mut label_values = HashSet::new(); - - for query in queries { - let prom_query = PromQuery { - query, - start: start.clone(), - end: end.clone(), - step: DEFAULT_LOOKBACK_STRING.to_string(), - }; - let result = handler.do_query(&prom_query, query_ctx.clone()).await; - let result = retrieve_label_values(result, &label_name, &mut label_values).await; - if let Err(err) = result { - // Prometheus won't report error if querying nonexist label and metric - if err.status_code() != StatusCode::TableNotFound - && err.status_code() != StatusCode::TableColumnNotFound - { - return PromJsonResponse::error(err.status_code().to_string(), err.to_string()); - } - } - } - - let mut label_values: Vec<_> = label_values.into_iter().collect(); - label_values.sort(); - PromJsonResponse::success(PromResponse::LabelValues(label_values)) -} - -async fn retrieve_label_values( - result: Result, - label_name: &str, - labels_values: &mut HashSet, -) -> Result<()> { - match result? { - Output::RecordBatches(batches) => { - retrieve_label_values_from_record_batch(batches, label_name, labels_values).await - } - Output::Stream(stream) => { - let batches = RecordBatches::try_collect(stream) - .await - .context(CollectRecordbatchSnafu)?; - retrieve_label_values_from_record_batch(batches, label_name, labels_values).await - } - Output::AffectedRows(_) => UnexpectedResultSnafu { - reason: "expected data result, but got affected rows".to_string(), - } - .fail(), - } -} - -async fn retrieve_label_values_from_record_batch( - batches: RecordBatches, - label_name: &str, - labels_values: &mut HashSet, -) -> Result<()> { - let Some(label_col_idx) = batches.schema().column_index_by_name(label_name) else { - return Ok(()); - }; - - // check whether label_name belongs to tag column - match batches - .schema() - .column_schema_by_name(label_name) - .unwrap() - .data_type - { - ConcreteDataType::String(_) => {} - _ => return Ok(()), - } - for batch in batches.iter() { - let label_column = batch - .column(label_col_idx) - .as_any() - .downcast_ref::() - .unwrap(); - for row_index in 0..batch.num_rows() { - if let Some(label_value) = label_column.get_data(row_index) { - let _ = labels_values.insert(label_value.to_string()); - } - } - } - - Ok(()) -} - -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] -pub struct SeriesQuery { - start: Option, - end: Option, - #[serde(flatten)] - matches: Matches, - db: Option, -} - -#[axum_macros::debug_handler] -pub async fn series_query( - State(handler): State, - Query(params): Query, - Form(form_params): Form, -) -> Json { - let mut queries: Vec = params.matches.0; - if queries.is_empty() { - queries = form_params.matches.0; - } - if queries.is_empty() { - return PromJsonResponse::error("Unsupported", "match[] parameter is required"); - } - let start = params - .start - .or(form_params.start) - .unwrap_or_else(yesterday_rfc3339); - let end = params - .end - .or(form_params.end) - .unwrap_or_else(current_time_rfc3339); - - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); - let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); - let query_ctx = Arc::new(QueryContext::with(catalog, schema)); - - let mut series = Vec::new(); - for query in queries { - let table_name = query.clone(); - let prom_query = PromQuery { - query, - start: start.clone(), - end: end.clone(), - // TODO: find a better value for step - step: DEFAULT_LOOKBACK_STRING.to_string(), - }; - let result = handler.do_query(&prom_query, query_ctx.clone()).await; - if let Err(err) = retrieve_series_from_query_result(result, &mut series, &table_name).await - { - return PromJsonResponse::error(err.status_code().to_string(), err.to_string()); - } - } - PromJsonResponse::success(PromResponse::Series(series)) -} diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs new file mode 100644 index 0000000000..d877eb171e --- /dev/null +++ b/src/servers/src/prom_store.rs @@ -0,0 +1,757 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! prometheus protocol supportings +//! handles prometheus remote_write, remote_read logic +use std::cmp::Ordering; +use std::collections::{BTreeMap, HashMap}; +use std::hash::{Hash, Hasher}; + +use api::prom_store::remote::label_matcher::Type as MatcherType; +use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; +use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests}; +use common_grpc::writer::{LinesWriter, Precision}; +use common_recordbatch::{RecordBatch, RecordBatches}; +use common_time::timestamp::TimeUnit; +use datafusion::prelude::{col, lit, regexp_match, Expr}; +use datatypes::prelude::{ConcreteDataType, Value}; +use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; +use query::dataframe::DataFrame; +use query::plan::LogicalPlan; +use snafu::{ensure, OptionExt, ResultExt}; +use snap::raw::{Decoder, Encoder}; + +use crate::error::{self, Result}; + +pub const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; +pub const FIELD_COLUMN_NAME: &str = "greptime_value"; +pub const METRIC_NAME_LABEL: &str = "__name__"; + +/// Metrics for push gateway protocol +pub struct Metrics { + pub exposition: MetricsExposition, +} + +/// Get table name from remote query +pub fn table_name(q: &Query) -> Result { + let label_matches = &q.matchers; + + label_matches + .iter() + .find_map(|m| { + if m.name == METRIC_NAME_LABEL { + Some(m.value.to_string()) + } else { + None + } + }) + .context(error::InvalidPromRemoteRequestSnafu { + msg: "missing '__name__' label in timeseries", + }) +} + +/// Create a DataFrame from a remote Query +pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { + let DataFrame::DataFusion(dataframe) = dataframe; + + let start_timestamp_ms = q.start_timestamp_ms; + let end_timestamp_ms = q.end_timestamp_ms; + + let label_matches = &q.matchers; + + let mut conditions = Vec::with_capacity(label_matches.len() + 1); + + conditions.push(col(TIMESTAMP_COLUMN_NAME).gt_eq(lit(start_timestamp_ms))); + conditions.push(col(TIMESTAMP_COLUMN_NAME).lt_eq(lit(end_timestamp_ms))); + + for m in label_matches { + let name = &m.name; + + if name == METRIC_NAME_LABEL { + continue; + } + + let value = &m.value; + let m_type = + MatcherType::from_i32(m.r#type).context(error::InvalidPromRemoteRequestSnafu { + msg: format!("invalid LabelMatcher type: {}", m.r#type), + })?; + + match m_type { + MatcherType::Eq => { + conditions.push(col(name).eq(lit(value))); + } + MatcherType::Neq => { + conditions.push(col(name).not_eq(lit(value))); + } + // Case sensitive regexp match + MatcherType::Re => { + conditions.push(regexp_match(vec![col(name), lit(value)]).is_not_null()); + } + // Case sensitive regexp not match + MatcherType::Nre => { + conditions.push(regexp_match(vec![col(name), lit(value)]).is_null()); + } + } + } + + // Safety: conditions MUST not be empty, reduce always return Some(expr). + let conditions = conditions.into_iter().reduce(Expr::and).unwrap(); + + let dataframe = dataframe + .filter(conditions) + .context(error::DataFrameSnafu)?; + + Ok(LogicalPlan::DfPlan(dataframe.into_parts().1)) +} + +#[inline] +fn new_label(name: String, value: String) -> Label { + Label { name, value } +} + +// A timeseries id +#[derive(Debug)] +struct TimeSeriesId { + labels: Vec