diff --git a/Cargo.lock b/Cargo.lock index 531a74d580..0f18b3b37f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11424,6 +11424,7 @@ dependencies = [ name = "session" version = "0.16.0" dependencies = [ + "ahash 0.8.11", "api", "arc-swap", "auth", diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index e729a2797f..b50c174b30 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -62,10 +62,7 @@ impl OpenTelemetryProtocolHandler for Instance { .flat_map(|s| s.metrics.iter().map(|m| &m.name)) .collect::>(); - // If the user uses OTLP metrics ingestion before v0.16, it uses the old path. - // So we call this path 'legacy'. - // After v0.16, we store the OTLP metrics using prometheus compatible format, the new path. - // The difference is how we convert the input data into the final table schema. + // See [`OtlpMetricCtx`] for details let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?; let mut metric_ctx = ctx diff --git a/src/servers/src/http/extractor.rs b/src/servers/src/http/extractor.rs index 6cf797f41e..28370fe4b6 100644 --- a/src/servers/src/http/extractor.rs +++ b/src/servers/src/http/extractor.rs @@ -14,6 +14,7 @@ use core::str; +use ahash::HashSet; use axum::extract::FromRequestParts; use axum::http::request::Parts; use axum::http::StatusCode; @@ -23,7 +24,9 @@ use pipeline::{truthy, GreptimePipelineParams, SelectInfo}; use crate::http::header::constants::{ GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME, GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME, + GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME, GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, @@ -134,8 +137,13 @@ where /// Axum extractor for OTLP metric options from HTTP headers. pub struct OtlpMetricOptions { /// Persist all resource attributes to the table - /// If false, only persist selected attributes. See [`DEFAULT_ATTRS`] in `otlp/metrics.rs` + /// If false, persist selected attributes. See [`promote_resource_attrs`]. pub promote_all_resource_attrs: bool, + + /// If `promote_all_resource_attrs` is true, then the list is an exclude list from `ignore_resource_attrs`. + /// If `promote_all_resource_attrs` is false, then this list is a include list from `promote_resource_attrs`. + pub resource_attrs: HashSet, + /// Persist scope attributes to the table /// If false, persist none pub promote_scope_attrs: bool, @@ -155,6 +163,17 @@ where )? .map(truthy) .unwrap_or(false); + + let attr_header = if promote_all_resource_attrs { + [GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME] + } else { + [GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME] + }; + + let resource_attrs = string_value_from_header(headers, &attr_header)? + .map(|s| s.split(';').map(|s| s.trim().to_string()).collect()) + .unwrap_or_default(); + let promote_scope_attrs = string_value_from_header( headers, &[GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME], @@ -164,6 +183,7 @@ where Ok(OtlpMetricOptions { promote_all_resource_attrs, + resource_attrs, promote_scope_attrs, }) } diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index 6572607bb1..4af50c3e9f 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -62,6 +62,10 @@ pub mod constants { // OTLP headers pub const GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME: &str = "x-greptime-otlp-metric-promote-all-resource-attrs"; + pub const GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME: &str = + "x-greptime-otlp-metric-promote-resource-attrs"; + pub const GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME: &str = + "x-greptime-otlp-metric-ignore-resource-attrs"; pub const GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME: &str = "x-greptime-otlp-metric-promote-scope-attrs"; diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 2ae344ac83..2415773542 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -75,6 +75,7 @@ pub async fn metrics( query_ctx.set_protocol_ctx(ProtocolCtx::OtlpMetric(OtlpMetricCtx { promote_all_resource_attrs: http_opts.promote_all_resource_attrs, + resource_attrs: http_opts.resource_attrs, promote_scope_attrs: http_opts.promote_scope_attrs, with_metric_engine, // set is_legacy later diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs index fb69144e45..e6d71cc7de 100644 --- a/src/servers/src/otlp/metrics.rs +++ b/src/servers/src/otlp/metrics.rs @@ -39,8 +39,7 @@ const INSTANCE_KEY: &str = "instance"; const UNDERSCORE: &str = "_"; // see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes -// instance and job alias to service.instance.id and service.name that we need to keep -const DEFAULT_ATTRS: [&str; 19] = [ +const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [ "service.instance.id", "service.name", "service.namespace", @@ -63,8 +62,8 @@ const DEFAULT_ATTRS: [&str; 19] = [ ]; lazy_static! { - static ref DEFAULT_ATTRS_HASHSET: HashSet = - HashSet::from_iter(DEFAULT_ATTRS.iter().map(|s| s.to_string())); + static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet = + HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string())); static ref INVALID_METRIC_NAME: Regex = Regex::new(r"[^a-zA-Z0-9:_]").unwrap(); } @@ -115,11 +114,6 @@ fn process_resource_attrs(attrs: &mut Vec, metric_ctx: &OtlpMetricCtx) return; } - // check if promote all - if !metric_ctx.promote_all_resource_attrs { - attrs.retain(|kv| DEFAULT_ATTRS_HASHSET.contains(&kv.key)); - } - // remap service.name and service.instance.id to job and instance let mut tmp = Vec::with_capacity(2); for kv in attrs.iter() { @@ -139,6 +133,17 @@ fn process_resource_attrs(attrs: &mut Vec, metric_ctx: &OtlpMetricCtx) _ => {} } } + + // if promote all, then exclude the list, else, include the list + if metric_ctx.promote_all_resource_attrs { + attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key)); + } else { + attrs.retain(|kv| { + metric_ctx.resource_attrs.contains(&kv.key) + || DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key) + }); + } + attrs.extend(tmp); } diff --git a/src/session/Cargo.toml b/src/session/Cargo.toml index c263c21616..abe38c7ac3 100644 --- a/src/session/Cargo.toml +++ b/src/session/Cargo.toml @@ -11,6 +11,7 @@ testing = [] workspace = true [dependencies] +ahash.workspace = true api.workspace = true arc-swap = "1.5" auth.workspace = true diff --git a/src/session/src/protocol_ctx.rs b/src/session/src/protocol_ctx.rs index 71e34b8f40..0f56a4ded6 100644 --- a/src/session/src/protocol_ctx.rs +++ b/src/session/src/protocol_ctx.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use ahash::HashSet; + /// Protocol specific context /// for carrying options(like HTTP header options) within the query context #[derive(Debug, Clone, Default)] @@ -30,9 +32,25 @@ impl ProtocolCtx { } } +/// The context information for OTLP metrics ingestion. +/// - `promote_all_resource_attrs` +/// If true, all resource attributes will be promoted to the final table schema. +/// - `resource_attrs` +/// If `promote_all_resource_attrs` is true, then the list is an exclude list from `ignore_resource_attrs`. +/// If `promote_all_resource_attrs` is false, then this list is a include list from `promote_resource_attrs`. +/// - `promote_scope_attrs` +/// If true, all scope attributes will be promoted to the final table schema. +/// Along with the scope name, scope version and scope schema URL. +/// - `with_metric_engine` +/// - `is_legacy` +/// If the user uses OTLP metrics ingestion before v0.16, it uses the old path. +/// So we call this path 'legacy'. +/// After v0.16, we store the OTLP metrics using prometheus compatible format, the new path. +/// The difference is how we convert the input data into the final table schema. #[derive(Debug, Clone, Default)] pub struct OtlpMetricCtx { pub promote_all_resource_attrs: bool, + pub resource_attrs: HashSet, pub promote_scope_attrs: bool, pub with_metric_engine: bool, pub is_legacy: bool, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 3bf96af655..cdeb10cf5f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -3621,6 +3621,10 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { HeaderName::from_static("x-greptime-otlp-metric-promote-all-resource-attrs"), HeaderValue::from_static("true"), ), + ( + HeaderName::from_static("x-greptime-otlp-metric-ignore-resource-attrs"), + HeaderValue::from_static("os.type"), + ), ], "/v1/otlp/v1/metrics", body.clone(), @@ -3638,7 +3642,6 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // "host_arch" STRING NULL, // "job" STRING NULL, // "model" STRING NULL, - // "os_type" STRING NULL, // "os_version" STRING NULL, // "otel_scope_name" STRING NULL, // "otel_scope_schema_url" STRING NULL, @@ -3649,14 +3652,14 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // "terminal_type" STRING NULL, // "user_id" STRING NULL, // TIME INDEX ("greptime_timestamp"), - // PRIMARY KEY ("host_arch", "job", "model", "os_type", "os_version", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id") + // PRIMARY KEY ("host_arch", "job", "model", "os_version", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id") // ) // ENGINE=metric // WITH( // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_all_show_create_table", &client, @@ -3666,7 +3669,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { .await; // select metrics data - let expected = "[[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]"; + let expected = "[[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]"; validate_data( "otlp_metrics_all_select", &client, @@ -3697,8 +3700,8 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { HeaderValue::from_static("application/x-protobuf"), ), ( - HeaderName::from_static("x-greptime-otlp-metric-promote-scope-attrs"), - HeaderValue::from_static("true"), + HeaderName::from_static("x-greptime-otlp-metric-promote-resource-attrs"), + HeaderValue::from_static("os.type;os.version"), ), ], "/v1/otlp/v1/metrics", @@ -3713,23 +3716,22 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // "greptime_value" DOUBLE NULL, // "job" STRING NULL, // "model" STRING NULL, - // "otel_scope_name" STRING NULL, - // "otel_scope_schema_url" STRING NULL, - // "otel_scope_version" STRING NULL, + // "os_type" STRING NULL, + // "os_version" STRING NULL, // "service_name" STRING NULL, // "service_version" STRING NULL, // "session_id" STRING NULL, // "terminal_type" STRING NULL, // "user_id" STRING NULL, // TIME INDEX ("greptime_timestamp"), - // PRIMARY KEY ("job", "model", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id") + // PRIMARY KEY ("job", "model", "os_type", "os_version", "service_name", "service_version", "session_id", "terminal_type", "user_id") // ) // ENGINE=metric // WITH( // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table", &client, @@ -3739,7 +3741,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { .await; // select metrics data - let expected = "[[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]"; + let expected = "[[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]"; validate_data( "otlp_metrics_select", &client,