diff --git a/docs/rfcs/2026-05-28-table-semantic-layer.md b/docs/rfcs/2026-05-28-table-semantic-layer.md index e4d899d704..d652171622 100644 --- a/docs/rfcs/2026-05-28-table-semantic-layer.md +++ b/docs/rfcs/2026-05-28-table-semantic-layer.md @@ -48,35 +48,42 @@ The audience is broader than LLM agents. Alert generators need to choose between ## Vocabulary -All keys are flat strings under the `greptime.semantic.` prefix; values are strings; unknown keys are tolerated so the vocabulary can grow without coordinated rollouts. +All keys are flat strings under the `greptime.semantic.` prefix; values are strings. + +The vocabulary is deliberately small. A key earns its place only when it records +something a consumer cannot cheaply and reliably recover on its own — from the +schema, the column set, or the metric-naming conventions it already understands. +Keys whose value is already in the metric name by convention (Prometheus +`_total`/`_bucket` suffixes), is a constant for the only producer that sets it, +or merely restates an existing column are intentionally omitted rather than +stamped for completeness. This is why, for example, Prometheus remote write +tables carry only the common identity (their type/unit live in the name), and +why resource-attribute lineage is not stamped (it is an ingest/collector-config +concern, not query-time semantics). **Common (all signals)** | Key | Example | | --- | --- | | `greptime.semantic.signal_type` | `trace` / `log` / `metric` / `event` | -| `greptime.semantic.source` | `opentelemetry` / `prometheus` / `elasticsearch` / `loki` / `custom` | -| `greptime.semantic.source_version` | protocol or SDK version, e.g. `v2` (Prom remote write), `1.30.0` (optional) | -| `greptime.semantic.pipeline` | `greptime_trace_v1` (subsumes the existing `table_data_model` value) | +| `greptime.semantic.source` | `opentelemetry` / `prometheus` / `influxdb` / `opentsdb` / `elasticsearch` / `loki` / `custom` | +| `greptime.semantic.pipeline` | `greptime_trace_v1` (the signal-agnostic successor to `table_data_model`) | -**Trace**: `greptime.semantic.trace.conventions` (e.g. `otel-semconv-1.27`, lifted from `schema_url`, which is the version of the OpenTelemetry semantic conventions used in this table), `greptime.semantic.trace.has_events`, `greptime.semantic.trace.has_links`. +**Trace**: `greptime.semantic.trace.conventions` (the OTel `schema_url` the rows conform to, or `mixed` / `unknown` when not single-valued). -**Metric** — v1 assumes one metric type per table, which is how both Prom RW and the post-v0.16 OTel ingestion path land data today; mixed-type tables are a follow-up. +**Metric** — v1 assumes one metric type per table, which is how both Prom RW and the post-v0.16 OTel ingestion path land data today; mixed-type tables are a follow-up. These are stamped for OTLP (which declares them on the wire and then discards them); Prometheus carries its type/unit in the name and gets identity only. | Key | Example | | --- | --- | | `greptime.semantic.metric.type` | `counter` / `gauge` / `histogram` / `summary` / `updown_counter` / `gauge_histogram` / `info` / `stateset` | -| `greptime.semantic.metric.unit` | UCUM, e.g. `s`, `By`, `{request}` | -| `greptime.semantic.metric.temporality` | `cumulative` / `delta` (OTel only) | -| `greptime.semantic.metric.monotonic` | `true` / `false` | -| `greptime.semantic.metric.metadata_quality` | `declared` (OTLP / Prom RW v2 / exposition) or `inferred` (Prom RW v1, name-suffix guess) | -| `greptime.semantic.metric.original_name` | Pre-translation OTel name when the table name was Prometheus-ised | +| `greptime.semantic.metric.unit` | UCUM, e.g. `s`, `By`, `{request}` (discarded by the row encoders, so unrecoverable once ingested) | +| `greptime.semantic.metric.temporality` | `cumulative` / `delta` (OTel only; invisible in the name) | +| `greptime.semantic.metric.metadata_quality` | `declared` (OTLP / exposition) or `inferred` (Prom RW v1, name-suffix guess) | +| `greptime.semantic.metric.original_name` | Pre-translation OTel name when the table name was Prometheus-ised; the key a consumer uses to look the metric up in the OTel semantic conventions | `metadata_quality = inferred` is the load-bearing field for confidence-aware tooling: an inferred counter should be re-checked before betting on `rate()`-style semantics. -**Log**: `greptime.semantic.log.severity_scheme` (`otlp` / `syslog` / `custom`), `greptime.semantic.log.body_format` (`string` / `json` / `mixed`). - -**Resource / scope preservation**: `greptime.semantic.resource.attributes_preserved` (JSON array string of attrs promoted to columns), `greptime.semantic.resource.attributes_dropped` (boolean), `greptime.semantic.scope.preserved` (boolean). These answer the most common downstream question: "is this data missing because it was dropped, or because it lives on a different column than I think?" List-shaped values use JSON array strings rather than comma-separated text to avoid escaping and ordering ambiguity. +**Deliberately omitted (and why)**: `metric.monotonic` (a function of `type`); `trace.has_events`/`has_links` (constant for the v1 model and derivable from the `span_events`/`span_links` columns); `log.severity_scheme`/`log.body_format` (constant / derivable by sampling, and the latter cost an O(rows) scan); `resource.attributes_preserved`/`attributes_dropped`/`scope.preserved` (the preserved set restates columns, the dropped flag is a contentless boolean, and lineage is a collector-config concern); `source_version` (no cheap non-constant value today — Prom RW is v1-only, OTel SDK identity is deferred). Some are reserved for follow-ups (see Future Work). ## Conflict and update semantics diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index ab63bfb6b2..dbe8b44f73 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -30,6 +30,7 @@ use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use store_api::mito_engine_options::MERGE_MODE_KEY; +use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_INFLUXDB}; use crate::instance::Instance; use crate::service_config::influxdb::InfluxdbMergeMode; @@ -77,6 +78,12 @@ impl InfluxdbLineProtocolHandler for Instance { .await?; let ctx = ctx_with_default_merge_mode(ctx, self.influxdb_default_merge_mode); + let ctx = { + let mut c = (*ctx).clone(); + c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); + c.set_extension(SEMANTIC_SOURCE, SOURCE_INFLUXDB); + Arc::new(c) + }; self.handle_influx_row_inserts(requests, ctx) .await diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index cd6105933d..1b3c3822cf 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; @@ -22,6 +24,7 @@ use servers::opentsdb::data_point_to_grpc_row_insert_requests; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use snafu::prelude::*; +use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_OPENTSDB}; use crate::instance::Instance; @@ -41,6 +44,13 @@ impl OpentsdbProtocolHandler for Instance { let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?; + let ctx = { + let mut c = (*ctx).clone(); + c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); + c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTSDB); + Arc::new(c) + }; + // OpenTSDB is single value. let output = self .handle_row_inserts(requests, ctx, true, true) diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 757f657c1e..11abcd7146 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -44,9 +44,9 @@ use servers::query_handler::{ use session::context::QueryContextRef; use snafu::{IntoError, ResultExt}; use table::requests::{ - OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM, SEMANTIC_PIPELINE, SEMANTIC_SIGNAL_TYPE, - SEMANTIC_SOURCE, SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_TRACE_HAS_EVENTS, - SEMANTIC_TRACE_HAS_LINKS, SEMANTIC_VALUE_UNKNOWN, SIGNAL_TYPE_LOG, SIGNAL_TYPE_METRIC, + OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM, SEMANTIC_PER_TABLE_INDEX_KEY, + SEMANTIC_PIPELINE, SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SEMANTIC_TRACE_CONVENTIONS, + SEMANTIC_VALUE_MIXED, SEMANTIC_VALUE_UNKNOWN, SIGNAL_TYPE_LOG, SIGNAL_TYPE_METRIC, SIGNAL_TYPE_TRACE, SOURCE_OPENTELEMETRY, TABLE_DATA_MODEL_TRACE_V1, }; @@ -133,13 +133,19 @@ impl OpenTelemetryProtocolHandler for Instance { .unwrap_or_default(); metric_ctx.is_legacy = is_legacy; - let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?; + let (requests, rows, semantic_index) = + otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?; OTLP_METRICS_ROWS.inc_by(rows as u64); let ctx = { let mut c = (*ctx).clone(); c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); + // Per-table metric specifics + resource/scope lineage ride this + // internal channel; the auto-create path folds them per table name. + if let Some(index) = semantic_index.encode() { + c.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, index); + } if !is_legacy { c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string()); } @@ -185,6 +191,8 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; + // `schema_url` is consumed by `parse`, so derive conventions first. + let conventions = trace_conventions(&request); let spans = otlp::trace::span::parse(request); self.ingest_trace_spans( pipeline_handler, @@ -192,6 +200,7 @@ impl OpenTelemetryProtocolHandler for Instance { &pipeline_params, table_name, spans, + &conventions, ctx, ) .await @@ -262,6 +271,7 @@ impl OpenTelemetryProtocolHandler for Instance { impl Instance { /// Ingest OTLP trace spans with chunk-level writes and span-level fallback on /// deterministic chunk failures. + #[allow(clippy::too_many_arguments)] async fn ingest_trace_spans( &self, pipeline_handler: PipelineHandlerRef, @@ -269,6 +279,7 @@ impl Instance { pipeline_params: &GreptimePipelineParams, table_name: String, groups: Vec, + conventions: &str, ctx: QueryContextRef, ) -> ServerResult { let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1); @@ -281,10 +292,7 @@ impl Instance { c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); if is_trace_v1_model { c.set_extension(SEMANTIC_PIPELINE, TABLE_DATA_MODEL_TRACE_V1); - c.set_extension(SEMANTIC_TRACE_HAS_EVENTS, "true"); - c.set_extension(SEMANTIC_TRACE_HAS_LINKS, "true"); - // schema_url is row-level, so conventions is unknown at table level. - c.set_extension(SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_VALUE_UNKNOWN); + c.set_extension(SEMANTIC_TRACE_CONVENTIONS, conventions); } Arc::new(c) }; @@ -838,6 +846,47 @@ where error::ExecuteGrpcQuerySnafu.into_error(BoxedError::new(err)) } +/// Derives `trace.conventions` from the request's resource/scope `schema_url`s. +/// A single distinct non-empty value is concrete; multiple distinct values are +/// `mixed`; none is `unknown`. `schema_url` is row-level in OTLP, so the +/// table-level value is best-effort per the RFC conflict rule. +fn trace_conventions(request: &ExportTraceServiceRequest) -> String { + let mut seen: Option<&str> = None; + let mut mixed = false; + + for resource_spans in &request.resource_spans { + let urls = std::iter::once(resource_spans.schema_url.as_str()).chain( + resource_spans + .scope_spans + .iter() + .map(|s| s.schema_url.as_str()), + ); + for url in urls { + if url.is_empty() { + continue; + } + match seen { + None => seen = Some(url), + Some(prev) if prev == url => {} + Some(_) => { + mixed = true; + break; + } + } + } + if mixed { + break; + } + } + + if mixed { + SEMANTIC_VALUE_MIXED.to_string() + } else { + seen.map(str::to_string) + .unwrap_or_else(|| SEMANTIC_VALUE_UNKNOWN.to_string()) + } +} + #[cfg(test)] mod tests { use common_error::ext::ErrorExt; @@ -1011,4 +1060,47 @@ mod tests { assert_eq!(err.status_code(), StatusCode::TableNotFound); } + + use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans}; + + use super::{ExportTraceServiceRequest, trace_conventions}; + + fn resource_spans(resource_url: &str, scope_urls: &[&str]) -> ResourceSpans { + ResourceSpans { + schema_url: resource_url.to_string(), + scope_spans: scope_urls + .iter() + .map(|u| ScopeSpans { + schema_url: u.to_string(), + ..Default::default() + }) + .collect(), + ..Default::default() + } + } + + #[test] + fn test_trace_conventions() { + let unknown = ExportTraceServiceRequest::default(); + assert_eq!(trace_conventions(&unknown), "unknown"); + + let url = "https://opentelemetry.io/schemas/1.27.0"; + let single = ExportTraceServiceRequest { + resource_spans: vec![resource_spans("", &[url, url])], + }; + assert_eq!(trace_conventions(&single), url); + + let resource_level = ExportTraceServiceRequest { + resource_spans: vec![resource_spans(url, &[""])], + }; + assert_eq!(trace_conventions(&resource_level), url); + + let conflicting = ExportTraceServiceRequest { + resource_spans: vec![resource_spans( + "", + &[url, "https://opentelemetry.io/schemas/1.30.0"], + )], + }; + assert_eq!(trace_conventions(&conflicting), "mixed"); + } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 317c324429..0c627af08f 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -61,9 +61,9 @@ use store_api::storage::{RegionId, TableId}; use table::TableRef; use table::metadata::TableInfo; use table::requests::{ - AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL, - TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS, - is_semantic_option_key, + AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, SEMANTIC_PER_TABLE_INDEX_KEY, + TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, + VALID_TABLE_OPTION_KEYS, is_semantic_option_key, validate_semantic_option, }; use table::table_reference::TableReference; @@ -861,6 +861,7 @@ impl Inserter { ) -> Result { let mut table_options = std::collections::HashMap::with_capacity(4); fill_table_options_for_create(&mut table_options, create_type, ctx); + apply_per_table_semantic_options(&mut table_options, ctx, &req.table_name); let engine_name = if let AutoCreateTableType::Logical(_) = create_type { // engine should be metric engine when creating logical tables. @@ -1090,9 +1091,9 @@ pub fn fill_table_options_for_create( } } - // Semantic keys are prefix-matched, not in the fixed allowlist above. + // Semantic keys use their own vocabulary instead of the fixed option list. for (key, value) in ctx.extensions() { - if is_semantic_option_key(&key) { + if is_semantic_option_key(&key) && validate_semantic_option(&key, &value) { table_options.insert(key, value); } } @@ -1144,6 +1145,41 @@ pub fn fill_table_options_for_create( } } +/// Folds the semantic keys for `table_name` carried on the internal per-table +/// index extension into `table_options`. +/// +/// The index is a `{table_name -> {semantic_key: value}}` JSON blob produced by +/// the OTLP metrics encode path (where one metric can fan out into several +/// tables with distinct keys). Common keys shared by every table in a request +/// travel as plain semantic extensions and are handled by +/// [`fill_table_options_for_create`]; this carries only the per-table tail. +/// Keys are re-checked against the vocabulary defensively. Ingestion paths +/// without a per-table index (logs, traces, Prom RW) carry no extension, so this +/// is a no-op for them. +fn apply_per_table_semantic_options( + table_options: &mut std::collections::HashMap, + ctx: &QueryContextRef, + table_name: &str, +) { + let Some(raw) = ctx.extension(SEMANTIC_PER_TABLE_INDEX_KEY) else { + return; + }; + let Ok(index) = serde_json::from_str::< + std::collections::BTreeMap>, + >(raw) else { + warn!("failed to parse semantic per-table index, skipping per-table options"); + return; + }; + let Some(entry) = index.get(table_name) else { + return; + }; + for (key, value) in entry { + if is_semantic_option_key(key) && validate_semantic_option(key, value) { + table_options.insert(key.clone(), value.clone()); + } + } +} + pub fn build_create_table_expr( table: &TableReference, request_schema: &[ColumnSchema], @@ -1402,13 +1438,14 @@ mod tests { #[test] fn test_fill_table_options_copies_semantic_extensions() { use table::requests::{ - SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, - SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY, + SEMANTIC_METRIC_TYPE, SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE, + SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY, }; let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); ctx.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); + ctx.set_extension(SEMANTIC_METRIC_TYPE, "bogus"); // The internal transport key must NOT be copied into table options. ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, "{}"); let ctx = Arc::new(ctx); @@ -1424,9 +1461,59 @@ mod tests { Some(SOURCE_OPENTELEMETRY), table_options.get(SEMANTIC_SOURCE).map(String::as_str) ); + assert!(!table_options.contains_key(SEMANTIC_METRIC_TYPE)); assert!(!table_options.contains_key(SEMANTIC_PER_TABLE_INDEX_KEY)); } + #[test] + fn test_apply_per_table_semantic_options() { + use table::requests::{ + SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, SEMANTIC_PER_TABLE_INDEX_KEY, + }; + + let index = r#"{ + "http_requests_total": { + "greptime.semantic.metric.type": "counter", + "greptime.semantic.metric.unit": "By", + "greptime.semantic.metric.type_BOGUS": "x" + }, + "other_table": { + "greptime.semantic.metric.type": "gauge" + } + }"#; + let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, index); + let ctx = Arc::new(ctx); + + let mut table_options = std::collections::HashMap::new(); + apply_per_table_semantic_options(&mut table_options, &ctx, "http_requests_total"); + assert_eq!( + table_options.get(SEMANTIC_METRIC_TYPE).map(String::as_str), + Some("counter") + ); + assert_eq!( + table_options.get(SEMANTIC_METRIC_UNIT).map(String::as_str), + Some("By") + ); + // The unknown key is rejected by the vocabulary check; other tables' keys + // never appear. + assert!(!table_options.contains_key("greptime.semantic.metric.type_BOGUS")); + assert_eq!(table_options.len(), 2); + + let mut empty = std::collections::HashMap::new(); + apply_per_table_semantic_options(&mut empty, &ctx, "not_in_index"); + assert!(empty.is_empty()); + + // No extension at all is a no-op (e.g. logs / Prom RW). + let bare = Arc::new(QueryContext::with( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + )); + let mut opts = std::collections::HashMap::new(); + apply_per_table_semantic_options(&mut opts, &bare, "http_requests_total"); + assert!(opts.is_empty()); + } + #[test] fn test_last_non_null_create_options_preserve_default_with_append_mode_false() { let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs index 2a99309dfa..94c04300cc 100644 --- a/src/servers/src/elasticsearch.rs +++ b/src/servers/src/elasticsearch.rs @@ -31,6 +31,9 @@ use pipeline::{ use serde_json::{Deserializer, Value, json}; use session::context::{Channel, QueryContext}; use snafu::{ResultExt, ensure}; +use table::requests::{ + SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_ELASTICSEARCH, +}; use vrl::value::Value as VrlValue; use crate::error::{ @@ -134,6 +137,8 @@ async fn do_handle_bulk_api( // The `schema` is already set in the query_ctx in auth process. query_ctx.set_channel(Channel::Elasticsearch); + query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG); + query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_ELASTICSEARCH); let db = query_ctx.current_schema(); diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index f86959f325..81063e371d 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -40,6 +40,7 @@ use prost::Message; use quoted_string::test_utils::TestSpec; use session::context::{Channel, QueryContext}; use snafu::{OptionExt, ResultExt, ensure}; +use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_LOKI}; use vrl::value::{KeyString, Value as VrlValue}; use crate::error::{ @@ -110,6 +111,8 @@ pub async fn loki_ingest( bytes: Bytes, ) -> Result { ctx.set_channel(Channel::Loki); + ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG); + ctx.set_extension(SEMANTIC_SOURCE, SOURCE_LOKI); let ctx = Arc::new(ctx); let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string()); let db = ctx.get_db_string(); diff --git a/src/servers/src/otlp/metrics.rs b/src/servers/src/otlp/metrics.rs index 4c72f665ee..73d05684ec 100644 --- a/src/servers/src/otlp/metrics.rs +++ b/src/servers/src/otlp/metrics.rs @@ -21,13 +21,19 @@ use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetrics use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value}; use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *}; use session::protocol_ctx::{MetricType, OtlpMetricCtx}; +use table::requests::{ + METADATA_QUALITY_DECLARED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_METRIC_ORIGINAL_NAME, + SEMANTIC_METRIC_TEMPORALITY, SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, +}; use crate::error::Result; use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME}; use crate::row_writer::{self, MultiTableData, TableData}; +mod semantic; mod translator; +pub use semantic::SemanticIndex; pub use translator::legacy_normalize_otlp_name; use translator::{translate_label_name, translate_metric_name}; @@ -36,6 +42,16 @@ const APPROXIMATE_COLUMN_COUNT: usize = 8; const COUNT_TABLE_SUFFIX: &str = "_count"; const SUM_TABLE_SUFFIX: &str = "_sum"; +const BUCKET_TABLE_SUFFIX: &str = "_bucket"; + +// `greptime.semantic.metric.type` values stamped per emitted table. Must stay +// within the domain accepted by `validate_semantic_option`; the drift-guard test +// asserts this. +const METRIC_TYPE_COUNTER: &str = "counter"; +const METRIC_TYPE_UPDOWN_COUNTER: &str = "updown_counter"; +const METRIC_TYPE_GAUGE: &str = "gauge"; +const METRIC_TYPE_HISTOGRAM: &str = "histogram"; +const METRIC_TYPE_SUMMARY: &str = "summary"; const JOB_KEY: &str = "job"; const INSTANCE_KEY: &str = "instance"; @@ -78,12 +94,14 @@ const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url"; /// /// for data structure of OTLP metrics. /// -/// Returns `InsertRequests` and total number of rows to ingest +/// Returns `InsertRequests`, total number of rows to ingest, and the per-table +/// semantic index for the auto-create path to stamp as table options. pub fn to_grpc_insert_requests( request: ExportMetricsServiceRequest, metric_ctx: &mut OtlpMetricCtx, -) -> Result<(RowInsertRequests, usize)> { +) -> Result<(RowInsertRequests, usize, SemanticIndex)> { let mut table_writer = MultiTableData::default(); + let mut semantic_index = SemanticIndex::default(); for resource in &request.resource_metrics { let resource_attrs = resource.resource.as_ref().map(|r| { @@ -109,12 +127,98 @@ pub fn to_grpc_insert_requests( resource_attrs.as_ref(), scope_attrs.as_ref(), metric_ctx, + &mut semantic_index, )?; } } } - Ok(table_writer.into_row_insert_requests()) + let (requests, rows) = table_writer.into_row_insert_requests(); + Ok((requests, rows, semantic_index)) +} + +/// The tables a metric emits and their per-table `metric.type`. Histogram fans +/// out into `_bucket` (the histogram) plus `_sum`/`_count` counters; summary +/// fans out into the quantile table plus `_count`/`_sum` counters (legacy +/// summary stays a single table). +fn emitted_semantic_tables( + metric_type: &MetricType, + is_legacy: bool, + base: &str, +) -> Vec<(String, &'static str)> { + match metric_type { + MetricType::Gauge => vec![(base.to_string(), METRIC_TYPE_GAUGE)], + MetricType::MonotonicSum => vec![(base.to_string(), METRIC_TYPE_COUNTER)], + MetricType::NonMonotonicSum => vec![(base.to_string(), METRIC_TYPE_UPDOWN_COUNTER)], + MetricType::Histogram => vec![ + ( + format!("{base}{BUCKET_TABLE_SUFFIX}"), + METRIC_TYPE_HISTOGRAM, + ), + (format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER), + (format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER), + ], + MetricType::Summary if is_legacy => vec![(base.to_string(), METRIC_TYPE_SUMMARY)], + MetricType::Summary => vec![ + (base.to_string(), METRIC_TYPE_SUMMARY), + (format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER), + (format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER), + ], + // ExponentialHistogram is a no-op today; Init never reaches encoding. + MetricType::ExponentialHistogram | MetricType::Init => vec![], + } +} + +/// Maps OTLP `aggregation_temporality` to the semantic value, or `None` when the +/// instrument has no temporality (gauge/summary) or it is unspecified. +fn temporality_value(data: &metric::Data) -> Option<&'static str> { + let raw = match data { + metric::Data::Sum(sum) => sum.aggregation_temporality, + metric::Data::Histogram(hist) => hist.aggregation_temporality, + _ => return None, + }; + match AggregationTemporality::try_from(raw) { + Ok(AggregationTemporality::Delta) => Some("delta"), + Ok(AggregationTemporality::Cumulative) => Some("cumulative"), + _ => None, + } +} + +/// Records the declared metric-level semantic keys for every table this metric +/// emits. +fn record_metric_semantics( + index: &mut SemanticIndex, + metric: &Metric, + name: &str, + metric_ctx: &OtlpMetricCtx, +) { + let emitted = emitted_semantic_tables(&metric_ctx.metric_type, metric_ctx.is_legacy, name); + if emitted.is_empty() { + return; + } + + let temporality = metric.data.as_ref().and_then(temporality_value); + let unit = metric.unit.trim(); + // `original_name` is meaningful only when translation renamed the metric. + let original_name = (name != metric.name.as_str()).then_some(metric.name.as_str()); + + for (table, metric_type) in &emitted { + index.record_scalar(table, SEMANTIC_METRIC_TYPE, metric_type); + index.record_scalar( + table, + SEMANTIC_METRIC_METADATA_QUALITY, + METADATA_QUALITY_DECLARED, + ); + if let Some(temporality) = temporality { + index.record_scalar(table, SEMANTIC_METRIC_TEMPORALITY, temporality); + } + if !unit.is_empty() { + index.record_scalar(table, SEMANTIC_METRIC_UNIT, unit); + } + if let Some(original_name) = original_name { + index.record_scalar(table, SEMANTIC_METRIC_ORIGINAL_NAME, original_name); + } + } } fn from_metric_type(data: &metric::Data) -> MetricType { @@ -211,6 +315,7 @@ fn encode_metrics( resource_attrs: Option<&Vec>, scope_attrs: Option<&Vec>, metric_ctx: &OtlpMetricCtx, + semantic_index: &mut SemanticIndex, ) -> Result<()> { let name = if metric_ctx.is_legacy { legacy_normalize_otlp_name(&metric.name) @@ -222,8 +327,11 @@ fn encode_metrics( ) }; - // note that we don't store description or unit, we might want to deal with - // these fields in the future. + // Stamp semantic metadata against the same table name(s) the data is written + // to below. `unit` is captured here (it is otherwise discarded by the row + // encoders) along with the declared type/temporality. + record_metric_semantics(semantic_index, metric, &name, metric_ctx); + if let Some(data) = &metric.data { match data { metric::Data::Gauge(gauge) => { @@ -511,9 +619,9 @@ fn encode_histogram( ) -> Result<()> { let normalized_name = name; - let bucket_table_name = format!("{}_bucket", normalized_name); - let sum_table_name = format!("{}_sum", normalized_name); - let count_table_name = format!("{}_count", normalized_name); + let bucket_table_name = format!("{}{}", normalized_name, BUCKET_TABLE_SUFFIX); + let sum_table_name = format!("{}{}", normalized_name, SUM_TABLE_SUFFIX); + let count_table_name = format!("{}{}", normalized_name, COUNT_TABLE_SUFFIX); let data_points_len = hist.data_points.len(); // Note that the row and columns number here is approximate @@ -1032,4 +1140,191 @@ mod tests { ] ); } + + use std::collections::BTreeMap; + + use table::requests::validate_semantic_option; + + fn decode(index: &SemanticIndex) -> BTreeMap> { + serde_json::from_str(&index.encode().expect("non-empty index")).unwrap() + } + + fn record(metric: &Metric, metric_type: MetricType, name: &str) -> SemanticIndex { + let ctx = OtlpMetricCtx { + metric_type, + ..Default::default() + }; + let mut index = SemanticIndex::default(); + record_metric_semantics(&mut index, metric, name, &ctx); + index + } + + #[test] + fn test_metric_type_constants_validate() { + for value in [ + METRIC_TYPE_COUNTER, + METRIC_TYPE_UPDOWN_COUNTER, + METRIC_TYPE_GAUGE, + METRIC_TYPE_HISTOGRAM, + METRIC_TYPE_SUMMARY, + ] { + assert!( + validate_semantic_option(SEMANTIC_METRIC_TYPE, value), + "metric.type value `{value}` must be in the vocabulary domain" + ); + } + for value in ["delta", "cumulative"] { + assert!(validate_semantic_option(SEMANTIC_METRIC_TEMPORALITY, value)); + } + } + + #[test] + fn test_record_monotonic_sum() { + let metric = Metric { + name: "claude_code.cost.usage".to_string(), + unit: "USD".to_string(), + data: Some(metric::Data::Sum(Sum { + aggregation_temporality: AggregationTemporality::Delta as i32, + is_monotonic: true, + ..Default::default() + })), + ..Default::default() + }; + let index = record( + &metric, + MetricType::MonotonicSum, + "claude_code_cost_usage_USD_total", + ); + let decoded = decode(&index); + let t = &decoded["claude_code_cost_usage_USD_total"]; + + assert_eq!( + t.get(SEMANTIC_METRIC_TYPE).map(String::as_str), + Some("counter") + ); + assert_eq!( + t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str), + Some("delta") + ); + assert_eq!(t.get(SEMANTIC_METRIC_UNIT).map(String::as_str), Some("USD")); + assert_eq!( + t.get(SEMANTIC_METRIC_ORIGINAL_NAME).map(String::as_str), + Some("claude_code.cost.usage") + ); + assert_eq!( + t.get(SEMANTIC_METRIC_METADATA_QUALITY).map(String::as_str), + Some("declared") + ); + } + + #[test] + fn test_record_non_monotonic_sum() { + let metric = Metric { + name: "queue_size".to_string(), + data: Some(metric::Data::Sum(Sum { + aggregation_temporality: AggregationTemporality::Cumulative as i32, + is_monotonic: false, + ..Default::default() + })), + ..Default::default() + }; + let index = record(&metric, MetricType::NonMonotonicSum, "queue_size"); + let decoded = decode(&index); + let t = &decoded["queue_size"]; + assert_eq!( + t.get(SEMANTIC_METRIC_TYPE).map(String::as_str), + Some("updown_counter") + ); + assert_eq!( + t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str), + Some("cumulative") + ); + // Name unchanged by translation -> no original_name. + assert_eq!(t.get(SEMANTIC_METRIC_ORIGINAL_NAME), None); + } + + #[test] + fn test_record_gauge_has_no_temporality() { + let metric = Metric { + name: "temperature".to_string(), + data: Some(metric::Data::Gauge(Gauge::default())), + ..Default::default() + }; + let index = record(&metric, MetricType::Gauge, "temperature"); + let decoded = decode(&index); + let t = &decoded["temperature"]; + assert_eq!( + t.get(SEMANTIC_METRIC_TYPE).map(String::as_str), + Some("gauge") + ); + assert_eq!(t.get(SEMANTIC_METRIC_TEMPORALITY), None); + } + + #[test] + fn test_record_histogram_fans_out_with_distinct_types() { + let metric = Metric { + name: "request.duration".to_string(), + unit: "s".to_string(), + data: Some(metric::Data::Histogram(Histogram { + aggregation_temporality: AggregationTemporality::Cumulative as i32, + ..Default::default() + })), + ..Default::default() + }; + let index = record(&metric, MetricType::Histogram, "request_duration"); + let decoded = decode(&index); + + let bucket = &decoded["request_duration_bucket"]; + assert_eq!( + bucket.get(SEMANTIC_METRIC_TYPE).map(String::as_str), + Some("histogram") + ); + assert_eq!( + bucket.get(SEMANTIC_METRIC_UNIT).map(String::as_str), + Some("s") + ); + + for companion in ["request_duration_sum", "request_duration_count"] { + let t = &decoded[companion]; + assert_eq!( + t.get(SEMANTIC_METRIC_TYPE).map(String::as_str), + Some("counter") + ); + assert_eq!( + t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str), + Some("cumulative") + ); + } + } + + #[test] + fn test_record_summary_fans_out() { + let metric = Metric { + name: "rpc.latency".to_string(), + data: Some(metric::Data::Summary(Summary::default())), + ..Default::default() + }; + let index = record(&metric, MetricType::Summary, "rpc_latency"); + let decoded = decode(&index); + + assert_eq!( + decoded["rpc_latency"] + .get(SEMANTIC_METRIC_TYPE) + .map(String::as_str), + Some("summary") + ); + // Summary has no temporality. + assert_eq!( + decoded["rpc_latency"].get(SEMANTIC_METRIC_TEMPORALITY), + None + ); + for companion in ["rpc_latency_count", "rpc_latency_sum"] { + assert_eq!( + decoded[companion] + .get(SEMANTIC_METRIC_TYPE) + .map(String::as_str), + Some("counter") + ); + } + } } diff --git a/src/servers/src/otlp/metrics/semantic.rs b/src/servers/src/otlp/metrics/semantic.rs new file mode 100644 index 0000000000..0726dc15c1 --- /dev/null +++ b/src/servers/src/otlp/metrics/semantic.rs @@ -0,0 +1,177 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Per-table semantic metadata accumulated during one OTLP metrics encode pass. +//! +//! A metric emits one or more tables (histogram and summary fan out into +//! `_bucket`/`_sum`/`_count` companions). Each emitted table collects the +//! metric's scalar semantic keys. The resulting index is serialized onto the +//! `greptime.internal.semantic.per_table_index` context extension and folded +//! into each table's options at auto-create time. +//! +//! Conflict handling follows the RFC: when two sources disagree on a +//! single-valued key the value collapses to `mixed` (or `unknown` for keys whose +//! domain has no `mixed`). + +use std::collections::{BTreeMap, HashMap}; + +use table::requests::{SEMANTIC_VALUE_MIXED, SEMANTIC_VALUE_UNKNOWN, validate_semantic_option}; + +/// Index of `{table_name -> {semantic_key -> value}}` built while encoding. +#[derive(Debug, Default)] +pub struct SemanticIndex { + /// Per-table scalar keys; conflicting values collapse to `mixed`/`unknown`. + tables: HashMap>, +} + +impl SemanticIndex { + pub fn is_empty(&self) -> bool { + self.tables.is_empty() + } + + /// Records a scalar semantic key for `table`. A value conflicting with one + /// already recorded collapses the key to `mixed`/`unknown`; once collapsed + /// it stays collapsed. + pub fn record_scalar(&mut self, table: &str, key: &'static str, value: &str) { + // Avoid allocating the table name (and an empty map) on the common path + // where the table is already present. + if let Some(scalars) = self.tables.get_mut(table) { + match scalars.get(key).map(String::as_str) { + Some(existing) if existing == value => {} + Some(SEMANTIC_VALUE_MIXED) | Some(SEMANTIC_VALUE_UNKNOWN) => {} + Some(_) => { + scalars.insert(key, collapse_value(key)); + } + None => { + scalars.insert(key, value.to_string()); + } + } + } else { + self.tables.insert( + table.to_string(), + BTreeMap::from([(key, value.to_string())]), + ); + } + } + + /// Serializes to the JSON `{table -> {key -> value}}` carried on the context + /// extension, or `None` when nothing was recorded. + pub fn encode(&self) -> Option { + if self.tables.is_empty() { + return None; + } + serde_json::to_string(&self.tables).ok() + } + + #[cfg(test)] + fn options_of(&self, table: &str) -> Option<&BTreeMap<&'static str, String>> { + self.tables.get(table) + } +} + +/// The collapsed value for a conflicting scalar key: `mixed` when the key's +/// domain accepts it, else `unknown`. Uses the vocabulary validator as the +/// single source of truth for which keys allow `mixed`. +fn collapse_value(key: &str) -> String { + if validate_semantic_option(key, SEMANTIC_VALUE_MIXED) { + SEMANTIC_VALUE_MIXED.to_string() + } else { + SEMANTIC_VALUE_UNKNOWN.to_string() + } +} + +#[cfg(test)] +mod tests { + use table::requests::{ + SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, + }; + + use super::*; + + #[test] + fn test_scalar_recording_keeps_first_then_collapses_on_conflict() { + let mut index = SemanticIndex::default(); + index.record_scalar("t", SEMANTIC_METRIC_TYPE, "counter"); + index.record_scalar("t", SEMANTIC_METRIC_TYPE, "counter"); + assert_eq!( + index + .options_of("t") + .unwrap() + .get(SEMANTIC_METRIC_TYPE) + .map(String::as_str), + Some("counter") + ); + + // Conflict on a key whose domain has `mixed` collapses to `mixed`. + index.record_scalar("t", SEMANTIC_METRIC_TYPE, "gauge"); + assert_eq!( + index + .options_of("t") + .unwrap() + .get(SEMANTIC_METRIC_TYPE) + .map(String::as_str), + Some("mixed") + ); + // Further writes stay collapsed. + index.record_scalar("t", SEMANTIC_METRIC_TYPE, "histogram"); + assert_eq!( + index + .options_of("t") + .unwrap() + .get(SEMANTIC_METRIC_TYPE) + .map(String::as_str), + Some("mixed") + ); + } + + #[test] + fn test_scalar_conflict_without_mixed_domain_collapses_to_unknown() { + let mut index = SemanticIndex::default(); + index.record_scalar("t", SEMANTIC_METRIC_METADATA_QUALITY, "declared"); + index.record_scalar("t", SEMANTIC_METRIC_METADATA_QUALITY, "inferred"); + // metadata_quality accepts only declared/inferred/unknown, so a conflict + // is `unknown`. + assert_eq!( + index + .options_of("t") + .unwrap() + .get(SEMANTIC_METRIC_METADATA_QUALITY) + .map(String::as_str), + Some("unknown") + ); + } + + #[test] + fn test_encode_is_none_when_empty_and_round_trips() { + let index = SemanticIndex::default(); + assert!(index.is_empty()); + assert_eq!(index.encode(), None); + + let mut index = SemanticIndex::default(); + index.record_scalar("metric_a", SEMANTIC_METRIC_TYPE, "counter"); + index.record_scalar("metric_a", SEMANTIC_METRIC_UNIT, "By"); + let json = index.encode().unwrap(); + let parsed: BTreeMap> = + serde_json::from_str(&json).unwrap(); + let table = parsed.get("metric_a").unwrap(); + assert_eq!( + table.get(SEMANTIC_METRIC_TYPE).map(String::as_str), + Some("counter") + ); + assert_eq!( + table.get(SEMANTIC_METRIC_UNIT).map(String::as_str), + Some("By") + ); + } +} diff --git a/src/table/src/requests/semantic.rs b/src/table/src/requests/semantic.rs index 66d5096293..54ea5d0448 100644 --- a/src/table/src/requests/semantic.rs +++ b/src/table/src/requests/semantic.rs @@ -19,8 +19,14 @@ //! align a table with the observability concept it stands for without guessing //! from column names. See `docs/rfcs/2026-05-28-table-semantic-layer.md`. //! -//! All public table-option keys share the [`SEMANTIC_PREFIX`] namespace and are -//! string-valued. [`is_semantic_option_key`] gates them through +//! The vocabulary is intentionally small: a key earns its place only when it +//! records something a consumer cannot cheaply and reliably recover from the +//! schema/data itself. Keys whose value is already in the metric name by +//! convention, is a constant, or duplicates an existing column are deliberately +//! omitted rather than stamped for completeness. +//! +//! All public keys share the [`SEMANTIC_PREFIX`] namespace and are string-valued. +//! [`is_semantic_option_key`] gates them through //! [`crate::requests::validate_table_option`], so they are accepted both on the //! ingestion auto-create path and on explicit `CREATE TABLE ... WITH (...)` DDL. @@ -40,56 +46,34 @@ pub const SEMANTIC_PER_TABLE_INDEX_KEY: &str = "greptime.internal.semantic.per_t pub const SEMANTIC_SIGNAL_TYPE: &str = "greptime.semantic.signal_type"; /// Ingestion ecosystem, e.g. [`SOURCE_OPENTELEMETRY`] / [`SOURCE_PROMETHEUS`]. pub const SEMANTIC_SOURCE: &str = "greptime.semantic.source"; -/// Optional protocol or SDK version string, e.g. `v2` (Prom remote write), `1.30.0`. -pub const SEMANTIC_SOURCE_VERSION: &str = "greptime.semantic.source_version"; -/// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`. +/// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`. The +/// signal-agnostic successor to the engine-specific `table_data_model` option. pub const SEMANTIC_PIPELINE: &str = "greptime.semantic.pipeline"; // ---- Trace keys ---- -/// Semantic-conventions version the rows conform to (e.g. `otel-semconv-1.27`), +/// Semantic-conventions version the rows conform to (e.g. the OTel schema URL), /// or [`SEMANTIC_VALUE_UNKNOWN`] / [`SEMANTIC_VALUE_MIXED`] when not single-valued. pub const SEMANTIC_TRACE_CONVENTIONS: &str = "greptime.semantic.trace.conventions"; -/// Whether `span_events` are preserved on the table. -pub const SEMANTIC_TRACE_HAS_EVENTS: &str = "greptime.semantic.trace.has_events"; -/// Whether `span_links` are preserved on the table. -pub const SEMANTIC_TRACE_HAS_LINKS: &str = "greptime.semantic.trace.has_links"; -// ---- Metric keys (populated in Phase 2) ---- +// ---- Metric keys ---- /// Instrument kind: `counter` / `gauge` / `histogram` / `summary` / /// `updown_counter` / `gauge_histogram` / `info` / `stateset`. pub const SEMANTIC_METRIC_TYPE: &str = "greptime.semantic.metric.type"; -/// UCUM unit, e.g. `s`, `By`, `{request}`. +/// UCUM unit, e.g. `s`, `By`, `{request}`. Discarded by the row encoders, so it +/// is unrecoverable once ingested. pub const SEMANTIC_METRIC_UNIT: &str = "greptime.semantic.metric.unit"; -/// `cumulative` / `delta` (OTel only). +/// `cumulative` / `delta` (OTel only). Invisible in the metric name, so it is +/// unrecoverable from the table alone. pub const SEMANTIC_METRIC_TEMPORALITY: &str = "greptime.semantic.metric.temporality"; -/// `true` / `false` for sum / counter typed data. -pub const SEMANTIC_METRIC_MONOTONIC: &str = "greptime.semantic.metric.monotonic"; /// [`METADATA_QUALITY_DECLARED`] when the protocol stated the type, or /// [`METADATA_QUALITY_INFERRED`] when guessed from a name suffix. pub const SEMANTIC_METRIC_METADATA_QUALITY: &str = "greptime.semantic.metric.metadata_quality"; -/// Pre-translation OTel metric name when the table name was Prometheus-ised. +/// Pre-translation OTel name when the table name was Prometheus-ised; the key a +/// consumer uses to look the metric up in the OTel semantic conventions. pub const SEMANTIC_METRIC_ORIGINAL_NAME: &str = "greptime.semantic.metric.original_name"; -// ---- Log keys (populated in Phase 3) ---- - -/// `otlp` / `syslog` / `custom` — which mapping to use for `severity_number`. -pub const SEMANTIC_LOG_SEVERITY_SCHEME: &str = "greptime.semantic.log.severity_scheme"; -/// `string` / `json` / `mixed` — how to parse `body`. -pub const SEMANTIC_LOG_BODY_FORMAT: &str = "greptime.semantic.log.body_format"; - -// ---- Resource / scope preservation keys (populated in Phase 3) ---- - -/// JSON array string of resource attributes promoted to first-class columns. -pub const SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED: &str = - "greptime.semantic.resource.attributes_preserved"; -/// `true` / `false` — whether any resource attribute was dropped at ingest. -pub const SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED: &str = - "greptime.semantic.resource.attributes_dropped"; -/// `true` / `false` — whether `scope.name` / `scope.version` survive on the row. -pub const SEMANTIC_SCOPE_PRESERVED: &str = "greptime.semantic.scope.preserved"; - // ---- Value constants ---- pub const SIGNAL_TYPE_TRACE: &str = "trace"; @@ -99,6 +83,10 @@ pub const SIGNAL_TYPE_EVENT: &str = "event"; pub const SOURCE_OPENTELEMETRY: &str = "opentelemetry"; pub const SOURCE_PROMETHEUS: &str = "prometheus"; +pub const SOURCE_INFLUXDB: &str = "influxdb"; +pub const SOURCE_OPENTSDB: &str = "opentsdb"; +pub const SOURCE_LOKI: &str = "loki"; +pub const SOURCE_ELASTICSEARCH: &str = "elasticsearch"; pub const METADATA_QUALITY_DECLARED: &str = "declared"; pub const METADATA_QUALITY_INFERRED: &str = "inferred"; @@ -115,22 +103,13 @@ pub const SEMANTIC_VALUE_MIXED: &str = "mixed"; pub const SEMANTIC_OPTION_KEYS: &[&str] = &[ SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, - SEMANTIC_SOURCE_VERSION, SEMANTIC_PIPELINE, SEMANTIC_TRACE_CONVENTIONS, - SEMANTIC_TRACE_HAS_EVENTS, - SEMANTIC_TRACE_HAS_LINKS, SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, SEMANTIC_METRIC_TEMPORALITY, - SEMANTIC_METRIC_MONOTONIC, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_METRIC_ORIGINAL_NAME, - SEMANTIC_LOG_SEVERITY_SCHEME, - SEMANTIC_LOG_BODY_FORMAT, - SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED, - SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED, - SEMANTIC_SCOPE_PRESERVED, ]; /// Returns true if `key` is a recognised semantic table-option key (whitelist). @@ -144,25 +123,25 @@ pub fn is_semantic_option_key(key: &str) -> bool { /// Validates a `greptime.semantic.*` option's `value` against its allowed domain. /// -/// Open-value keys (unit, original_name, version, pipeline, conventions, the -/// preserved-attributes list) accept any non-empty string. Closed-domain keys -/// accept a fixed set, plus the `unknown` sentinel, plus `mixed` for the keys -/// where one long-lived table can legitimately see multiple values. Keys not in -/// [`SEMANTIC_OPTION_KEYS`] are rejected. +/// Open-value keys (unit, original_name, pipeline, conventions) accept any +/// non-empty string. Closed-domain keys accept a fixed set, plus the `unknown` +/// sentinel, plus `mixed` for the keys where one long-lived table can +/// legitimately see multiple values. Keys not in [`SEMANTIC_OPTION_KEYS`] are +/// rejected. pub fn validate_semantic_option(key: &str, value: &str) -> bool { match key { - SEMANTIC_SOURCE_VERSION - | SEMANTIC_PIPELINE + SEMANTIC_PIPELINE | SEMANTIC_METRIC_UNIT | SEMANTIC_METRIC_ORIGINAL_NAME - | SEMANTIC_TRACE_CONVENTIONS - | SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED => !value.is_empty(), + | SEMANTIC_TRACE_CONVENTIONS => !value.is_empty(), SEMANTIC_SIGNAL_TYPE => matches!(value, "trace" | "log" | "metric" | "event" | "unknown"), SEMANTIC_SOURCE => matches!( value, "opentelemetry" | "prometheus" + | "influxdb" + | "opentsdb" | "elasticsearch" | "loki" | "custom" @@ -185,14 +164,7 @@ pub fn validate_semantic_option(key: &str, value: &str) -> bool { SEMANTIC_METRIC_TEMPORALITY => { matches!(value, "cumulative" | "delta" | "mixed" | "unknown") } - SEMANTIC_METRIC_MONOTONIC - | SEMANTIC_TRACE_HAS_EVENTS - | SEMANTIC_TRACE_HAS_LINKS - | SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED - | SEMANTIC_SCOPE_PRESERVED => matches!(value, "true" | "false" | "unknown"), SEMANTIC_METRIC_METADATA_QUALITY => matches!(value, "declared" | "inferred" | "unknown"), - SEMANTIC_LOG_SEVERITY_SCHEME => matches!(value, "otlp" | "syslog" | "custom" | "unknown"), - SEMANTIC_LOG_BODY_FORMAT => matches!(value, "string" | "json" | "mixed" | "unknown"), _ => false, } @@ -206,10 +178,18 @@ mod tests { fn test_is_semantic_option_key() { assert!(is_semantic_option_key(SEMANTIC_SIGNAL_TYPE)); assert!(is_semantic_option_key(SEMANTIC_METRIC_TYPE)); + assert!(is_semantic_option_key(SEMANTIC_PIPELINE)); // Unknown keys under the prefix are not whitelisted. assert!(!is_semantic_option_key("greptime.semantic.future.key")); assert!(!is_semantic_option_key("greptime.semantic.unknown_key")); + // Keys cut from the vocabulary are no longer accepted. + assert!(!is_semantic_option_key( + "greptime.semantic.metric.monotonic" + )); + assert!(!is_semantic_option_key( + "greptime.semantic.resource.attributes_dropped" + )); // Near-misses must not match. assert!(!is_semantic_option_key("greptime.semanticx")); assert!(!is_semantic_option_key("semantic.signal_type")); @@ -227,16 +207,23 @@ mod tests { assert!(validate_semantic_option(SEMANTIC_METRIC_TYPE, "mixed")); assert!(!validate_semantic_option(SEMANTIC_METRIC_TYPE, "bogus")); - // Booleans, sentinels, open values. - assert!(validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "true")); - assert!(!validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "yes")); + // Sentinels and open values. assert!(validate_semantic_option( SEMANTIC_METRIC_TEMPORALITY, "unknown" )); assert!(validate_semantic_option(SEMANTIC_METRIC_UNIT, "By")); assert!(!validate_semantic_option(SEMANTIC_METRIC_UNIT, "")); + assert!(validate_semantic_option( + SEMANTIC_PIPELINE, + "greptime_trace_v1" + )); + // A cut key validates to false regardless of value. + assert!(!validate_semantic_option( + "greptime.semantic.metric.monotonic", + "true" + )); // Unknown key is rejected regardless of value. assert!(!validate_semantic_option( "greptime.semantic.future.key", @@ -265,6 +252,10 @@ mod tests { SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED )); + assert!(validate_semantic_option( + SEMANTIC_METRIC_METADATA_QUALITY, + METADATA_QUALITY_DECLARED + )); assert!(validate_semantic_option( SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_VALUE_UNKNOWN diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9e757a743c..1e488626de 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -5094,14 +5094,13 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { "[[1]]", ) .await; - // OTLP metric type is declared, so Phase 1 must not stamp `metadata_quality` - // here (Phase 2 adds it as `declared`). + // OTLP metric type is declared, so Phase 2 stamps `metadata_quality=declared`. validate_data( - "otlp_metrics_no_metadata_quality", + "otlp_metrics_metadata_quality_declared", &client, "select count(*) from information_schema.tables where table_name = 'claude_code_cost_usage_USD_total' \ - and create_options like '%metadata_quality%';", - "[[0]]", + and create_options like '%greptime.semantic.metric.metadata_quality=declared%';", + "[[1]]", ) .await; @@ -5129,7 +5128,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'declared',\\n 'greptime.semantic.metric.original_name' = 'claude_code.cost.usage',\\n 'greptime.semantic.metric.temporality' = 'delta',\\n 'greptime.semantic.metric.type' = 'counter',\\n 'greptime.semantic.metric.unit' = 'USD',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_all_show_create_table", &client, @@ -5202,7 +5201,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'declared',\\n 'greptime.semantic.metric.original_name' = 'claude_code.cost.usage',\\n 'greptime.semantic.metric.temporality' = 'delta',\\n 'greptime.semantic.metric.type' = 'counter',\\n 'greptime.semantic.metric.unit' = 'USD',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table", &client, @@ -5266,7 +5265,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'declared',\\n 'greptime.semantic.metric.original_name' = 'claude_code.cost.usage',\\n 'greptime.semantic.metric.temporality' = 'delta',\\n 'greptime.semantic.metric.type' = 'counter',\\n 'greptime.semantic.metric.unit' = 'USD',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table_none", &client, @@ -5581,14 +5580,12 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { "select count(*) from information_schema.tables where table_name = 'mytable' \ and create_options like '%greptime.semantic.signal_type=trace%' \ and create_options like '%greptime.semantic.source=opentelemetry%' \ - and create_options like '%greptime.semantic.pipeline=greptime_trace_v1%' \ - and create_options like '%greptime.semantic.trace.has_events=true%' \ - and create_options like '%greptime.semantic.trace.has_links=true%';", + and create_options like '%greptime.semantic.pipeline=greptime_trace_v1%';", "[[1]]", ) .await; - let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -5641,7 +5638,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; assert_eq!(StatusCode::OK, res.status()); - let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -5678,7 +5675,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; assert_eq!(StatusCode::OK, res.status()); - let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -5715,7 +5712,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; assert_eq!(StatusCode::OK, res.status()); - let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -6546,7 +6543,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // test schema - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]"; validate_data( "loki_pb_schema", &client, @@ -6681,7 +6678,7 @@ processors: // 'comment' = 'Created on insertion', // append_mode = 'true' // ) - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]"; validate_data( "loki_pb_schema", &client, @@ -6751,7 +6748,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // test schema - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]"; validate_data( "loki_json_schema", &client, @@ -6855,7 +6852,7 @@ processors: // 'comment' = 'Created on insertion', // append_mode = 'true' // ) - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]"; validate_data( "loki_json_schema", &client, @@ -7824,7 +7821,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\\n 'greptime.semantic.signal_type' = 'trace',\\n 'greptime.semantic.source' = 'opentelemetry',\\n 'greptime.semantic.trace.conventions' = 'unknown',\\n 'greptime.semantic.trace.has_events' = 'true',\\n 'greptime.semantic.trace.has_links' = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]"; + let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\\n 'greptime.semantic.signal_type' = 'trace',\\n 'greptime.semantic.source' = 'opentelemetry',\\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]"; validate_data( "trace_v1_create_table", &client,