chore(otlp_metric): support attr list in header opts (#6617)

* chore: support attr list in OTLP metrics

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix cr issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-07-31 16:54:45 +08:00
committed by GitHub
parent a265499325
commit 51dc393371
9 changed files with 75 additions and 26 deletions

1
Cargo.lock generated
View File

@@ -11424,6 +11424,7 @@ dependencies = [
name = "session"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
"arc-swap",
"auth",

View File

@@ -62,10 +62,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.flat_map(|s| s.metrics.iter().map(|m| &m.name))
.collect::<Vec<_>>();
// If the user uses OTLP metrics ingestion before v0.16, it uses the old path.
// So we call this path 'legacy'.
// After v0.16, we store the OTLP metrics using prometheus compatible format, the new path.
// The difference is how we convert the input data into the final table schema.
// See [`OtlpMetricCtx`] for details
let is_legacy = self.check_otlp_legacy(&input_names, ctx.clone()).await?;
let mut metric_ctx = ctx

View File

@@ -14,6 +14,7 @@
use core::str;
use ahash::HashSet;
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use axum::http::StatusCode;
@@ -23,7 +24,9 @@ use pipeline::{truthy, GreptimePipelineParams, SelectInfo};
use crate::http::header::constants::{
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME,
GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME,
GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME,
GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
@@ -134,8 +137,13 @@ where
/// Axum extractor for OTLP metric options from HTTP headers.
pub struct OtlpMetricOptions {
/// Persist all resource attributes to the table
/// If false, only persist selected attributes. See [`DEFAULT_ATTRS`] in `otlp/metrics.rs`
/// If false, persist selected attributes. See [`promote_resource_attrs`].
pub promote_all_resource_attrs: bool,
/// If `promote_all_resource_attrs` is true, then the list is an exclude list from `ignore_resource_attrs`.
/// If `promote_all_resource_attrs` is false, then this list is a include list from `promote_resource_attrs`.
pub resource_attrs: HashSet<String>,
/// Persist scope attributes to the table
/// If false, persist none
pub promote_scope_attrs: bool,
@@ -155,6 +163,17 @@ where
)?
.map(truthy)
.unwrap_or(false);
let attr_header = if promote_all_resource_attrs {
[GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME]
} else {
[GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME]
};
let resource_attrs = string_value_from_header(headers, &attr_header)?
.map(|s| s.split(';').map(|s| s.trim().to_string()).collect())
.unwrap_or_default();
let promote_scope_attrs = string_value_from_header(
headers,
&[GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME],
@@ -164,6 +183,7 @@ where
Ok(OtlpMetricOptions {
promote_all_resource_attrs,
resource_attrs,
promote_scope_attrs,
})
}

View File

@@ -62,6 +62,10 @@ pub mod constants {
// OTLP headers
pub const GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME: &str =
"x-greptime-otlp-metric-promote-all-resource-attrs";
pub const GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME: &str =
"x-greptime-otlp-metric-promote-resource-attrs";
pub const GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME: &str =
"x-greptime-otlp-metric-ignore-resource-attrs";
pub const GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME: &str =
"x-greptime-otlp-metric-promote-scope-attrs";

View File

@@ -75,6 +75,7 @@ pub async fn metrics(
query_ctx.set_protocol_ctx(ProtocolCtx::OtlpMetric(OtlpMetricCtx {
promote_all_resource_attrs: http_opts.promote_all_resource_attrs,
resource_attrs: http_opts.resource_attrs,
promote_scope_attrs: http_opts.promote_scope_attrs,
with_metric_engine,
// set is_legacy later

View File

@@ -39,8 +39,7 @@ const INSTANCE_KEY: &str = "instance";
const UNDERSCORE: &str = "_";
// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes
// instance and job alias to service.instance.id and service.name that we need to keep
const DEFAULT_ATTRS: [&str; 19] = [
const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
"service.instance.id",
"service.name",
"service.namespace",
@@ -63,8 +62,8 @@ const DEFAULT_ATTRS: [&str; 19] = [
];
lazy_static! {
static ref DEFAULT_ATTRS_HASHSET: HashSet<String> =
HashSet::from_iter(DEFAULT_ATTRS.iter().map(|s| s.to_string()));
static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
static ref INVALID_METRIC_NAME: Regex = Regex::new(r"[^a-zA-Z0-9:_]").unwrap();
}
@@ -115,11 +114,6 @@ fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx)
return;
}
// check if promote all
if !metric_ctx.promote_all_resource_attrs {
attrs.retain(|kv| DEFAULT_ATTRS_HASHSET.contains(&kv.key));
}
// remap service.name and service.instance.id to job and instance
let mut tmp = Vec::with_capacity(2);
for kv in attrs.iter() {
@@ -139,6 +133,17 @@ fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx)
_ => {}
}
}
// if promote all, then exclude the list, else, include the list
if metric_ctx.promote_all_resource_attrs {
attrs.retain(|kv| !metric_ctx.resource_attrs.contains(&kv.key));
} else {
attrs.retain(|kv| {
metric_ctx.resource_attrs.contains(&kv.key)
|| DEFAULT_PROMOTE_ATTRS_SET.contains(&kv.key)
});
}
attrs.extend(tmp);
}

View File

@@ -11,6 +11,7 @@ testing = []
workspace = true
[dependencies]
ahash.workspace = true
api.workspace = true
arc-swap = "1.5"
auth.workspace = true

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
/// Protocol specific context
/// for carrying options(like HTTP header options) within the query context
#[derive(Debug, Clone, Default)]
@@ -30,9 +32,25 @@ impl ProtocolCtx {
}
}
/// The context information for OTLP metrics ingestion.
/// - `promote_all_resource_attrs`
/// If true, all resource attributes will be promoted to the final table schema.
/// - `resource_attrs`
/// If `promote_all_resource_attrs` is true, then the list is an exclude list from `ignore_resource_attrs`.
/// If `promote_all_resource_attrs` is false, then this list is a include list from `promote_resource_attrs`.
/// - `promote_scope_attrs`
/// If true, all scope attributes will be promoted to the final table schema.
/// Along with the scope name, scope version and scope schema URL.
/// - `with_metric_engine`
/// - `is_legacy`
/// If the user uses OTLP metrics ingestion before v0.16, it uses the old path.
/// So we call this path 'legacy'.
/// After v0.16, we store the OTLP metrics using prometheus compatible format, the new path.
/// The difference is how we convert the input data into the final table schema.
#[derive(Debug, Clone, Default)]
pub struct OtlpMetricCtx {
pub promote_all_resource_attrs: bool,
pub resource_attrs: HashSet<String>,
pub promote_scope_attrs: bool,
pub with_metric_engine: bool,
pub is_legacy: bool,

View File

@@ -3621,6 +3621,10 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
HeaderName::from_static("x-greptime-otlp-metric-promote-all-resource-attrs"),
HeaderValue::from_static("true"),
),
(
HeaderName::from_static("x-greptime-otlp-metric-ignore-resource-attrs"),
HeaderValue::from_static("os.type"),
),
],
"/v1/otlp/v1/metrics",
body.clone(),
@@ -3638,7 +3642,6 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// "host_arch" STRING NULL,
// "job" STRING NULL,
// "model" STRING NULL,
// "os_type" STRING NULL,
// "os_version" STRING NULL,
// "otel_scope_name" STRING NULL,
// "otel_scope_schema_url" STRING NULL,
@@ -3649,14 +3652,14 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// "terminal_type" STRING NULL,
// "user_id" STRING NULL,
// TIME INDEX ("greptime_timestamp"),
// PRIMARY KEY ("host_arch", "job", "model", "os_type", "os_version", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id")
// PRIMARY KEY ("host_arch", "job", "model", "os_version", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id")
// )
// ENGINE=metric
// WITH(
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_all_show_create_table",
&client,
@@ -3666,7 +3669,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
.await;
// select metrics data
let expected = "[[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
let expected = "[[1753780559836,0.0052544,\"arm64\",\"claude-code\",\"claude-3-5-haiku-20241022\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,2.244618,\"arm64\",\"claude-code\",\"claude-sonnet-4-20250514\",\"25.0.0\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
validate_data(
"otlp_metrics_all_select",
&client,
@@ -3697,8 +3700,8 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-otlp-metric-promote-scope-attrs"),
HeaderValue::from_static("true"),
HeaderName::from_static("x-greptime-otlp-metric-promote-resource-attrs"),
HeaderValue::from_static("os.type;os.version"),
),
],
"/v1/otlp/v1/metrics",
@@ -3713,23 +3716,22 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// "greptime_value" DOUBLE NULL,
// "job" STRING NULL,
// "model" STRING NULL,
// "otel_scope_name" STRING NULL,
// "otel_scope_schema_url" STRING NULL,
// "otel_scope_version" STRING NULL,
// "os_type" STRING NULL,
// "os_version" STRING NULL,
// "service_name" STRING NULL,
// "service_version" STRING NULL,
// "session_id" STRING NULL,
// "terminal_type" STRING NULL,
// "user_id" STRING NULL,
// TIME INDEX ("greptime_timestamp"),
// PRIMARY KEY ("job", "model", "otel_scope_name", "otel_scope_schema_url", "otel_scope_version", "service_name", "service_version", "session_id", "terminal_type", "user_id")
// PRIMARY KEY ("job", "model", "os_type", "os_version", "service_name", "service_version", "session_id", "terminal_type", "user_id")
// )
// ENGINE=metric
// WITH(
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_show_create_table",
&client,
@@ -3739,7 +3741,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
.await;
// select metrics data
let expected = "[[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"com.anthropic.claude_code\",\"\",\"1.0.62\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
let expected = "[[1753780559836,2.244618,\"claude-code\",\"claude-sonnet-4-20250514\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"],[1753780559836,0.0052544,\"claude-code\",\"claude-3-5-haiku-20241022\",\"darwin\",\"25.0.0\",\"claude-code\",\"1.0.62\",\"736525A3-F5D4-496B-933E-827AF23A5B97\",\"ghostty\",\"6DA02FD9-B5C5-4E61-9355-9FE8EC9A0CF4\"]]";
validate_data(
"otlp_metrics_select",
&client,