diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 434a413fed..757f657c1e 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -43,7 +43,12 @@ use servers::query_handler::{ }; use session::context::QueryContextRef; use snafu::{IntoError, ResultExt}; -use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM}; +use table::requests::{ + OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM, SEMANTIC_PIPELINE, SEMANTIC_SIGNAL_TYPE, + SEMANTIC_SOURCE, SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_TRACE_HAS_EVENTS, + SEMANTIC_TRACE_HAS_LINKS, SEMANTIC_VALUE_UNKNOWN, SIGNAL_TYPE_LOG, SIGNAL_TYPE_METRIC, + SIGNAL_TYPE_TRACE, SOURCE_OPENTELEMETRY, TABLE_DATA_MODEL_TRACE_V1, +}; use crate::instance::Instance; use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type; @@ -131,12 +136,14 @@ impl OpenTelemetryProtocolHandler for Instance { let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?; OTLP_METRICS_ROWS.inc_by(rows as u64); - let ctx = if !is_legacy { + let ctx = { let mut c = (*ctx).clone(); - c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string()); + c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); + c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); + if !is_legacy { + c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string()); + } Arc::new(c) - } else { - ctx }; // If the user uses the legacy path, it is by default without metric engine. @@ -211,6 +218,15 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; + // `as_req_iter` clones this ctx into each `temp_ctx`, so identity set here + // reaches the context that drives table auto-create. + let ctx = { + let mut c = (*ctx).clone(); + c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG); + c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); + Arc::new(c) + }; + let opt_req = otlp::logs::to_grpc_insert_requests( request, pipeline, @@ -256,6 +272,23 @@ impl Instance { ctx: QueryContextRef, ) -> ServerResult { let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1); + + // Only the main span table gets the identity; the derived `_services` / + // `_operations` lookup tables keep the unstamped `ctx`. + let main_ctx = { + let mut c = (*ctx).clone(); + c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_TRACE); + c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); + if is_trace_v1_model { + c.set_extension(SEMANTIC_PIPELINE, TABLE_DATA_MODEL_TRACE_V1); + c.set_extension(SEMANTIC_TRACE_HAS_EVENTS, "true"); + c.set_extension(SEMANTIC_TRACE_HAS_LINKS, "true"); + // schema_url is row-level, so conventions is unknown at table level. + c.set_extension(SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_VALUE_UNKNOWN); + } + Arc::new(c) + }; + let ingest_ctx = TraceChunkIngestContext { pipeline_handler, pipeline, @@ -278,7 +311,7 @@ impl Instance { .map(|chunk| chunk.collect::>()) .collect::>(); for chunk in chunks { - self.ingest_trace_chunk(&ingest_ctx, chunk, ctx.clone(), &mut ingest_state) + self.ingest_trace_chunk(&ingest_ctx, chunk, main_ctx.clone(), &mut ingest_state) .await?; } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 9ef4d2ff94..317c324429 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -63,6 +63,7 @@ use table::metadata::TableInfo; use table::requests::{ AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS, + is_semantic_option_key, }; use table::table_reference::TableReference; @@ -1089,6 +1090,13 @@ pub fn fill_table_options_for_create( } } + // Semantic keys are prefix-matched, not in the fixed allowlist above. + for (key, value) in ctx.extensions() { + if is_semantic_option_key(&key) { + table_options.insert(key, value); + } + } + match create_type { AutoCreateTableType::Logical(physical_table) => { table_options.insert( @@ -1391,6 +1399,34 @@ mod tests { assert!(!table_options.contains_key(APPEND_MODE_KEY)); } + #[test] + fn test_fill_table_options_copies_semantic_extensions() { + use table::requests::{ + SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, + SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY, + }; + + let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); + ctx.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); + // The internal transport key must NOT be copied into table options. + ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, "{}"); + let ctx = Arc::new(ctx); + let mut table_options = Default::default(); + + fill_table_options_for_create(&mut table_options, &AutoCreateTableType::Physical, &ctx); + + assert_eq!( + Some(SIGNAL_TYPE_METRIC), + table_options.get(SEMANTIC_SIGNAL_TYPE).map(String::as_str) + ); + assert_eq!( + Some(SOURCE_OPENTELEMETRY), + table_options.get(SEMANTIC_SOURCE).map(String::as_str) + ); + assert!(!table_options.contains_key(SEMANTIC_PER_TABLE_INDEX_KEY)); + } + #[test] fn test_last_non_null_create_options_preserve_default_with_append_mode_false() { let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index bfc072e84e..280c0655d7 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -31,6 +31,10 @@ use prost::Message; use serde::{Deserialize, Serialize}; use session::context::{Channel, QueryContext}; use snafu::prelude::*; +use table::requests::{ + METADATA_QUALITY_INFERRED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_SIGNAL_TYPE, + SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS, +}; use crate::error::{self, InternalSnafu, PipelineSnafu, Result}; use crate::http::extractor::PipelineInfo; @@ -108,6 +112,13 @@ pub async fn remote_write( .clone() .unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string()); query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table.clone()); + // Stamp the Prometheus metric identity here, before `as_req_iter` splits into the + // batched and direct write paths, so both inherit it (the batched path bypasses + // `PromStoreProtocolHandler::write`). Prom RW v1 metadata is weak, so the type is + // inferred from naming. + query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); + query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS); + query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED); let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED .with_label_values(&[db.as_str()]) diff --git a/src/sql/src/parsers/utils.rs b/src/sql/src/parsers/utils.rs index 0306bd859d..239f3155ca 100644 --- a/src/sql/src/parsers/utils.rs +++ b/src/sql/src/parsers/utils.rs @@ -40,7 +40,7 @@ use snafu::{ResultExt, ensure}; use sqlparser::dialect::Dialect; use sqlparser::keywords::Keyword; use sqlparser::parser::Parser; -use table::requests::validate_table_option; +use table::requests::{SEMANTIC_PREFIX, validate_semantic_option, validate_table_option}; use crate::error::{ ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, InvalidTableOptionSnafu, ParseSqlValueSnafu, @@ -395,8 +395,18 @@ pub fn parse_with_options(parser: &mut Parser) -> Result { .into_iter() .map(parse_option_string) .collect::>>()?; - for key in options.keys() { - ensure!(validate_table_option(key), InvalidTableOptionSnafu { key }); + for (key, value) in &options { + if key.starts_with(SEMANTIC_PREFIX) { + // Semantic keys are whitelisted and value-checked against their domain, + // so a user cannot set an unknown key or an out-of-range value. + let value = value.as_string().unwrap_or_default(); + ensure!( + validate_semantic_option(key, value), + InvalidTableOptionSnafu { key } + ); + } else { + ensure!(validate_table_option(key), InvalidTableOptionSnafu { key }); + } } Ok(OptionMap::new(options)) } diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 74ab8aee18..67742b853d 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -868,7 +868,25 @@ ENGINE=mito "; let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); - assert_matches!(result, Err(Error::InvalidTableOption { .. })) + assert_matches!(result, Err(Error::InvalidTableOption { .. })); + + // A whitelisted semantic key with an in-domain value is accepted. + let semantic = |with: &str| { + let sql = + format!("create table demo(host string, ts timestamp time index) with({with});"); + ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default()) + }; + assert!(semantic("'greptime.semantic.signal_type'='metric'").is_ok()); + // An out-of-domain value is rejected. + assert_matches!( + semantic("'greptime.semantic.signal_type'='spans'"), + Err(Error::InvalidTableOption { .. }) + ); + // An unknown key under the semantic prefix is rejected. + assert_matches!( + semantic("'greptime.semantic.bogus'='x'"), + Err(Error::InvalidTableOption { .. }) + ); } #[test] diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 4506c6ae65..6f27d5a20b 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -48,6 +48,9 @@ use crate::error::{ParseTableOptionSnafu, Result}; use crate::metadata::{TableId, TableVersion}; use crate::table_reference::TableReference; +mod semantic; +pub use semantic::*; + pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta"; pub const FILE_TABLE_LOCATION_KEY: &str = "location"; pub const FILE_TABLE_PATTERN_KEY: &str = "pattern"; @@ -129,6 +132,12 @@ pub fn validate_table_option(key: &str) -> bool { return true; } + // Semantic-layer keys share a reserved prefix instead of a fixed allowlist so + // the vocabulary can grow without touching this gate. See `semantic` module. + if is_semantic_option_key(key) { + return true; + } + VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key) } @@ -490,6 +499,14 @@ mod tests { assert!(validate_table_option(STORAGE_KEY)); assert!(validate_table_option(MEMTABLE_BULK_MERGE_THRESHOLD)); assert!(!validate_table_option("foo")); + + // Only whitelisted semantic keys are accepted. + assert!(validate_table_option(SEMANTIC_SIGNAL_TYPE)); + assert!(validate_table_option(SEMANTIC_METRIC_TYPE)); + // Unknown semantic key, near-miss, and the internal transport key are rejected. + assert!(!validate_table_option("greptime.semantic.future.key")); + assert!(!validate_table_option("greptime.semanticx")); + assert!(!validate_table_option(SEMANTIC_PER_TABLE_INDEX_KEY)); } #[test] diff --git a/src/table/src/requests/semantic.rs b/src/table/src/requests/semantic.rs new file mode 100644 index 0000000000..66d5096293 --- /dev/null +++ b/src/table/src/requests/semantic.rs @@ -0,0 +1,280 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Table semantic layer vocabulary. +//! +//! A thin layer of semantic metadata attached to a table via `table_options`, so +//! machine consumers (LLM agents, alert/dashboard builders, MCP servers, ETL) can +//! align a table with the observability concept it stands for without guessing +//! from column names. See `docs/rfcs/2026-05-28-table-semantic-layer.md`. +//! +//! All public table-option keys share the [`SEMANTIC_PREFIX`] namespace and are +//! string-valued. [`is_semantic_option_key`] gates them through +//! [`crate::requests::validate_table_option`], so they are accepted both on the +//! ingestion auto-create path and on explicit `CREATE TABLE ... WITH (...)` DDL. + +/// Reserved prefix for every public semantic table-option key. +pub const SEMANTIC_PREFIX: &str = "greptime.semantic."; + +/// Internal `QueryContext` extension key carrying the per-table semantic index +/// (a `{table_name -> {semantic_key: value}}` JSON blob) from the ingestion +/// encode path to the auto-create site. Deliberately OUTSIDE [`SEMANTIC_PREFIX`] +/// so it is not a valid table option and never leaks into a table's options. +pub const SEMANTIC_PER_TABLE_INDEX_KEY: &str = "greptime.internal.semantic.per_table_index"; + +// ---- Common keys (all signals) ---- + +/// Signal kind: one of [`SIGNAL_TYPE_TRACE`] / [`SIGNAL_TYPE_LOG`] / +/// [`SIGNAL_TYPE_METRIC`] / [`SIGNAL_TYPE_EVENT`]. +pub const SEMANTIC_SIGNAL_TYPE: &str = "greptime.semantic.signal_type"; +/// Ingestion ecosystem, e.g. [`SOURCE_OPENTELEMETRY`] / [`SOURCE_PROMETHEUS`]. +pub const SEMANTIC_SOURCE: &str = "greptime.semantic.source"; +/// Optional protocol or SDK version string, e.g. `v2` (Prom remote write), `1.30.0`. +pub const SEMANTIC_SOURCE_VERSION: &str = "greptime.semantic.source_version"; +/// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`. +pub const SEMANTIC_PIPELINE: &str = "greptime.semantic.pipeline"; + +// ---- Trace keys ---- + +/// Semantic-conventions version the rows conform to (e.g. `otel-semconv-1.27`), +/// or [`SEMANTIC_VALUE_UNKNOWN`] / [`SEMANTIC_VALUE_MIXED`] when not single-valued. +pub const SEMANTIC_TRACE_CONVENTIONS: &str = "greptime.semantic.trace.conventions"; +/// Whether `span_events` are preserved on the table. +pub const SEMANTIC_TRACE_HAS_EVENTS: &str = "greptime.semantic.trace.has_events"; +/// Whether `span_links` are preserved on the table. +pub const SEMANTIC_TRACE_HAS_LINKS: &str = "greptime.semantic.trace.has_links"; + +// ---- Metric keys (populated in Phase 2) ---- + +/// Instrument kind: `counter` / `gauge` / `histogram` / `summary` / +/// `updown_counter` / `gauge_histogram` / `info` / `stateset`. +pub const SEMANTIC_METRIC_TYPE: &str = "greptime.semantic.metric.type"; +/// UCUM unit, e.g. `s`, `By`, `{request}`. +pub const SEMANTIC_METRIC_UNIT: &str = "greptime.semantic.metric.unit"; +/// `cumulative` / `delta` (OTel only). +pub const SEMANTIC_METRIC_TEMPORALITY: &str = "greptime.semantic.metric.temporality"; +/// `true` / `false` for sum / counter typed data. +pub const SEMANTIC_METRIC_MONOTONIC: &str = "greptime.semantic.metric.monotonic"; +/// [`METADATA_QUALITY_DECLARED`] when the protocol stated the type, or +/// [`METADATA_QUALITY_INFERRED`] when guessed from a name suffix. +pub const SEMANTIC_METRIC_METADATA_QUALITY: &str = "greptime.semantic.metric.metadata_quality"; +/// Pre-translation OTel metric name when the table name was Prometheus-ised. +pub const SEMANTIC_METRIC_ORIGINAL_NAME: &str = "greptime.semantic.metric.original_name"; + +// ---- Log keys (populated in Phase 3) ---- + +/// `otlp` / `syslog` / `custom` — which mapping to use for `severity_number`. +pub const SEMANTIC_LOG_SEVERITY_SCHEME: &str = "greptime.semantic.log.severity_scheme"; +/// `string` / `json` / `mixed` — how to parse `body`. +pub const SEMANTIC_LOG_BODY_FORMAT: &str = "greptime.semantic.log.body_format"; + +// ---- Resource / scope preservation keys (populated in Phase 3) ---- + +/// JSON array string of resource attributes promoted to first-class columns. +pub const SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED: &str = + "greptime.semantic.resource.attributes_preserved"; +/// `true` / `false` — whether any resource attribute was dropped at ingest. +pub const SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED: &str = + "greptime.semantic.resource.attributes_dropped"; +/// `true` / `false` — whether `scope.name` / `scope.version` survive on the row. +pub const SEMANTIC_SCOPE_PRESERVED: &str = "greptime.semantic.scope.preserved"; + +// ---- Value constants ---- + +pub const SIGNAL_TYPE_TRACE: &str = "trace"; +pub const SIGNAL_TYPE_LOG: &str = "log"; +pub const SIGNAL_TYPE_METRIC: &str = "metric"; +pub const SIGNAL_TYPE_EVENT: &str = "event"; + +pub const SOURCE_OPENTELEMETRY: &str = "opentelemetry"; +pub const SOURCE_PROMETHEUS: &str = "prometheus"; + +pub const METADATA_QUALITY_DECLARED: &str = "declared"; +pub const METADATA_QUALITY_INFERRED: &str = "inferred"; + +/// Sentinel for a key that cannot be determined at stamp time. +pub const SEMANTIC_VALUE_UNKNOWN: &str = "unknown"; +/// Sentinel for a single-valued key that saw conflicting sources. +pub const SEMANTIC_VALUE_MIXED: &str = "mixed"; + +/// Every recognised public semantic table-option key. The set is a closed +/// whitelist: keys under [`SEMANTIC_PREFIX`] that are not listed here are rejected, +/// so an unknown key like `greptime.semantic.unknown_key` does not silently land +/// in a table's options. Adding a key to the vocabulary means adding it here. +pub const SEMANTIC_OPTION_KEYS: &[&str] = &[ + SEMANTIC_SIGNAL_TYPE, + SEMANTIC_SOURCE, + SEMANTIC_SOURCE_VERSION, + SEMANTIC_PIPELINE, + SEMANTIC_TRACE_CONVENTIONS, + SEMANTIC_TRACE_HAS_EVENTS, + SEMANTIC_TRACE_HAS_LINKS, + SEMANTIC_METRIC_TYPE, + SEMANTIC_METRIC_UNIT, + SEMANTIC_METRIC_TEMPORALITY, + SEMANTIC_METRIC_MONOTONIC, + SEMANTIC_METRIC_METADATA_QUALITY, + SEMANTIC_METRIC_ORIGINAL_NAME, + SEMANTIC_LOG_SEVERITY_SCHEME, + SEMANTIC_LOG_BODY_FORMAT, + SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED, + SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED, + SEMANTIC_SCOPE_PRESERVED, +]; + +/// Returns true if `key` is a recognised semantic table-option key (whitelist). +/// +/// Note this is membership, not a prefix test: unknown keys under +/// [`SEMANTIC_PREFIX`] are rejected, and the internal +/// [`SEMANTIC_PER_TABLE_INDEX_KEY`] (outside the prefix) never matches. +pub fn is_semantic_option_key(key: &str) -> bool { + SEMANTIC_OPTION_KEYS.contains(&key) +} + +/// Validates a `greptime.semantic.*` option's `value` against its allowed domain. +/// +/// Open-value keys (unit, original_name, version, pipeline, conventions, the +/// preserved-attributes list) accept any non-empty string. Closed-domain keys +/// accept a fixed set, plus the `unknown` sentinel, plus `mixed` for the keys +/// where one long-lived table can legitimately see multiple values. Keys not in +/// [`SEMANTIC_OPTION_KEYS`] are rejected. +pub fn validate_semantic_option(key: &str, value: &str) -> bool { + match key { + SEMANTIC_SOURCE_VERSION + | SEMANTIC_PIPELINE + | SEMANTIC_METRIC_UNIT + | SEMANTIC_METRIC_ORIGINAL_NAME + | SEMANTIC_TRACE_CONVENTIONS + | SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED => !value.is_empty(), + + SEMANTIC_SIGNAL_TYPE => matches!(value, "trace" | "log" | "metric" | "event" | "unknown"), + SEMANTIC_SOURCE => matches!( + value, + "opentelemetry" + | "prometheus" + | "elasticsearch" + | "loki" + | "custom" + | "mixed" + | "unknown" + ), + SEMANTIC_METRIC_TYPE => matches!( + value, + "counter" + | "gauge" + | "histogram" + | "summary" + | "updown_counter" + | "gauge_histogram" + | "info" + | "stateset" + | "mixed" + | "unknown" + ), + SEMANTIC_METRIC_TEMPORALITY => { + matches!(value, "cumulative" | "delta" | "mixed" | "unknown") + } + SEMANTIC_METRIC_MONOTONIC + | SEMANTIC_TRACE_HAS_EVENTS + | SEMANTIC_TRACE_HAS_LINKS + | SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED + | SEMANTIC_SCOPE_PRESERVED => matches!(value, "true" | "false" | "unknown"), + SEMANTIC_METRIC_METADATA_QUALITY => matches!(value, "declared" | "inferred" | "unknown"), + SEMANTIC_LOG_SEVERITY_SCHEME => matches!(value, "otlp" | "syslog" | "custom" | "unknown"), + SEMANTIC_LOG_BODY_FORMAT => matches!(value, "string" | "json" | "mixed" | "unknown"), + + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_semantic_option_key() { + assert!(is_semantic_option_key(SEMANTIC_SIGNAL_TYPE)); + assert!(is_semantic_option_key(SEMANTIC_METRIC_TYPE)); + + // Unknown keys under the prefix are not whitelisted. + assert!(!is_semantic_option_key("greptime.semantic.future.key")); + assert!(!is_semantic_option_key("greptime.semantic.unknown_key")); + // Near-misses must not match. + assert!(!is_semantic_option_key("greptime.semanticx")); + assert!(!is_semantic_option_key("semantic.signal_type")); + assert!(!is_semantic_option_key("table_data_model")); + // The internal transport key must never be treated as a table option. + assert!(!is_semantic_option_key(SEMANTIC_PER_TABLE_INDEX_KEY)); + } + + #[test] + fn test_validate_semantic_option() { + // Enum keys reject out-of-domain values. + assert!(validate_semantic_option(SEMANTIC_SIGNAL_TYPE, "metric")); + assert!(!validate_semantic_option(SEMANTIC_SIGNAL_TYPE, "spans")); + assert!(validate_semantic_option(SEMANTIC_METRIC_TYPE, "counter")); + assert!(validate_semantic_option(SEMANTIC_METRIC_TYPE, "mixed")); + assert!(!validate_semantic_option(SEMANTIC_METRIC_TYPE, "bogus")); + + // Booleans, sentinels, open values. + assert!(validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "true")); + assert!(!validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "yes")); + assert!(validate_semantic_option( + SEMANTIC_METRIC_TEMPORALITY, + "unknown" + )); + assert!(validate_semantic_option(SEMANTIC_METRIC_UNIT, "By")); + assert!(!validate_semantic_option(SEMANTIC_METRIC_UNIT, "")); + + // Unknown key is rejected regardless of value. + assert!(!validate_semantic_option( + "greptime.semantic.future.key", + "x" + )); + + // Drift guard: every value stamped by the ingestion path must validate. + assert!(validate_semantic_option( + SEMANTIC_SIGNAL_TYPE, + SIGNAL_TYPE_TRACE + )); + assert!(validate_semantic_option( + SEMANTIC_SIGNAL_TYPE, + SIGNAL_TYPE_METRIC + )); + assert!(validate_semantic_option( + SEMANTIC_SIGNAL_TYPE, + SIGNAL_TYPE_LOG + )); + assert!(validate_semantic_option( + SEMANTIC_SOURCE, + SOURCE_OPENTELEMETRY + )); + assert!(validate_semantic_option(SEMANTIC_SOURCE, SOURCE_PROMETHEUS)); + assert!(validate_semantic_option( + SEMANTIC_METRIC_METADATA_QUALITY, + METADATA_QUALITY_INFERRED + )); + assert!(validate_semantic_option( + SEMANTIC_TRACE_CONVENTIONS, + SEMANTIC_VALUE_UNKNOWN + )); + // An empty value never validates, for any whitelisted key. + for key in SEMANTIC_OPTION_KEYS { + assert!( + !validate_semantic_option(key, ""), + "empty value should never validate for {key}" + ); + } + } +} diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 60e0d9143e..ff231f6054 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -16,6 +16,7 @@ use std::env; use std::fmt::Display; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use auth::{DefaultPermissionChecker, PermissionCheckerRef, UserProviderRef}; use axum::Router; @@ -49,6 +50,7 @@ use servers::http::{HttpOptions, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::otel_arrow::OtelArrowServiceHandler; +use servers::pending_rows_batcher::PendingRowsBatcher; use servers::postgres::PostgresServer; use servers::prom_remote_write::validation::PromValidationMode; use servers::query_handler::sql::SqlQueryHandler; @@ -564,6 +566,24 @@ async fn run_sql(sql: &str, instance: &GreptimeDbStandalone) { pub async fn setup_test_prom_app_with_frontend( store_type: StorageType, name: &str, +) -> (Router, TestGuard) { + setup_test_prom_app_with_frontend_inner(store_type, name, false).await +} + +/// Like [`setup_test_prom_app_with_frontend`] but enables the pending-rows batcher, +/// so Prometheus remote write goes through the batched (metric-engine) path instead +/// of the direct `PromStoreProtocolHandler::write` path. +pub async fn setup_test_prom_app_with_frontend_batched( + store_type: StorageType, + name: &str, +) -> (Router, TestGuard) { + setup_test_prom_app_with_frontend_inner(store_type, name, true).await +} + +async fn setup_test_prom_app_with_frontend_inner( + store_type: StorageType, + name: &str, + enable_batcher: bool, ) -> (Router, TestGuard) { unsafe { std::env::set_var("TZ", "UTC"); @@ -617,6 +637,24 @@ pub async fn setup_test_prom_app_with_frontend( ..Default::default() }; let frontend_ref = instance.fe_instance().clone(); + // Mirror the production wiring at `frontend::server`: build the batcher from the + // instance's managers. A short flush interval keeps the test responsive. + let pending_rows_batcher = if enable_batcher { + PendingRowsBatcher::try_new( + frontend_ref.partition_manager().clone(), + frontend_ref.node_manager().clone(), + frontend_ref.catalog_manager().clone(), + true, + frontend_ref.clone(), + Duration::from_millis(50), + 1000, + 4, + 64, + 64, + ) + } else { + None + }; let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(frontend_ref.clone()) .with_logs_handler(instance.fe_instance().clone()) @@ -625,7 +663,7 @@ pub async fn setup_test_prom_app_with_frontend( Some(frontend_ref.clone()), true, PromValidationMode::Strict, - None, + pending_rows_batcher, ) .with_prometheus_handler(frontend_ref) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 5fe22359f3..9e757a743c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -71,6 +71,7 @@ use tests_integration::test_util::{ StorageType, setup_test_http_app, setup_test_http_app_with_frontend, setup_test_http_app_with_frontend_and_slow_query_threshold, setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, + setup_test_prom_app_with_frontend_batched, }; use urlencoding::encode; use yaml_rust::YamlLoader; @@ -117,6 +118,7 @@ macro_rules! http_tests { test_dashboard_path, test_dashboard_api, test_prometheus_remote_write, + test_prometheus_remote_write_batched, test_prometheus_remote_special_labels, test_prometheus_remote_schema_labels, test_prometheus_remote_write_with_pipeline, @@ -1956,6 +1958,18 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) { ) .await; + // Prom RW tables carry the metric identity; type is inferred from naming. + validate_data( + "prometheus_remote_write_semantic_identity", + &client, + "select count(*) from information_schema.tables where table_name = 'metric2' \ + and create_options like '%greptime.semantic.signal_type=metric%' \ + and create_options like '%greptime.semantic.source=prometheus%' \ + and create_options like '%greptime.semantic.metric.metadata_quality=inferred%';", + "[[1]]", + ) + .await; + // Write snappy encoded data with new labels let write_request = WriteRequest { timeseries: mock_timeseries_new_label(), @@ -1977,6 +1991,48 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) { guard.remove_all().await; } +/// Covers the batched (pending-rows-batcher) Prometheus remote write path, which +/// bypasses `PromStoreProtocolHandler::write`. Verifies the metric table is created +/// asynchronously and still carries the Prometheus semantic identity stamped on the +/// shared request context. +pub async fn test_prometheus_remote_write_batched(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_prom_app_with_frontend_batched(store_type, "prometheus_remote_write_batched") + .await; + let client = TestClient::new(app).await; + + let write_request = WriteRequest { + timeseries: prom_store::mock_timeseries(), + ..Default::default() + }; + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // The batcher flushes asynchronously, so poll until the table exists and carries + // the semantic identity (signal_type/source/metadata_quality). + wait_for_data( + &client, + "select count(*) from information_schema.tables where table_name = 'metric2' \ + and create_options like '%greptime.semantic.signal_type=metric%' \ + and create_options like '%greptime.semantic.source=prometheus%' \ + and create_options like '%greptime.semantic.metric.metadata_quality=inferred%'", + "[[1]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = @@ -2025,7 +2081,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { expected, ) .await; - let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f1'\\n)\"]]"; + let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f1'\\n)\"]]"; validate_data( "test_prometheus_remote_special_labels_idc3_show_create_table", &client, @@ -2051,7 +2107,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { expected, ) .await; - let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f2'\\n)\"]]"; + let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f2'\\n)\"]]"; validate_data( "test_prometheus_remote_special_labels_idc4_show_create_table", &client, @@ -5027,6 +5083,28 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { let expected = "[[\"claude_code_cost_usage_USD_total\"],[\"claude_code_token_usage_tokens_total\"],[\"demo\"],[\"greptime_physical_table\"],[\"numbers\"]]"; validate_data("otlp_metrics_all_tables", &client, "show tables;", expected).await; + // Metric-engine logical table carries the semantic identity. Match substrings + // because extra_options ordering is not stable. + validate_data( + "otlp_metrics_semantic_identity", + &client, + "select count(*) from information_schema.tables where table_name = 'claude_code_cost_usage_USD_total' \ + and create_options like '%greptime.semantic.signal_type=metric%' \ + and create_options like '%greptime.semantic.source=opentelemetry%';", + "[[1]]", + ) + .await; + // OTLP metric type is declared, so Phase 1 must not stamp `metadata_quality` + // here (Phase 2 adds it as `declared`). + validate_data( + "otlp_metrics_no_metadata_quality", + &client, + "select count(*) from information_schema.tables where table_name = 'claude_code_cost_usage_USD_total' \ + and create_options like '%metadata_quality%';", + "[[0]]", + ) + .await; + // CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" ( // "greptime_timestamp" TIMESTAMP(3) NOT NULL, // "greptime_value" DOUBLE NULL, @@ -5051,7 +5129,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_all_show_create_table", &client, @@ -5124,7 +5202,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table", &client, @@ -5188,7 +5266,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) { // on_physical_table = 'greptime_physical_table', // otlp_metric_compat = 'prom' // ) - let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; + let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]"; validate_data( "otlp_metrics_show_create_table_none", &client, @@ -5495,7 +5573,22 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#; validate_data("otlp_traces", &client, "select * from mytable;", expected).await; - let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + // The trace v1 main table carries the trace identity (events/links preserved as + // JSON columns by the v1 model). + validate_data( + "otlp_traces_semantic_identity", + &client, + "select count(*) from information_schema.tables where table_name = 'mytable' \ + and create_options like '%greptime.semantic.signal_type=trace%' \ + and create_options like '%greptime.semantic.source=opentelemetry%' \ + and create_options like '%greptime.semantic.pipeline=greptime_trace_v1%' \ + and create_options like '%greptime.semantic.trace.has_events=true%' \ + and create_options like '%greptime.semantic.trace.has_links=true%';", + "[[1]]", + ) + .await; + + let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -5548,7 +5641,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; assert_eq!(StatusCode::OK, res.status()); - let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -5585,7 +5678,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; assert_eq!(StatusCode::OK, res.status()); - let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -5622,7 +5715,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { ) .await; assert_eq!(StatusCode::OK, res.status()); - let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client, @@ -6285,6 +6378,17 @@ pub async fn test_otlp_logs(store_type: StorageType) { expected, ) .await; + + // The auto-created log table carries the log identity. + validate_data( + "otlp_logs_semantic_identity", + &client, + "select count(*) from information_schema.tables where table_name = 'opentelemetry_logs' \ + and create_options like '%greptime.semantic.signal_type=log%' \ + and create_options like '%greptime.semantic.source=opentelemetry%';", + "[[1]]", + ) + .await; } { @@ -7720,7 +7824,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]"; + let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\\n 'greptime.semantic.signal_type' = 'trace',\\n 'greptime.semantic.source' = 'opentelemetry',\\n 'greptime.semantic.trace.conventions' = 'unknown',\\n 'greptime.semantic.trace.has_events' = 'true',\\n 'greptime.semantic.trace.has_links' = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]"; validate_data( "trace_v1_create_table", &client,