diff --git a/Cargo.lock b/Cargo.lock index 110f5f2ce5..98a6e213a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,7 +209,7 @@ dependencies = [ "greptime-proto", "prost", "snafu", - "tonic", + "tonic 0.9.2", "tonic-build", ] @@ -396,7 +396,7 @@ dependencies = [ "paste", "prost", "tokio", - "tonic", + "tonic 0.9.2", ] [[package]] @@ -705,6 +705,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", + "headers", "http", "http-body", "hyper", @@ -1553,7 +1554,7 @@ dependencies = [ "substrait 0.7.5", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "tracing", "tracing-subscriber", ] @@ -1780,7 +1781,7 @@ dependencies = [ "rand", "snafu", "tokio", - "tonic", + "tonic 0.9.2", "tower", ] @@ -1948,7 +1949,7 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-util", "once_cell", - "opentelemetry", + "opentelemetry 0.17.0", "opentelemetry-jaeger", "parking_lot 0.12.1", "serde", @@ -2034,7 +2035,7 @@ checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" dependencies = [ "prost", "prost-types", - "tonic", + "tonic 0.9.2", "tracing-core", ] @@ -2056,7 +2057,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "tracing", "tracing-core", "tracing-subscriber", @@ -2676,7 +2677,7 @@ dependencies = [ "tokio", "tokio-stream", "toml 0.7.6", - "tonic", + "tonic 0.9.2", "tower", "tower-http", "url", @@ -3068,7 +3069,7 @@ dependencies = [ "prost", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "tonic-build", "tower", "tower-service", @@ -3281,6 +3282,7 @@ dependencies = [ "moka 0.9.7", "object-store", "openmetrics-parser", + "opentelemetry-proto", "partition", "prost", "query", @@ -3299,7 +3301,7 @@ dependencies = [ "table", "tokio", "toml 0.7.6", - "tonic", + "tonic 0.9.2", "tower", "uuid", ] @@ -4143,7 +4145,7 @@ dependencies = [ "prost", "serde", "serde_json", - "tonic", + "tonic 0.9.2", "tonic-build", ] @@ -4228,6 +4230,31 @@ dependencies = [ "num-traits", ] +[[package]] +name = "headers" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +dependencies = [ + "base64 0.13.1", + "bitflags 1.3.2", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.4.1" @@ -5235,7 +5262,7 @@ dependencies = [ "table", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "tower", "tracing", "tracing-subscriber", @@ -5286,7 +5313,7 @@ dependencies = [ "tokio", "tokio-stream", "toml 0.7.6", - "tonic", + "tonic 0.9.2", "tower", "tracing", "tracing-subscriber", @@ -6090,6 +6117,16 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + [[package]] name = "opentelemetry-jaeger" version = "0.16.0" @@ -6098,20 +6135,69 @@ checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229" dependencies = [ "async-trait", "lazy_static", - "opentelemetry", + "opentelemetry 0.17.0", "opentelemetry-semantic-conventions", "thiserror", "thrift 0.15.0", "tokio", ] +[[package]] +name = "opentelemetry-proto" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c" +dependencies = [ + "futures", + "futures-util", + "opentelemetry 0.19.0", + "prost", + "tonic 0.8.3", +] + [[package]] name = "opentelemetry-semantic-conventions" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd" dependencies = [ - "opentelemetry", + "opentelemetry 0.17.0", +] + +[[package]] +name = "opentelemetry_api" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror", ] [[package]] @@ -8713,6 +8799,7 @@ dependencies = [ "derive_builder 0.12.0", "digest", "futures", + "headers", "hex", "hostname", "http-body", @@ -8728,6 +8815,7 @@ dependencies = [ "once_cell", "openmetrics-parser", "opensrv-mysql", + "opentelemetry-proto", "parking_lot 0.12.1", "pgwire", "pin-project", @@ -8759,7 +8847,7 @@ dependencies = [ "tokio-rustls 0.24.0", "tokio-stream", "tokio-test", - "tonic", + "tonic 0.9.2", "tonic-reflection", "tower", "tower-http", @@ -9317,7 +9405,7 @@ dependencies = [ "table", "tokio", "tokio-util", - "tonic", + "tonic 0.9.2", "tonic-build", "uuid", ] @@ -9776,7 +9864,7 @@ dependencies = [ "table", "tempfile", "tokio", - "tonic", + "tonic 0.9.2", "tower", "uuid", ] @@ -10168,6 +10256,38 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tonic" version = "0.9.2" @@ -10222,7 +10342,7 @@ dependencies = [ "prost-types", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", ] [[package]] @@ -10381,7 +10501,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbbe89715c1dbbb790059e2565353978564924ee85017b5fff365c872ff6721f" dependencies = [ "once_cell", - "opentelemetry", + "opentelemetry 0.17.0", "tracing", "tracing-core", "tracing-log", @@ -10805,6 +10925,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9" + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index c0bda55ba5..f6035e3887 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "bec16e50c9322758111f73e42fb5d377c7235e05" } itertools = "0.10" lazy_static = "1.4" +opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] } parquet = "40.0" paste = "1.0" prost = "0.11" diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 610603bae3..9293a87688 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -47,6 +47,7 @@ mito = { path = "../mito", features = ["test"] } moka = { version = "0.9", features = ["future"] } object-store = { path = "../object-store" } openmetrics-parser = "0.4" +opentelemetry-proto.workspace = true partition = { path = "../partition" } prost.workspace = true query = { path = "../query" } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 5a60e290ac..91a17a01ca 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -20,8 +20,8 @@ use servers::http::HttpOptions; use servers::Mode; use crate::service_config::{ - GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, - PrometheusOptions, + GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions, + PromStoreOptions, PrometheusOptions, }; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -37,6 +37,7 @@ pub struct FrontendOptions { pub influxdb_options: Option, pub prom_store_options: Option, pub prometheus_options: Option, + pub otlp_options: Option, pub meta_client_options: Option, pub logging: LoggingOptions, } @@ -54,6 +55,7 @@ impl Default for FrontendOptions { influxdb_options: Some(InfluxdbOptions::default()), prom_store_options: Some(PromStoreOptions::default()), prometheus_options: Some(PrometheusOptions::default()), + otlp_options: Some(OtlpOptions::default()), meta_client_options: None, logging: LoggingOptions::default(), } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1a907bff19..a9cfc31cad 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -16,6 +16,7 @@ pub mod distributed; mod grpc; mod influxdb; mod opentsdb; +mod otlp; mod prom_store; mod script; mod standalone; @@ -65,7 +66,8 @@ use servers::prometheus::PrometheusHandler; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ - InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PromStoreProtocolHandler, ScriptHandler, + InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler, + PromStoreProtocolHandler, ScriptHandler, }; use session::context::QueryContextRef; use snafu::prelude::*; @@ -97,6 +99,7 @@ pub trait FrontendInstance: + OpentsdbProtocolHandler + InfluxdbLineProtocolHandler + PromStoreProtocolHandler + + OpenTelemetryProtocolHandler + ScriptHandler + PrometheusHandler + Send diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs new file mode 100644 index 0000000000..70ce1642d4 --- /dev/null +++ b/src/frontend/src/instance/otlp.rs @@ -0,0 +1,52 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use metrics::counter; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, ExportMetricsServiceResponse, +}; +use servers::error::{self, Result as ServerResult}; +use servers::otlp; +use servers::query_handler::OpenTelemetryProtocolHandler; +use session::context::QueryContextRef; +use snafu::ResultExt; + +use crate::instance::Instance; +use crate::metrics::OTLP_METRICS_ROWS; + +#[async_trait] +impl OpenTelemetryProtocolHandler for Instance { + async fn metrics( + &self, + request: ExportMetricsServiceRequest, + ctx: QueryContextRef, + ) -> ServerResult { + let (requests, rows) = otlp::to_grpc_insert_requests(request)?; + let _ = self + .handle_inserts(requests, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + + counter!(OTLP_METRICS_ROWS, rows as u64); + + let resp = ExportMetricsServiceResponse { + // TODO(sunng87): add support for partial_success in future patch + partial_success: None, + }; + Ok(resp) + } +} diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 2ba922fa24..31c5de4a64 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -24,3 +24,5 @@ pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows"; /// The samples count of Prometheus remote write. pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples"; + +pub const OTLP_METRICS_ROWS: &str = "frontend.otlp.metrics.rows"; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index f30e20c149..b68f0129a5 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -38,7 +38,7 @@ use crate::error::Error::StartServer; use crate::error::{self, Result}; use crate::frontend::FrontendOptions; use crate::instance::FrontendInstance; -use crate::service_config::{InfluxdbOptions, PromStoreOptions}; +use crate::service_config::{InfluxdbOptions, OtlpOptions, PromStoreOptions}; pub(crate) struct Services; @@ -178,6 +178,10 @@ impl Services { let _ = http_server_builder.with_prom_handler(instance.clone()); } + if matches!(opts.otlp_options, Some(OtlpOptions { enable: true })) { + let _ = http_server_builder.with_otlp_handler(instance.clone()); + } + let http_server = http_server_builder .with_metrics_handler(MetricsHandler) .with_script_handler(instance.clone()) diff --git a/src/frontend/src/service_config.rs b/src/frontend/src/service_config.rs index 1c4a6f840f..6102b0c37d 100644 --- a/src/frontend/src/service_config.rs +++ b/src/frontend/src/service_config.rs @@ -16,6 +16,7 @@ pub mod grpc; pub mod influxdb; pub mod mysql; pub mod opentsdb; +pub mod otlp; pub mod postgres; pub mod prom_store; pub mod prometheus; @@ -24,6 +25,7 @@ pub use grpc::GrpcOptions; pub use influxdb::InfluxdbOptions; pub use mysql::MysqlOptions; pub use opentsdb::OpentsdbOptions; +pub use otlp::OtlpOptions; pub use postgres::PostgresOptions; pub use prom_store::PromStoreOptions; pub use prometheus::PrometheusOptions; diff --git a/src/frontend/src/service_config/otlp.rs b/src/frontend/src/service_config/otlp.rs new file mode 100644 index 0000000000..60bb98243b --- /dev/null +++ b/src/frontend/src/service_config/otlp.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OtlpOptions { + pub enable: bool, +} + +impl Default for OtlpOptions { + fn default() -> Self { + Self { enable: true } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_otlp_options() { + let default = OtlpOptions::default(); + assert!(default.enable); + } +} diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b6effcf237..7c6d6d8480 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -14,7 +14,7 @@ aide = { version = "0.9", features = ["axum"] } api = { path = "../api" } arrow-flight.workspace = true async-trait = "0.1" -axum = "0.6" +axum = { version = "0.6", features = ["headers"] } axum-macros = "0.3" base64 = "0.13" bytes = "1.2" @@ -40,6 +40,7 @@ datatypes = { path = "../datatypes" } derive_builder = "0.12" digest = "0.10" futures = "0.3" +headers = "0.3" hex = { version = "0.4" } hostname = "0.3.1" http-body = "0.4" @@ -55,6 +56,7 @@ num_cpus = "1.13" once_cell = "1.16" openmetrics-parser = "0.4" opensrv-mysql = "0.4" +opentelemetry-proto.workspace = true parking_lot = "0.12" pgwire = "0.15" pin-project = "1.0" @@ -90,6 +92,7 @@ tower-http = { version = "0.3", features = ["full"] } [dev-dependencies] axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } +catalog = { path = "../catalog", features = ["testing"] } client = { path = "../client" } common-base = { path = "../common/base" } common-test-util = { path = "../common/test-util" } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index cf6d974fc5..ff85e70f96 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -136,6 +136,12 @@ pub enum Error { source: common_grpc::error::Error, }, + #[snafu(display("Failed to write OTLP metrics, source: {}", source))] + OtlpMetricsWrite { + location: Location, + source: common_grpc::error::Error, + }, + #[snafu(display("Failed to convert time precision, name: {}", name))] TimePrecision { name: String, location: Location }, @@ -163,6 +169,12 @@ pub enum Error { source: prost::DecodeError, }, + #[snafu(display("Failed to decode OTLP request, source: {}", source))] + DecodeOtlpRequest { + location: Location, + source: prost::DecodeError, + }, + #[snafu(display("Failed to decompress prometheus remote request, source: {}", source))] DecompressPromRemoteRequest { location: Location, @@ -356,6 +368,7 @@ impl ErrorExt for Error { | InvalidOpentsdbLine { .. } | InvalidOpentsdbJsonRequest { .. } | DecodePromRemoteRequest { .. } + | DecodeOtlpRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } | InvalidFlightTicket { .. } @@ -364,9 +377,9 @@ impl ErrorExt for Error { | PreparedStmtTypeMismatch { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, - InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } => { - source.status_code() - } + InfluxdbLinesWrite { source, .. } + | PromSeriesWrite { source, .. } + | OtlpMetricsWrite { source, .. } => source.status_code(), Hyper { .. } => StatusCode::Unknown, TlsRequired { .. } => StatusCode::Unknown, @@ -478,6 +491,7 @@ impl IntoResponse for Error { | Error::InvalidOpentsdbLine { .. } | Error::InvalidOpentsdbJsonRequest { .. } | Error::DecodePromRemoteRequest { .. } + | Error::DecodeOtlpRequest { .. } | Error::DecompressPromRemoteRequest { .. } | Error::InvalidPromRemoteRequest { .. } | Error::InvalidQuery { .. } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 87637dc078..f84fcc4ee8 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -15,9 +15,11 @@ mod admin; pub mod authorize; pub mod handler; +pub mod header; pub mod influxdb; pub mod mem_prof; pub mod opentsdb; +pub mod otlp; mod pprof; pub mod prom_store; pub mod script; @@ -73,8 +75,8 @@ use crate::metrics_handler::MetricsHandler; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ - InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PromStoreProtocolHandlerRef, - ScriptHandlerRef, + InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, + PromStoreProtocolHandlerRef, ScriptHandlerRef, }; use crate::server::Server; @@ -119,6 +121,7 @@ pub struct HttpServer { influxdb_handler: Option, opentsdb_handler: Option, prom_handler: Option, + otlp_handler: Option, script_handler: Option, shutdown_tx: Mutex>>, user_provider: Option, @@ -399,6 +402,7 @@ impl HttpServerBuilder { opentsdb_handler: None, influxdb_handler: None, prom_handler: None, + otlp_handler: None, user_provider: None, script_handler: None, metrics_handler: None, @@ -439,6 +443,11 @@ impl HttpServerBuilder { self } + pub fn with_otlp_handler(&mut self, handler: OpenTelemetryProtocolHandlerRef) -> &mut Self { + let _ = self.inner.otlp_handler.get_or_insert(handler); + self + } + pub fn with_user_provider(&mut self, user_provider: UserProviderRef) -> &mut Self { let _ = self.inner.user_provider.get_or_insert(user_provider); self @@ -521,6 +530,13 @@ impl HttpServer { ); } + if let Some(otlp_handler) = self.otlp_handler.clone() { + router = router.nest( + &format!("/{HTTP_API_VERSION}/otlp"), + self.route_otlp(otlp_handler), + ); + } + if let Some(metrics_handler) = self.metrics_handler { router = router.nest("", self.route_metrics(metrics_handler)); } @@ -647,6 +663,12 @@ impl HttpServer { .with_state(opentsdb_handler) } + fn route_otlp(&self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router { + Router::new() + .route("/v1/metrics", routing::post(otlp::metrics)) + .with_state(otlp_handler) + } + fn route_admin(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router { Router::new() .route("/flush", routing::post(flush)) diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 7a0bad87b6..49e4e32e7f 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -19,6 +19,7 @@ use axum::response::Response; use common_error::ext::ErrorExt; use common_telemetry::warn; use futures::future::BoxFuture; +use headers::Header; use http_body::Body; use metrics::increment_counter; use secrecy::SecretString; @@ -26,6 +27,7 @@ use session::context::UserInfo; use snafu::{ensure, IntoError, OptionExt, ResultExt}; use tower_http::auth::AsyncAuthorizeRequest; +use super::header::GreptimeDbName; use super::PUBLIC_APIS; use crate::auth::Error::IllegalParam; use crate::auth::{Identity, IllegalParamSnafu, UserProviderRef}; @@ -141,14 +143,22 @@ where fn extract_catalog_and_schema( request: &Request, ) -> crate::auth::Result<(&str, &str)> { - // try get database name - let query = request.uri().query().unwrap_or_default(); - let input_database = extract_db_from_query(query).context(IllegalParamSnafu { - msg: "db not provided or corrupted", - })?; + // parse database from header + let dbname = request + .headers() + .get(GreptimeDbName::name()) + // eat this invalid ascii error and give user the final IllegalParam error + .and_then(|header| header.to_str().ok()) + .or_else(|| { + let query = request.uri().query().unwrap_or_default(); + extract_db_from_query(query) + }) + .context(IllegalParamSnafu { + msg: "db not provided or corrupted", + })?; Ok(crate::parse_catalog_and_schema_from_client_database_name( - input_database, + dbname, )) } @@ -391,6 +401,19 @@ mod tests { Ok(req.body(()).unwrap()) } + #[test] + fn test_db_name_header() { + let http_api_version = crate::http::HTTP_API_VERSION; + let req = Request::builder() + .uri(format!("http://localhost/{http_api_version}/sql").as_str()) + .header(GreptimeDbName::name(), "greptime-tomcat") + .body(()) + .unwrap(); + + let db = extract_catalog_and_schema(&req).unwrap(); + assert_eq!(db, ("greptime", "tomcat")); + } + #[test] fn test_extract_db() { assert_matches!(extract_db_from_query(""), None); diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs new file mode 100644 index 0000000000..04db669550 --- /dev/null +++ b/src/servers/src/http/header.rs @@ -0,0 +1,52 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use headers::{Header, HeaderName, HeaderValue}; + +pub static GREPTIME_DB_NAME_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-db-name"); + +pub struct GreptimeDbName(Option); + +impl Header for GreptimeDbName { + fn name() -> &'static HeaderName { + &GREPTIME_DB_NAME_HEADER_NAME + } + + fn decode<'i, I>(values: &mut I) -> Result + where + Self: Sized, + I: Iterator, + { + if let Some(value) = values.next() { + let str_value = value.to_str().map_err(|_| headers::Error::invalid())?; + Ok(Self(Some(str_value.to_owned()))) + } else { + Ok(Self(None)) + } + } + + fn encode>(&self, values: &mut E) { + if let Some(name) = &self.0 { + if let Ok(value) = HeaderValue::from_str(name) { + values.extend(std::iter::once(value)); + } + } + } +} + +impl GreptimeDbName { + pub fn value(&self) -> Option<&String> { + self.0.as_ref() + } +} diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs new file mode 100644 index 0000000000..5ddc34806c --- /dev/null +++ b/src/servers/src/http/otlp.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use axum::extract::{RawBody, State}; +use axum::http::header; +use axum::response::IntoResponse; +use axum::TypedHeader; +use common_telemetry::timer; +use hyper::Body; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, ExportMetricsServiceResponse, +}; +use prost::Message; +use session::context::QueryContext; +use snafu::prelude::*; + +use crate::error::{self, Result}; +use crate::http::header::GreptimeDbName; +use crate::parse_catalog_and_schema_from_client_database_name; +use crate::query_handler::OpenTelemetryProtocolHandlerRef; + +#[axum_macros::debug_handler] +pub async fn metrics( + State(handler): State, + TypedHeader(db): TypedHeader, + RawBody(body): RawBody, +) -> Result { + let ctx = if let Some(db) = db.value() { + let (catalog, schema) = parse_catalog_and_schema_from_client_database_name(db); + Arc::new(QueryContext::with(catalog, schema)) + } else { + QueryContext::arc() + }; + let _timer = timer!( + crate::metrics::METRIC_HTTP_OPENTELEMETRY_ELAPSED, + &[(crate::metrics::METRIC_DB_LABEL, ctx.get_db_string())] + ); + let request = parse_body(body).await?; + handler.metrics(request, ctx).await.map(OtlpResponse) +} + +async fn parse_body(body: Body) -> Result { + hyper::body::to_bytes(body) + .await + .context(error::HyperSnafu) + .and_then(|buf| { + ExportMetricsServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu) + }) +} + +pub struct OtlpResponse(ExportMetricsServiceResponse); + +impl IntoResponse for OtlpResponse { + fn into_response(self) -> axum::response::Response { + ( + [(header::CONTENT_TYPE, "application/x-protobuf")], + self.0.encode_to_vec(), + ) + .into_response() + } +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 1d47739bdd..b77bb9c941 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -33,6 +33,7 @@ mod metrics; pub mod metrics_handler; pub mod mysql; pub mod opentsdb; +pub mod otlp; pub mod postgres; pub mod prom_store; pub mod prometheus; diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 6795cde38f..cbeaaa9ab8 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -43,6 +43,7 @@ pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influx pub(crate) const METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: &str = "servers.http_prometheus_write_elapsed"; pub(crate) const METRIC_HTTP_PROM_STORE_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed"; +pub(crate) const METRIC_HTTP_OPENTELEMETRY_ELAPSED: &str = "servers.http_otlp_elapsed"; pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str = "servers.opentsdb_line_write_elapsed"; diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs new file mode 100644 index 0000000000..e65f33f5a3 --- /dev/null +++ b/src/servers/src/otlp.rs @@ -0,0 +1,457 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::{InsertRequest, InsertRequests}; +use common_grpc::writer::{LinesWriter, Precision}; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; +use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *}; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +const GREPTIME_TIMESTAMP: &str = "greptime_timestamp"; +const GREPTIME_VALUE: &str = "greptime_value"; + +/// Normalize otlp instrumentation, metric and attribute names +/// +/// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax +/// - since the name are case-insensitive, we transform them to lowercase for +/// better sql usability +/// - replace `.` and `-` with `_` +fn normalize_otlp_name(name: &str) -> String { + name.to_lowercase().replace(|c| c == '.' || c == '-', "_") +} + +/// Convert OpenTelemetry metrics to GreptimeDB insert requests +/// +/// See +/// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto#L162 +/// for data structure of OTLP metrics. +/// +/// Returns `InsertRequests` and total number of rows to ingest +pub fn to_grpc_insert_requests( + request: ExportMetricsServiceRequest, +) -> Result<(InsertRequests, usize)> { + let metrics = request + .resource_metrics + .iter() + .flat_map(|resource_metrics| &resource_metrics.scope_metrics) + .flat_map(|scope_metrics| &scope_metrics.metrics); + + let mut insert_batch = Vec::with_capacity(metrics.size_hint().0); + let mut rows = 0; + for metric in metrics { + if let Some(insert) = encode_metrics(metric)? { + rows += insert.row_count; + insert_batch.push(insert); + } + } + + let inserts = InsertRequests { + inserts: insert_batch, + }; + + Ok((inserts, rows as usize)) +} + +fn encode_metrics(metric: &Metric) -> Result> { + let name = &metric.name; + // note that we don't store description or unit, we might want to deal with + // these fields in the future. + if let Some(data) = &metric.data { + match data { + metric::Data::Gauge(gauge) => encode_gauge(name, gauge).map(Some), + metric::Data::Sum(sum) => encode_sum(name, sum).map(Some), + metric::Data::Summary(summary) => encode_summary(name, summary).map(Some), + // TODO(sunng87) leave histogram for next release + metric::Data::Histogram(_hist) => Ok(None), + metric::Data::ExponentialHistogram(_hist) => Ok(None), + } + } else { + Ok(None) + } +} + +fn write_attribute(lines: &mut LinesWriter, attr: &KeyValue) -> Result<()> { + if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { + match val { + any_value::Value::StringValue(s) => lines + .write_tag(&attr.key, s) + .context(error::OtlpMetricsWriteSnafu)?, + + any_value::Value::IntValue(v) => lines + .write_tag(&normalize_otlp_name(&attr.key), &v.to_string()) + .context(error::OtlpMetricsWriteSnafu)?, + any_value::Value::DoubleValue(v) => lines + .write_tag(&normalize_otlp_name(&attr.key), &v.to_string()) + .context(error::OtlpMetricsWriteSnafu)?, + // TODO(sunng87): allow different type of values + _ => {} + } + } + + Ok(()) +} + +fn write_timestamp(lines: &mut LinesWriter, time_nano: i64) -> Result<()> { + lines + .write_ts(GREPTIME_TIMESTAMP, (time_nano, Precision::Nanosecond)) + .context(error::OtlpMetricsWriteSnafu)?; + Ok(()) +} + +fn write_data_point_value( + lines: &mut LinesWriter, + field: &str, + value: &Option, +) -> Result<()> { + match value { + Some(number_data_point::Value::AsInt(val)) => { + // we coerce all values to f64 + lines + .write_f64(field, *val as f64) + .context(error::OtlpMetricsWriteSnafu)? + } + Some(number_data_point::Value::AsDouble(val)) => lines + .write_f64(field, *val) + .context(error::OtlpMetricsWriteSnafu)?, + _ => {} + } + Ok(()) +} + +/// encode this gauge metric +/// +/// note that there can be multiple data points in the request, it's going to be +/// stored as multiple rows +fn encode_gauge(name: &str, gauge: &Gauge) -> Result { + let mut lines = LinesWriter::with_lines(gauge.data_points.len()); + + for data_point in &gauge.data_points { + for attr in &data_point.attributes { + write_attribute(&mut lines, attr)?; + } + + write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; + + write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?; + + lines.commit(); + } + + let (columns, row_count) = lines.finish(); + Ok(InsertRequest { + table_name: normalize_otlp_name(name), + region_number: 0, + columns, + row_count, + }) +} + +/// encode this sum metric +/// +/// `aggregation_temporality` and `monotonic` are ignored for now +fn encode_sum(name: &str, sum: &Sum) -> Result { + let mut lines = LinesWriter::with_lines(sum.data_points.len()); + + for data_point in &sum.data_points { + for attr in &data_point.attributes { + write_attribute(&mut lines, attr)?; + } + + write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; + + write_data_point_value(&mut lines, GREPTIME_VALUE, &data_point.value)?; + + lines.commit(); + } + + let (columns, row_count) = lines.finish(); + Ok(InsertRequest { + table_name: normalize_otlp_name(name), + region_number: 0, + columns, + row_count, + }) +} + +// TODO(sunng87): we may need better implementation for histogram +#[allow(dead_code)] +fn encode_histogram(name: &str, hist: &Histogram) -> Result { + let mut lines = LinesWriter::with_lines(hist.data_points.len()); + + for data_point in &hist.data_points { + for attr in &data_point.attributes { + write_attribute(&mut lines, attr)?; + } + + write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; + + for (idx, count) in data_point.bucket_counts.iter().enumerate() { + // here we don't store bucket boundary + lines + .write_u64(&format!("bucket_{}", idx), *count) + .context(error::OtlpMetricsWriteSnafu)?; + } + + if let Some(min) = data_point.min { + lines + .write_f64("min", min) + .context(error::OtlpMetricsWriteSnafu)?; + } + + if let Some(max) = data_point.max { + lines + .write_f64("max", max) + .context(error::OtlpMetricsWriteSnafu)?; + } + + lines.commit(); + } + + let (columns, row_count) = lines.finish(); + Ok(InsertRequest { + table_name: normalize_otlp_name(name), + region_number: 0, + columns, + row_count, + }) +} + +#[allow(dead_code)] +fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Result { + let mut lines = LinesWriter::with_lines(hist.data_points.len()); + + for data_point in &hist.data_points { + for attr in &data_point.attributes { + write_attribute(&mut lines, attr)?; + } + + write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; + + // TODO(sunng87): confirm if this working + if let Some(positive_buckets) = &data_point.positive { + for (idx, count) in positive_buckets.bucket_counts.iter().enumerate() { + // here we don't store bucket boundary + lines + .write_u64( + &format!("bucket_{}", idx + positive_buckets.offset as usize), + *count, + ) + .context(error::OtlpMetricsWriteSnafu)?; + } + } + + if let Some(negative_buckets) = &data_point.negative { + for (idx, count) in negative_buckets.bucket_counts.iter().enumerate() { + lines + .write_u64( + &format!("bucket_{}", idx + negative_buckets.offset as usize), + *count, + ) + .context(error::OtlpMetricsWriteSnafu)?; + } + } + + if let Some(min) = data_point.min { + lines + .write_f64("min", min) + .context(error::OtlpMetricsWriteSnafu)?; + } + + if let Some(max) = data_point.max { + lines + .write_f64("max", max) + .context(error::OtlpMetricsWriteSnafu)?; + } + + lines.commit(); + } + + let (columns, row_count) = lines.finish(); + Ok(InsertRequest { + table_name: normalize_otlp_name(name), + region_number: 0, + columns, + row_count, + }) +} + +fn encode_summary(name: &str, summary: &Summary) -> Result { + let mut lines = LinesWriter::with_lines(summary.data_points.len()); + + for data_point in &summary.data_points { + for attr in &data_point.attributes { + write_attribute(&mut lines, attr)?; + } + + write_timestamp(&mut lines, data_point.time_unix_nano as i64)?; + + for quantile in &data_point.quantile_values { + // here we don't store bucket boundary + lines + .write_f64( + &format!("p{:02}", quantile.quantile * 100f64), + quantile.value, + ) + .context(error::OtlpMetricsWriteSnafu)?; + } + + lines + .write_u64("count", data_point.count) + .context(error::OtlpMetricsWriteSnafu)?; + + lines.commit(); + } + + let (columns, row_count) = lines.finish(); + Ok(InsertRequest { + table_name: normalize_otlp_name(name), + region_number: 0, + columns, + row_count, + }) +} + +#[cfg(test)] +mod tests { + use opentelemetry_proto::tonic::common::v1::any_value::Value as Val; + use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; + use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value; + use opentelemetry_proto::tonic::metrics::v1::summary_data_point::ValueAtQuantile; + use opentelemetry_proto::tonic::metrics::v1::NumberDataPoint; + + use super::*; + + #[test] + fn test_normalize_otlp_name() { + assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free"); + assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free"); + } + + fn keyvalue(key: &str, value: &str) -> KeyValue { + KeyValue { + key: key.into(), + value: Some(AnyValue { + value: Some(Val::StringValue(value.into())), + }), + } + } + + #[test] + fn test_encode_gauge() { + let data_points = vec![ + NumberDataPoint { + attributes: vec![keyvalue("host", "testsevrer")], + time_unix_nano: 100, + value: Some(Value::AsInt(100)), + ..Default::default() + }, + NumberDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 105, + value: Some(Value::AsInt(105)), + ..Default::default() + }, + ]; + let gauge = Gauge { data_points }; + let inserts = encode_gauge("datamon", &gauge).unwrap(); + + assert_eq!(inserts.table_name, "datamon"); + assert_eq!(inserts.row_count, 2); + assert_eq!(inserts.columns.len(), 3); + assert_eq!( + inserts + .columns + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec!["host", "greptime_timestamp", "greptime_value"] + ); + } + + #[test] + fn test_encode_sum() { + let data_points = vec![ + NumberDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + value: Some(Value::AsInt(100)), + ..Default::default() + }, + NumberDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 105, + value: Some(Value::AsInt(0)), + ..Default::default() + }, + ]; + let sum = Sum { + data_points, + ..Default::default() + }; + let inserts = encode_sum("datamon", &sum).unwrap(); + + assert_eq!(inserts.table_name, "datamon"); + assert_eq!(inserts.row_count, 2); + assert_eq!(inserts.columns.len(), 3); + assert_eq!( + inserts + .columns + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec!["host", "greptime_timestamp", "greptime_value"] + ); + } + + #[test] + fn test_encode_summary() { + let data_points = vec![SummaryDataPoint { + attributes: vec![keyvalue("host", "testserver")], + time_unix_nano: 100, + count: 25, + sum: 5400.0, + quantile_values: vec![ + ValueAtQuantile { + quantile: 0.90, + value: 1000.0, + }, + ValueAtQuantile { + quantile: 0.95, + value: 3030.0, + }, + ], + ..Default::default() + }]; + let summary = Summary { data_points }; + let inserts = encode_summary("datamon", &summary).unwrap(); + + assert_eq!(inserts.table_name, "datamon"); + assert_eq!(inserts.row_count, 1); + assert_eq!(inserts.columns.len(), 5); + assert_eq!( + inserts + .columns + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec!["host", "greptime_timestamp", "p90", "p95", "count"] + ); + } +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 4ded774502..14172bce0b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -31,6 +31,9 @@ use std::sync::Arc; use api::prom_store::remote::{ReadRequest, WriteRequest}; use async_trait::async_trait; use common_query::Output; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, ExportMetricsServiceResponse, +}; use session::context::QueryContextRef; use crate::error::Result; @@ -41,6 +44,7 @@ use crate::prom_store::Metrics; pub type OpentsdbProtocolHandlerRef = Arc; pub type InfluxdbLineProtocolHandlerRef = Arc; pub type PromStoreProtocolHandlerRef = Arc; +pub type OpenTelemetryProtocolHandlerRef = Arc; pub type ScriptHandlerRef = Arc; #[async_trait] @@ -83,3 +87,13 @@ pub trait PromStoreProtocolHandler { /// Handling push gateway requests async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>; } + +#[async_trait] +pub trait OpenTelemetryProtocolHandler { + /// Handling opentelemetry metrics request + async fn metrics( + &self, + request: ExportMetricsServiceRequest, + ctx: QueryContextRef, + ) -> Result; +}