From 2b4fb2f32abb65d744dff8455c758b6d65283f9e Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Wed, 30 Jul 2025 19:20:03 +0800 Subject: [PATCH] refactor(otlp_metric): make otlp metric compatible with promql (#6543) * chore: tmp save Signed-off-by: shuiyisong * chore: minor update * chore: remove metric metadata and introduce shared attrs Signed-off-by: shuiyisong * chore: replace . with _ in metric name Signed-off-by: shuiyisong * chore: minor update & fix tests Signed-off-by: shuiyisong * chore: add legacy mode param to otlp metrics Signed-off-by: shuiyisong * chore: update test Signed-off-by: shuiyisong * chore: update test & fix Signed-off-by: shuiyisong * chore: add automatically legacy check for otlp metrics Signed-off-by: shuiyisong * chore: fix clippy Signed-off-by: shuiyisong * fix: typos Signed-off-by: shuiyisong * chore: insert table options in compat mode & add test Signed-off-by: shuiyisong * fix: check table options consistency Signed-off-by: shuiyisong * chore: update test and add comments Signed-off-by: shuiyisong * chore: minor tags update Signed-off-by: shuiyisong * chore: minor update about scope labeling Signed-off-by: shuiyisong * chore: update opts using header & update test Signed-off-by: shuiyisong * chore: minor code refactor Signed-off-by: shuiyisong * chore: fix cr issue Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong --- Cargo.lock | 1 + src/frontend/Cargo.toml | 1 + src/frontend/src/instance.rs | 127 +++- src/frontend/src/instance/builder.rs | 2 + src/frontend/src/instance/otlp.rs | 55 +- src/frontend/src/server.rs | 5 +- .../src/etl/transform/transformer/greptime.rs | 7 +- src/pipeline/src/lib.rs | 5 + src/servers/src/error.rs | 20 +- src/servers/src/http.rs | 19 +- src/servers/src/http/extractor.rs | 46 +- src/servers/src/http/header.rs | 6 + src/servers/src/http/otlp.rs | 40 +- src/servers/src/otel_arrow.rs | 1 + src/servers/src/otlp/metrics.rs | 571 +++++++++++++++--- src/servers/src/otlp/trace.rs | 1 + src/session/src/context.rs | 13 + src/session/src/lib.rs | 1 + src/session/src/protocol_ctx.rs | 39 ++ src/table/src/requests.rs | 11 +- tests-integration/src/otlp.rs | 36 +- tests-integration/src/test_util.rs | 2 +- tests-integration/tests/http.rs | 233 ++++++- 23 files changed, 1060 insertions(+), 182 deletions(-) create mode 100644 src/session/src/protocol_ctx.rs diff --git a/Cargo.lock b/Cargo.lock index 932356dd72..531a74d580 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4807,6 +4807,7 @@ dependencies = [ "common-test-util", "common-time", "common-version", + "dashmap", "datafusion", "datafusion-expr", "datanode", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 761b113ca4..c1fc08fa33 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -39,6 +39,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true +dashmap.workspace = true datafusion.workspace = true datafusion-expr.workspace = true datanode.workspace = true diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 02a36c0ad8..f3676e746e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -42,6 +42,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::runtime_switch::RuntimeSwitchManager; +use common_meta::key::table_name::TableNameKey; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; @@ -53,8 +54,10 @@ use common_query::Output; use common_recordbatch::error::StreamTimeoutSnafu; use common_recordbatch::RecordBatchStreamWrapper; use common_telemetry::{debug, error, info, tracing}; +use dashmap::DashMap; use datafusion_expr::LogicalPlan; use futures::{Stream, StreamExt}; +use lazy_static::lazy_static; use log_store::raft_engine::RaftEngineBackend; use operator::delete::DeleterRef; use operator::insert::InserterRef; @@ -68,11 +71,14 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::DescribeResult; use query::QueryEngineRef; -use servers::error as server_error; -use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; +use servers::error::{ + self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu, + OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, +}; use servers::interceptor::{ PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef, }; +use servers::otlp::metrics::legacy_normalize_otlp_name; use servers::prometheus_handler::PrometheusHandler; use servers::query_handler::sql::SqlQueryHandler; use session::context::{Channel, QueryContextRef}; @@ -85,6 +91,7 @@ use sql::statements::statement::Statement; use sql::statements::tql::Tql; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; +use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu, @@ -95,6 +102,10 @@ use crate::limiter::LimiterRef; use crate::slow_query_recorder::SlowQueryRecorder; use crate::stream_wrapper::CancellableStreamWrapper; +lazy_static! { + static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string(); +} + /// The frontend instance contains necessary components, and implements many /// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`], /// [`servers::query_handler::sql::SqlQueryHandler`], etc. @@ -111,6 +122,12 @@ pub struct Instance { slow_query_recorder: Option, limiter: Option, process_manager: ProcessManagerRef, + + // cache for otlp metrics + // first layer key: db-string + // key: direct input metric name + // value: if runs in legacy mode + otlp_metrics_table_legacy_cache: DashMap>, } impl Instance { @@ -331,6 +348,112 @@ impl Instance { .await .context(TableOperationSnafu) } + + async fn check_otlp_legacy( + &self, + names: &[&String], + ctx: QueryContextRef, + ) -> server_error::Result { + let db_string = ctx.get_db_string(); + let cache = self + .otlp_metrics_table_legacy_cache + .entry(db_string) + .or_default(); + + // check cache + let hit_cache = names + .iter() + .filter_map(|name| cache.get(*name)) + .collect::>(); + if !hit_cache.is_empty() { + let hit_legacy = hit_cache.iter().any(|en| *en.value()); + let hit_prom = hit_cache.iter().any(|en| !*en.value()); + + // hit but have true and false, means both legacy and new mode are used + // we cannot handle this case, so return error + // add doc links in err msg later + ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu); + + let flag = hit_legacy; + // set cache for all names + names.iter().for_each(|name| { + if !cache.contains_key(*name) { + cache.insert(name.to_string(), flag); + } + }); + return Ok(flag); + } + + let catalog = ctx.current_catalog(); + let schema = ctx.current_schema(); + + // query legacy table names + let normalized_names = names + .iter() + .map(|n| legacy_normalize_otlp_name(n)) + .collect::>(); + let table_names = normalized_names + .iter() + .map(|n| TableNameKey::new(catalog, &schema, n)) + .collect::>(); + let table_values = self + .table_metadata_manager() + .table_name_manager() + .batch_get(table_names) + .await + .context(CommonMetaSnafu)?; + let table_ids = table_values + .into_iter() + .filter_map(|v| v.map(|vi| vi.table_id())) + .collect::>(); + + // means no existing table is found, use new mode + if table_ids.is_empty() { + // set cache + names.iter().for_each(|name| { + cache.insert(name.to_string(), false); + }); + return Ok(false); + } + + // has existing table, check table options + let table_infos = self + .table_metadata_manager() + .table_info_manager() + .batch_get(&table_ids) + .await + .context(CommonMetaSnafu)?; + let options = table_infos + .values() + .map(|info| { + info.table_info + .meta + .options + .extra_options + .get(OTLP_METRIC_COMPAT_KEY) + .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE) + }) + .collect::>(); + if !options.is_empty() { + // check value consistency + let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM); + let has_legacy = options + .iter() + .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str()); + ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu); + let flag = has_legacy; + names.iter().for_each(|name| { + cache.insert(name.to_string(), flag); + }); + Ok(flag) + } else { + // no table info, use new mode + names.iter().for_each(|name| { + cache.insert(name.to_string(), false); + }); + Ok(false) + } + } } /// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements. diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index e9c132da42..e2658a22fb 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -25,6 +25,7 @@ use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; +use dashmap::DashMap; use operator::delete::Deleter; use operator::flow::FlowServiceOperator; use operator::insert::Inserter; @@ -223,6 +224,7 @@ impl FrontendBuilder { slow_query_recorder, limiter, process_manager, + otlp_metrics_table_legacy_cache: DashMap::new(), }) } } diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 4f92f935bb..e729a2797f 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -12,21 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; +use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_telemetry::tracing; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult}; +use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef}; use session::context::QueryContextRef; use snafu::ResultExt; +use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; use crate::instance::Instance; use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; @@ -50,9 +55,37 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; - let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?; + let input_names = request + .resource_metrics + .iter() + .flat_map(|r| r.scope_metrics.iter()) + .flat_map(|s| s.metrics.iter().map(|m| &m.name)) + .collect::>(); + + // If the user uses OTLP metrics ingestion before v0.16, it uses the old path. + // So we call this path 'legacy'. + // After v0.16, we store the OTLP metrics using prometheus compatible format, the new path. + // The difference is how we convert the input data into the final table schema. + let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?; + + let mut metric_ctx = ctx + .protocol_ctx() + .get_otlp_metric_ctx() + .cloned() + .unwrap_or_default(); + metric_ctx.is_legacy = is_legacy; + + let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &metric_ctx)?; OTLP_METRICS_ROWS.inc_by(rows as u64); + let ctx = if !is_legacy { + let mut c = (*ctx).clone(); + c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string()); + Arc::new(c) + } else { + ctx + }; + let _guard = if let Some(limiter) = &self.limiter { let result = limiter.limit_row_inserts(&requests); if result.is_none() { @@ -63,10 +96,22 @@ impl OpenTelemetryProtocolHandler for Instance { None }; - self.handle_row_inserts(requests, ctx, false, false) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu) + // If the user uses the legacy path, it is by default without metric engine. + if metric_ctx.is_legacy || !metric_ctx.with_metric_engine { + self.handle_row_inserts(requests, ctx, false, false) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } else { + let physical_table = ctx + .extension(PHYSICAL_TABLE_PARAM) + .unwrap_or(GREPTIME_PHYSICAL_TABLE) + .to_string(); + self.handle_metric_row_inserts(requests, ctx, physical_table.to_string()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } } #[tracing::instrument(skip_all)] diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index e1e91080b6..3e91395fde 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -106,7 +106,8 @@ where } if opts.otlp.enable { - builder = builder.with_otlp_handler(self.instance.clone()); + builder = builder + .with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine); } if opts.jaeger.enable { @@ -158,7 +159,7 @@ where let grpc_server = builder .database_handler(greptime_request_handler.clone()) .prometheus_handler(self.instance.clone(), user_provider.clone()) - .otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone())) + .otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone())) .flight_handler(Arc::new(greptime_request_handler)) .frontend_grpc_handler(frontend_grpc_handler) .build(); diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 17332a33e6..5095e49eb0 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -45,7 +45,7 @@ use crate::etl::field::{Field, Fields}; use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transforms}; use crate::etl::PipelineDocVersion; -use crate::{unwrap_or_continue_if_err, PipelineContext}; +use crate::{truthy, unwrap_or_continue_if_err, PipelineContext}; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; @@ -58,11 +58,6 @@ pub struct GreptimeTransformer { schema: Vec, } -fn truthy>(v: V) -> bool { - let v = v.as_ref().to_lowercase(); - v == "true" || v == "1" || v == "yes" || v == "on" || v == "t" -} - /// Parameters that can be used to configure the greptime pipelines. #[derive(Debug, Default)] pub struct GreptimePipelineParams { diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 8792dded4a..f4b0400249 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -50,3 +50,8 @@ macro_rules! unwrap_or_continue_if_err { } }}; } + +pub fn truthy>(v: V) -> bool { + let v = v.as_ref().to_lowercase(); + v == "true" || v == "1" || v == "yes" || v == "on" || v == "t" +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 0472263d28..39130b9c7e 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -230,6 +230,22 @@ pub enum Error { error: prost::DecodeError, }, + #[snafu(display( + "OTLP metric input have incompatible existing tables, please refer to docs for details" + ))] + OtlpMetricModeIncompatible { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Common Meta error"))] + CommonMeta { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: common_meta::error::Error, + }, + #[snafu(display("Failed to decompress snappy prometheus remote request"))] DecompressSnappyPromRemoteRequest { #[snafu(implicit)] @@ -646,7 +662,8 @@ impl ErrorExt for Error { AddressBind { .. } | AlreadyStarted { .. } - | InvalidPromRemoteReadQueryResult { .. } => StatusCode::IllegalState, + | InvalidPromRemoteReadQueryResult { .. } + | OtlpMetricModeIncompatible { .. } => StatusCode::IllegalState, UnsupportedDataType { .. } => StatusCode::Unsupported, @@ -662,6 +679,7 @@ impl ErrorExt for Error { | CheckDatabaseValidity { source, .. } => source.status_code(), Pipeline { source, .. } => source.status_code(), + CommonMeta { source, .. } => source.status_code(), NotSupported { .. } | InvalidParameter { .. } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 5ed3674b18..1465c900cb 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -58,6 +58,7 @@ use crate::error::{ ToJsonSnafu, }; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; +use crate::http::otlp::OtlpState; use crate::http::prom_store::PromStoreState; use crate::http::prometheus::{ build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query, @@ -631,11 +632,15 @@ impl HttpServerBuilder { } } - pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self { + pub fn with_otlp_handler( + self, + handler: OpenTelemetryProtocolHandlerRef, + with_metric_engine: bool, + ) -> Self { Self { router: self.router.nest( &format!("/{HTTP_API_VERSION}/otlp"), - HttpServer::route_otlp(handler), + HttpServer::route_otlp(handler, with_metric_engine), ), ..self } @@ -1100,7 +1105,10 @@ impl HttpServer { .with_state(opentsdb_handler) } - fn route_otlp(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router { + fn route_otlp( + otlp_handler: OpenTelemetryProtocolHandlerRef, + with_metric_engine: bool, + ) -> Router { Router::new() .route("/v1/metrics", routing::post(otlp::metrics)) .route("/v1/traces", routing::post(otlp::traces)) @@ -1109,7 +1117,10 @@ impl HttpServer { ServiceBuilder::new() .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)), ) - .with_state(otlp_handler) + .with_state(OtlpState { + with_metric_engine, + handler: otlp_handler, + }) } fn route_config(state: GreptimeOptionsConfigState) -> Router { diff --git a/src/servers/src/http/extractor.rs b/src/servers/src/http/extractor.rs index 1cb073ebdb..6cf797f41e 100644 --- a/src/servers/src/http/extractor.rs +++ b/src/servers/src/http/extractor.rs @@ -18,13 +18,15 @@ use axum::extract::FromRequestParts; use axum::http::request::Parts; use axum::http::StatusCode; use http::HeaderMap; -use pipeline::{GreptimePipelineParams, SelectInfo}; +use pipeline::{truthy, GreptimePipelineParams, SelectInfo}; use crate::http::header::constants::{ GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME, - GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER, - GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, + GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME, + GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME, + GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME, + GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, }; /// Axum extractor for optional target log table name from HTTP header @@ -129,6 +131,44 @@ where } } +/// Axum extractor for OTLP metric options from HTTP headers. +pub struct OtlpMetricOptions { + /// Persist all resource attributes to the table + /// If false, only persist selected attributes. See [`DEFAULT_ATTRS`] in `otlp/metrics.rs` + pub promote_all_resource_attrs: bool, + /// Persist scope attributes to the table + /// If false, persist none + pub promote_scope_attrs: bool, +} + +impl FromRequestParts for OtlpMetricOptions +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + let headers = &parts.headers; + let promote_all_resource_attrs = string_value_from_header( + headers, + &[GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME], + )? + .map(truthy) + .unwrap_or(false); + let promote_scope_attrs = string_value_from_header( + headers, + &[GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME], + )? + .map(truthy) + .unwrap_or(false); + + Ok(OtlpMetricOptions { + promote_all_resource_attrs, + promote_scope_attrs, + }) + } +} + #[inline] fn string_value_from_header( headers: &HeaderMap, diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index ccfdb7c0e8..6572607bb1 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -59,6 +59,12 @@ pub mod constants { pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys"; pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name"; + // OTLP headers + pub const GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME: &str = + "x-greptime-otlp-metric-promote-all-resource-attrs"; + pub const GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME: &str = + "x-greptime-otlp-metric-promote-scope-attrs"; + /// The header key that contains the pipeline params. pub const GREPTIME_PIPELINE_PARAMS_HEADER: &str = "x-greptime-pipeline-params"; } diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 95cc2113d8..2ae344ac83 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -33,31 +33,55 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; +use session::protocol_ctx::{OtlpMetricCtx, ProtocolCtx}; use snafu::prelude::*; use crate::error::{self, PipelineSnafu, Result}; -use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName}; +use crate::http::extractor::{ + LogTableName, OtlpMetricOptions, PipelineInfo, SelectInfoWrapper, TraceTableName, +}; +// use crate::http::header::constants::GREPTIME_METRICS_LEGACY_MODE_HEADER_NAME; use crate::http::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED; use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler}; +#[derive(Clone)] +pub struct OtlpState { + pub with_metric_engine: bool, + pub handler: OpenTelemetryProtocolHandlerRef, +} + #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))] pub async fn metrics( - State(handler): State, + State(state): State, Extension(mut query_ctx): Extension, - + http_opts: OtlpMetricOptions, bytes: Bytes, ) -> Result> { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); - let query_ctx = Arc::new(query_ctx); + let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); let request = ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; + let OtlpState { + with_metric_engine, + handler, + } = state; + + query_ctx.set_protocol_ctx(ProtocolCtx::OtlpMetric(OtlpMetricCtx { + promote_all_resource_attrs: http_opts.promote_all_resource_attrs, + promote_scope_attrs: http_opts.promote_scope_attrs, + with_metric_engine, + // set is_legacy later + is_legacy: false, + })); + let query_ctx = Arc::new(query_ctx); + handler .metrics(request, query_ctx) .await @@ -72,7 +96,7 @@ pub async fn metrics( #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] pub async fn traces( - State(handler): State, + State(state): State, TraceTableName(table_name): TraceTableName, pipeline_info: PipelineInfo, Extension(mut query_ctx): Extension, @@ -100,6 +124,8 @@ pub async fn traces( let pipeline_params = pipeline_info.pipeline_params; + let OtlpState { handler, .. } = state; + // here we use nightly feature `trait_upcasting` to convert handler to // pipeline_handler let pipeline_handler: Arc = handler.clone(); @@ -125,7 +151,7 @@ pub async fn traces( #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))] pub async fn logs( - State(handler): State, + State(state): State, Extension(mut query_ctx): Extension, pipeline_info: PipelineInfo, LogTableName(tablename): LogTableName, @@ -149,6 +175,8 @@ pub async fn logs( .context(PipelineSnafu)?; let pipeline_params = pipeline_info.pipeline_params; + let OtlpState { handler, .. } = state; + // here we use nightly feature `trait_upcasting` to convert handler to // pipeline_handler let pipeline_handler: Arc = handler.clone(); diff --git a/src/servers/src/otel_arrow.rs b/src/servers/src/otel_arrow.rs index f279c7f7b8..29df8b54df 100644 --- a/src/servers/src/otel_arrow.rs +++ b/src/servers/src/otel_arrow.rs @@ -85,6 +85,7 @@ impl ArrowMetricsService for OtelArrowServiceHandler -/// - since the name are case-insensitive, we transform them to lowercase for -/// better sql usability -/// - replace `.` and `-` with `_` -fn normalize_otlp_name(name: &str) -> String { - name.to_lowercase().replace(['.', '-'], "_") +const COUNT_TABLE_SUFFIX: &str = "_count"; +const SUM_TABLE_SUFFIX: &str = "_sum"; + +const JOB_KEY: &str = "job"; +const INSTANCE_KEY: &str = "instance"; + +const UNDERSCORE: &str = "_"; + +// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes +// instance and job alias to service.instance.id and service.name that we need to keep +const DEFAULT_ATTRS: [&str; 19] = [ + "service.instance.id", + "service.name", + "service.namespace", + "service.version", + "cloud.availability_zone", + "cloud.region", + "container.name", + "deployment.environment", + "deployment.environment.name", + "k8s.cluster.name", + "k8s.container.name", + "k8s.cronjob.name", + "k8s.daemonset.name", + "k8s.deployment.name", + "k8s.job.name", + "k8s.namespace.name", + "k8s.pod.name", + "k8s.replicaset.name", + "k8s.statefulset.name", +]; + +lazy_static! { + static ref DEFAULT_ATTRS_HASHSET: HashSet = + HashSet::from_iter(DEFAULT_ATTRS.iter().map(|s| s.to_string())); + static ref INVALID_METRIC_NAME: Regex = Regex::new(r"[^a-zA-Z0-9:_]").unwrap(); } +const OTEL_SCOPE_NAME: &str = "name"; +const OTEL_SCOPE_VERSION: &str = "version"; +const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url"; + /// Convert OpenTelemetry metrics to GreptimeDB insert requests /// /// See @@ -44,15 +81,28 @@ fn normalize_otlp_name(name: &str) -> String { /// Returns `InsertRequests` and total number of rows to ingest pub fn to_grpc_insert_requests( request: ExportMetricsServiceRequest, + metric_ctx: &OtlpMetricCtx, ) -> Result<(RowInsertRequests, usize)> { let mut table_writer = MultiTableData::default(); for resource in &request.resource_metrics { - let resource_attrs = resource.resource.as_ref().map(|r| &r.attributes); + let resource_attrs = resource.resource.as_ref().map(|r| { + let mut attrs = r.attributes.clone(); + process_resource_attrs(&mut attrs, metric_ctx); + attrs + }); + for scope in &resource.scope_metrics { - let scope_attrs = scope.scope.as_ref().map(|s| &s.attributes); + let scope_attrs = process_scope_attrs(scope, metric_ctx); + for metric in &scope.metrics { - encode_metrics(&mut table_writer, metric, resource_attrs, scope_attrs)?; + encode_metrics( + &mut table_writer, + metric, + resource_attrs.as_ref(), + scope_attrs.as_ref(), + metric_ctx, + )?; } } } @@ -60,28 +110,153 @@ pub fn to_grpc_insert_requests( Ok(table_writer.into_row_insert_requests()) } +fn process_resource_attrs(attrs: &mut Vec, metric_ctx: &OtlpMetricCtx) { + if metric_ctx.is_legacy { + return; + } + + // check if promote all + if !metric_ctx.promote_all_resource_attrs { + attrs.retain(|kv| DEFAULT_ATTRS_HASHSET.contains(&kv.key)); + } + + // remap service.name and service.instance.id to job and instance + let mut tmp = Vec::with_capacity(2); + for kv in attrs.iter() { + match &kv.key as &str { + KEY_SERVICE_NAME => { + tmp.push(KeyValue { + key: JOB_KEY.to_string(), + value: kv.value.clone(), + }); + } + KEY_SERVICE_INSTANCE_ID => { + tmp.push(KeyValue { + key: INSTANCE_KEY.to_string(), + value: kv.value.clone(), + }); + } + _ => {} + } + } + attrs.extend(tmp); +} + +fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Option> { + if metric_ctx.is_legacy { + return scope.scope.as_ref().map(|s| s.attributes.clone()); + }; + + if !metric_ctx.promote_scope_attrs { + return None; + } + + // persist scope attrs with name, version and schema_url + scope.scope.as_ref().map(|s| { + let mut attrs = s.attributes.clone(); + attrs.push(KeyValue { + key: OTEL_SCOPE_NAME.to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(s.name.clone())), + }), + }); + attrs.push(KeyValue { + key: OTEL_SCOPE_VERSION.to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(s.version.clone())), + }), + }); + attrs.push(KeyValue { + key: OTEL_SCOPE_SCHEMA_URL.to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(scope.schema_url.clone())), + }), + }); + attrs + }) +} + +// replace . with _ +// see: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.38.0/specification/compatibility/prometheus_and_openmetrics.md#otlp-metric-points-to-prometheus +pub fn normalize_metric_name(name: &str) -> String { + let name = INVALID_METRIC_NAME.replace_all(name, UNDERSCORE); + + if let Some((_, first)) = name.char_indices().next() + && first >= '0' + && first <= '9' + { + format!("_{}", name) + } else { + name.to_string() + } +} + +/// Normalize otlp instrumentation, metric and attribute names +/// +/// +/// - since the name are case-insensitive, we transform them to lowercase for +/// better sql usability +/// - replace `.` and `-` with `_` +pub fn legacy_normalize_otlp_name(name: &str) -> String { + name.to_lowercase().replace(['.', '-'], "_") +} + fn encode_metrics( table_writer: &mut MultiTableData, metric: &Metric, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, + metric_ctx: &OtlpMetricCtx, ) -> Result<()> { - let name = &metric.name; + let name = if metric_ctx.is_legacy { + legacy_normalize_otlp_name(&metric.name) + } else { + normalize_metric_name(&metric.name) + }; + // note that we don't store description or unit, we might want to deal with // these fields in the future. if let Some(data) = &metric.data { match data { metric::Data::Gauge(gauge) => { - encode_gauge(table_writer, name, gauge, resource_attrs, scope_attrs)?; + encode_gauge( + table_writer, + &name, + gauge, + resource_attrs, + scope_attrs, + metric_ctx, + )?; } metric::Data::Sum(sum) => { - encode_sum(table_writer, name, sum, resource_attrs, scope_attrs)?; + encode_sum( + table_writer, + &name, + sum, + resource_attrs, + scope_attrs, + metric_ctx, + )?; } metric::Data::Summary(summary) => { - encode_summary(table_writer, name, summary, resource_attrs, scope_attrs)?; + encode_summary( + table_writer, + &name, + summary, + resource_attrs, + scope_attrs, + metric_ctx, + )?; } metric::Data::Histogram(hist) => { - encode_histogram(table_writer, name, hist, resource_attrs, scope_attrs)?; + encode_histogram( + table_writer, + &name, + hist, + resource_attrs, + scope_attrs, + metric_ctx, + )?; } // TODO(sunng87) leave ExponentialHistogram for next release metric::Data::ExponentialHistogram(_hist) => {} @@ -91,39 +266,74 @@ fn encode_metrics( Ok(()) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AttributeType { + Resource, + Scope, + DataPoint, + Legacy, +} + fn write_attributes( writer: &mut TableData, row: &mut Vec, attrs: Option<&Vec>, + attribute_type: AttributeType, ) -> Result<()> { - if let Some(attrs) = attrs { - let table_tags = attrs.iter().filter_map(|attr| { - if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { - let key = normalize_otlp_name(&attr.key); + let Some(attrs) = attrs else { + return Ok(()); + }; + + let tags = attrs.iter().filter_map(|attr| { + attr.value + .as_ref() + .and_then(|v| v.value.as_ref()) + .and_then(|val| { + let key = match attribute_type { + AttributeType::Resource | AttributeType::DataPoint => { + normalize_metric_name(&attr.key) + } + AttributeType::Scope => { + format!("otel_scope_{}", normalize_metric_name(&attr.key)) + } + AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key), + }; match val { - any_value::Value::StringValue(s) => Some((key, s.to_string())), + any_value::Value::StringValue(s) => Some((key, s.clone())), any_value::Value::IntValue(v) => Some((key, v.to_string())), any_value::Value::DoubleValue(v) => Some((key, v.to_string())), _ => None, // TODO(sunng87): allow different type of values } - } else { - None - } - }); + }) + }); + row_writer::write_tags(writer, tags, row)?; - row_writer::write_tags(writer, table_tags, row)?; - } Ok(()) } -fn write_timestamp(table: &mut TableData, row: &mut Vec, time_nano: i64) -> Result<()> { - row_writer::write_ts_to_nanos( - table, - GREPTIME_TIMESTAMP, - Some(time_nano), - Precision::Nanosecond, - row, - ) +fn write_timestamp( + table: &mut TableData, + row: &mut Vec, + time_nano: i64, + legacy_mode: bool, +) -> Result<()> { + if legacy_mode { + row_writer::write_ts_to_nanos( + table, + GREPTIME_TIMESTAMP, + Some(time_nano), + Precision::Nanosecond, + row, + ) + } else { + row_writer::write_ts_to_millis( + table, + GREPTIME_TIMESTAMP, + Some(time_nano / 1000000), + Precision::Millisecond, + row, + ) + } } fn write_data_point_value( @@ -152,12 +362,20 @@ fn write_tags_and_timestamp( scope_attrs: Option<&Vec>, data_point_attrs: Option<&Vec>, timestamp_nanos: i64, + metric_ctx: &OtlpMetricCtx, ) -> Result<()> { - write_attributes(table, row, resource_attrs)?; - write_attributes(table, row, scope_attrs)?; - write_attributes(table, row, data_point_attrs)?; + if metric_ctx.is_legacy { + write_attributes(table, row, resource_attrs, AttributeType::Legacy)?; + write_attributes(table, row, scope_attrs, AttributeType::Legacy)?; + write_attributes(table, row, data_point_attrs, AttributeType::Legacy)?; + } else { + // TODO(shuiyisong): check `__type__` and `__unit__` tags in prometheus + write_attributes(table, row, resource_attrs, AttributeType::Resource)?; + write_attributes(table, row, scope_attrs, AttributeType::Scope)?; + write_attributes(table, row, data_point_attrs, AttributeType::DataPoint)?; + } - write_timestamp(table, row, timestamp_nanos)?; + write_timestamp(table, row, timestamp_nanos, metric_ctx.is_legacy)?; Ok(()) } @@ -172,9 +390,10 @@ fn encode_gauge( gauge: &Gauge, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, + metric_ctx: &OtlpMetricCtx, ) -> Result<()> { let table = table_writer.get_or_default_table_data( - normalize_otlp_name(name), + name, APPROXIMATE_COLUMN_COUNT, gauge.data_points.len(), ); @@ -188,6 +407,7 @@ fn encode_gauge( scope_attrs, Some(data_point.attributes.as_ref()), data_point.time_unix_nano as i64, + metric_ctx, )?; write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; @@ -206,9 +426,10 @@ fn encode_sum( sum: &Sum, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, + metric_ctx: &OtlpMetricCtx, ) -> Result<()> { let table = table_writer.get_or_default_table_data( - normalize_otlp_name(name), + name, APPROXIMATE_COLUMN_COUNT, sum.data_points.len(), ); @@ -222,6 +443,7 @@ fn encode_sum( scope_attrs, Some(data_point.attributes.as_ref()), data_point.time_unix_nano as i64, + metric_ctx, )?; write_data_point_value(table, &mut row, GREPTIME_VALUE, &data_point.value)?; table.add_row(row); @@ -249,8 +471,9 @@ fn encode_histogram( hist: &Histogram, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, + metric_ctx: &OtlpMetricCtx, ) -> Result<()> { - let normalized_name = normalize_otlp_name(name); + let normalized_name = name; let bucket_table_name = format!("{}_bucket", normalized_name); let sum_table_name = format!("{}_sum", normalized_name); @@ -273,6 +496,7 @@ fn encode_histogram( scope_attrs, Some(data_point.attributes.as_ref()), data_point.time_unix_nano as i64, + metric_ctx, )?; if let Some(upper_bounds) = data_point.explicit_bounds.get(idx) { @@ -312,6 +536,7 @@ fn encode_histogram( scope_attrs, Some(data_point.attributes.as_ref()), data_point.time_unix_nano as i64, + metric_ctx, )?; row_writer::write_f64(&mut sum_table, GREPTIME_VALUE, sum, &mut sum_row)?; @@ -326,6 +551,7 @@ fn encode_histogram( scope_attrs, Some(data_point.attributes.as_ref()), data_point.time_unix_nano as i64, + metric_ctx, )?; row_writer::write_f64( @@ -356,35 +582,126 @@ fn encode_summary( summary: &Summary, resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, + metric_ctx: &OtlpMetricCtx, ) -> Result<()> { - let table = table_writer.get_or_default_table_data( - normalize_otlp_name(name), - APPROXIMATE_COLUMN_COUNT, - summary.data_points.len(), - ); + if metric_ctx.is_legacy { + let table = table_writer.get_or_default_table_data( + name, + APPROXIMATE_COLUMN_COUNT, + summary.data_points.len(), + ); - for data_point in &summary.data_points { - let mut row = table.alloc_one_row(); - write_tags_and_timestamp( - table, - &mut row, - resource_attrs, - scope_attrs, - Some(data_point.attributes.as_ref()), - data_point.time_unix_nano as i64, - )?; - - for quantile in &data_point.quantile_values { - row_writer::write_f64( + for data_point in &summary.data_points { + let mut row = table.alloc_one_row(); + write_tags_and_timestamp( table, - format!("greptime_p{:02}", quantile.quantile * 100f64), - quantile.value, &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + metric_ctx, )?; - } - row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?; - table.add_row(row); + for quantile in &data_point.quantile_values { + row_writer::write_f64( + table, + format!("greptime_p{:02}", quantile.quantile * 100f64), + quantile.value, + &mut row, + )?; + } + + row_writer::write_f64(table, GREPTIME_COUNT, data_point.count as f64, &mut row)?; + table.add_row(row); + } + } else { + // 1. quantile table + // 2. count table + // 3. sum table + + let metric_name = name; + let count_name = format!("{}{}", metric_name, COUNT_TABLE_SUFFIX); + let sum_name = format!("{}{}", metric_name, SUM_TABLE_SUFFIX); + + for data_point in &summary.data_points { + { + let quantile_table = table_writer.get_or_default_table_data( + metric_name, + APPROXIMATE_COLUMN_COUNT, + summary.data_points.len(), + ); + + for quantile in &data_point.quantile_values { + let mut row = quantile_table.alloc_one_row(); + write_tags_and_timestamp( + quantile_table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + metric_ctx, + )?; + row_writer::write_tag(quantile_table, "quantile", quantile.quantile, &mut row)?; + row_writer::write_f64( + quantile_table, + GREPTIME_VALUE, + quantile.value, + &mut row, + )?; + quantile_table.add_row(row); + } + } + { + let count_table = table_writer.get_or_default_table_data( + &count_name, + APPROXIMATE_COLUMN_COUNT, + summary.data_points.len(), + ); + let mut row = count_table.alloc_one_row(); + write_tags_and_timestamp( + count_table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + metric_ctx, + )?; + + row_writer::write_f64( + count_table, + GREPTIME_VALUE, + data_point.count as f64, + &mut row, + )?; + + count_table.add_row(row); + } + { + let sum_table = table_writer.get_or_default_table_data( + &sum_name, + APPROXIMATE_COLUMN_COUNT, + summary.data_points.len(), + ); + + let mut row = sum_table.alloc_one_row(); + write_tags_and_timestamp( + sum_table, + &mut row, + resource_attrs, + scope_attrs, + Some(data_point.attributes.as_ref()), + data_point.time_unix_nano as i64, + metric_ctx, + )?; + + row_writer::write_f64(sum_table, GREPTIME_VALUE, data_point.sum, &mut row)?; + + sum_table.add_row(row); + } + } } Ok(()) @@ -401,12 +718,27 @@ mod tests { use super::*; #[test] - fn test_normalize_otlp_name() { - assert_eq!(normalize_otlp_name("jvm.memory.free"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("jvm-memory-free"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("jvm_memory_free"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("JVM_MEMORY_FREE"), "jvm_memory_free"); - assert_eq!(normalize_otlp_name("JVM_memory_FREE"), "jvm_memory_free"); + fn test_legacy_normalize_otlp_name() { + assert_eq!( + legacy_normalize_otlp_name("jvm.memory.free"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("jvm-memory-free"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("jvm_memory_free"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("JVM_MEMORY_FREE"), + "jvm_memory_free" + ); + assert_eq!( + legacy_normalize_otlp_name("JVM_memory_FREE"), + "jvm_memory_free" + ); } fn keyvalue(key: &str, value: &str) -> KeyValue { @@ -441,14 +773,15 @@ mod tests { &mut tables, "datamon", &gauge, - Some(&vec![keyvalue("resource", "app")]), + Some(&vec![]), Some(&vec![keyvalue("scope", "otel")]), + &OtlpMetricCtx::default(), ) .unwrap(); let table = tables.get_or_default_table_data("datamon", 0, 0); assert_eq!(table.num_rows(), 2); - assert_eq!(table.num_columns(), 5); + assert_eq!(table.num_columns(), 4); assert_eq!( table .columns() @@ -456,8 +789,7 @@ mod tests { .map(|c| &c.column_name) .collect::>(), vec![ - "resource", - "scope", + "otel_scope_scope", "host", "greptime_timestamp", "greptime_value" @@ -491,14 +823,15 @@ mod tests { &mut tables, "datamon", &sum, - Some(&vec![keyvalue("resource", "app")]), + Some(&vec![]), Some(&vec![keyvalue("scope", "otel")]), + &OtlpMetricCtx::default(), ) .unwrap(); let table = tables.get_or_default_table_data("datamon", 0, 0); assert_eq!(table.num_rows(), 2); - assert_eq!(table.num_columns(), 5); + assert_eq!(table.num_columns(), 4); assert_eq!( table .columns() @@ -506,8 +839,7 @@ mod tests { .map(|c| &c.column_name) .collect::>(), vec![ - "resource", - "scope", + "otel_scope_scope", "host", "greptime_timestamp", "greptime_value" @@ -541,14 +873,15 @@ mod tests { &mut tables, "datamon", &summary, - Some(&vec![keyvalue("resource", "app")]), + Some(&vec![]), Some(&vec![keyvalue("scope", "otel")]), + &OtlpMetricCtx::default(), ) .unwrap(); let table = tables.get_or_default_table_data("datamon", 0, 0); - assert_eq!(table.num_rows(), 1); - assert_eq!(table.num_columns(), 7); + assert_eq!(table.num_rows(), 2); + assert_eq!(table.num_columns(), 5); assert_eq!( table .columns() @@ -556,13 +889,45 @@ mod tests { .map(|c| &c.column_name) .collect::>(), vec![ - "resource", - "scope", + "otel_scope_scope", "host", "greptime_timestamp", - "greptime_p90", - "greptime_p95", - "greptime_count" + "quantile", + "greptime_value" + ] + ); + + let table = tables.get_or_default_table_data("datamon_count", 0, 0); + assert_eq!(table.num_rows(), 1); + assert_eq!(table.num_columns(), 4); + assert_eq!( + table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "otel_scope_scope", + "host", + "greptime_timestamp", + "greptime_value" + ] + ); + + let table = tables.get_or_default_table_data("datamon_sum", 0, 0); + assert_eq!(table.num_rows(), 1); + assert_eq!(table.num_columns(), 4); + assert_eq!( + table + .columns() + .iter() + .map(|c| &c.column_name) + .collect::>(), + vec![ + "otel_scope_scope", + "host", + "greptime_timestamp", + "greptime_value" ] ); } @@ -592,8 +957,9 @@ mod tests { &mut tables, "histo", &histogram, - Some(&vec![keyvalue("resource", "app")]), + Some(&vec![]), Some(&vec![keyvalue("scope", "otel")]), + &OtlpMetricCtx::default(), ) .unwrap(); @@ -602,7 +968,7 @@ mod tests { // bucket table let bucket_table = tables.get_or_default_table_data("histo_bucket", 0, 0); assert_eq!(bucket_table.num_rows(), 5); - assert_eq!(bucket_table.num_columns(), 6); + assert_eq!(bucket_table.num_columns(), 5); assert_eq!( bucket_table .columns() @@ -610,8 +976,7 @@ mod tests { .map(|c| &c.column_name) .collect::>(), vec![ - "resource", - "scope", + "otel_scope_scope", "host", "greptime_timestamp", "le", @@ -621,7 +986,7 @@ mod tests { let sum_table = tables.get_or_default_table_data("histo_sum", 0, 0); assert_eq!(sum_table.num_rows(), 1); - assert_eq!(sum_table.num_columns(), 5); + assert_eq!(sum_table.num_columns(), 4); assert_eq!( sum_table .columns() @@ -629,17 +994,16 @@ mod tests { .map(|c| &c.column_name) .collect::>(), vec![ - "resource", - "scope", + "otel_scope_scope", "host", "greptime_timestamp", - "greptime_value", + "greptime_value" ] ); let count_table = tables.get_or_default_table_data("histo_count", 0, 0); assert_eq!(count_table.num_rows(), 1); - assert_eq!(count_table.num_columns(), 5); + assert_eq!(count_table.num_columns(), 4); assert_eq!( count_table .columns() @@ -647,12 +1011,19 @@ mod tests { .map(|c| &c.column_name) .collect::>(), vec![ - "resource", - "scope", + "otel_scope_scope", "host", "greptime_timestamp", - "greptime_value", + "greptime_value" ] ); } + + #[test] + fn test_normalize_otlp_name() { + assert_eq!(normalize_metric_name("test.123"), "test_123"); + assert_eq!(normalize_metric_name("test_123"), "test_123"); + assert_eq!(normalize_metric_name("test._123"), "test__123"); + assert_eq!(normalize_metric_name("123_test"), "_123_test"); + } } diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 7c2d9da2f9..19abc74daf 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -42,6 +42,7 @@ pub const RESOURCE_ATTRIBUTES_COLUMN: &str = "resource_attributes"; // const keys pub const KEY_SERVICE_NAME: &str = "service.name"; +pub const KEY_SERVICE_INSTANCE_ID: &str = "service.instance.id"; pub const KEY_SPAN_KIND: &str = "span.kind"; // jaeger const keys, not sure if they are general diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 6ccd9ae0f7..fac891ac82 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -31,6 +31,7 @@ use common_time::Timezone; use derive_builder::Builder; use sql::dialect::{Dialect, GenericDialect, GreptimeDbDialect, MySqlDialect, PostgreSqlDialect}; +use crate::protocol_ctx::ProtocolCtx; use crate::session_config::{PGByteaOutputValue, PGDateOrder, PGDateTimeStyle}; use crate::{MutableInner, ReadPreference}; @@ -70,6 +71,9 @@ pub struct QueryContext { /// Connection information #[builder(default)] conn_info: ConnInfo, + /// Protocol specific context + #[builder(default)] + protocol_ctx: ProtocolCtx, } /// This fields hold data that is only valid to current query context @@ -447,6 +451,14 @@ impl QueryContext { pub fn conn_info(&self) -> &ConnInfo { &self.conn_info } + + pub fn protocol_ctx(&self) -> &ProtocolCtx { + &self.protocol_ctx + } + + pub fn set_protocol_ctx(&mut self, protocol_ctx: ProtocolCtx) { + self.protocol_ctx = protocol_ctx; + } } impl QueryContextBuilder { @@ -470,6 +482,7 @@ impl QueryContextBuilder { channel, process_id: self.process_id.unwrap_or_default(), conn_info: self.conn_info.unwrap_or_default(), + protocol_ctx: self.protocol_ctx.unwrap_or_default(), } } diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index 7688b0b659..ba0484f9c5 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -14,6 +14,7 @@ pub mod context; pub mod hints; +pub mod protocol_ctx; pub mod session_config; pub mod table_name; diff --git a/src/session/src/protocol_ctx.rs b/src/session/src/protocol_ctx.rs new file mode 100644 index 0000000000..71e34b8f40 --- /dev/null +++ b/src/session/src/protocol_ctx.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Protocol specific context +/// for carrying options(like HTTP header options) within the query context +#[derive(Debug, Clone, Default)] +pub enum ProtocolCtx { + #[default] + None, + OtlpMetric(OtlpMetricCtx), +} + +impl ProtocolCtx { + pub fn get_otlp_metric_ctx(&self) -> Option<&OtlpMetricCtx> { + match self { + ProtocolCtx::None => None, + ProtocolCtx::OtlpMetric(opt) => Some(opt), + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct OtlpMetricCtx { + pub promote_all_resource_attrs: bool, + pub promote_scope_attrs: bool, + pub with_metric_engine: bool, + pub is_legacy: bool, +} diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 4236bb1700..953a2bfa27 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -14,7 +14,7 @@ //! Table and TableEngine requests -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::str::FromStr; @@ -30,6 +30,7 @@ use datatypes::schema::{ ColumnDefaultConstraint, ColumnSchema, FulltextOptions, SkippingIndexOptions, }; use greptime_proto::v1::region::compact_request; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use store_api::metric_engine_consts::{ is_metric_engine_option_key, LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY, @@ -52,11 +53,10 @@ pub const FILE_TABLE_FORMAT_KEY: &str = "format"; pub const TABLE_DATA_MODEL: &str = "table_data_model"; pub const TABLE_DATA_MODEL_TRACE_V1: &str = "greptime_trace_v1"; -use std::collections::HashSet; +pub const OTLP_METRIC_COMPAT_KEY: &str = "otlp_metric_compat"; +pub const OTLP_METRIC_COMPAT_PROM: &str = "prom"; -use once_cell::sync::Lazy; - -pub const VALID_TABLE_OPTION_KEYS: [&str; 11] = [ +pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [ // common keys: WRITE_BUFFER_SIZE_KEY, TTL_KEY, @@ -72,6 +72,7 @@ pub const VALID_TABLE_OPTION_KEYS: [&str; 11] = [ LOGICAL_TABLE_METADATA_KEY, // table model info TABLE_DATA_MODEL, + OTLP_METRIC_COMPAT_KEY, ]; // Valid option keys when creating a db. diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index 1b0a37f02f..b9668695f8 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -81,12 +81,12 @@ mod test { assert_eq!( recordbatches.pretty_print().unwrap(), "\ -+------------+-------+--------------------+------------+-------------------------------+----------------+ -| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | -+------------+-------+--------------------+------------+-------------------------------+----------------+ -| greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00.000000100 | 100.0 | -| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000105 | 105.0 | -+------------+-------+--------------------+------------+-------------------------------+----------------+", ++----------------+---------------------+----------------+ +| container_name | greptime_timestamp | greptime_value | ++----------------+---------------------+----------------+ +| testserver | 1970-01-01T00:00:00 | 105.0 | +| testsevrer | 1970-01-01T00:00:00 | 100.0 | ++----------------+---------------------+----------------+", ); let mut output = instance @@ -123,11 +123,11 @@ mod test { assert_eq!( recordbatches.pretty_print().unwrap(), "\ -+------------+-------+--------------------+------------+-------------------------------+----------------+ -| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | -+------------+-------+--------------------+------------+-------------------------------+----------------+ -| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 51.0 | -+------------+-------+--------------------+------------+-------------------------------+----------------+", ++------------+---------------------+----------------+ +| host | greptime_timestamp | greptime_value | ++------------+---------------------+----------------+ +| testserver | 1970-01-01T00:00:00 | 51.0 | ++------------+---------------------+----------------+", ); let mut output = instance @@ -141,24 +141,24 @@ mod test { assert_eq!( recordbatches.pretty_print().unwrap(), "\ -+------------+-------+--------------------+------------+-------------------------------+----------------+ -| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value | -+------------+-------+--------------------+------------+-------------------------------+----------------+ -| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 4.0 | -+------------+-------+--------------------+------------+-------------------------------+----------------+" ++------------+---------------------+----------------+ +| host | greptime_timestamp | greptime_value | ++------------+---------------------+----------------+ +| testserver | 1970-01-01T00:00:00 | 4.0 | ++------------+---------------------+----------------+", ); } fn build_request() -> ExportMetricsServiceRequest { let data_points = vec![ NumberDataPoint { - attributes: vec![keyvalue("host", "testsevrer")], + attributes: vec![keyvalue("container.name", "testsevrer")], time_unix_nano: 100, value: Some(Value::AsInt(100)), ..Default::default() }, NumberDataPoint { - attributes: vec![keyvalue("host", "testserver")], + attributes: vec![keyvalue("container.name", "testserver")], time_unix_nano: 105, value: Some(Value::AsInt(105)), ..Default::default() diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 1f4c47903d..be8dcda090 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -458,7 +458,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( .with_log_ingest_handler(instance.fe_instance().clone(), None, None) .with_logs_handler(instance.fe_instance().clone()) .with_influxdb_handler(instance.fe_instance().clone()) - .with_otlp_handler(instance.fe_instance().clone()) + .with_otlp_handler(instance.fe_instance().clone(), true) .with_jaeger_handler(instance.fe_instance().clone()) .with_greptime_config_options(instance.opts.to_toml().unwrap()); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8682014f19..3bf96af655 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -114,7 +114,7 @@ macro_rules! http_tests { test_pipeline_skip_error, test_pipeline_filter, - test_otlp_metrics, + test_otlp_metrics_new, test_otlp_traces_v0, test_otlp_traces_v1, test_otlp_logs, @@ -3588,13 +3588,14 @@ pub async fn test_pipeline_auto_transform_with_select(store_type: StorageType) { guard.remove_all().await; } -pub async fn test_otlp_metrics(store_type: StorageType) { +pub async fn test_otlp_metrics_new(store_type: StorageType) { // init common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_metrics").await; + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_otlp_metrics_new").await; let content = r#" -{"resourceMetrics":[{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeMetrics":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"metrics":[{"name":"gen","description":"","unit":"","metadata":[],"gauge":{"dataPoints":[{"attributes":[],"startTimeUnixNano":"0","timeUnixNano":"1736489291872539000","exemplars":[],"flags":0,"asInt":0}]}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.13.0"},{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeMetrics":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"metrics":[{"name":"gen","description":"","unit":"","metadata":[],"gauge":{"dataPoints":[{"attributes":[],"startTimeUnixNano":"0","timeUnixNano":"1736489291919917000","exemplars":[],"flags":0,"asInt":1}]}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.13.0"}]} +{"resourceMetrics":[{"resource":{"attributes":[{"key":"host.arch","value":{"stringValue":"arm64"}},{"key":"os.type","value":{"stringValue":"darwin"}},{"key":"os.version","value":{"stringValue":"25.0.0"}},{"key":"service.name","value":{"stringValue":"claude-code"}},{"key":"service.version","value":{"stringValue":"1.0.62"}}],"droppedAttributesCount":0},"scopeMetrics":[{"scope":{"name":"com.anthropic.claude_code","version":"1.0.62","attributes":[],"droppedAttributesCount":0},"metrics":[{"name":"claude_code.cost.usage","description":"Cost of the Claude Code session","unit":"USD","metadata":[],"sum":{"dataPoints":[{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"model","value":{"stringValue":"claude-3-5-haiku-20241022"}}],"startTimeUnixNano":"1753780502453000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":0.0052544},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"model","value":{"stringValue":"claude-sonnet-4-20250514"}}],"startTimeUnixNano":"1753780513420000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":2.244618}],"aggregationTemporality":1,"isMonotonic":true}},{"name":"claude_code.token.usage","description":"Number of tokens used","unit":"tokens","metadata":[],"sum":{"dataPoints":[{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"input"}},{"key":"model","value":{"stringValue":"claude-3-5-haiku-20241022"}}],"startTimeUnixNano":"1753780502453000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":6208.0},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"output"}},{"key":"model","value":{"stringValue":"claude-3-5-haiku-20241022"}}],"startTimeUnixNano":"1753780502453000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":72.0},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"cacheRead"}},{"key":"model","value":{"stringValue":"claude-3-5-haiku-20241022"}}],"startTimeUnixNano":"1753780502453000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":0.0},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"cacheCreation"}},{"key":"model","value":{"stringValue":"claude-3-5-haiku-20241022"}}],"startTimeUnixNano":"1753780502453000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":0.0},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"input"}},{"key":"model","value":{"stringValue":"claude-sonnet-4-20250514"}}],"startTimeUnixNano":"1753780513420000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":743056.0},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"output"}},{"key":"model","value":{"stringValue":"claude-sonnet-4-20250514"}}],"startTimeUnixNano":"1753780513420000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":1030.0},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"cacheRead"}},{"key":"model","value":{"stringValue":"claude-sonnet-4-20250514"}}],"startTimeUnixNano":"1753780513420000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":0.0},{"attributes":[{"key":"user.id","value":{"stringValue":"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4"}},{"key":"session.id","value":{"stringValue":"736525A3-F5D4-496B-933E-827AF23A5B97"}},{"key":"terminal.type","value":{"stringValue":"ghostty"}},{"key":"type","value":{"stringValue":"cacheCreation"}},{"key":"model","value":{"stringValue":"claude-sonnet-4-20250514"}}],"startTimeUnixNano":"1753780513420000000","timeUnixNano":"1753780559836000000","exemplars":[],"flags":0,"asDouble":0.0}],"aggregationTemporality":1,"isMonotonic":true}}],"schemaUrl":""}],"schemaUrl":""}]} "#; let req: ExportMetricsServiceRequest = serde_json::from_str(content).unwrap(); @@ -3603,6 +3604,162 @@ pub async fn test_otlp_metrics(store_type: StorageType) { // handshake let client = TestClient::new(app).await; + // write metrics data + // with scope attrs and all resource attrs + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-otlp-metric-promote-scope-attrs"), + HeaderValue::from_static("true"), + ), + ( + HeaderName::from_static("x-greptime-otlp-metric-promote-all-resource-attrs"), + HeaderValue::from_static("true"), + ), + ], + "/v1/otlp/v1/metrics", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = "[[\"claude_code_cost_usage\"],[\"claude_code_token_usage\"],[\"demo\"],[\"greptime_physical_table\"],[\"numbers\"]]"; + validate_data("otlp_metrics_all_tables", &client, "show tables;", expected).await; + + // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" ( + // "greptime_timestamp" TIMESTAMP(3) NOT NULL, + // "greptime_value" DOUBLE NULL, + // "host_arch" STRING NULL, + // "job" STRING NULL, + // "model" STRING NULL, + // "os_type" STRING NULL, + // "os_version" STRING NULL, + // "otel_scope_name" STRING NULL, + // "otel_scope_schema_url" STRING NULL, + // "otel_scope_version" STRING NULL, + // "service_name" STRING NULL, + // "service_version" STRING NULL, + // "session_id" STRING NULL, + // "terminal_type" STRING NULL, + // "user_id" STRING NULL, + // TIME INDEX ("greptime_timestamp"), + // PRIMARY KEY ("host_arch", "job", "model", "os_type", "os_version", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id") + // ) + // ENGINE=metric + // WITH( + // on_physical_table = 'greptime_physical_table', + // otlp_metric_compat = 'prom' + // ) + let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + validate_data( + "otlp_metrics_all_show_create_table", + &client, + "show create table claude_code_cost_usage;", + expected, + ) + .await; + + // select metrics data + let expected = "[[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]"; + validate_data( + "otlp_metrics_all_select", + &client, + "select * from claude_code_cost_usage;", + expected, + ) + .await; + + // drop table + let res = client + .get("/v1/sql?sql=drop table claude_code_cost_usage;") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .get("/v1/sql?sql=drop table claude_code_token_usage;") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // write metrics data + // with scope attrs + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-otlp-metric-promote-scope-attrs"), + HeaderValue::from_static("true"), + ), + ], + "/v1/otlp/v1/metrics", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" ( + // "greptime_timestamp" TIMESTAMP(3) NOT NULL, + // "greptime_value" DOUBLE NULL, + // "job" STRING NULL, + // "model" STRING NULL, + // "otel_scope_name" STRING NULL, + // "otel_scope_schema_url" STRING NULL, + // "otel_scope_version" STRING NULL, + // "service_name" STRING NULL, + // "service_version" STRING NULL, + // "session_id" STRING NULL, + // "terminal_type" STRING NULL, + // "user_id" STRING NULL, + // TIME INDEX ("greptime_timestamp"), + // PRIMARY KEY ("job", "model", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id") + // ) + // ENGINE=metric + // WITH( + // on_physical_table = 'greptime_physical_table', + // otlp_metric_compat = 'prom' + // ) + let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + validate_data( + "otlp_metrics_show_create_table", + &client, + "show create table claude_code_cost_usage;", + expected, + ) + .await; + + // select metrics data + let expected = "[[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]"; + validate_data( + "otlp_metrics_select", + &client, + "select * from claude_code_cost_usage;", + expected, + ) + .await; + + // drop table + let res = client + .get("/v1/sql?sql=drop table claude_code_cost_usage;") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .get("/v1/sql?sql=drop table claude_code_token_usage;") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + // write metrics data let res = send_req( &client, @@ -3617,37 +3774,55 @@ pub async fn test_otlp_metrics(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - // select metrics data - let expected = "[[1736489291872539000,0.0],[1736489291919917000,1.0]]"; - validate_data("otlp_metrics", &client, "select * from gen;", expected).await; - - // drop table - let res = client.get("/v1/sql?sql=drop table gen;").send().await; - assert_eq!(res.status(), StatusCode::OK); - - // write metrics data with gzip - let res = send_req( - &client, - vec![( - HeaderName::from_static("content-type"), - HeaderValue::from_static("application/x-protobuf"), - )], - "/v1/otlp/v1/metrics", - body.clone(), - true, - ) - .await; - assert_eq!(StatusCode::OK, res.status()); - - // select metrics data again + // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" ( + // "greptime_timestamp" TIMESTAMP(3) NOT NULL, + // "greptime_value" DOUBLE NULL, + // "job" STRING NULL, + // "model" STRING NULL, + // "service_name" STRING NULL, + // "service_version" STRING NULL, + // "session_id" STRING NULL, + // "terminal_type" STRING NULL, + // "user_id" STRING NULL, + // TIME INDEX ("greptime_timestamp"), + // PRIMARY KEY ("job", "model", "service_name", "service_version", "session_id", "terminal_type", "user_id") + // ) + // ENGINE=metric + // WITH( + // on_physical_table = 'greptime_physical_table', + // otlp_metric_compat = 'prom' + // ) + let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( - "otlp_metrics_with_gzip", + "otlp_metrics_show_create_table_none", &client, - "select * from gen;", + "show create table claude_code_cost_usage;", expected, ) .await; + // select metrics data + let expected = "[[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]"; + validate_data( + "otlp_metrics_select_none", + &client, + "select * from claude_code_cost_usage;", + expected, + ) + .await; + + // drop table + let res = client + .get("/v1/sql?sql=drop table claude_code_cost_usage;") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .get("/v1/sql?sql=drop table claude_code_token_usage;") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + guard.remove_all().await; }