chore(otlp_metric): update metric and label naming (#6624)

* chore: update otlp metrics & labels naming

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: typo and test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* Update src/session/src/protocol_ctx.rs

* chore: add test cases for normalizing functions

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Ning Sun <classicning@gmail.com>
This commit is contained in:
shuiyisong
2025-08-01 16:17:12 +08:00
committed by GitHub
parent 52466fdd92
commit 9e2f793b04
6 changed files with 374 additions and 45 deletions

View File

@@ -72,7 +72,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.unwrap_or_default();
metric_ctx.is_legacy = is_legacy;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &metric_ctx)?;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
OTLP_METRICS_ROWS.inc_by(rows as u64);
let ctx = if !is_legacy {

View File

@@ -33,7 +33,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
use pipeline::PipelineWay;
use prost::Message;
use session::context::{Channel, QueryContext};
use session::protocol_ctx::{OtlpMetricCtx, ProtocolCtx};
use session::protocol_ctx::{MetricType, OtlpMetricCtx, ProtocolCtx};
use snafu::prelude::*;
use crate::error::{self, PipelineSnafu, Result};
@@ -80,6 +80,7 @@ pub async fn metrics(
with_metric_engine,
// set is_legacy later
is_legacy: false,
metric_type: MetricType::Init,
}));
let query_ctx = Arc::new(query_ctx);

View File

@@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::HashSet;
use ahash::{HashMap, HashSet};
use api::v1::{RowInsertRequests, Value};
use common_grpc::precision::Precision;
use common_query::prelude::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use itertools::Itertools;
use lazy_static::lazy_static;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};
use regex::Regex;
use session::protocol_ctx::OtlpMetricCtx;
use session::protocol_ctx::{MetricType, OtlpMetricCtx};
use crate::error::Result;
use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
@@ -37,6 +38,9 @@ const JOB_KEY: &str = "job";
const INSTANCE_KEY: &str = "instance";
const UNDERSCORE: &str = "_";
const DOUBLE_UNDERSCORE: &str = "__";
const TOTAL: &str = "total";
const RATIO: &str = "ratio";
// see: https://prometheus.io/docs/guides/opentelemetry/#promoting-resource-attributes
const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
@@ -64,7 +68,48 @@ const DEFAULT_PROMOTE_ATTRS: [&str; 19] = [
lazy_static! {
static ref DEFAULT_PROMOTE_ATTRS_SET: HashSet<String> =
HashSet::from_iter(DEFAULT_PROMOTE_ATTRS.iter().map(|s| s.to_string()));
static ref INVALID_METRIC_NAME: Regex = Regex::new(r"[^a-zA-Z0-9:_]").unwrap();
static ref NON_ALPHA_NUM_CHAR: Regex = Regex::new(r"[^a-zA-Z0-9]").unwrap();
static ref UNIT_MAP: HashMap<String, String> = [
// Time
("d", "days"),
("h", "hours"),
("min", "minutes"),
("s", "seconds"),
("ms", "milliseconds"),
("us", "microseconds"),
("ns", "nanoseconds"),
// Bytes
("By", "bytes"),
("KiBy", "kibibytes"),
("MiBy", "mebibytes"),
("GiBy", "gibibytes"),
("TiBy", "tibibytes"),
("KBy", "kilobytes"),
("MBy", "megabytes"),
("GBy", "gigabytes"),
("TBy", "terabytes"),
// SI
("m", "meters"),
("V", "volts"),
("A", "amperes"),
("J", "joules"),
("W", "watts"),
("g", "grams"),
// Misc
("Cel", "celsius"),
("Hz", "hertz"),
("1", ""),
("%", "percent"),
].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
static ref PER_UNIT_MAP: HashMap<String, String> = [
("s", "second"),
("m", "minute"),
("h", "hour"),
("d", "day"),
("w", "week"),
("mo", "month"),
("y", "year"),
].iter().map(|(k, v)| (k.to_string(), v.to_string())).collect();
}
const OTEL_SCOPE_NAME: &str = "name";
@@ -80,7 +125,7 @@ const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
metric_ctx: &OtlpMetricCtx,
metric_ctx: &mut OtlpMetricCtx,
) -> Result<(RowInsertRequests, usize)> {
let mut table_writer = MultiTableData::default();
@@ -95,6 +140,13 @@ pub fn to_grpc_insert_requests(
let scope_attrs = process_scope_attrs(scope, metric_ctx);
for metric in &scope.metrics {
if metric.data.is_none() {
continue;
}
if let Some(t) = metric.data.as_ref().map(from_metric_type) {
metric_ctx.set_metric_type(t);
}
encode_metrics(
&mut table_writer,
metric,
@@ -109,6 +161,22 @@ pub fn to_grpc_insert_requests(
Ok(table_writer.into_row_insert_requests())
}
fn from_metric_type(data: &metric::Data) -> MetricType {
match data {
metric::Data::Gauge(_) => MetricType::Gauge,
metric::Data::Sum(s) => {
if s.is_monotonic {
MetricType::MonotonicSum
} else {
MetricType::NonMonotonicSum
}
}
metric::Data::Histogram(_) => MetricType::Histogram,
metric::Data::ExponentialHistogram(_) => MetricType::ExponentialHistogram,
metric::Data::Summary(_) => MetricType::Summary,
}
}
fn process_resource_attrs(attrs: &mut Vec<KeyValue>, metric_ctx: &OtlpMetricCtx) {
if metric_ctx.is_legacy {
return;
@@ -181,10 +249,37 @@ fn process_scope_attrs(scope: &ScopeMetrics, metric_ctx: &OtlpMetricCtx) -> Opti
})
}
// replace . with _
// see: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.38.0/specification/compatibility/prometheus_and_openmetrics.md#otlp-metric-points-to-prometheus
pub fn normalize_metric_name(name: &str) -> String {
let name = INVALID_METRIC_NAME.replace_all(name, UNDERSCORE);
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_name.go#L55
pub fn normalize_metric_name(metric: &Metric, metric_type: &MetricType) -> String {
let mut name_tokens = NON_ALPHA_NUM_CHAR
.split(&metric.name)
.map(|s| s.to_string())
.collect_vec();
if !metric.unit.is_empty() {
let (main, per) = build_unit_suffix(&metric.unit);
if let Some(main) = main
&& !name_tokens.contains(&main)
{
name_tokens.push(main);
}
if let Some(per) = per
&& !name_tokens.contains(&per)
{
name_tokens.push("per".to_string());
name_tokens.push(per);
}
}
if matches!(metric_type, MetricType::MonotonicSum) {
name_tokens.retain(|t| t != TOTAL);
name_tokens.push(TOTAL.to_string());
}
if metric.unit == "1" && matches!(metric_type, MetricType::Gauge) {
name_tokens.retain(|t| t != RATIO);
name_tokens.push(RATIO.to_string());
}
let name = name_tokens.join(UNDERSCORE);
if let Some((_, first)) = name.char_indices().next()
&& first >= '0'
@@ -192,10 +287,50 @@ pub fn normalize_metric_name(name: &str) -> String {
{
format!("_{}", name)
} else {
name.to_string()
name
}
}
fn build_unit_suffix(unit: &str) -> (Option<String>, Option<String>) {
let (main, per) = unit.split_once('/').unwrap_or((unit, ""));
(check_unit(main, &UNIT_MAP), check_unit(per, &PER_UNIT_MAP))
}
fn check_unit(unit_str: &str, unit_map: &HashMap<String, String>) -> Option<String> {
let u = unit_str.trim();
if !u.is_empty() && !u.contains("{}") {
let u = unit_map.get(u).map(|s| s.as_ref()).unwrap_or(u);
let u = clean_unit_name(u);
if !u.is_empty() {
return Some(u);
}
}
None
}
fn clean_unit_name(name: &str) -> String {
NON_ALPHA_NUM_CHAR.split(name).join(UNDERSCORE)
}
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/145942706622aba5c276ca47f48df438228bfea4/pkg/translator/prometheus/normalize_label.go#L27
pub fn normalize_label_name(name: &str) -> String {
if name.is_empty() {
return name.to_string();
}
let n = NON_ALPHA_NUM_CHAR.replace_all(name, UNDERSCORE);
if let Some((_, first)) = n.char_indices().next()
&& first >= '0'
&& first <= '9'
{
return format!("key_{}", n);
}
if n.starts_with(UNDERSCORE) && !n.starts_with(DOUBLE_UNDERSCORE) {
return format!("key{}", n);
}
n.to_string()
}
/// Normalize otlp instrumentation, metric and attribute names
///
/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#instrument-name-syntax>
@@ -216,7 +351,7 @@ fn encode_metrics(
let name = if metric_ctx.is_legacy {
legacy_normalize_otlp_name(&metric.name)
} else {
normalize_metric_name(&metric.name)
normalize_metric_name(metric, &metric_ctx.metric_type)
};
// note that we don't store description or unit, we might want to deal with
@@ -296,10 +431,10 @@ fn write_attributes(
.and_then(|val| {
let key = match attribute_type {
AttributeType::Resource | AttributeType::DataPoint => {
normalize_metric_name(&attr.key)
normalize_label_name(&attr.key)
}
AttributeType::Scope => {
format!("otel_scope_{}", normalize_metric_name(&attr.key))
format!("otel_scope_{}", normalize_label_name(&attr.key))
}
AttributeType::Legacy => legacy_normalize_otlp_name(&attr.key),
};
@@ -746,6 +881,181 @@ mod tests {
);
}
#[test]
fn test_normalize_metric_name() {
let test_cases = vec![
// Default case
(Metric::default(), MetricType::Init, ""),
// Basic metric with just name
(
Metric {
name: "foo".to_string(),
..Default::default()
},
MetricType::Init,
"foo",
),
// Metric with unit "s" should append "seconds"
(
Metric {
name: "foo".to_string(),
unit: "s".to_string(),
..Default::default()
},
MetricType::Init,
"foo_seconds",
),
// Metric already ending with unit suffix should not duplicate
(
Metric {
name: "foo_seconds".to_string(),
unit: "s".to_string(),
..Default::default()
},
MetricType::Init,
"foo_seconds",
),
// Monotonic sum should append "total"
(
Metric {
name: "foo".to_string(),
..Default::default()
},
MetricType::MonotonicSum,
"foo_total",
),
// Metric already ending with "total" should not duplicate
(
Metric {
name: "foo_total".to_string(),
..Default::default()
},
MetricType::MonotonicSum,
"foo_total",
),
// Monotonic sum with unit should append both unit and "total"
(
Metric {
name: "foo".to_string(),
unit: "s".to_string(),
..Default::default()
},
MetricType::MonotonicSum,
"foo_seconds_total",
),
// Metric with unit suffix and monotonic sum
(
Metric {
name: "foo_seconds".to_string(),
unit: "s".to_string(),
..Default::default()
},
MetricType::MonotonicSum,
"foo_seconds_total",
),
// Metric already ending with "total" and has unit
(
Metric {
name: "foo_total".to_string(),
unit: "s".to_string(),
..Default::default()
},
MetricType::MonotonicSum,
"foo_seconds_total",
),
// Metric already ending with both unit and "total"
(
Metric {
name: "foo_seconds_total".to_string(),
unit: "s".to_string(),
..Default::default()
},
MetricType::MonotonicSum,
"foo_seconds_total",
),
// Metric with unusual order (total_seconds) should be normalized
(
Metric {
name: "foo_total_seconds".to_string(),
unit: "s".to_string(),
..Default::default()
},
MetricType::MonotonicSum,
"foo_seconds_total",
),
// Gauge with unit "1" should append "ratio"
(
Metric {
name: "foo".to_string(),
unit: "1".to_string(),
..Default::default()
},
MetricType::Gauge,
"foo_ratio",
),
// Complex unit like "m/s" should be converted to "meters_per_second"
(
Metric {
name: "foo".to_string(),
unit: "m/s".to_string(),
..Default::default()
},
MetricType::Init,
"foo_meters_per_second",
),
// Metric with partial unit match
(
Metric {
name: "foo_second".to_string(),
unit: "m/s".to_string(),
..Default::default()
},
MetricType::Init,
"foo_second_meters",
),
// Metric already containing the main unit
(
Metric {
name: "foo_meters".to_string(),
unit: "m/s".to_string(),
..Default::default()
},
MetricType::Init,
"foo_meters_per_second",
),
];
for (metric, metric_type, expected) in test_cases {
let result = normalize_metric_name(&metric, &metric_type);
assert_eq!(
result, expected,
"Failed for metric name: '{}', unit: '{}', type: {:?}",
metric.name, metric.unit, metric_type
);
}
}
#[test]
fn test_normalize_label_name() {
let test_cases = vec![
("", ""),
("foo", "foo"),
("foo_bar/baz:abc", "foo_bar_baz_abc"),
("1foo", "key_1foo"),
("_foo", "key_foo"),
("__bar", "__bar"),
];
for (input, expected) in test_cases {
let result = normalize_label_name(input);
assert_eq!(
result, expected,
"unexpected result for input '{}'; got '{}'; want '{}'",
input, result, expected
);
}
}
fn keyvalue(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.into(),
@@ -1023,12 +1333,4 @@ mod tests {
]
);
}
#[test]
fn test_normalize_otlp_name() {
assert_eq!(normalize_metric_name("test.123"), "test_123");
assert_eq!(normalize_metric_name("test_123"), "test_123");
assert_eq!(normalize_metric_name("test._123"), "test__123");
assert_eq!(normalize_metric_name("123_test"), "_123_test");
}
}

View File

@@ -54,4 +54,24 @@ pub struct OtlpMetricCtx {
pub promote_scope_attrs: bool,
pub with_metric_engine: bool,
pub is_legacy: bool,
pub metric_type: MetricType,
}
impl OtlpMetricCtx {
pub fn set_metric_type(&mut self, metric_type: MetricType) {
self.metric_type = metric_type;
}
}
#[derive(Debug, Clone, Default)]
pub enum MetricType {
// default value when initializing the context
#[default]
Init,
NonMonotonicSum,
MonotonicSum,
Gauge,
Histogram,
ExponentialHistogram,
Summary,
}

View File

@@ -69,7 +69,7 @@ mod test {
let mut output = instance
.do_query(
"SELECT * FROM my_test_metric ORDER BY greptime_timestamp",
"SELECT * FROM my_test_metric_my_ignored_unit ORDER BY greptime_timestamp",
ctx.clone(),
)
.await;
@@ -91,7 +91,7 @@ mod test {
let mut output = instance
.do_query(
"SELECT le, greptime_value FROM my_test_histo_bucket order by le",
"SELECT le, greptime_value FROM my_test_histo_my_ignored_unit_bucket order by le",
ctx.clone(),
)
.await;
@@ -113,7 +113,10 @@ mod test {
);
let mut output = instance
.do_query("SELECT * FROM my_test_histo_sum", ctx.clone())
.do_query(
"SELECT * FROM my_test_histo_my_ignored_unit_sum",
ctx.clone(),
)
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {
@@ -131,7 +134,10 @@ mod test {
);
let mut output = instance
.do_query("SELECT * FROM my_test_histo_count", ctx.clone())
.do_query(
"SELECT * FROM my_test_histo_my_ignored_unit_count",
ctx.clone(),
)
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {

View File

@@ -3636,10 +3636,10 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = "[[\"claude_code_cost_usage\"],[\"claude_code_token_usage\"],[\"demo\"],[\"greptime_physical_table\"],[\"numbers\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\"],[\"claude_code_token_usage_tokens_total\"],[\"demo\"],[\"greptime_physical_table\"],[\"numbers\"]]";
validate_data("otlp_metrics_all_tables", &client, "show tables;", expected).await;
// CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" (
// CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" (
// "greptime_timestamp" TIMESTAMP(3) NOT NULL,
// "greptime_value" DOUBLE NULL,
// "host_arch" STRING NULL,
@@ -3662,11 +3662,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_all_show_create_table",
&client,
"show create table claude_code_cost_usage;",
"show create table `claude_code_cost_usage_USD_total`;",
expected,
)
.await;
@@ -3676,19 +3676,19 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
validate_data(
"otlp_metrics_all_select",
&client,
"select * from claude_code_cost_usage;",
"select * from `claude_code_cost_usage_USD_total`;",
expected,
)
.await;
// drop table
let res = client
.get("/v1/sql?sql=drop table claude_code_cost_usage;")
.get("/v1/sql?sql=drop table `claude_code_cost_usage_USD_total`;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/sql?sql=drop table claude_code_token_usage;")
.get("/v1/sql?sql=drop table claude_code_token_usage_tokens_total;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -3714,7 +3714,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
.await;
assert_eq!(StatusCode::OK, res.status());
// CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" (
// CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" (
// "greptime_timestamp" TIMESTAMP(3) NOT NULL,
// "greptime_value" DOUBLE NULL,
// "job" STRING NULL,
@@ -3734,11 +3734,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_show_create_table",
&client,
"show create table claude_code_cost_usage;",
"show create table `claude_code_cost_usage_USD_total`;",
expected,
)
.await;
@@ -3748,19 +3748,19 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
validate_data(
"otlp_metrics_select",
&client,
"select * from claude_code_cost_usage;",
"select * from `claude_code_cost_usage_USD_total`;",
expected,
)
.await;
// drop table
let res = client
.get("/v1/sql?sql=drop table claude_code_cost_usage;")
.get("/v1/sql?sql=drop table `claude_code_cost_usage_USD_total`;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/sql?sql=drop table claude_code_token_usage;")
.get("/v1/sql?sql=drop table claude_code_token_usage_tokens_total;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -3779,7 +3779,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
.await;
assert_eq!(StatusCode::OK, res.status());
// CREATE TABLE IF NOT EXISTS "claude_code_cost_usage" (
// CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" (
// "greptime_timestamp" TIMESTAMP(3) NOT NULL,
// "greptime_value" DOUBLE NULL,
// "job" STRING NULL,
@@ -3797,11 +3797,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_show_create_table_none",
&client,
"show create table claude_code_cost_usage;",
"show create table `claude_code_cost_usage_USD_total`;",
expected,
)
.await;
@@ -3811,19 +3811,19 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
validate_data(
"otlp_metrics_select_none",
&client,
"select * from claude_code_cost_usage;",
"select * from `claude_code_cost_usage_USD_total`;",
expected,
)
.await;
// drop table
let res = client
.get("/v1/sql?sql=drop table claude_code_cost_usage;")
.get("/v1/sql?sql=drop table `claude_code_cost_usage_USD_total`;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/sql?sql=drop table claude_code_token_usage;")
.get("/v1/sql?sql=drop table claude_code_token_usage_tokens_total;")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);