chore(prom)!: rename prometheus(remote storage) to prom-store and promql(HTTP server) to prometheus (#1931)

* chore(prom): rename prometheus(remote storage) to prom-store and promql(HTTP server) to prometheus

* chore: apply clippy suggestions

* chore: adjust format according to rustfmt
This commit is contained in:
Eugene Tolbakov
2023-07-12 07:47:09 +01:00
committed by GitHub
parent 4fa8340572
commit 674bfd85c7
31 changed files with 1803 additions and 1782 deletions

View File

@@ -47,12 +47,12 @@ runtime_size = 2
[influxdb_options]
enable = true
# Prometheus protocol options, see `standalone.example.toml`.
[prometheus_options]
# Prometheus remote storage options, see `standalone.example.toml`.
[prom_store_options]
enable = true
# Prometheus protocol options, see `standalone.example.toml`.
[prom_options]
[prometheus_options]
addr = "127.0.0.1:4004"
# Metasrv client options, see `datanode.example.toml`.

View File

@@ -69,13 +69,13 @@ runtime_size = 2
# Whether to enable InfluxDB protocol in HTTP API, true by default.
enable = true
# Prometheus protocol options.
[prometheus_options]
# Prometheus remote storage options
[prom_store_options]
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true
# Prom protocol options.
[prom_options]
# Prometheus protocol options
[prometheus_options]
# Prometheus API server address, "127.0.0.1:4004" by default.
addr = "127.0.0.1:4004"

View File

@@ -15,7 +15,7 @@
pub mod error;
pub mod helper;
pub mod prometheus {
pub mod prom_store {
pub mod remote {
pub use greptime_proto::prometheus::remote::*;
}

View File

@@ -19,7 +19,7 @@ use common_base::Plugins;
use common_telemetry::logging;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::service_config::{InfluxdbOptions, PromOptions};
use frontend::service_config::{InfluxdbOptions, PrometheusOptions};
use meta_client::MetaClientOptions;
use servers::auth::UserProviderRef;
use servers::tls::{TlsMode, TlsOption};
@@ -172,7 +172,7 @@ impl StartCommand {
}
if let Some(addr) = &self.prom_addr {
opts.prom_options = Some(PromOptions { addr: addr.clone() });
opts.prometheus_options = Some(PrometheusOptions { addr: addr.clone() });
}
if let Some(addr) = &self.postgres_addr {
@@ -274,7 +274,10 @@ mod tests {
opts.opentsdb_options.as_ref().unwrap().addr,
"127.0.0.1:4321"
);
assert_eq!(opts.prom_options.as_ref().unwrap().addr, "127.0.0.1:4444");
assert_eq!(
opts.prometheus_options.as_ref().unwrap().addr,
"127.0.0.1:4444"
);
let default_opts = FrontendOptions::default();
assert_eq!(

View File

@@ -23,7 +23,7 @@ use datanode::instance::InstanceRef;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions,
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
PrometheusOptions,
};
use serde::{Deserialize, Serialize};
@@ -89,8 +89,8 @@ pub struct StandaloneOptions {
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prom_store_options: Option<PromStoreOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub prom_options: Option<PromOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
pub procedure: ProcedureConfig,
@@ -108,8 +108,8 @@ impl Default for StandaloneOptions {
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prom_store_options: Some(PromStoreOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
prom_options: Some(PromOptions::default()),
wal: WalConfig::default(),
storage: StorageConfig::default(),
procedure: ProcedureConfig::default(),
@@ -128,8 +128,8 @@ impl StandaloneOptions {
postgres_options: self.postgres_options,
opentsdb_options: self.opentsdb_options,
influxdb_options: self.influxdb_options,
prom_store_options: self.prom_store_options,
prometheus_options: self.prometheus_options,
prom_options: self.prom_options,
meta_client_options: None,
logging: self.logging,
..Default::default()
@@ -269,7 +269,7 @@ impl StartCommand {
}
if let Some(addr) = &self.prom_addr {
opts.prom_options = Some(PromOptions { addr: addr.clone() })
opts.prometheus_options = Some(PrometheusOptions { addr: addr.clone() })
}
if let Some(addr) = &self.postgres_addr {

View File

@@ -394,7 +394,7 @@ pub enum Error {
"Failed to create logical plan for prometheus query, source: {}",
source
))]
PrometheusRemoteQueryPlan {
PromStoreRemoteQueryPlan {
#[snafu(backtrace)]
source: servers::error::Error,
},
@@ -611,7 +611,7 @@ impl ErrorExt for Error {
Error::HandleHeartbeatResponse { source, .. } => source.status_code(),
Error::RuntimeResource { source, .. } => source.status_code(),
Error::PrometheusRemoteQueryPlan { source, .. }
Error::PromStoreRemoteQueryPlan { source, .. }
| Error::ExecutePromql { source, .. } => source.status_code(),
Error::SqlExecIntercepted { source, .. } => source.status_code(),

View File

@@ -19,7 +19,7 @@ use servers::http::HttpOptions;
use servers::Mode;
use crate::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromOptions,
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
PrometheusOptions,
};
@@ -35,8 +35,8 @@ pub struct FrontendOptions {
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prom_store_options: Option<PromStoreOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub prom_options: Option<PromOptions>,
pub meta_client_options: Option<MetaClientOptions>,
pub logging: LoggingOptions,
}
@@ -53,8 +53,8 @@ impl Default for FrontendOptions {
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prom_store_options: Some(PromStoreOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
prom_options: Some(PromOptions::default()),
meta_client_options: None,
logging: LoggingOptions::default(),
}

View File

@@ -16,7 +16,7 @@ pub mod distributed;
mod grpc;
mod influxdb;
mod opentsdb;
mod prometheus;
mod prom_store;
mod script;
mod standalone;
@@ -60,11 +60,11 @@ use servers::error::{ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
use servers::prom::PromHandler;
use servers::prometheus::PrometheusHandler;
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler,
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler,
};
use session::context::QueryContextRef;
use snafu::prelude::*;
@@ -95,9 +95,9 @@ pub trait FrontendInstance:
+ SqlQueryHandler<Error = Error>
+ OpentsdbProtocolHandler
+ InfluxdbLineProtocolHandler
+ PrometheusProtocolHandler
+ PromStoreProtocolHandler
+ ScriptHandler
+ PromHandler
+ PrometheusHandler
+ Send
+ Sync
+ 'static
@@ -524,7 +524,7 @@ impl SqlQueryHandler for Instance {
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
let result = PromHandler::do_query(self, query, query_ctx)
let result = PrometheusHandler::do_query(self, query, query_ctx)
.await
.with_context(|_| ExecutePromqlSnafu {
query: format!("{query:?}"),
@@ -566,7 +566,7 @@ impl SqlQueryHandler for Instance {
}
#[async_trait]
impl PromHandler for Instance {
impl PrometheusHandler for Instance {
async fn do_query(
&self,
query: &PromQuery,

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::prometheus::remote::read_request::ResponseType;
use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
@@ -23,17 +23,17 @@ use common_telemetry::logging;
use metrics::counter;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::prom_store::{self, Metrics};
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, PrometheusRemoteQueryPlanSnafu, ReadTableSnafu, Result,
CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
TableNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metrics::PROMETHEUS_REMOTE_WRITE_SAMPLES;
use crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES;
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -72,7 +72,7 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult<Query
.await
.context(error::CollectRecordbatchSnafu)?;
Ok(QueryResult {
timeseries: prometheus::recordbatches_to_timeseries(table_name, recordbatches)?,
timeseries: prom_store::recordbatches_to_timeseries(table_name, recordbatches)?,
})
}
@@ -102,7 +102,7 @@ impl Instance {
})?;
let logical_plan =
prometheus::query_to_plan(dataframe, query).context(PrometheusRemoteQueryPlanSnafu)?;
prom_store::query_to_plan(dataframe, query).context(PromStoreRemoteQueryPlanSnafu)?;
logging::debug!(
"Prometheus remote read, table: {}, logical plan: {}",
@@ -127,7 +127,7 @@ impl Instance {
let schema_name = ctx.current_schema();
for query in queries {
let table_name = prometheus::table_name(query)?;
let table_name = prom_store::table_name(query)?;
let output = self
.handle_remote_query(&ctx, &catalog_name, &schema_name, &table_name, query)
@@ -144,16 +144,16 @@ impl Instance {
}
#[async_trait]
impl PrometheusProtocolHandler for Instance {
impl PromStoreProtocolHandler for Instance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
let (requests, samples) = prometheus::to_grpc_insert_requests(request)?;
let (requests, samples) = prom_store::to_grpc_insert_requests(request)?;
let _ = self
.handle_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
counter!(PROMETHEUS_REMOTE_WRITE_SAMPLES, samples as u64);
counter!(PROM_STORE_REMOTE_WRITE_SAMPLES, samples as u64);
Ok(())
}
@@ -161,7 +161,7 @@ impl PrometheusProtocolHandler for Instance {
&self,
request: ReadRequest,
ctx: QueryContextRef,
) -> ServerResult<PrometheusResponse> {
) -> ServerResult<PromStoreResponse> {
let response_type = negotiate_response_type(&request.accepted_response_types)?;
// TODO(dennis): use read_hints to speedup query if possible
@@ -179,10 +179,10 @@ impl PrometheusProtocolHandler for Instance {
};
// TODO(dennis): may consume too much memory, adds flow control
Ok(PrometheusResponse {
Ok(PromStoreResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
body: prometheus::snappy_compress(&response.encode_to_vec())?,
body: prom_store::snappy_compress(&response.encode_to_vec())?,
})
}
ResponseType::StreamedXorChunks => error::NotSupportedSnafu {

View File

@@ -23,4 +23,4 @@ pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table";
pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows";
/// The samples count of Prometheus remote write.
pub const PROMETHEUS_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";
pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";

View File

@@ -28,7 +28,7 @@ use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::prom::PromServer;
use servers::prometheus::PrometheusServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
@@ -38,7 +38,7 @@ use crate::error::Error::StartServer;
use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::instance::FrontendInstance;
use crate::service_config::{InfluxdbOptions, PrometheusOptions};
use crate::service_config::{InfluxdbOptions, PromStoreOptions};
pub(crate) struct Services;
@@ -172,8 +172,8 @@ impl Services {
}
if matches!(
opts.prometheus_options,
Some(PrometheusOptions { enable: true })
opts.prom_store_options,
Some(PromStoreOptions { enable: true })
) {
let _ = http_server_builder.with_prom_handler(instance.clone());
}
@@ -187,10 +187,10 @@ impl Services {
result.push((Box::new(http_server), http_addr));
}
if let Some(prom_options) = &opts.prom_options {
let prom_addr = parse_addr(&prom_options.addr)?;
if let Some(prometheus_options) = &opts.prometheus_options {
let prom_addr = parse_addr(&prometheus_options.addr)?;
let mut prom_server = PromServer::create_server(instance);
let mut prom_server = PrometheusServer::create_server(instance);
if let Some(user_provider) = user_provider {
prom_server.set_user_provider(user_provider);
}

View File

@@ -17,7 +17,7 @@ pub mod influxdb;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prom_store;
pub mod prometheus;
pub use grpc::GrpcOptions;
@@ -25,5 +25,5 @@ pub use influxdb::InfluxdbOptions;
pub use mysql::MysqlOptions;
pub use opentsdb::OpentsdbOptions;
pub use postgres::PostgresOptions;
pub use prom::PromOptions;
pub use prom_store::PromStoreOptions;
pub use prometheus::PrometheusOptions;

View File

@@ -15,25 +15,23 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PromOptions {
pub addr: String,
pub struct PromStoreOptions {
pub enable: bool,
}
impl Default for PromOptions {
impl Default for PromStoreOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4004".to_string(),
}
Self { enable: true }
}
}
#[cfg(test)]
mod tests {
use super::PromOptions;
use super::PromStoreOptions;
#[test]
fn test_prometheus_options() {
let default = PromOptions::default();
assert_eq!(default.addr, "127.0.0.1:4004".to_string());
fn test_prom_store_options() {
let default = PromStoreOptions::default();
assert!(default.enable);
}
}

View File

@@ -16,12 +16,14 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PrometheusOptions {
pub enable: bool,
pub addr: String,
}
impl Default for PrometheusOptions {
fn default() -> Self {
Self { enable: true }
Self {
addr: "127.0.0.1:4004".to_string(),
}
}
}
@@ -32,6 +34,6 @@ mod tests {
#[test]
fn test_prometheus_options() {
let default = PrometheusOptions::default();
assert!(default.enable);
assert_eq!(default.addr, "127.0.0.1:4004".to_string());
}
}

View File

@@ -46,7 +46,7 @@ use crate::error::{
use crate::grpc::database::DatabaseService;
use crate::grpc::flight::FlightHandler;
use crate::grpc::handler::GreptimeRequestHandler;
use crate::prom::PromHandlerRef;
use crate::prometheus::PrometheusHandlerRef;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::server::Server;
@@ -56,7 +56,7 @@ pub struct GrpcServer {
shutdown_tx: Mutex<Option<Sender<()>>>,
request_handler: Arc<GreptimeRequestHandler>,
/// Handler for Prometheus-compatible PromQL queries. Only present for frontend server.
promql_handler: Option<PromHandlerRef>,
prometheus_handler: Option<PrometheusHandlerRef>,
/// gRPC serving state receiver. Only present if the gRPC server is started.
/// Used to wait for the server to stop, performing the old blocking fashion.
@@ -66,7 +66,7 @@ pub struct GrpcServer {
impl GrpcServer {
pub fn new(
query_handler: ServerGrpcQueryHandlerRef,
promql_handler: Option<PromHandlerRef>,
prometheus_handler: Option<PrometheusHandlerRef>,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
) -> Self {
@@ -78,7 +78,7 @@ impl GrpcServer {
Self {
shutdown_tx: Mutex::new(None),
request_handler,
promql_handler,
prometheus_handler,
serve_state: Mutex::new(None),
}
}
@@ -97,7 +97,7 @@ impl GrpcServer {
pub fn create_prom_query_gateway_service(
&self,
handler: PromHandlerRef,
handler: PrometheusHandlerRef,
) -> PrometheusGatewayServer<impl PrometheusGateway> {
PrometheusGatewayServer::new(PrometheusGatewayService::new(handler))
}
@@ -178,9 +178,9 @@ impl Server for GrpcServer {
.add_service(self.create_flight_service())
.add_service(self.create_database_service())
.add_service(self.create_healthcheck_service());
if let Some(promql_handler) = &self.promql_handler {
builder =
builder.add_service(self.create_prom_query_gateway_service(promql_handler.clone()))
if let Some(prometheus_handler) = &self.prometheus_handler {
builder = builder
.add_service(self.create_prom_query_gateway_service(prometheus_handler.clone()))
}
let builder = builder.add_service(reflection_service);

View File

@@ -33,10 +33,12 @@ use tonic::{Request, Response};
use crate::error::InvalidQuerySnafu;
use crate::grpc::handler::create_query_context;
use crate::grpc::TonicResult;
use crate::prom::{retrieve_metric_name_and_result_type, PromHandlerRef, PromJsonResponse};
use crate::prometheus::{
retrieve_metric_name_and_result_type, PrometheusHandlerRef, PrometheusJsonResponse,
};
pub struct PrometheusGatewayService {
handler: PromHandlerRef,
handler: PrometheusHandlerRef,
}
#[async_trait]
@@ -86,7 +88,7 @@ impl PrometheusGateway for PrometheusGatewayService {
}
impl PrometheusGatewayService {
pub fn new(handler: PromHandlerRef) -> Self {
pub fn new(handler: PrometheusHandlerRef) -> Self {
Self { handler }
}
@@ -95,7 +97,7 @@ impl PrometheusGatewayService {
query: PromQuery,
ctx: Arc<QueryContext>,
is_range_query: bool,
) -> PromJsonResponse {
) -> PrometheusJsonResponse {
let _timer = timer!(
crate::metrics::METRIC_SERVER_GRPC_PROM_REQUEST_TIMER,
&[(crate::metrics::METRIC_DB_LABEL, ctx.get_db_string())]
@@ -106,8 +108,11 @@ impl PrometheusGatewayService {
match retrieve_metric_name_and_result_type(&query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Err(err) => {
return PromJsonResponse::error(err.status_code().to_string(), err.to_string())
.0
return PrometheusJsonResponse::error(
err.status_code().to_string(),
err.to_string(),
)
.0
}
};
// range query only returns matrix
@@ -115,7 +120,7 @@ impl PrometheusGatewayService {
result_type = ValueType::Matrix;
};
PromJsonResponse::from_query_result(result, metric_name, result_type)
PrometheusJsonResponse::from_query_result(result, metric_name, result_type)
.await
.0
}

View File

@@ -19,7 +19,7 @@ pub mod influxdb;
pub mod mem_prof;
pub mod opentsdb;
mod pprof;
pub mod prometheus;
pub mod prom_store;
pub mod script;
#[cfg(feature = "dashboard")]
@@ -73,7 +73,7 @@ use crate::metrics_handler::MetricsHandler;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef,
InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef,
ScriptHandlerRef,
};
use crate::server::Server;
@@ -118,7 +118,7 @@ pub struct HttpServer {
options: HttpOptions,
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
prom_handler: Option<PrometheusProtocolHandlerRef>,
prom_handler: Option<PromStoreProtocolHandlerRef>,
script_handler: Option<ScriptHandlerRef>,
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
@@ -434,7 +434,7 @@ impl HttpServerBuilder {
self
}
pub fn with_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) -> &mut Self {
pub fn with_prom_handler(&mut self, handler: PromStoreProtocolHandlerRef) -> &mut Self {
let _ = self.inner.prom_handler.get_or_insert(handler);
self
}
@@ -625,10 +625,10 @@ impl HttpServer {
.with_state(api_state)
}
fn route_prom<S>(&self, prom_handler: PrometheusProtocolHandlerRef) -> Router<S> {
fn route_prom<S>(&self, prom_handler: PromStoreProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/write", routing::post(prometheus::remote_write))
.route("/read", routing::post(prometheus::remote_read))
.route("/write", routing::post(prom_store::remote_write))
.route("/read", routing::post(prom_store::remote_read))
.with_state(prom_handler)
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::prometheus::remote::{ReadRequest, WriteRequest};
use api::prom_store::remote::{ReadRequest, WriteRequest};
use axum::extract::{Query, RawBody, State};
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
@@ -29,8 +29,8 @@ use snafu::prelude::*;
use crate::error::{self, Result};
use crate::parse_catalog_and_schema_from_client_database_name;
use crate::prometheus::snappy_decompress;
use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse};
use crate::prom_store::snappy_decompress;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct DatabaseQuery {
@@ -47,14 +47,14 @@ impl Default for DatabaseQuery {
#[axum_macros::debug_handler]
pub async fn remote_write(
State(handler): State<PrometheusProtocolHandlerRef>,
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
RawBody(body): RawBody,
) -> Result<(StatusCode, ())> {
let request = decode_remote_write_request(body).await?;
let _timer = timer!(
crate::metrics::METRIC_HTTP_PROMETHEUS_WRITE_ELAPSED,
crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED,
&[(
crate::metrics::METRIC_DB_LABEL,
params.db.clone().unwrap_or_default()
@@ -72,7 +72,7 @@ pub async fn remote_write(
Ok((StatusCode::NO_CONTENT, ()))
}
impl IntoResponse for PrometheusResponse {
impl IntoResponse for PromStoreResponse {
fn into_response(self) -> axum::response::Response {
(
[
@@ -87,14 +87,14 @@ impl IntoResponse for PrometheusResponse {
#[axum_macros::debug_handler]
pub async fn remote_read(
State(handler): State<PrometheusProtocolHandlerRef>,
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
RawBody(body): RawBody,
) -> Result<PrometheusResponse> {
) -> Result<PromStoreResponse> {
let request = decode_remote_read_request(body).await?;
let _timer = timer!(
crate::metrics::METRIC_HTTP_PROMETHEUS_READ_ELAPSED,
crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED,
&[(
crate::metrics::METRIC_DB_LABEL,
params.db.clone().unwrap_or_default()

View File

@@ -33,7 +33,7 @@ pub mod metrics_handler;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prom;
pub mod prom_store;
pub mod prometheus;
pub mod query_handler;
pub mod server;

View File

@@ -40,9 +40,9 @@ pub(crate) const METRIC_HTTP_SQL_ELAPSED: &str = "servers.http_sql_elapsed";
pub(crate) const METRIC_HTTP_PROMQL_ELAPSED: &str = "servers.http_promql_elapsed";
pub(crate) const METRIC_AUTH_FAILURE: &str = "servers.auth_failure_count";
pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influxdb_write_elapsed";
pub(crate) const METRIC_HTTP_PROMETHEUS_WRITE_ELAPSED: &str =
pub(crate) const METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: &str =
"servers.http_prometheus_write_elapsed";
pub(crate) const METRIC_HTTP_PROMETHEUS_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed";
pub(crate) const METRIC_HTTP_PROM_STORE_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed";
pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str =
"servers.opentsdb_line_write_elapsed";

View File

@@ -1,921 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use axum::body::BoxBody;
use axum::extract::{Path, Query, State};
use axum::{middleware, routing, Form, Json, Router};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use futures::FutureExt;
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
UnaryExpr, ValueType, VectorSelector,
};
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use schemars::JsonSchema;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, Location, OptionExt, ResultExt};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tower::ServiceBuilder;
use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::compression::CompressionLayer;
use tower_http::trace::TraceLayer;
use crate::auth::UserProviderRef;
use crate::error::{
AlreadyStartedSnafu, CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result,
StartHttpSnafu, UnexpectedResultSnafu,
};
use crate::http::authorize::HttpAuth;
use crate::http::track_metrics;
use crate::prometheus::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
use crate::server::Server;
pub const PROM_API_VERSION: &str = "v1";
pub type PromHandlerRef = Arc<dyn PromHandler + Send + Sync>;
#[async_trait]
pub trait PromHandler {
async fn do_query(&self, query: &PromQuery, query_ctx: QueryContextRef) -> Result<Output>;
}
/// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API
pub struct PromServer {
query_handler: PromHandlerRef,
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
}
impl PromServer {
pub fn create_server(query_handler: PromHandlerRef) -> Box<Self> {
Box::new(PromServer {
query_handler,
shutdown_tx: Mutex::new(None),
user_provider: None,
})
}
pub fn set_user_provider(&mut self, user_provider: UserProviderRef) {
debug_assert!(self.user_provider.is_none());
self.user_provider = Some(user_provider);
}
pub fn make_app(&self) -> Router {
// TODO(ruihang): implement format_query, series, values, query_examplars and targets methods
let router = Router::new()
.route("/query", routing::post(instant_query).get(instant_query))
.route("/query_range", routing::post(range_query).get(range_query))
.route("/labels", routing::post(labels_query).get(labels_query))
.route("/series", routing::post(series_query).get(series_query))
.route(
"/label/:label_name/values",
routing::get(label_values_query),
)
.with_state(self.query_handler.clone());
Router::new()
.nest(&format!("/api/{PROM_API_VERSION}"), router)
// middlewares
.layer(
ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(CompressionLayer::new())
// custom layer
.layer(AsyncRequireAuthorizationLayer::new(
HttpAuth::<BoxBody>::new(self.user_provider.clone()),
)),
)
// We need to register the metrics layer again since start a new http server
// for the PromServer.
.route_layer(middleware::from_fn(track_metrics))
}
}
pub const PROM_SERVER: &str = "PROM_SERVER";
#[async_trait]
impl Server for PromServer {
async fn shutdown(&self) -> Result<()> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {
if tx.send(()).is_err() {
info!("Receiver dropped, the Prometheus API server has already existed");
}
}
info!("Shutdown Prometheus API server");
Ok(())
}
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr> {
let (tx, rx) = oneshot::channel();
let server = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
AlreadyStartedSnafu {
server: "Prometheus"
}
);
let app = self.make_app();
let server = axum::Server::bind(&listening).serve(app.into_make_service());
*shutdown_tx = Some(tx);
server
};
let listening = server.local_addr();
info!("Prometheus API server is bound to {}", listening);
let graceful = server.with_graceful_shutdown(rx.map(drop));
graceful.await.context(StartHttpSnafu)?;
Ok(listening)
}
fn name(&self) -> &str {
PROM_SERVER
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromSeries {
pub metric: HashMap<String, String>,
/// For [ValueType::Matrix] result type
pub values: Vec<(f64, String)>,
/// For [ValueType::Vector] result type
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<(f64, String)>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromData {
#[serde(rename = "resultType")]
pub result_type: String,
pub result: Vec<PromSeries>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)]
#[serde(untagged)]
pub enum PromResponse {
PromData(PromData),
Labels(Vec<String>),
Series(Vec<HashMap<String, String>>),
LabelValues(Vec<String>),
}
impl Default for PromResponse {
fn default() -> Self {
PromResponse::PromData(Default::default())
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromJsonResponse {
pub status: String,
pub data: PromResponse,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "errorType")]
pub error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub warnings: Option<Vec<String>>,
}
impl PromJsonResponse {
pub fn error<S1, S2>(error_type: S1, reason: S2) -> Json<Self>
where
S1: Into<String>,
S2: Into<String>,
{
Json(PromJsonResponse {
status: "error".to_string(),
data: PromResponse::default(),
error: Some(reason.into()),
error_type: Some(error_type.into()),
warnings: None,
})
}
pub fn success(data: PromResponse) -> Json<Self> {
Json(PromJsonResponse {
status: "success".to_string(),
data,
error: None,
error_type: None,
warnings: None,
})
}
/// Convert from `Result<Output>`
pub async fn from_query_result(
result: Result<Output>,
metric_name: String,
result_type: ValueType,
) -> Json<Self> {
let response: Result<Json<Self>> = try {
let json = match result? {
Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data(
batches,
metric_name,
result_type,
)?),
Output::Stream(stream) => {
let record_batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
Self::success(Self::record_batches_to_data(
record_batches,
metric_name,
result_type,
)?)
}
Output::AffectedRows(_) => {
Self::error("Unexpected", "expected data result, but got affected rows")
}
};
json
};
let result_type_string = result_type.to_string();
match response {
Ok(resp) => resp,
Err(err) => {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() == StatusCode::TableNotFound
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PromResponse::PromData(PromData {
result_type: result_type_string,
..Default::default()
}))
} else {
Self::error(err.status_code().to_string(), err.to_string())
}
}
}
}
/// Convert [RecordBatches] to [PromData]
fn record_batches_to_data(
batches: RecordBatches,
metric_name: String,
result_type: ValueType,
) -> Result<PromResponse> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
let mut timestamp_column_index = None;
let mut tag_column_indices = Vec::new();
let mut first_field_column_index = None;
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
match column.data_type {
ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
if timestamp_column_index.is_none() {
timestamp_column_index = Some(i);
}
}
ConcreteDataType::Float64(_) => {
if first_field_column_index.is_none() {
first_field_column_index = Some(i);
}
}
ConcreteDataType::String(_) => {
tag_column_indices.push(i);
}
_ => {}
}
}
let timestamp_column_index = timestamp_column_index.context(InternalSnafu {
err_msg: "no timestamp column found".to_string(),
})?;
let first_field_column_index = first_field_column_index.context(InternalSnafu {
err_msg: "no value column found".to_string(),
})?;
let metric_name = (METRIC_NAME.to_string(), metric_name);
let mut buffer = BTreeMap::<Vec<(String, String)>, Vec<(f64, String)>>::new();
for batch in batches.iter() {
// prepare things...
let tag_columns = tag_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<StringVector>()
.unwrap()
})
.collect::<Vec<_>>();
let tag_names = tag_column_indices
.iter()
.map(|c| batches.schema().column_name_by_index(*c).to_string())
.collect::<Vec<_>>();
let timestamp_column = batch
.column(timestamp_column_index)
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
let field_column = batch
.column(first_field_column_index)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap();
// assemble rows
for row_index in 0..batch.num_rows() {
// retrieve tags
// TODO(ruihang): push table name `__metric__`
let mut tags = vec![metric_name.clone()];
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name.to_string(), tag_value.to_string()));
}
}
// retrieve timestamp
let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into();
let timestamp = timestamp_millis as f64 / 1000.0;
// retrieve value
if let Some(v) = field_column.get_data(row_index) {
buffer
.entry(tags)
.or_default()
.push((timestamp, Into::<f64>::into(v).to_string()));
};
}
}
let result = buffer
.into_iter()
.map(|(tags, mut values)| {
let metric = tags.into_iter().collect();
match result_type {
ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries {
metric,
value: values.pop(),
..Default::default()
}),
ValueType::Matrix => Ok(PromSeries {
metric,
values,
..Default::default()
}),
}
})
.collect::<Result<Vec<_>>>()?;
let result_type_string = result_type.to_string();
let data = PromResponse::PromData(PromData {
result_type: result_type_string,
result,
});
Ok(data)
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct InstantQuery {
query: Option<String>,
time: Option<String>,
timeout: Option<String>,
db: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn instant_query(
State(handler): State<PromHandlerRef>,
Query(params): Query<InstantQuery>,
Form(form_params): Form<InstantQuery>,
) -> Json<PromJsonResponse> {
// Extract time from query string, or use current server time if not specified.
let time = params
.time
.or(form_params.time)
.unwrap_or_else(current_time_rfc3339);
let prom_query = PromQuery {
query: params.query.or(form_params.query).unwrap_or_default(),
start: time.clone(),
end: time,
step: "1s".to_string(),
};
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = QueryContext::with(catalog, schema);
let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await;
let (metric_name, result_type) = match retrieve_metric_name_and_result_type(&prom_query.query) {
Ok((metric_name, result_type)) => (metric_name.unwrap_or_default(), result_type),
Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()),
};
PromJsonResponse::from_query_result(result, metric_name, result_type).await
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct RangeQuery {
query: Option<String>,
start: Option<String>,
end: Option<String>,
step: Option<String>,
timeout: Option<String>,
db: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn range_query(
State(handler): State<PromHandlerRef>,
Query(params): Query<RangeQuery>,
Form(form_params): Form<RangeQuery>,
) -> Json<PromJsonResponse> {
let prom_query = PromQuery {
query: params.query.or(form_params.query).unwrap_or_default(),
start: params.start.or(form_params.start).unwrap_or_default(),
end: params.end.or(form_params.end).unwrap_or_default(),
step: params.step.or(form_params.step).unwrap_or_default(),
};
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = QueryContext::with(catalog, schema);
let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await;
let metric_name = match retrieve_metric_name_and_result_type(&prom_query.query) {
Err(err) => return PromJsonResponse::error(err.status_code().to_string(), err.to_string()),
Ok((metric_name, _)) => metric_name.unwrap_or_default(),
};
PromJsonResponse::from_query_result(result, metric_name, ValueType::Matrix).await
}
#[derive(Debug, Default, Serialize, JsonSchema)]
struct Matches(Vec<String>);
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LabelsQuery {
start: Option<String>,
end: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
}
// Custom Deserialize method to support parsing repeated match[]
impl<'de> Deserialize<'de> for Matches {
fn deserialize<D>(deserializer: D) -> std::result::Result<Matches, D::Error>
where
D: de::Deserializer<'de>,
{
struct MatchesVisitor;
impl<'d> Visitor<'d> for MatchesVisitor {
type Value = Vec<String>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string")
}
fn visit_map<M>(self, mut access: M) -> std::result::Result<Self::Value, M::Error>
where
M: MapAccess<'d>,
{
let mut matches = Vec::new();
while let Some((key, value)) = access.next_entry::<String, String>()? {
if key == "match[]" {
matches.push(value);
}
}
Ok(matches)
}
}
Ok(Matches(deserializer.deserialize_map(MatchesVisitor)?))
}
}
#[axum_macros::debug_handler]
pub async fn labels_query(
State(handler): State<PromHandlerRef>,
Query(params): Query<LabelsQuery>,
Form(form_params): Form<LabelsQuery>,
) -> Json<PromJsonResponse> {
let mut queries = params.matches.0;
if queries.is_empty() {
queries = form_params.matches.0;
}
if queries.is_empty() {
return PromJsonResponse::error("Unsupported", "match[] parameter is required");
}
let start = params
.start
.or(form_params.start)
.unwrap_or_else(yesterday_rfc3339);
let end = params
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = Arc::new(QueryContext::with(catalog, schema));
let mut labels = HashSet::new();
let _ = labels.insert(METRIC_NAME.to_string());
for query in queries {
let prom_query = PromQuery {
query,
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
let response = retrieve_labels_name_from_query_result(result, &mut labels).await;
if let Err(err) = response {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() != StatusCode::TableNotFound
&& err.status_code() != StatusCode::TableColumnNotFound
{
return PromJsonResponse::error(err.status_code().to_string(), err.to_string());
}
}
}
let _ = labels.remove(TIMESTAMP_COLUMN_NAME);
let _ = labels.remove(FIELD_COLUMN_NAME);
let mut sorted_labels: Vec<String> = labels.into_iter().collect();
sorted_labels.sort();
PromJsonResponse::success(PromResponse::Labels(sorted_labels))
}
async fn retrieve_series_from_query_result(
result: Result<Output>,
series: &mut Vec<HashMap<String, String>>,
table_name: &str,
) -> Result<()> {
match result? {
Output::RecordBatches(batches) => {
record_batches_to_series(batches, series, table_name)?;
Ok(())
}
Output::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
record_batches_to_series(batches, series, table_name)?;
Ok(())
}
Output::AffectedRows(_) => Err(Error::UnexpectedResult {
reason: "expected data result, but got affected rows".to_string(),
location: Location::default(),
}),
}
}
/// Retrieve labels name from query result
async fn retrieve_labels_name_from_query_result(
result: Result<Output>,
labels: &mut HashSet<String>,
) -> Result<()> {
match result? {
Output::RecordBatches(batches) => {
record_batches_to_labels_name(batches, labels)?;
Ok(())
}
Output::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
record_batches_to_labels_name(batches, labels)?;
Ok(())
}
Output::AffectedRows(_) => UnexpectedResultSnafu {
reason: "expected data result, but got affected rows".to_string(),
}
.fail(),
}
}
fn record_batches_to_series(
batches: RecordBatches,
series: &mut Vec<HashMap<String, String>>,
table_name: &str,
) -> Result<()> {
for batch in batches.iter() {
for row in batch.rows() {
let mut element: HashMap<String, String> = row
.iter()
.enumerate()
.map(|(idx, column)| {
let column_name = batch.schema.column_name_by_index(idx);
(column_name.to_string(), column.to_string())
})
.collect();
let _ = element.insert("__name__".to_string(), table_name.to_string());
series.push(element);
}
}
Ok(())
}
/// Retrieve labels name from record batches
fn record_batches_to_labels_name(
batches: RecordBatches,
labels: &mut HashSet<String>,
) -> Result<()> {
let mut column_indices = Vec::new();
let mut field_column_indices = Vec::new();
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
if let ConcreteDataType::Float64(_) = column.data_type {
field_column_indices.push(i);
}
column_indices.push(i);
}
if field_column_indices.is_empty() {
return Err(Error::Internal {
err_msg: "no value column found".to_string(),
});
}
for batch in batches.iter() {
let names = column_indices
.iter()
.map(|c| batches.schema().column_name_by_index(*c).to_string())
.collect::<Vec<_>>();
let field_columns = field_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap()
})
.collect::<Vec<_>>();
for row_index in 0..batch.num_rows() {
// if all field columns are null, skip this row
if field_columns
.iter()
.all(|c| c.get_data(row_index).is_none())
{
continue;
}
// if a field is not null, record the tag name and return
names.iter().for_each(|name| {
let _ = labels.insert(name.to_string());
});
return Ok(());
}
}
Ok(())
}
pub(crate) fn retrieve_metric_name_and_result_type(
promql: &str,
) -> Result<(Option<String>, ValueType)> {
let promql_expr = promql_parser::parser::parse(promql)
.map_err(|reason| InvalidQuerySnafu { reason }.build())?;
let metric_name = promql_expr_to_metric_name(&promql_expr);
let result_type = promql_expr.value_type();
Ok((metric_name, result_type))
}
fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
match expr {
PromqlExpr::Aggregate(AggregateExpr { expr, .. }) => promql_expr_to_metric_name(expr),
PromqlExpr::Unary(UnaryExpr { expr }) => promql_expr_to_metric_name(expr),
PromqlExpr::Binary(BinaryExpr { lhs, rhs, .. }) => {
promql_expr_to_metric_name(lhs).or(promql_expr_to_metric_name(rhs))
}
PromqlExpr::Paren(ParenExpr { expr }) => promql_expr_to_metric_name(expr),
PromqlExpr::Subquery(SubqueryExpr { expr, .. }) => promql_expr_to_metric_name(expr),
PromqlExpr::NumberLiteral(_) => Some(String::new()),
PromqlExpr::StringLiteral(_) => Some(String::new()),
PromqlExpr::Extension(_) => None,
PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => {
matchers.find_matchers(METRIC_NAME).pop().cloned()
}
PromqlExpr::MatrixSelector(MatrixSelector {
vector_selector, ..
}) => {
let VectorSelector { matchers, .. } = vector_selector;
matchers.find_matchers(METRIC_NAME).pop().cloned()
}
PromqlExpr::Call(Call { args, .. }) => {
args.args.iter().find_map(|e| promql_expr_to_metric_name(e))
}
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LabelValueQuery {
start: Option<String>,
end: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn label_values_query(
State(handler): State<PromHandlerRef>,
Path(label_name): Path<String>,
Query(params): Query<LabelValueQuery>,
) -> Json<PromJsonResponse> {
let queries = params.matches.0;
if queries.is_empty() {
return PromJsonResponse::error("Invalid argument", "match[] parameter is required");
}
let start = params.start.unwrap_or_else(yesterday_rfc3339);
let end = params.end.unwrap_or_else(current_time_rfc3339);
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = Arc::new(QueryContext::with(catalog, schema));
let mut label_values = HashSet::new();
for query in queries {
let prom_query = PromQuery {
query,
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
let result = retrieve_label_values(result, &label_name, &mut label_values).await;
if let Err(err) = result {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() != StatusCode::TableNotFound
&& err.status_code() != StatusCode::TableColumnNotFound
{
return PromJsonResponse::error(err.status_code().to_string(), err.to_string());
}
}
}
let mut label_values: Vec<_> = label_values.into_iter().collect();
label_values.sort();
PromJsonResponse::success(PromResponse::LabelValues(label_values))
}
async fn retrieve_label_values(
result: Result<Output>,
label_name: &str,
labels_values: &mut HashSet<String>,
) -> Result<()> {
match result? {
Output::RecordBatches(batches) => {
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
}
Output::Stream(stream) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
}
Output::AffectedRows(_) => UnexpectedResultSnafu {
reason: "expected data result, but got affected rows".to_string(),
}
.fail(),
}
}
async fn retrieve_label_values_from_record_batch(
batches: RecordBatches,
label_name: &str,
labels_values: &mut HashSet<String>,
) -> Result<()> {
let Some(label_col_idx) = batches.schema().column_index_by_name(label_name) else {
return Ok(());
};
// check whether label_name belongs to tag column
match batches
.schema()
.column_schema_by_name(label_name)
.unwrap()
.data_type
{
ConcreteDataType::String(_) => {}
_ => return Ok(()),
}
for batch in batches.iter() {
let label_column = batch
.column(label_col_idx)
.as_any()
.downcast_ref::<StringVector>()
.unwrap();
for row_index in 0..batch.num_rows() {
if let Some(label_value) = label_column.get_data(row_index) {
let _ = labels_values.insert(label_value.to_string());
}
}
}
Ok(())
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct SeriesQuery {
start: Option<String>,
end: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn series_query(
State(handler): State<PromHandlerRef>,
Query(params): Query<SeriesQuery>,
Form(form_params): Form<SeriesQuery>,
) -> Json<PromJsonResponse> {
let mut queries: Vec<String> = params.matches.0;
if queries.is_empty() {
queries = form_params.matches.0;
}
if queries.is_empty() {
return PromJsonResponse::error("Unsupported", "match[] parameter is required");
}
let start = params
.start
.or(form_params.start)
.unwrap_or_else(yesterday_rfc3339);
let end = params
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
let query_ctx = Arc::new(QueryContext::with(catalog, schema));
let mut series = Vec::new();
for query in queries {
let table_name = query.clone();
let prom_query = PromQuery {
query,
start: start.clone(),
end: end.clone(),
// TODO: find a better value for step
step: DEFAULT_LOOKBACK_STRING.to_string(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
if let Err(err) = retrieve_series_from_query_result(result, &mut series, &table_name).await
{
return PromJsonResponse::error(err.status_code().to_string(), err.to_string());
}
}
PromJsonResponse::success(PromResponse::Series(series))
}

View File

@@ -0,0 +1,757 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! prometheus protocol supportings
//! handles prometheus remote_write, remote_read logic
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use common_grpc::writer::{LinesWriter, Precision};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
use datafusion::prelude::{col, lit, regexp_match, Expr};
use datatypes::prelude::{ConcreteDataType, Value};
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use snafu::{ensure, OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
use crate::error::{self, Result};
pub const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const FIELD_COLUMN_NAME: &str = "greptime_value";
pub const METRIC_NAME_LABEL: &str = "__name__";
/// Metrics for push gateway protocol
pub struct Metrics {
pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
}
/// Get table name from remote query
pub fn table_name(q: &Query) -> Result<String> {
let label_matches = &q.matchers;
label_matches
.iter()
.find_map(|m| {
if m.name == METRIC_NAME_LABEL {
Some(m.value.to_string())
} else {
None
}
})
.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})
}
/// Create a DataFrame from a remote Query
pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
let DataFrame::DataFusion(dataframe) = dataframe;
let start_timestamp_ms = q.start_timestamp_ms;
let end_timestamp_ms = q.end_timestamp_ms;
let label_matches = &q.matchers;
let mut conditions = Vec::with_capacity(label_matches.len() + 1);
conditions.push(col(TIMESTAMP_COLUMN_NAME).gt_eq(lit(start_timestamp_ms)));
conditions.push(col(TIMESTAMP_COLUMN_NAME).lt_eq(lit(end_timestamp_ms)));
for m in label_matches {
let name = &m.name;
if name == METRIC_NAME_LABEL {
continue;
}
let value = &m.value;
let m_type =
MatcherType::from_i32(m.r#type).context(error::InvalidPromRemoteRequestSnafu {
msg: format!("invalid LabelMatcher type: {}", m.r#type),
})?;
match m_type {
MatcherType::Eq => {
conditions.push(col(name).eq(lit(value)));
}
MatcherType::Neq => {
conditions.push(col(name).not_eq(lit(value)));
}
// Case sensitive regexp match
MatcherType::Re => {
conditions.push(regexp_match(vec![col(name), lit(value)]).is_not_null());
}
// Case sensitive regexp not match
MatcherType::Nre => {
conditions.push(regexp_match(vec![col(name), lit(value)]).is_null());
}
}
}
// Safety: conditions MUST not be empty, reduce always return Some(expr).
let conditions = conditions.into_iter().reduce(Expr::and).unwrap();
let dataframe = dataframe
.filter(conditions)
.context(error::DataFrameSnafu)?;
Ok(LogicalPlan::DfPlan(dataframe.into_parts().1))
}
#[inline]
fn new_label(name: String, value: String) -> Label {
Label { name, value }
}
// A timeseries id
#[derive(Debug)]
struct TimeSeriesId {
labels: Vec<Label>,
}
/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
impl PartialEq for TimeSeriesId {
fn eq(&self, other: &Self) -> bool {
if self.labels.len() != other.labels.len() {
return false;
}
self.labels
.iter()
.zip(other.labels.iter())
.all(|(l, r)| l.name == r.name && l.value == r.value)
}
}
impl Eq for TimeSeriesId {}
impl Hash for TimeSeriesId {
fn hash<H: Hasher>(&self, state: &mut H) {
for label in &self.labels {
label.name.hash(state);
label.value.hash(state);
}
}
}
/// For Sorting timeseries
impl Ord for TimeSeriesId {
fn cmp(&self, other: &Self) -> Ordering {
let ordering = self.labels.len().cmp(&other.labels.len());
if ordering != Ordering::Equal {
return ordering;
}
for (l, r) in self.labels.iter().zip(other.labels.iter()) {
let ordering = l.name.cmp(&r.name);
if ordering != Ordering::Equal {
return ordering;
}
let ordering = l.value.cmp(&r.value);
if ordering != Ordering::Equal {
return ordering;
}
}
Ordering::Equal
}
}
impl PartialOrd for TimeSeriesId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Collect each row's timeseries id
/// This processing is ugly, hope https://github.com/GreptimeTeam/greptimedb/issues/336 making some progress in future.
fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<TimeSeriesId> {
let row_count = recordbatch.num_rows();
let mut timeseries_ids = Vec::with_capacity(row_count);
for row in 0..row_count {
let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
labels.push(new_label(
METRIC_NAME_LABEL.to_string(),
table_name.to_string(),
));
for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
if column_schema.name == FIELD_COLUMN_NAME
|| column_schema.name == TIMESTAMP_COLUMN_NAME
{
continue;
}
let column = &recordbatch.columns()[i];
// A label with an empty label value is considered equivalent to a label that does not exist.
if column.is_null(row) {
continue;
}
let value = column.get(row).to_string();
labels.push(new_label(column_schema.name.clone(), value));
}
timeseries_ids.push(TimeSeriesId { labels });
}
timeseries_ids
}
pub fn recordbatches_to_timeseries(
table_name: &str,
recordbatches: RecordBatches,
) -> Result<Vec<TimeSeries>> {
Ok(recordbatches
.take()
.into_iter()
.map(|x| recordbatch_to_timeseries(table_name, x))
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect())
}
fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Vec<TimeSeries>> {
let ts_column = recordbatch.column_by_name(TIMESTAMP_COLUMN_NAME).context(
error::InvalidPromRemoteReadQueryResultSnafu {
msg: "missing greptime_timestamp column in query result",
},
)?;
ensure!(
ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
error::InvalidPromRemoteReadQueryResultSnafu {
msg: format!(
"Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
ts_column.data_type()
)
}
);
let field_column = recordbatch.column_by_name(FIELD_COLUMN_NAME).context(
error::InvalidPromRemoteReadQueryResultSnafu {
msg: "missing greptime_value column in query result",
},
)?;
ensure!(
field_column.data_type() == ConcreteDataType::float64_datatype(),
error::InvalidPromRemoteReadQueryResultSnafu {
msg: format!(
"Expect value column of datatype Float64, actual {:?}",
field_column.data_type()
)
}
);
// First, collect each row's timeseries id
let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
// Then, group timeseries by it's id.
let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
let timeseries = timeseries_map
.entry(timeseries_id)
.or_insert_with(|| TimeSeries {
labels: timeseries_id.labels.clone(),
..Default::default()
});
if ts_column.is_null(row) || field_column.is_null(row) {
continue;
}
let value: f64 = match field_column.get(row) {
Value::Float64(value) => value.into(),
_ => unreachable!("checked by the \"ensure\" above"),
};
let timestamp = match ts_column.get(row) {
Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
_ => unreachable!("checked by the \"ensure\" above"),
};
let sample = Sample { value, timestamp };
timeseries.samples.push(sample);
}
Ok(timeseries_map.into_values().collect())
}
pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> {
let mut writers: HashMap<String, LinesWriter> = HashMap::new();
for timeseries in &request.timeseries {
let table_name = timeseries
.labels
.iter()
.find(|label| {
// The metric name is a special label
label.name == METRIC_NAME_LABEL
})
.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?
.value
.clone();
let writer = writers
.entry(table_name)
.or_insert_with(|| LinesWriter::with_lines(16));
// For each sample
for sample in &timeseries.samples {
// Insert labels first.
for label in &timeseries.labels {
// The metric name is a special label
if label.name == METRIC_NAME_LABEL {
continue;
}
writer
.write_tag(&label.name, &label.value)
.context(error::PromSeriesWriteSnafu)?;
}
// Insert sample timestamp.
writer
.write_ts(
TIMESTAMP_COLUMN_NAME,
(sample.timestamp, Precision::Millisecond),
)
.context(error::PromSeriesWriteSnafu)?;
// Insert sample value.
writer
.write_f64(FIELD_COLUMN_NAME, sample.value)
.context(error::PromSeriesWriteSnafu)?;
writer.commit();
}
}
let mut sample_counts = 0;
let inserts = writers
.into_iter()
.map(|(table_name, writer)| {
let (columns, row_count) = writer.finish();
sample_counts += row_count as usize;
GrpcInsertRequest {
table_name,
region_number: 0,
columns,
row_count,
}
})
.collect();
Ok((InsertRequests { inserts }, sample_counts))
}
#[inline]
pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
let mut decoder = Decoder::new();
decoder
.decompress_vec(buf)
.context(error::DecompressPromRemoteRequestSnafu)
}
#[inline]
pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
let mut encoder = Encoder::new();
encoder
.compress_vec(buf)
.context(error::DecompressPromRemoteRequestSnafu)
}
/// Mock timeseries for test, it is both used in servers and frontend crate
/// So we present it here
pub fn mock_timeseries() -> Vec<TimeSeries> {
vec![
TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
new_label("job".to_string(), "spark".to_string()),
],
samples: vec![
Sample {
value: 1.0f64,
timestamp: 1000,
},
Sample {
value: 2.0f64,
timestamp: 2000,
},
],
..Default::default()
},
TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
new_label("instance".to_string(), "test_host1".to_string()),
new_label("idc".to_string(), "z001".to_string()),
],
samples: vec![
Sample {
value: 3.0f64,
timestamp: 1000,
},
Sample {
value: 4.0f64,
timestamp: 2000,
},
],
..Default::default()
},
TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
new_label("idc".to_string(), "z002".to_string()),
new_label("app".to_string(), "biz".to_string()),
],
samples: vec![
Sample {
value: 5.0f64,
timestamp: 1000,
},
Sample {
value: 6.0f64,
timestamp: 2000,
},
Sample {
value: 7.0f64,
timestamp: 3000,
},
],
..Default::default()
},
]
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::prom_store::remote::LabelMatcher;
use datafusion::prelude::SessionContext;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use table::table::adapter::DfTableProviderAdapter;
use table::test_util::MemTable;
use super::*;
const EQ_TYPE: i32 = MatcherType::Eq as i32;
const NEQ_TYPE: i32 = MatcherType::Neq as i32;
const RE_TYPE: i32 = MatcherType::Re as i32;
#[test]
fn test_table_name() {
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![],
..Default::default()
};
let err = table_name(&q).unwrap_err();
assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: METRIC_NAME_LABEL.to_string(),
value: "test".to_string(),
r#type: EQ_TYPE,
}],
..Default::default()
};
assert_eq!("test", table_name(&q).unwrap());
}
#[test]
fn test_query_to_plan() {
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: METRIC_NAME_LABEL.to_string(),
value: "test".to_string(),
r#type: EQ_TYPE,
}],
..Default::default()
};
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
TIMESTAMP_COLUMN_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(
FIELD_COLUMN_NAME,
ConcreteDataType::float64_datatype(),
true,
),
ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("job", ConcreteDataType::string_datatype(), true),
]));
let recordbatch = RecordBatch::new(
schema,
vec![
Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
Arc::new(StringVector::from(vec!["host1"])) as _,
Arc::new(StringVector::from(vec!["job"])) as _,
],
)
.unwrap();
let ctx = SessionContext::new();
let table = Arc::new(MemTable::new("test", recordbatch));
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
let dataframe = ctx.read_table(table_provider.clone()).unwrap();
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());
assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000)\n TableScan: ?table?", display_string);
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![
LabelMatcher {
name: METRIC_NAME_LABEL.to_string(),
value: "test".to_string(),
r#type: EQ_TYPE,
},
LabelMatcher {
name: "job".to_string(),
value: "*prom*".to_string(),
r#type: RE_TYPE,
},
LabelMatcher {
name: "instance".to_string(),
value: "localhost".to_string(),
r#type: NEQ_TYPE,
},
],
..Default::default()
};
let dataframe = ctx.read_table(table_provider).unwrap();
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());
assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
}
#[test]
fn test_write_request_to_insert_exprs() {
let write_request = WriteRequest {
timeseries: mock_timeseries(),
..Default::default()
};
let mut exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts;
exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
assert_eq!(3, exprs.len());
assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name);
let expr = exprs.get_mut(0).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns;
let row_count = expr.row_count;
assert_eq!(2, row_count);
assert_eq!(columns.len(), 3);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000]
);
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![1.0, 2.0]
);
assert_eq!(columns[2].column_name, "job");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["spark", "spark"]
);
let expr = exprs.get_mut(1).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns;
let row_count = expr.row_count;
assert_eq!(2, row_count);
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000]
);
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![3.0, 4.0]
);
assert_eq!(columns[2].column_name, "idc");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["z001", "z001"]
);
assert_eq!(columns[3].column_name, "instance");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
let expr = exprs.get_mut(2).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns;
let row_count = expr.row_count;
assert_eq!(3, row_count);
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, "app");
assert_eq!(
columns[0].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"]
);
assert_eq!(columns[1].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000, 3000]
);
assert_eq!(columns[2].column_name, FIELD_COLUMN_NAME);
assert_eq!(
columns[2].values.as_ref().unwrap().f64_values,
vec![5.0, 6.0, 7.0]
);
assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["z002", "z002", "z002"]
);
}
#[test]
fn test_recordbatches_to_timeseries() {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(
TIMESTAMP_COLUMN_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
ColumnSchema::new(
FIELD_COLUMN_NAME,
ConcreteDataType::float64_datatype(),
true,
),
ColumnSchema::new("instance", ConcreteDataType::string_datatype(), true),
]));
let recordbatches = RecordBatches::try_new(
schema.clone(),
vec![
RecordBatch::new(
schema.clone(),
vec![
Arc::new(TimestampMillisecondVector::from_vec(vec![1000])) as _,
Arc::new(Float64Vector::from_vec(vec![3.0])) as _,
Arc::new(StringVector::from(vec!["host1"])) as _,
],
)
.unwrap(),
RecordBatch::new(
schema,
vec![
Arc::new(TimestampMillisecondVector::from_vec(vec![2000])) as _,
Arc::new(Float64Vector::from_vec(vec![7.0])) as _,
Arc::new(StringVector::from(vec!["host2"])) as _,
],
)
.unwrap(),
],
)
.unwrap();
let timeseries = recordbatches_to_timeseries("metric1", recordbatches).unwrap();
assert_eq!(2, timeseries.len());
assert_eq!(
vec![
Label {
name: METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "instance".to_string(),
value: "host1".to_string(),
},
],
timeseries[0].labels
);
assert_eq!(
timeseries[0].samples,
vec![Sample {
value: 3.0,
timestamp: 1000,
}]
);
assert_eq!(
vec![
Label {
name: METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "instance".to_string(),
value: "host2".to_string(),
},
],
timeseries[1].labels
);
assert_eq!(
timeseries[1].samples,
vec![Sample {
value: 7.0,
timestamp: 2000,
}]
);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -28,7 +28,7 @@ pub mod sql;
use std::collections::HashMap;
use std::sync::Arc;
use api::prometheus::remote::{ReadRequest, WriteRequest};
use api::prom_store::remote::{ReadRequest, WriteRequest};
use async_trait::async_trait;
use common_query::Output;
use session::context::QueryContextRef;
@@ -36,11 +36,11 @@ use session::context::QueryContextRef;
use crate::error::Result;
use crate::influxdb::InfluxdbRequest;
use crate::opentsdb::codec::DataPoint;
use crate::prometheus::Metrics;
use crate::prom_store::Metrics;
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
pub type PrometheusProtocolHandlerRef = Arc<dyn PrometheusProtocolHandler + Send + Sync>;
pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
#[async_trait]
@@ -68,18 +68,18 @@ pub trait OpentsdbProtocolHandler {
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
}
pub struct PrometheusResponse {
pub struct PromStoreResponse {
pub content_type: String,
pub content_encoding: String,
pub body: Vec<u8>,
}
#[async_trait]
pub trait PrometheusProtocolHandler {
pub trait PromStoreProtocolHandler {
/// Handling prometheus remote write requests
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()>;
/// Handling prometheus remote read requests
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PrometheusResponse>;
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
/// Handling push gateway requests
async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
}

View File

@@ -17,4 +17,4 @@ mod http_handler_test;
mod http_test;
mod influxdb_test;
mod opentsdb_test;
mod prometheus_test;
mod prom_store_test;

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::prometheus::remote::{
use api::prom_store::remote::{
LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest,
};
use api::v1::greptime_request::Request;
@@ -29,11 +29,11 @@ use query::plan::LogicalPlan;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::prometheus;
use servers::prometheus::{snappy_compress, Metrics};
use servers::prom_store;
use servers::prom_store::{snappy_compress, Metrics};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use session::context::QueryContextRef;
use tokio::sync::mpsc;
@@ -55,7 +55,7 @@ impl GrpcQueryHandler for DummyInstance {
}
#[async_trait]
impl PrometheusProtocolHandler for DummyInstance {
impl PromStoreProtocolHandler for DummyInstance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> {
let _ = self
.tx
@@ -64,7 +64,7 @@ impl PrometheusProtocolHandler for DummyInstance {
Ok(())
}
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PrometheusResponse> {
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse> {
let _ = self
.tx
.send((ctx.current_schema(), request.encode_to_vec()))
@@ -72,11 +72,11 @@ impl PrometheusProtocolHandler for DummyInstance {
let response = ReadResponse {
results: vec![QueryResult {
timeseries: prometheus::mock_timeseries(),
timeseries: prom_store::mock_timeseries(),
}],
};
Ok(PrometheusResponse {
Ok(PromStoreResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
body: response.encode_to_vec(),
@@ -148,7 +148,7 @@ async fn test_prometheus_remote_write_read() {
let client = TestClient::new(app);
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
timeseries: prom_store::mock_timeseries(),
..Default::default()
};
@@ -174,7 +174,7 @@ async fn test_prometheus_remote_write_read() {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
name: prom_store::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
r#type: 0,
}],
@@ -204,7 +204,7 @@ async fn test_prometheus_remote_write_read() {
assert_eq!(response.results.len(), 1);
assert_eq!(
response.results[0].timeseries,
prometheus::mock_timeseries()
prom_store::mock_timeseries()
);
// Read from public database

View File

@@ -18,7 +18,7 @@ mod grpc;
mod influxdb;
mod instance;
mod opentsdb;
mod prometheus;
mod prom_store;
pub mod test_util;
// TODO(LFC): Refactor: move instance structs out of mod "tests", like the `GreptimeDbCluster`.

View File

@@ -16,39 +16,39 @@
mod tests {
use std::sync::Arc;
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{
Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, WriteRequest,
};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use frontend::instance::Instance;
use prost::Message;
use servers::prometheus;
use servers::prom_store;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::PrometheusProtocolHandler;
use servers::query_handler::PromStoreProtocolHandler;
use session::context::QueryContext;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_prometheus_remote_rw() {
async fn test_standalone_prom_store_remote_rw() {
let standalone =
tests::create_standalone_instance("test_standalone_prometheus_remote_rw").await;
tests::create_standalone_instance("test_standalone_prom_store_remote_rw").await;
let instance = &standalone.instance;
test_prometheus_remote_rw(instance).await;
test_prom_store_remote_rw(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_prometheus_remote_rw() {
async fn test_distributed_prom_store_remote_rw() {
let distributed =
tests::create_distributed_instance("test_distributed_prometheus_remote_rw").await;
test_prometheus_remote_rw(&distributed.frontend()).await;
tests::create_distributed_instance("test_distributed_prom_store_remote_rw").await;
test_prom_store_remote_rw(&distributed.frontend()).await;
}
async fn test_prometheus_remote_rw(instance: &Arc<Instance>) {
async fn test_prom_store_remote_rw(instance: &Arc<Instance>) {
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
timeseries: prom_store::mock_timeseries(),
..Default::default()
};
@@ -73,7 +73,7 @@ mod tests {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
name: prom_store::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
r#type: 0,
}],
@@ -84,7 +84,7 @@ mod tests {
end_timestamp_ms: 3000,
matchers: vec![
LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
name: prom_store::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
r#type: 0,
},
@@ -103,7 +103,7 @@ mod tests {
let resp = instance.read(read_request, ctx).await.unwrap();
assert_eq!(resp.content_type, "application/x-protobuf");
assert_eq!(resp.content_encoding, "snappy");
let body = prometheus::snappy_decompress(&resp.body).unwrap();
let body = prom_store::snappy_decompress(&resp.body).unwrap();
let read_response = ReadResponse::decode(&body[..]).unwrap();
let query_results = read_response.results;
assert_eq!(2, query_results.len());
@@ -114,7 +114,7 @@ mod tests {
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
name: prom_store::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
@@ -145,7 +145,7 @@ mod tests {
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
name: prom_store::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
},
Label {

View File

@@ -53,7 +53,7 @@ use servers::http::{HttpOptions, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::postgres::PostgresServer;
use servers::prom::PromServer;
use servers::prometheus::PrometheusServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
@@ -522,7 +522,7 @@ pub async fn setup_test_prom_app_with_frontend(
.with_prom_handler(frontend_ref.clone())
.with_greptime_config_options(opts.to_toml_string())
.build();
let prom_server = PromServer::create_server(frontend_ref);
let prom_server = PrometheusServer::create_server(frontend_ref);
let app = http_server.build(http_server.make_app());
let app = app.merge(prom_server.make_app());
(app, guard)

View File

@@ -23,7 +23,7 @@ use api::v1::{
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_query::Output;
use servers::prom::{PromData, PromJsonResponse, PromResponse, PromSeries};
use servers::prometheus::{PromData, PromSeries, PrometheusJsonResponse, PrometheusResponse};
use servers::server::Server;
use tests_integration::test_util::{setup_grpc_server, StorageType};
@@ -379,10 +379,11 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
.unwrap()
.into_inner()
.body;
let instant_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
let instant_query_result =
serde_json::from_slice::<PrometheusJsonResponse>(&json_bytes).unwrap();
let expected = PrometheusJsonResponse {
status: "success".to_string(),
data: PromResponse::PromData(PromData {
data: PrometheusResponse::PromData(PromData {
result_type: "vector".to_string(),
result: vec![
PromSeries {
@@ -430,10 +431,10 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
.unwrap()
.into_inner()
.body;
let range_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
let range_query_result = serde_json::from_slice::<PrometheusJsonResponse>(&json_bytes).unwrap();
let expected = PrometheusJsonResponse {
status: "success".to_string(),
data: PromResponse::PromData(PromData {
data: PrometheusResponse::PromData(PromData {
result_type: "matrix".to_string(),
result: vec![
PromSeries {
@@ -481,10 +482,10 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
.unwrap()
.into_inner()
.body;
let range_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
let range_query_result = serde_json::from_slice::<PrometheusJsonResponse>(&json_bytes).unwrap();
let expected = PrometheusJsonResponse {
status: "success".to_string(),
data: PromResponse::PromData(PromData {
data: PrometheusResponse::PromData(PromData {
result_type: "matrix".to_string(),
result: vec![],
}),

View File

@@ -18,7 +18,7 @@ use common_error::status_code::StatusCode as ErrorCode;
use serde_json::json;
use servers::http::handler::HealthResponse;
use servers::http::{JsonOutput, JsonResponse};
use servers::prom::{PromJsonResponse, PromResponse};
use servers::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend, setup_test_prom_app_with_frontend,
StorageType,
@@ -329,12 +329,14 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PromJsonResponse>(&res.text().await).unwrap();
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PromResponse>(json!(["__name__", "cpu", "host", "memory", "ts"]))
.unwrap()
serde_json::from_value::<PrometheusResponse>(json!([
"__name__", "cpu", "host", "memory", "ts"
]))
.unwrap()
);
// labels query with multiple match[] params
@@ -356,11 +358,11 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PromJsonResponse>(&res.text().await).unwrap();
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PromResponse>(json!(
serde_json::from_value::<PrometheusResponse>(json!(
[{"__name__" : "demo","ts":"1970-01-01 00:00:00+0000","cpu":"1.1","host":"host1","memory":"2.2"}]
))
.unwrap()
@@ -377,7 +379,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
// should return error if there is no match[]
let res = client.get("/api/v1/label/instance/values").send().await;
assert_eq!(res.status(), StatusCode::OK);
let prom_resp = res.json::<PromJsonResponse>().await;
let prom_resp = res.json::<PrometheusJsonResponse>().await;
assert_eq!(prom_resp.status, "error");
assert!(prom_resp.error.is_some_and(|err| !err.is_empty()));
assert!(prom_resp.error_type.is_some_and(|err| !err.is_empty()));
@@ -388,11 +390,11 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PromJsonResponse>(&res.text().await).unwrap();
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PromResponse>(json!(["host1", "host2"])).unwrap()
serde_json::from_value::<PrometheusResponse>(json!(["host1", "host2"])).unwrap()
);
// multiple match[]
@@ -401,7 +403,7 @@ pub async fn test_prom_http_api(store_type: StorageType) {
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let prom_resp = res.json::<PromJsonResponse>().await;
let prom_resp = res.json::<PrometheusJsonResponse>().await;
assert_eq!(prom_resp.status, "success");
assert!(prom_resp.error.is_none());
assert!(prom_resp.error_type.is_none());