From 9e2f793b04d340e77ce08d1707877982f0f891c2 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:17:12 +0800 Subject: [PATCH] chore(otlp_metric): update metric and label naming (#6624) * chore: update otlp metrics & labels naming Signed-off-by: shuiyisong * fix: typo and test Signed-off-by: shuiyisong * Update src/session/src/protocol_ctx.rs * chore: add test cases for normalizing functions Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong Co-authored-by: Ning Sun --- src/frontend/src/instance/otlp.rs | 2 +- src/servers/src/http/otlp.rs | 3 +- src/servers/src/otlp/metrics.rs | 342 ++++++++++++++++++++++++++++-- src/session/src/protocol_ctx.rs | 20 ++ tests-integration/src/otlp.rs | 14 +- tests-integration/tests/http.rs | 38 ++-- 6 files changed, 374 insertions(+), 45 deletions(-) diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index b50c174b30..d3a0b75d1c 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -72,7 +72,7 @@ impl OpenTelemetryProtocolHandler for Instance { .unwrap_or_default(); metric_ctx.is_legacy = is_legacy; - let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &metric_ctx)?; + let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?; OTLP_METRICS_ROWS.inc_by(rows as u64); let ctx = if !is_legacy { diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 2415773542..7c7847a332 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -33,7 +33,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; -use session::protocol_ctx::{OtlpMetricCtx, ProtocolCtx}; +use session::protocol_ctx::{MetricType, OtlpMetricCtx, ProtocolCtx}; use snafu::prelude::*; use crate::error::{self, PipelineSnafu, Result}; @@ -80,6 +80,7 @@ pub async fn metrics( with_metric_engine, // set is_legacy later is_legacy: false, + metric_type: MetricType::Init, })); let query_ctx = Arc::new(query_ctx); diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs index e6d71cc7de..7e7a8e8a85 100644 --- a/src/servers/src/otlp/metrics.rs +++ b/src/servers/src/otlp/metrics.rs @@ -12,16 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::HashSet; +use ahash::{HashMap, HashSet}; use api::v1::{RowInsertRequests, Value}; use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use itertools::Itertools; use lazy_static::lazy_static; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, KeyValue}; use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *}; use regex::Regex; -use session::protocol_ctx::OtlpMetricCtx; +use session::protocol_ctx::{MetricType, OtlpMetricCtx}; use crate::error::Result; use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME}; @@ -37,6 +38,9 @@ const JOB_KEY: &str = "job"; const INSTANCE_KEY: &str = "instance"; const UNDERSCORE: &str = "_"; +const DOUBLE_UNDERSCORE: &str = "__"; +const TOTAL: &str = "total"; +const RATIO: &str = "ratio"; // see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [ @@ -64,7 +68,48 @@ const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [ lazy_static! { static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet = HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string())); - static ref INVALID_METRIC_NAME: Regex = Regex::new(r"[^a-zA-Z0-9:_]").unwrap(); + static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap(); + static ref UNIT_MAP: HashMap = [ + // Time + ("d", "days"), + ("h", "hours"), + ("min", "minutes"), + ("s", "seconds"), + ("ms", "milliseconds"), + ("us", "microseconds"), + ("ns", "nanoseconds"), + // Bytes + ("By", "bytes"), + ("KiBy", "kibibytes"), + ("MiBy", "mebibytes"), + ("GiBy", "gibibytes"), + ("TiBy", "tibibytes"), + ("KBy", "kilobytes"), + ("MBy", "megabytes"), + ("GBy", "gigabytes"), + ("TBy", "terabytes"), + // SI + ("m", "meters"), + ("V", "volts"), + ("A", "amperes"), + ("J", "joules"), + ("W", "watts"), + ("g", "grams"), + // Misc + ("Cel", "celsius"), + ("Hz", "hertz"), + ("1", ""), + ("%", "percent"), + ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect(); + static ref PER_UNIT_MAP: HashMap = [ + ("s", "second"), + ("m", "minute"), + ("h", "hour"), + ("d", "day"), + ("w", "week"), + ("mo", "month"), + ("y", "year"), + ].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect(); } const OTEL_SCOPE_NAME: &str = "name"; @@ -80,7 +125,7 @@ const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url"; /// Returns `InsertRequests` and total number of rows to ingest pub fn to_grpc_insert_requests( request: ExportMetricsServiceRequest, - metric_ctx: &OtlpMetricCtx, + metric_ctx: &mut OtlpMetricCtx, ) -> Result<(RowInsertRequests, usize)> { let mut table_writer = MultiTableData::default(); @@ -95,6 +140,13 @@ pub fn to_grpc_insert_requests( let scope_attrs = process_scope_attrs(scope, metric_ctx); for metric in &scope.metrics { + if metric.data.is_none() { + continue; + } + if let Some(t) = metric.data.as_ref().map(from_metric_type) { + metric_ctx.set_metric_type(t); + } + encode_metrics( &mut table_writer, metric, @@ -109,6 +161,22 @@ pub fn to_grpc_insert_requests( Ok(table_writer.into_row_insert_requests()) } +fn from_metric_type(data: &metric::Data) -> MetricType { + match data { + metric::Data::Gauge(_) => MetricType::Gauge, + metric::Data::Sum(s) => { + if s.is_monotonic { + MetricType::MonotonicSum + } else { + MetricType::NonMonotonicSum + } + } + metric::Data::Histogram(_) => MetricType::Histogram, + metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram, + metric::Data::Summary(_) => MetricType::Summary, + } +} + fn process_resource_attrs(attrs: &mut Vec, metric_ctx: &OtlpMetricCtx) { if metric_ctx.is_legacy { return; @@ -181,10 +249,37 @@ fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Opti }) } -// replace . with _ -// see: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.38.0/specification/compatibility/prometheus_and_openmetrics.md#otlp-metric-points-to-prometheus -pub fn normalize_metric_name(name: &str) -> String { - let name = INVALID_METRIC_NAME.replace_all(name, UNDERSCORE); +// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55 +pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String { + let mut name_tokens = NON_ALPHA_NUM_CHAR + .split(&metric.name) + .map(|s| s.to_string()) + .collect_vec(); + if !metric.unit.is_empty() { + let (main, per) = build_unit_suffix(&metric.unit); + if let Some(main) = main + && !name_tokens.contains(&main) + { + name_tokens.push(main); + } + if let Some(per) = per + && !name_tokens.contains(&per) + { + name_tokens.push("per".to_string()); + name_tokens.push(per); + } + } + + if matches!(metric_type, MetricType::MonotonicSum) { + name_tokens.retain(|t| t != TOTAL); + name_tokens.push(TOTAL.to_string()); + } + if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) { + name_tokens.retain(|t| t != RATIO); + name_tokens.push(RATIO.to_string()); + } + + let name = name_tokens.join(UNDERSCORE); if let Some((_, first)) = name.char_indices().next() && first >= '0' @@ -192,10 +287,50 @@ pub fn normalize_metric_name(name: &str) -> String { { format!("_{}", name) } else { - name.to_string() + name } } +fn build_unit_suffix(unit: &str) -> (Option, Option) { + let (main, per) = unit.split_once('/').unwrap_or((unit, "")); + (check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP)) +} + +fn check_unit(unit_str: &str, unit_map: &HashMap) -> Option { + let u = unit_str.trim(); + if !u.is_empty() && !u.contains("{}") { + let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u); + let u = clean_unit_name(u); + if !u.is_empty() { + return Some(u); + } + } + None +} + +fn clean_unit_name(name: &str) -> String { + NON_ALPHA_NUM_CHAR.split(name).join(UNDERSCORE) +} + +// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27 +pub fn normalize_label_name(name: &str) -> String { + if name.is_empty() { + return name.to_string(); + } + + let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE); + if let Some((_, first)) = n.char_indices().next() + && first >= '0' + && first <= '9' + { + return format!("key_{}", n); + } + if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) { + return format!("key{}", n); + } + n.to_string() +} + /// Normalize otlp instrumentation, metric and attribute names /// /// @@ -216,7 +351,7 @@ fn encode_metrics( let name = if metric_ctx.is_legacy { legacy_normalize_otlp_name(&metric.name) } else { - normalize_metric_name(&metric.name) + normalize_metric_name(metric, &metric_ctx.metric_type) }; // note that we don't store description or unit, we might want to deal with @@ -296,10 +431,10 @@ fn write_attributes( .and_then(|val| { let key = match attribute_type { AttributeType::Resource | AttributeType::DataPoint => { - normalize_metric_name(&attr.key) + normalize_label_name(&attr.key) } AttributeType::Scope => { - format!("otel_scope_{}", normalize_metric_name(&attr.key)) + format!("otel_scope_{}", normalize_label_name(&attr.key)) } AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key), }; @@ -746,6 +881,181 @@ mod tests { ); } + #[test] + fn test_normalize_metric_name() { + let test_cases = vec![ + // Default case + (Metric::default(), MetricType::Init, ""), + // Basic metric with just name + ( + Metric { + name: "foo".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo", + ), + // Metric with unit "s" should append "seconds" + ( + Metric { + name: "foo".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo_seconds", + ), + // Metric already ending with unit suffix should not duplicate + ( + Metric { + name: "foo_seconds".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo_seconds", + ), + // Monotonic sum should append "total" + ( + Metric { + name: "foo".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "foo_total", + ), + // Metric already ending with "total" should not duplicate + ( + Metric { + name: "foo_total".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "foo_total", + ), + // Monotonic sum with unit should append both unit and "total" + ( + Metric { + name: "foo".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "foo_seconds_total", + ), + // Metric with unit suffix and monotonic sum + ( + Metric { + name: "foo_seconds".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "foo_seconds_total", + ), + // Metric already ending with "total" and has unit + ( + Metric { + name: "foo_total".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "foo_seconds_total", + ), + // Metric already ending with both unit and "total" + ( + Metric { + name: "foo_seconds_total".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "foo_seconds_total", + ), + // Metric with unusual order (total_seconds) should be normalized + ( + Metric { + name: "foo_total_seconds".to_string(), + unit: "s".to_string(), + ..Default::default() + }, + MetricType::MonotonicSum, + "foo_seconds_total", + ), + // Gauge with unit "1" should append "ratio" + ( + Metric { + name: "foo".to_string(), + unit: "1".to_string(), + ..Default::default() + }, + MetricType::Gauge, + "foo_ratio", + ), + // Complex unit like "m/s" should be converted to "meters_per_second" + ( + Metric { + name: "foo".to_string(), + unit: "m/s".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo_meters_per_second", + ), + // Metric with partial unit match + ( + Metric { + name: "foo_second".to_string(), + unit: "m/s".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo_second_meters", + ), + // Metric already containing the main unit + ( + Metric { + name: "foo_meters".to_string(), + unit: "m/s".to_string(), + ..Default::default() + }, + MetricType::Init, + "foo_meters_per_second", + ), + ]; + + for (metric, metric_type, expected) in test_cases { + let result = normalize_metric_name(&metric, &metric_type); + assert_eq!( + result, expected, + "Failed for metric name: '{}', unit: '{}', type: {:?}", + metric.name, metric.unit, metric_type + ); + } + } + + #[test] + fn test_normalize_label_name() { + let test_cases = vec![ + ("", ""), + ("foo", "foo"), + ("foo_bar/baz:abc", "foo_bar_baz_abc"), + ("1foo", "key_1foo"), + ("_foo", "key_foo"), + ("__bar", "__bar"), + ]; + + for (input, expected) in test_cases { + let result = normalize_label_name(input); + assert_eq!( + result, expected, + "unexpected result for input '{}'; got '{}'; want '{}'", + input, result, expected + ); + } + } + fn keyvalue(key: &str, value: &str) -> KeyValue { KeyValue { key: key.into(), @@ -1023,12 +1333,4 @@ mod tests { ] ); } - - #[test] - fn test_normalize_otlp_name() { - assert_eq!(normalize_metric_name("test.123"), "test_123"); - assert_eq!(normalize_metric_name("test_123"), "test_123"); - assert_eq!(normalize_metric_name("test._123"), "test__123"); - assert_eq!(normalize_metric_name("123_test"), "_123_test"); - } } diff --git a/src/session/src/protocol_ctx.rs b/src/session/src/protocol_ctx.rs index 0f56a4ded6..2cc1b86174 100644 --- a/src/session/src/protocol_ctx.rs +++ b/src/session/src/protocol_ctx.rs @@ -54,4 +54,24 @@ pub struct OtlpMetricCtx { pub promote_scope_attrs: bool, pub with_metric_engine: bool, pub is_legacy: bool, + pub metric_type: MetricType, +} + +impl OtlpMetricCtx { + pub fn set_metric_type(&mut self, metric_type: MetricType) { + self.metric_type = metric_type; + } +} + +#[derive(Debug, Clone, Default)] +pub enum MetricType { + // default value when initializing the context + #[default] + Init, + NonMonotonicSum, + MonotonicSum, + Gauge, + Histogram, + ExponentialHistogram, + Summary, } diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index b9668695f8..5383ada9fd 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -69,7 +69,7 @@ mod test { let mut output = instance .do_query( - "SELECT * FROM my_test_metric ORDER BY greptime_timestamp", + "SELECT * FROM my_test_metric_my_ignored_unit ORDER BY greptime_timestamp", ctx.clone(), ) .await; @@ -91,7 +91,7 @@ mod test { let mut output = instance .do_query( - "SELECT le, greptime_value FROM my_test_histo_bucket order by le", + "SELECT le, greptime_value FROM my_test_histo_my_ignored_unit_bucket order by le", ctx.clone(), ) .await; @@ -113,7 +113,10 @@ mod test { ); let mut output = instance - .do_query("SELECT * FROM my_test_histo_sum", ctx.clone()) + .do_query( + "SELECT * FROM my_test_histo_my_ignored_unit_sum", + ctx.clone(), + ) .await; let output = output.remove(0).unwrap(); let OutputData::Stream(stream) = output.data else { @@ -131,7 +134,10 @@ mod test { ); let mut output = instance - .do_query("SELECT * FROM my_test_histo_count", ctx.clone()) + .do_query( + "SELECT * FROM my_test_histo_my_ignored_unit_count", + ctx.clone(), + ) .await; let output = output.remove(0).unwrap(); let OutputData::Stream(stream) = output.data else { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fc9737c388..e075320569 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -3636,10 +3636,10 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - let expected = "[[\"claude_code_cost_usage\"],[\"claude_code_token_usage\"],[\"demo\"],[\"greptime_physical_table\"],[\"numbers\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\"],[\"claude_code_token_usage_tokens_total\"],[\"demo\"],[\"greptime_physical_table\"],[\"numbers\"]]"; validate_data("otlp_metrics_all_tables", &client, "show tables;", expected).await; - // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" ( + // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" ( // "greptime_timestamp" TIMESTAMP(3) NOT NULL, // "greptime_value" DOUBLE NULL, // "host_arch" STRING NULL, @@ -3662,11 +3662,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_all_show_create_table", &client, - "show create table claude_code_cost_usage;", + "show create table `claude_code_cost_usage_USD_total`;", expected, ) .await; @@ -3676,19 +3676,19 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { validate_data( "otlp_metrics_all_select", &client, - "select * from claude_code_cost_usage;", + "select * from `claude_code_cost_usage_USD_total`;", expected, ) .await; // drop table let res = client - .get("/v1/sql?sql=drop table claude_code_cost_usage;") + .get("/v1/sql?sql=drop table `claude_code_cost_usage_USD_total`;") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .get("/v1/sql?sql=drop table claude_code_token_usage;") + .get("/v1/sql?sql=drop table claude_code_token_usage_tokens_total;") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -3714,7 +3714,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" ( + // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" ( // "greptime_timestamp" TIMESTAMP(3) NOT NULL, // "greptime_value" DOUBLE NULL, // "job" STRING NULL, @@ -3734,11 +3734,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table", &client, - "show create table claude_code_cost_usage;", + "show create table `claude_code_cost_usage_USD_total`;", expected, ) .await; @@ -3748,19 +3748,19 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { validate_data( "otlp_metrics_select", &client, - "select * from claude_code_cost_usage;", + "select * from `claude_code_cost_usage_USD_total`;", expected, ) .await; // drop table let res = client - .get("/v1/sql?sql=drop table claude_code_cost_usage;") + .get("/v1/sql?sql=drop table `claude_code_cost_usage_USD_total`;") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .get("/v1/sql?sql=drop table claude_code_token_usage;") + .get("/v1/sql?sql=drop table claude_code_token_usage_tokens_total;") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -3779,7 +3779,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" ( + // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" ( // "greptime_timestamp" TIMESTAMP(3) NOT NULL, // "greptime_value" DOUBLE NULL, // "job" STRING NULL, @@ -3797,11 +3797,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table_none", &client, - "show create table claude_code_cost_usage;", + "show create table `claude_code_cost_usage_USD_total`;", expected, ) .await; @@ -3811,19 +3811,19 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { validate_data( "otlp_metrics_select_none", &client, - "select * from claude_code_cost_usage;", + "select * from `claude_code_cost_usage_USD_total`;", expected, ) .await; // drop table let res = client - .get("/v1/sql?sql=drop table claude_code_cost_usage;") + .get("/v1/sql?sql=drop table `claude_code_cost_usage_USD_total`;") .send() .await; assert_eq!(res.status(), StatusCode::OK); let res = client - .get("/v1/sql?sql=drop table claude_code_token_usage;") + .get("/v1/sql?sql=drop table claude_code_token_usage_tokens_total;") .send() .await; assert_eq!(res.status(), StatusCode::OK);