feat: table semantic layer per-table enrichment (Phase 2) (#8218)

* feat: table semantic layer per-table enrichment (Phase 2)

Phase 2 of the table semantic layer, plus a vocabulary trim so the layer only
records what a machine consumer cannot cheaply recover on its own.

Per-table metric enrichment (OTLP), via an internal per-table channel:
- A `SemanticIndex` accumulator records, per emitted table, the declared metric
  keys: type / unit / temporality / metadata_quality=declared / original_name.
  Conflicting single-valued keys collapse to `mixed`/`unknown`.
- Recording happens at the `encode_metrics` level where the base name, metric
  type, and proto fields are all in scope, so histogram/summary fan-out gets the
  correct per-subtable type (`_bucket`=histogram, `_sum`/`_count`=counter)
  without threading state through every encoder.
- The index is serialized onto the `greptime.internal.semantic.per_table_index`
  context extension; `apply_per_table_semantic_options` folds each table's keys
  into its options at auto-create time.
- `trace.conventions` is refined from the request's resource/scope `schema_url`s
  (concrete when uniform, else `mixed`/`unknown`).

Vocabulary trimmed to only meaningful keys. Kept: signal_type, source, pipeline,
trace.conventions, metric.{type,unit,temporality,metadata_quality,original_name}.
Dropped: metric.monotonic (a function of type), trace.has_events/has_links
(constant + derivable from columns), log.severity_scheme/body_format (constant /
derivable, and body_format cost an O(rows) scan), resource/scope lineage
(restates columns / collector-config concern), source_version (no cheap
non-constant value today). Prometheus carries type/unit in the metric name by
convention, so it gets identity only — no inferred enrichment.

Identity (signal_type + source) extended to the remaining ingest protocols so
the discovery view is complete: InfluxDB and OpenTSDB (metric), Loki and
Elasticsearch (log). These protocols carry no type/unit metadata, so identity is
all that applies.

Tests: unit coverage for the accumulator, per-metric-type fan-out, and trace
conventions; integration goldens updated for the OTLP metric/trace SHOW CREATE
output and the new Loki identity.

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: validate the option value

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
dennis zhuang
2026-06-04 20:52:00 +08:00
committed by GitHub
parent 6b7772e457
commit 31c2c1f6db
11 changed files with 790 additions and 119 deletions

View File

@@ -48,35 +48,42 @@ The audience is broader than LLM agents. Alert generators need to choose between
## Vocabulary
All keys are flat strings under the `greptime.semantic.` prefix; values are strings; unknown keys are tolerated so the vocabulary can grow without coordinated rollouts.
All keys are flat strings under the `greptime.semantic.` prefix; values are strings.
The vocabulary is deliberately small. A key earns its place only when it records
something a consumer cannot cheaply and reliably recover on its own — from the
schema, the column set, or the metric-naming conventions it already understands.
Keys whose value is already in the metric name by convention (Prometheus
`_total`/`_bucket` suffixes), is a constant for the only producer that sets it,
or merely restates an existing column are intentionally omitted rather than
stamped for completeness. This is why, for example, Prometheus remote write
tables carry only the common identity (their type/unit live in the name), and
why resource-attribute lineage is not stamped (it is an ingest/collector-config
concern, not query-time semantics).
**Common (all signals)**
| Key | Example |
| --- | --- |
| `greptime.semantic.signal_type` | `trace` / `log` / `metric` / `event` |
| `greptime.semantic.source` | `opentelemetry` / `prometheus` / `elasticsearch` / `loki` / `custom` |
| `greptime.semantic.source_version` | protocol or SDK version, e.g. `v2` (Prom remote write), `1.30.0` (optional) |
| `greptime.semantic.pipeline` | `greptime_trace_v1` (subsumes the existing `table_data_model` value) |
| `greptime.semantic.source` | `opentelemetry` / `prometheus` / `influxdb` / `opentsdb` / `elasticsearch` / `loki` / `custom` |
| `greptime.semantic.pipeline` | `greptime_trace_v1` (the signal-agnostic successor to `table_data_model`) |
**Trace**: `greptime.semantic.trace.conventions` (e.g. `otel-semconv-1.27`, lifted from `schema_url`, which is the version of the OpenTelemetry semantic conventions used in this table), `greptime.semantic.trace.has_events`, `greptime.semantic.trace.has_links`.
**Trace**: `greptime.semantic.trace.conventions` (the OTel `schema_url` the rows conform to, or `mixed` / `unknown` when not single-valued).
**Metric** — v1 assumes one metric type per table, which is how both Prom RW and the post-v0.16 OTel ingestion path land data today; mixed-type tables are a follow-up.
**Metric** — v1 assumes one metric type per table, which is how both Prom RW and the post-v0.16 OTel ingestion path land data today; mixed-type tables are a follow-up. These are stamped for OTLP (which declares them on the wire and then discards them); Prometheus carries its type/unit in the name and gets identity only.
| Key | Example |
| --- | --- |
| `greptime.semantic.metric.type` | `counter` / `gauge` / `histogram` / `summary` / `updown_counter` / `gauge_histogram` / `info` / `stateset` |
| `greptime.semantic.metric.unit` | UCUM, e.g. `s`, `By`, `{request}` |
| `greptime.semantic.metric.temporality` | `cumulative` / `delta` (OTel only) |
| `greptime.semantic.metric.monotonic` | `true` / `false` |
| `greptime.semantic.metric.metadata_quality` | `declared` (OTLP / Prom RW v2 / exposition) or `inferred` (Prom RW v1, name-suffix guess) |
| `greptime.semantic.metric.original_name` | Pre-translation OTel name when the table name was Prometheus-ised |
| `greptime.semantic.metric.unit` | UCUM, e.g. `s`, `By`, `{request}` (discarded by the row encoders, so unrecoverable once ingested) |
| `greptime.semantic.metric.temporality` | `cumulative` / `delta` (OTel only; invisible in the name) |
| `greptime.semantic.metric.metadata_quality` | `declared` (OTLP / exposition) or `inferred` (Prom RW v1, name-suffix guess) |
| `greptime.semantic.metric.original_name` | Pre-translation OTel name when the table name was Prometheus-ised; the key a consumer uses to look the metric up in the OTel semantic conventions |
`metadata_quality = inferred` is the load-bearing field for confidence-aware tooling: an inferred counter should be re-checked before betting on `rate()`-style semantics.
**Log**: `greptime.semantic.log.severity_scheme` (`otlp` / `syslog` / `custom`), `greptime.semantic.log.body_format` (`string` / `json` / `mixed`).
**Resource / scope preservation**: `greptime.semantic.resource.attributes_preserved` (JSON array string of attrs promoted to columns), `greptime.semantic.resource.attributes_dropped` (boolean), `greptime.semantic.scope.preserved` (boolean). These answer the most common downstream question: "is this data missing because it was dropped, or because it lives on a different column than I think?" List-shaped values use JSON array strings rather than comma-separated text to avoid escaping and ordering ambiguity.
**Deliberately omitted (and why)**: `metric.monotonic` (a function of `type`); `trace.has_events`/`has_links` (constant for the v1 model and derivable from the `span_events`/`span_links` columns); `log.severity_scheme`/`log.body_format` (constant / derivable by sampling, and the latter cost an O(rows) scan); `resource.attributes_preserved`/`attributes_dropped`/`scope.preserved` (the preserved set restates columns, the dropped flag is a contentless boolean, and lineage is a collector-config concern); `source_version` (no cheap non-constant value today — Prom RW is v1-only, OTel SDK identity is deferred). Some are reserved for follow-ups (see Future Work).
## Conflict and update semantics

View File

@@ -30,6 +30,7 @@ use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::mito_engine_options::MERGE_MODE_KEY;
use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_INFLUXDB};
use crate::instance::Instance;
use crate::service_config::influxdb::InfluxdbMergeMode;
@@ -77,6 +78,12 @@ impl InfluxdbLineProtocolHandler for Instance {
.await?;
let ctx = ctx_with_default_merge_mode(ctx, self.influxdb_default_merge_mode);
let ctx = {
let mut c = (*ctx).clone();
c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
c.set_extension(SEMANTIC_SOURCE, SOURCE_INFLUXDB);
Arc::new(c)
};
self.handle_influx_row_inserts(requests, ctx)
.await

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
@@ -22,6 +24,7 @@ use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use snafu::prelude::*;
use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_OPENTSDB};
use crate::instance::Instance;
@@ -41,6 +44,13 @@ impl OpentsdbProtocolHandler for Instance {
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
let ctx = {
let mut c = (*ctx).clone();
c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTSDB);
Arc::new(c)
};
// OpenTSDB is single value.
let output = self
.handle_row_inserts(requests, ctx, true, true)

View File

@@ -44,9 +44,9 @@ use servers::query_handler::{
use session::context::QueryContextRef;
use snafu::{IntoError, ResultExt};
use table::requests::{
OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM, SEMANTIC_PIPELINE, SEMANTIC_SIGNAL_TYPE,
SEMANTIC_SOURCE, SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_TRACE_HAS_EVENTS,
SEMANTIC_TRACE_HAS_LINKS, SEMANTIC_VALUE_UNKNOWN, SIGNAL_TYPE_LOG, SIGNAL_TYPE_METRIC,
OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM, SEMANTIC_PER_TABLE_INDEX_KEY,
SEMANTIC_PIPELINE, SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SEMANTIC_TRACE_CONVENTIONS,
SEMANTIC_VALUE_MIXED, SEMANTIC_VALUE_UNKNOWN, SIGNAL_TYPE_LOG, SIGNAL_TYPE_METRIC,
SIGNAL_TYPE_TRACE, SOURCE_OPENTELEMETRY, TABLE_DATA_MODEL_TRACE_V1,
};
@@ -133,13 +133,19 @@ impl OpenTelemetryProtocolHandler for Instance {
.unwrap_or_default();
metric_ctx.is_legacy = is_legacy;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
let (requests, rows, semantic_index) =
otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
OTLP_METRICS_ROWS.inc_by(rows as u64);
let ctx = {
let mut c = (*ctx).clone();
c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
// Per-table metric specifics + resource/scope lineage ride this
// internal channel; the auto-create path folds them per table name.
if let Some(index) = semantic_index.encode() {
c.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, index);
}
if !is_legacy {
c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
}
@@ -185,6 +191,8 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
// `schema_url` is consumed by `parse`, so derive conventions first.
let conventions = trace_conventions(&request);
let spans = otlp::trace::span::parse(request);
self.ingest_trace_spans(
pipeline_handler,
@@ -192,6 +200,7 @@ impl OpenTelemetryProtocolHandler for Instance {
&pipeline_params,
table_name,
spans,
&conventions,
ctx,
)
.await
@@ -262,6 +271,7 @@ impl OpenTelemetryProtocolHandler for Instance {
impl Instance {
/// Ingest OTLP trace spans with chunk-level writes and span-level fallback on
/// deterministic chunk failures.
#[allow(clippy::too_many_arguments)]
async fn ingest_trace_spans(
&self,
pipeline_handler: PipelineHandlerRef,
@@ -269,6 +279,7 @@ impl Instance {
pipeline_params: &GreptimePipelineParams,
table_name: String,
groups: Vec<TraceSpanGroup>,
conventions: &str,
ctx: QueryContextRef,
) -> ServerResult<TraceIngestOutcome> {
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
@@ -281,10 +292,7 @@ impl Instance {
c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
if is_trace_v1_model {
c.set_extension(SEMANTIC_PIPELINE, TABLE_DATA_MODEL_TRACE_V1);
c.set_extension(SEMANTIC_TRACE_HAS_EVENTS, "true");
c.set_extension(SEMANTIC_TRACE_HAS_LINKS, "true");
// schema_url is row-level, so conventions is unknown at table level.
c.set_extension(SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_VALUE_UNKNOWN);
c.set_extension(SEMANTIC_TRACE_CONVENTIONS, conventions);
}
Arc::new(c)
};
@@ -838,6 +846,47 @@ where
error::ExecuteGrpcQuerySnafu.into_error(BoxedError::new(err))
}
/// Derives `trace.conventions` from the request's resource/scope `schema_url`s.
/// A single distinct non-empty value is concrete; multiple distinct values are
/// `mixed`; none is `unknown`. `schema_url` is row-level in OTLP, so the
/// table-level value is best-effort per the RFC conflict rule.
fn trace_conventions(request: &ExportTraceServiceRequest) -> String {
let mut seen: Option<&str> = None;
let mut mixed = false;
for resource_spans in &request.resource_spans {
let urls = std::iter::once(resource_spans.schema_url.as_str()).chain(
resource_spans
.scope_spans
.iter()
.map(|s| s.schema_url.as_str()),
);
for url in urls {
if url.is_empty() {
continue;
}
match seen {
None => seen = Some(url),
Some(prev) if prev == url => {}
Some(_) => {
mixed = true;
break;
}
}
}
if mixed {
break;
}
}
if mixed {
SEMANTIC_VALUE_MIXED.to_string()
} else {
seen.map(str::to_string)
.unwrap_or_else(|| SEMANTIC_VALUE_UNKNOWN.to_string())
}
}
#[cfg(test)]
mod tests {
use common_error::ext::ErrorExt;
@@ -1011,4 +1060,47 @@ mod tests {
assert_eq!(err.status_code(), StatusCode::TableNotFound);
}
use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans};
use super::{ExportTraceServiceRequest, trace_conventions};
fn resource_spans(resource_url: &str, scope_urls: &[&str]) -> ResourceSpans {
ResourceSpans {
schema_url: resource_url.to_string(),
scope_spans: scope_urls
.iter()
.map(|u| ScopeSpans {
schema_url: u.to_string(),
..Default::default()
})
.collect(),
..Default::default()
}
}
#[test]
fn test_trace_conventions() {
let unknown = ExportTraceServiceRequest::default();
assert_eq!(trace_conventions(&unknown), "unknown");
let url = "https://opentelemetry.io/schemas/1.27.0";
let single = ExportTraceServiceRequest {
resource_spans: vec![resource_spans("", &[url, url])],
};
assert_eq!(trace_conventions(&single), url);
let resource_level = ExportTraceServiceRequest {
resource_spans: vec![resource_spans(url, &[""])],
};
assert_eq!(trace_conventions(&resource_level), url);
let conflicting = ExportTraceServiceRequest {
resource_spans: vec![resource_spans(
"",
&[url, "https://opentelemetry.io/schemas/1.30.0"],
)],
};
assert_eq!(trace_conventions(&conflicting), "mixed");
}
}

View File

@@ -61,9 +61,9 @@ use store_api::storage::{RegionId, TableId};
use table::TableRef;
use table::metadata::TableInfo;
use table::requests::{
AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS,
is_semantic_option_key,
AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, SEMANTIC_PER_TABLE_INDEX_KEY,
TABLE_DATA_MODEL, TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY,
VALID_TABLE_OPTION_KEYS, is_semantic_option_key, validate_semantic_option,
};
use table::table_reference::TableReference;
@@ -861,6 +861,7 @@ impl Inserter {
) -> Result<CreateTableExpr> {
let mut table_options = std::collections::HashMap::with_capacity(4);
fill_table_options_for_create(&mut table_options, create_type, ctx);
apply_per_table_semantic_options(&mut table_options, ctx, &req.table_name);
let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
// engine should be metric engine when creating logical tables.
@@ -1090,9 +1091,9 @@ pub fn fill_table_options_for_create(
}
}
// Semantic keys are prefix-matched, not in the fixed allowlist above.
// Semantic keys use their own vocabulary instead of the fixed option list.
for (key, value) in ctx.extensions() {
if is_semantic_option_key(&key) {
if is_semantic_option_key(&key) && validate_semantic_option(&key, &value) {
table_options.insert(key, value);
}
}
@@ -1144,6 +1145,41 @@ pub fn fill_table_options_for_create(
}
}
/// Folds the semantic keys for `table_name` carried on the internal per-table
/// index extension into `table_options`.
///
/// The index is a `{table_name -> {semantic_key: value}}` JSON blob produced by
/// the OTLP metrics encode path (where one metric can fan out into several
/// tables with distinct keys). Common keys shared by every table in a request
/// travel as plain semantic extensions and are handled by
/// [`fill_table_options_for_create`]; this carries only the per-table tail.
/// Keys are re-checked against the vocabulary defensively. Ingestion paths
/// without a per-table index (logs, traces, Prom RW) carry no extension, so this
/// is a no-op for them.
fn apply_per_table_semantic_options(
table_options: &mut std::collections::HashMap<String, String>,
ctx: &QueryContextRef,
table_name: &str,
) {
let Some(raw) = ctx.extension(SEMANTIC_PER_TABLE_INDEX_KEY) else {
return;
};
let Ok(index) = serde_json::from_str::<
std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>,
>(raw) else {
warn!("failed to parse semantic per-table index, skipping per-table options");
return;
};
let Some(entry) = index.get(table_name) else {
return;
};
for (key, value) in entry {
if is_semantic_option_key(key) && validate_semantic_option(key, value) {
table_options.insert(key.clone(), value.clone());
}
}
}
pub fn build_create_table_expr(
table: &TableReference,
request_schema: &[ColumnSchema],
@@ -1402,13 +1438,14 @@ mod tests {
#[test]
fn test_fill_table_options_copies_semantic_extensions() {
use table::requests::{
SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE,
SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY,
SEMANTIC_METRIC_TYPE, SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE,
SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY,
};
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
ctx.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
ctx.set_extension(SEMANTIC_METRIC_TYPE, "bogus");
// The internal transport key must NOT be copied into table options.
ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, "{}");
let ctx = Arc::new(ctx);
@@ -1424,9 +1461,59 @@ mod tests {
Some(SOURCE_OPENTELEMETRY),
table_options.get(SEMANTIC_SOURCE).map(String::as_str)
);
assert!(!table_options.contains_key(SEMANTIC_METRIC_TYPE));
assert!(!table_options.contains_key(SEMANTIC_PER_TABLE_INDEX_KEY));
}
#[test]
fn test_apply_per_table_semantic_options() {
use table::requests::{
SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, SEMANTIC_PER_TABLE_INDEX_KEY,
};
let index = r#"{
"http_requests_total": {
"greptime.semantic.metric.type": "counter",
"greptime.semantic.metric.unit": "By",
"greptime.semantic.metric.type_BOGUS": "x"
},
"other_table": {
"greptime.semantic.metric.type": "gauge"
}
}"#;
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, index);
let ctx = Arc::new(ctx);
let mut table_options = std::collections::HashMap::new();
apply_per_table_semantic_options(&mut table_options, &ctx, "http_requests_total");
assert_eq!(
table_options.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
Some("counter")
);
assert_eq!(
table_options.get(SEMANTIC_METRIC_UNIT).map(String::as_str),
Some("By")
);
// The unknown key is rejected by the vocabulary check; other tables' keys
// never appear.
assert!(!table_options.contains_key("greptime.semantic.metric.type_BOGUS"));
assert_eq!(table_options.len(), 2);
let mut empty = std::collections::HashMap::new();
apply_per_table_semantic_options(&mut empty, &ctx, "not_in_index");
assert!(empty.is_empty());
// No extension at all is a no-op (e.g. logs / Prom RW).
let bare = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
));
let mut opts = std::collections::HashMap::new();
apply_per_table_semantic_options(&mut opts, &bare, "http_requests_total");
assert!(opts.is_empty());
}
#[test]
fn test_last_non_null_create_options_preserve_default_with_append_mode_false() {
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);

View File

@@ -31,6 +31,9 @@ use pipeline::{
use serde_json::{Deserializer, Value, json};
use session::context::{Channel, QueryContext};
use snafu::{ResultExt, ensure};
use table::requests::{
SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_ELASTICSEARCH,
};
use vrl::value::Value as VrlValue;
use crate::error::{
@@ -134,6 +137,8 @@ async fn do_handle_bulk_api(
// The `schema` is already set in the query_ctx in auth process.
query_ctx.set_channel(Channel::Elasticsearch);
query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG);
query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_ELASTICSEARCH);
let db = query_ctx.current_schema();

View File

@@ -40,6 +40,7 @@ use prost::Message;
use quoted_string::test_utils::TestSpec;
use session::context::{Channel, QueryContext};
use snafu::{OptionExt, ResultExt, ensure};
use table::requests::{SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_LOKI};
use vrl::value::{KeyString, Value as VrlValue};
use crate::error::{
@@ -110,6 +111,8 @@ pub async fn loki_ingest(
bytes: Bytes,
) -> Result<HttpResponse> {
ctx.set_channel(Channel::Loki);
ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG);
ctx.set_extension(SEMANTIC_SOURCE, SOURCE_LOKI);
let ctx = Arc::new(ctx);
let table_name = table_name.unwrap_or_else(|| LOKI_TABLE_NAME.to_string());
let db = ctx.get_db_string();

View File

@@ -21,13 +21,19 @@ use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetrics
use otel_arrow_rust::proto::opentelemetry::common::v1::{AnyValue, KeyValue, any_value};
use otel_arrow_rust::proto::opentelemetry::metrics::v1::{metric, number_data_point, *};
use session::protocol_ctx::{MetricType, OtlpMetricCtx};
use table::requests::{
METADATA_QUALITY_DECLARED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_METRIC_ORIGINAL_NAME,
SEMANTIC_METRIC_TEMPORALITY, SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT,
};
use crate::error::Result;
use crate::otlp::trace::{KEY_SERVICE_INSTANCE_ID, KEY_SERVICE_NAME};
use crate::row_writer::{self, MultiTableData, TableData};
mod semantic;
mod translator;
pub use semantic::SemanticIndex;
pub use translator::legacy_normalize_otlp_name;
use translator::{translate_label_name, translate_metric_name};
@@ -36,6 +42,16 @@ const APPROXIMATE_COLUMN_COUNT: usize = 8;
const COUNT_TABLE_SUFFIX: &str = "_count";
const SUM_TABLE_SUFFIX: &str = "_sum";
const BUCKET_TABLE_SUFFIX: &str = "_bucket";
// `greptime.semantic.metric.type` values stamped per emitted table. Must stay
// within the domain accepted by `validate_semantic_option`; the drift-guard test
// asserts this.
const METRIC_TYPE_COUNTER: &str = "counter";
const METRIC_TYPE_UPDOWN_COUNTER: &str = "updown_counter";
const METRIC_TYPE_GAUGE: &str = "gauge";
const METRIC_TYPE_HISTOGRAM: &str = "histogram";
const METRIC_TYPE_SUMMARY: &str = "summary";
const JOB_KEY: &str = "job";
const INSTANCE_KEY: &str = "instance";
@@ -78,12 +94,14 @@ const OTEL_SCOPE_SCHEMA_URL: &str = "schema_url";
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto>
/// for data structure of OTLP metrics.
///
/// Returns `InsertRequests` and total number of rows to ingest
/// Returns `InsertRequests`, total number of rows to ingest, and the per-table
/// semantic index for the auto-create path to stamp as table options.
pub fn to_grpc_insert_requests(
request: ExportMetricsServiceRequest,
metric_ctx: &mut OtlpMetricCtx,
) -> Result<(RowInsertRequests, usize)> {
) -> Result<(RowInsertRequests, usize, SemanticIndex)> {
let mut table_writer = MultiTableData::default();
let mut semantic_index = SemanticIndex::default();
for resource in &request.resource_metrics {
let resource_attrs = resource.resource.as_ref().map(|r| {
@@ -109,12 +127,98 @@ pub fn to_grpc_insert_requests(
resource_attrs.as_ref(),
scope_attrs.as_ref(),
metric_ctx,
&mut semantic_index,
)?;
}
}
}
Ok(table_writer.into_row_insert_requests())
let (requests, rows) = table_writer.into_row_insert_requests();
Ok((requests, rows, semantic_index))
}
/// The tables a metric emits and their per-table `metric.type`. Histogram fans
/// out into `_bucket` (the histogram) plus `_sum`/`_count` counters; summary
/// fans out into the quantile table plus `_count`/`_sum` counters (legacy
/// summary stays a single table).
fn emitted_semantic_tables(
metric_type: &MetricType,
is_legacy: bool,
base: &str,
) -> Vec<(String, &'static str)> {
match metric_type {
MetricType::Gauge => vec![(base.to_string(), METRIC_TYPE_GAUGE)],
MetricType::MonotonicSum => vec![(base.to_string(), METRIC_TYPE_COUNTER)],
MetricType::NonMonotonicSum => vec![(base.to_string(), METRIC_TYPE_UPDOWN_COUNTER)],
MetricType::Histogram => vec![
(
format!("{base}{BUCKET_TABLE_SUFFIX}"),
METRIC_TYPE_HISTOGRAM,
),
(format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
(format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
],
MetricType::Summary if is_legacy => vec![(base.to_string(), METRIC_TYPE_SUMMARY)],
MetricType::Summary => vec![
(base.to_string(), METRIC_TYPE_SUMMARY),
(format!("{base}{COUNT_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
(format!("{base}{SUM_TABLE_SUFFIX}"), METRIC_TYPE_COUNTER),
],
// ExponentialHistogram is a no-op today; Init never reaches encoding.
MetricType::ExponentialHistogram | MetricType::Init => vec![],
}
}
/// Maps OTLP `aggregation_temporality` to the semantic value, or `None` when the
/// instrument has no temporality (gauge/summary) or it is unspecified.
fn temporality_value(data: &metric::Data) -> Option<&'static str> {
let raw = match data {
metric::Data::Sum(sum) => sum.aggregation_temporality,
metric::Data::Histogram(hist) => hist.aggregation_temporality,
_ => return None,
};
match AggregationTemporality::try_from(raw) {
Ok(AggregationTemporality::Delta) => Some("delta"),
Ok(AggregationTemporality::Cumulative) => Some("cumulative"),
_ => None,
}
}
/// Records the declared metric-level semantic keys for every table this metric
/// emits.
fn record_metric_semantics(
index: &mut SemanticIndex,
metric: &Metric,
name: &str,
metric_ctx: &OtlpMetricCtx,
) {
let emitted = emitted_semantic_tables(&metric_ctx.metric_type, metric_ctx.is_legacy, name);
if emitted.is_empty() {
return;
}
let temporality = metric.data.as_ref().and_then(temporality_value);
let unit = metric.unit.trim();
// `original_name` is meaningful only when translation renamed the metric.
let original_name = (name != metric.name.as_str()).then_some(metric.name.as_str());
for (table, metric_type) in &emitted {
index.record_scalar(table, SEMANTIC_METRIC_TYPE, metric_type);
index.record_scalar(
table,
SEMANTIC_METRIC_METADATA_QUALITY,
METADATA_QUALITY_DECLARED,
);
if let Some(temporality) = temporality {
index.record_scalar(table, SEMANTIC_METRIC_TEMPORALITY, temporality);
}
if !unit.is_empty() {
index.record_scalar(table, SEMANTIC_METRIC_UNIT, unit);
}
if let Some(original_name) = original_name {
index.record_scalar(table, SEMANTIC_METRIC_ORIGINAL_NAME, original_name);
}
}
}
fn from_metric_type(data: &metric::Data) -> MetricType {
@@ -211,6 +315,7 @@ fn encode_metrics(
resource_attrs: Option<&Vec<KeyValue>>,
scope_attrs: Option<&Vec<KeyValue>>,
metric_ctx: &OtlpMetricCtx,
semantic_index: &mut SemanticIndex,
) -> Result<()> {
let name = if metric_ctx.is_legacy {
legacy_normalize_otlp_name(&metric.name)
@@ -222,8 +327,11 @@ fn encode_metrics(
)
};
// note that we don't store description or unit, we might want to deal with
// these fields in the future.
// Stamp semantic metadata against the same table name(s) the data is written
// to below. `unit` is captured here (it is otherwise discarded by the row
// encoders) along with the declared type/temporality.
record_metric_semantics(semantic_index, metric, &name, metric_ctx);
if let Some(data) = &metric.data {
match data {
metric::Data::Gauge(gauge) => {
@@ -511,9 +619,9 @@ fn encode_histogram(
) -> Result<()> {
let normalized_name = name;
let bucket_table_name = format!("{}_bucket", normalized_name);
let sum_table_name = format!("{}_sum", normalized_name);
let count_table_name = format!("{}_count", normalized_name);
let bucket_table_name = format!("{}{}", normalized_name, BUCKET_TABLE_SUFFIX);
let sum_table_name = format!("{}{}", normalized_name, SUM_TABLE_SUFFIX);
let count_table_name = format!("{}{}", normalized_name, COUNT_TABLE_SUFFIX);
let data_points_len = hist.data_points.len();
// Note that the row and columns number here is approximate
@@ -1032,4 +1140,191 @@ mod tests {
]
);
}
use std::collections::BTreeMap;
use table::requests::validate_semantic_option;
fn decode(index: &SemanticIndex) -> BTreeMap<String, BTreeMap<String, String>> {
serde_json::from_str(&index.encode().expect("non-empty index")).unwrap()
}
fn record(metric: &Metric, metric_type: MetricType, name: &str) -> SemanticIndex {
let ctx = OtlpMetricCtx {
metric_type,
..Default::default()
};
let mut index = SemanticIndex::default();
record_metric_semantics(&mut index, metric, name, &ctx);
index
}
#[test]
fn test_metric_type_constants_validate() {
for value in [
METRIC_TYPE_COUNTER,
METRIC_TYPE_UPDOWN_COUNTER,
METRIC_TYPE_GAUGE,
METRIC_TYPE_HISTOGRAM,
METRIC_TYPE_SUMMARY,
] {
assert!(
validate_semantic_option(SEMANTIC_METRIC_TYPE, value),
"metric.type value `{value}` must be in the vocabulary domain"
);
}
for value in ["delta", "cumulative"] {
assert!(validate_semantic_option(SEMANTIC_METRIC_TEMPORALITY, value));
}
}
#[test]
fn test_record_monotonic_sum() {
let metric = Metric {
name: "claude_code.cost.usage".to_string(),
unit: "USD".to_string(),
data: Some(metric::Data::Sum(Sum {
aggregation_temporality: AggregationTemporality::Delta as i32,
is_monotonic: true,
..Default::default()
})),
..Default::default()
};
let index = record(
&metric,
MetricType::MonotonicSum,
"claude_code_cost_usage_USD_total",
);
let decoded = decode(&index);
let t = &decoded["claude_code_cost_usage_USD_total"];
assert_eq!(
t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
Some("counter")
);
assert_eq!(
t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
Some("delta")
);
assert_eq!(t.get(SEMANTIC_METRIC_UNIT).map(String::as_str), Some("USD"));
assert_eq!(
t.get(SEMANTIC_METRIC_ORIGINAL_NAME).map(String::as_str),
Some("claude_code.cost.usage")
);
assert_eq!(
t.get(SEMANTIC_METRIC_METADATA_QUALITY).map(String::as_str),
Some("declared")
);
}
#[test]
fn test_record_non_monotonic_sum() {
let metric = Metric {
name: "queue_size".to_string(),
data: Some(metric::Data::Sum(Sum {
aggregation_temporality: AggregationTemporality::Cumulative as i32,
is_monotonic: false,
..Default::default()
})),
..Default::default()
};
let index = record(&metric, MetricType::NonMonotonicSum, "queue_size");
let decoded = decode(&index);
let t = &decoded["queue_size"];
assert_eq!(
t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
Some("updown_counter")
);
assert_eq!(
t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
Some("cumulative")
);
// Name unchanged by translation -> no original_name.
assert_eq!(t.get(SEMANTIC_METRIC_ORIGINAL_NAME), None);
}
#[test]
fn test_record_gauge_has_no_temporality() {
let metric = Metric {
name: "temperature".to_string(),
data: Some(metric::Data::Gauge(Gauge::default())),
..Default::default()
};
let index = record(&metric, MetricType::Gauge, "temperature");
let decoded = decode(&index);
let t = &decoded["temperature"];
assert_eq!(
t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
Some("gauge")
);
assert_eq!(t.get(SEMANTIC_METRIC_TEMPORALITY), None);
}
#[test]
fn test_record_histogram_fans_out_with_distinct_types() {
let metric = Metric {
name: "request.duration".to_string(),
unit: "s".to_string(),
data: Some(metric::Data::Histogram(Histogram {
aggregation_temporality: AggregationTemporality::Cumulative as i32,
..Default::default()
})),
..Default::default()
};
let index = record(&metric, MetricType::Histogram, "request_duration");
let decoded = decode(&index);
let bucket = &decoded["request_duration_bucket"];
assert_eq!(
bucket.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
Some("histogram")
);
assert_eq!(
bucket.get(SEMANTIC_METRIC_UNIT).map(String::as_str),
Some("s")
);
for companion in ["request_duration_sum", "request_duration_count"] {
let t = &decoded[companion];
assert_eq!(
t.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
Some("counter")
);
assert_eq!(
t.get(SEMANTIC_METRIC_TEMPORALITY).map(String::as_str),
Some("cumulative")
);
}
}
#[test]
fn test_record_summary_fans_out() {
let metric = Metric {
name: "rpc.latency".to_string(),
data: Some(metric::Data::Summary(Summary::default())),
..Default::default()
};
let index = record(&metric, MetricType::Summary, "rpc_latency");
let decoded = decode(&index);
assert_eq!(
decoded["rpc_latency"]
.get(SEMANTIC_METRIC_TYPE)
.map(String::as_str),
Some("summary")
);
// Summary has no temporality.
assert_eq!(
decoded["rpc_latency"].get(SEMANTIC_METRIC_TEMPORALITY),
None
);
for companion in ["rpc_latency_count", "rpc_latency_sum"] {
assert_eq!(
decoded[companion]
.get(SEMANTIC_METRIC_TYPE)
.map(String::as_str),
Some("counter")
);
}
}
}

View File

@@ -0,0 +1,177 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Per-table semantic metadata accumulated during one OTLP metrics encode pass.
//!
//! A metric emits one or more tables (histogram and summary fan out into
//! `_bucket`/`_sum`/`_count` companions). Each emitted table collects the
//! metric's scalar semantic keys. The resulting index is serialized onto the
//! `greptime.internal.semantic.per_table_index` context extension and folded
//! into each table's options at auto-create time.
//!
//! Conflict handling follows the RFC: when two sources disagree on a
//! single-valued key the value collapses to `mixed` (or `unknown` for keys whose
//! domain has no `mixed`).
use std::collections::{BTreeMap, HashMap};
use table::requests::{SEMANTIC_VALUE_MIXED, SEMANTIC_VALUE_UNKNOWN, validate_semantic_option};
/// Index of `{table_name -> {semantic_key -> value}}` built while encoding.
#[derive(Debug, Default)]
pub struct SemanticIndex {
/// Per-table scalar keys; conflicting values collapse to `mixed`/`unknown`.
tables: HashMap<String, BTreeMap<&'static str, String>>,
}
impl SemanticIndex {
pub fn is_empty(&self) -> bool {
self.tables.is_empty()
}
/// Records a scalar semantic key for `table`. A value conflicting with one
/// already recorded collapses the key to `mixed`/`unknown`; once collapsed
/// it stays collapsed.
pub fn record_scalar(&mut self, table: &str, key: &'static str, value: &str) {
// Avoid allocating the table name (and an empty map) on the common path
// where the table is already present.
if let Some(scalars) = self.tables.get_mut(table) {
match scalars.get(key).map(String::as_str) {
Some(existing) if existing == value => {}
Some(SEMANTIC_VALUE_MIXED) | Some(SEMANTIC_VALUE_UNKNOWN) => {}
Some(_) => {
scalars.insert(key, collapse_value(key));
}
None => {
scalars.insert(key, value.to_string());
}
}
} else {
self.tables.insert(
table.to_string(),
BTreeMap::from([(key, value.to_string())]),
);
}
}
/// Serializes to the JSON `{table -> {key -> value}}` carried on the context
/// extension, or `None` when nothing was recorded.
pub fn encode(&self) -> Option<String> {
if self.tables.is_empty() {
return None;
}
serde_json::to_string(&self.tables).ok()
}
#[cfg(test)]
fn options_of(&self, table: &str) -> Option<&BTreeMap<&'static str, String>> {
self.tables.get(table)
}
}
/// The collapsed value for a conflicting scalar key: `mixed` when the key's
/// domain accepts it, else `unknown`. Uses the vocabulary validator as the
/// single source of truth for which keys allow `mixed`.
fn collapse_value(key: &str) -> String {
if validate_semantic_option(key, SEMANTIC_VALUE_MIXED) {
SEMANTIC_VALUE_MIXED.to_string()
} else {
SEMANTIC_VALUE_UNKNOWN.to_string()
}
}
#[cfg(test)]
mod tests {
use table::requests::{
SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT,
};
use super::*;
#[test]
fn test_scalar_recording_keeps_first_then_collapses_on_conflict() {
let mut index = SemanticIndex::default();
index.record_scalar("t", SEMANTIC_METRIC_TYPE, "counter");
index.record_scalar("t", SEMANTIC_METRIC_TYPE, "counter");
assert_eq!(
index
.options_of("t")
.unwrap()
.get(SEMANTIC_METRIC_TYPE)
.map(String::as_str),
Some("counter")
);
// Conflict on a key whose domain has `mixed` collapses to `mixed`.
index.record_scalar("t", SEMANTIC_METRIC_TYPE, "gauge");
assert_eq!(
index
.options_of("t")
.unwrap()
.get(SEMANTIC_METRIC_TYPE)
.map(String::as_str),
Some("mixed")
);
// Further writes stay collapsed.
index.record_scalar("t", SEMANTIC_METRIC_TYPE, "histogram");
assert_eq!(
index
.options_of("t")
.unwrap()
.get(SEMANTIC_METRIC_TYPE)
.map(String::as_str),
Some("mixed")
);
}
#[test]
fn test_scalar_conflict_without_mixed_domain_collapses_to_unknown() {
let mut index = SemanticIndex::default();
index.record_scalar("t", SEMANTIC_METRIC_METADATA_QUALITY, "declared");
index.record_scalar("t", SEMANTIC_METRIC_METADATA_QUALITY, "inferred");
// metadata_quality accepts only declared/inferred/unknown, so a conflict
// is `unknown`.
assert_eq!(
index
.options_of("t")
.unwrap()
.get(SEMANTIC_METRIC_METADATA_QUALITY)
.map(String::as_str),
Some("unknown")
);
}
#[test]
fn test_encode_is_none_when_empty_and_round_trips() {
let index = SemanticIndex::default();
assert!(index.is_empty());
assert_eq!(index.encode(), None);
let mut index = SemanticIndex::default();
index.record_scalar("metric_a", SEMANTIC_METRIC_TYPE, "counter");
index.record_scalar("metric_a", SEMANTIC_METRIC_UNIT, "By");
let json = index.encode().unwrap();
let parsed: BTreeMap<String, BTreeMap<String, String>> =
serde_json::from_str(&json).unwrap();
let table = parsed.get("metric_a").unwrap();
assert_eq!(
table.get(SEMANTIC_METRIC_TYPE).map(String::as_str),
Some("counter")
);
assert_eq!(
table.get(SEMANTIC_METRIC_UNIT).map(String::as_str),
Some("By")
);
}
}

View File

@@ -19,8 +19,14 @@
//! align a table with the observability concept it stands for without guessing
//! from column names. See `docs/rfcs/2026-05-28-table-semantic-layer.md`.
//!
//! All public table-option keys share the [`SEMANTIC_PREFIX`] namespace and are
//! string-valued. [`is_semantic_option_key`] gates them through
//! The vocabulary is intentionally small: a key earns its place only when it
//! records something a consumer cannot cheaply and reliably recover from the
//! schema/data itself. Keys whose value is already in the metric name by
//! convention, is a constant, or duplicates an existing column are deliberately
//! omitted rather than stamped for completeness.
//!
//! All public keys share the [`SEMANTIC_PREFIX`] namespace and are string-valued.
//! [`is_semantic_option_key`] gates them through
//! [`crate::requests::validate_table_option`], so they are accepted both on the
//! ingestion auto-create path and on explicit `CREATE TABLE ... WITH (...)` DDL.
@@ -40,56 +46,34 @@ pub const SEMANTIC_PER_TABLE_INDEX_KEY: &str = "greptime.internal.semantic.per_t
pub const SEMANTIC_SIGNAL_TYPE: &str = "greptime.semantic.signal_type";
/// Ingestion ecosystem, e.g. [`SOURCE_OPENTELEMETRY`] / [`SOURCE_PROMETHEUS`].
pub const SEMANTIC_SOURCE: &str = "greptime.semantic.source";
/// Optional protocol or SDK version string, e.g. `v2` (Prom remote write), `1.30.0`.
pub const SEMANTIC_SOURCE_VERSION: &str = "greptime.semantic.source_version";
/// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`.
/// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`. The
/// signal-agnostic successor to the engine-specific `table_data_model` option.
pub const SEMANTIC_PIPELINE: &str = "greptime.semantic.pipeline";
// ---- Trace keys ----
/// Semantic-conventions version the rows conform to (e.g. `otel-semconv-1.27`),
/// Semantic-conventions version the rows conform to (e.g. the OTel schema URL),
/// or [`SEMANTIC_VALUE_UNKNOWN`] / [`SEMANTIC_VALUE_MIXED`] when not single-valued.
pub const SEMANTIC_TRACE_CONVENTIONS: &str = "greptime.semantic.trace.conventions";
/// Whether `span_events` are preserved on the table.
pub const SEMANTIC_TRACE_HAS_EVENTS: &str = "greptime.semantic.trace.has_events";
/// Whether `span_links` are preserved on the table.
pub const SEMANTIC_TRACE_HAS_LINKS: &str = "greptime.semantic.trace.has_links";
// ---- Metric keys (populated in Phase 2) ----
// ---- Metric keys ----
/// Instrument kind: `counter` / `gauge` / `histogram` / `summary` /
/// `updown_counter` / `gauge_histogram` / `info` / `stateset`.
pub const SEMANTIC_METRIC_TYPE: &str = "greptime.semantic.metric.type";
/// UCUM unit, e.g. `s`, `By`, `{request}`.
/// UCUM unit, e.g. `s`, `By`, `{request}`. Discarded by the row encoders, so it
/// is unrecoverable once ingested.
pub const SEMANTIC_METRIC_UNIT: &str = "greptime.semantic.metric.unit";
/// `cumulative` / `delta` (OTel only).
/// `cumulative` / `delta` (OTel only). Invisible in the metric name, so it is
/// unrecoverable from the table alone.
pub const SEMANTIC_METRIC_TEMPORALITY: &str = "greptime.semantic.metric.temporality";
/// `true` / `false` for sum / counter typed data.
pub const SEMANTIC_METRIC_MONOTONIC: &str = "greptime.semantic.metric.monotonic";
/// [`METADATA_QUALITY_DECLARED`] when the protocol stated the type, or
/// [`METADATA_QUALITY_INFERRED`] when guessed from a name suffix.
pub const SEMANTIC_METRIC_METADATA_QUALITY: &str = "greptime.semantic.metric.metadata_quality";
/// Pre-translation OTel metric name when the table name was Prometheus-ised.
/// Pre-translation OTel name when the table name was Prometheus-ised; the key a
/// consumer uses to look the metric up in the OTel semantic conventions.
pub const SEMANTIC_METRIC_ORIGINAL_NAME: &str = "greptime.semantic.metric.original_name";
// ---- Log keys (populated in Phase 3) ----
/// `otlp` / `syslog` / `custom` — which mapping to use for `severity_number`.
pub const SEMANTIC_LOG_SEVERITY_SCHEME: &str = "greptime.semantic.log.severity_scheme";
/// `string` / `json` / `mixed` — how to parse `body`.
pub const SEMANTIC_LOG_BODY_FORMAT: &str = "greptime.semantic.log.body_format";
// ---- Resource / scope preservation keys (populated in Phase 3) ----
/// JSON array string of resource attributes promoted to first-class columns.
pub const SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED: &str =
"greptime.semantic.resource.attributes_preserved";
/// `true` / `false` — whether any resource attribute was dropped at ingest.
pub const SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED: &str =
"greptime.semantic.resource.attributes_dropped";
/// `true` / `false` — whether `scope.name` / `scope.version` survive on the row.
pub const SEMANTIC_SCOPE_PRESERVED: &str = "greptime.semantic.scope.preserved";
// ---- Value constants ----
pub const SIGNAL_TYPE_TRACE: &str = "trace";
@@ -99,6 +83,10 @@ pub const SIGNAL_TYPE_EVENT: &str = "event";
pub const SOURCE_OPENTELEMETRY: &str = "opentelemetry";
pub const SOURCE_PROMETHEUS: &str = "prometheus";
pub const SOURCE_INFLUXDB: &str = "influxdb";
pub const SOURCE_OPENTSDB: &str = "opentsdb";
pub const SOURCE_LOKI: &str = "loki";
pub const SOURCE_ELASTICSEARCH: &str = "elasticsearch";
pub const METADATA_QUALITY_DECLARED: &str = "declared";
pub const METADATA_QUALITY_INFERRED: &str = "inferred";
@@ -115,22 +103,13 @@ pub const SEMANTIC_VALUE_MIXED: &str = "mixed";
pub const SEMANTIC_OPTION_KEYS: &[&str] = &[
SEMANTIC_SIGNAL_TYPE,
SEMANTIC_SOURCE,
SEMANTIC_SOURCE_VERSION,
SEMANTIC_PIPELINE,
SEMANTIC_TRACE_CONVENTIONS,
SEMANTIC_TRACE_HAS_EVENTS,
SEMANTIC_TRACE_HAS_LINKS,
SEMANTIC_METRIC_TYPE,
SEMANTIC_METRIC_UNIT,
SEMANTIC_METRIC_TEMPORALITY,
SEMANTIC_METRIC_MONOTONIC,
SEMANTIC_METRIC_METADATA_QUALITY,
SEMANTIC_METRIC_ORIGINAL_NAME,
SEMANTIC_LOG_SEVERITY_SCHEME,
SEMANTIC_LOG_BODY_FORMAT,
SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED,
SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED,
SEMANTIC_SCOPE_PRESERVED,
];
/// Returns true if `key` is a recognised semantic table-option key (whitelist).
@@ -144,25 +123,25 @@ pub fn is_semantic_option_key(key: &str) -> bool {
/// Validates a `greptime.semantic.*` option's `value` against its allowed domain.
///
/// Open-value keys (unit, original_name, version, pipeline, conventions, the
/// preserved-attributes list) accept any non-empty string. Closed-domain keys
/// accept a fixed set, plus the `unknown` sentinel, plus `mixed` for the keys
/// where one long-lived table can legitimately see multiple values. Keys not in
/// [`SEMANTIC_OPTION_KEYS`] are rejected.
/// Open-value keys (unit, original_name, pipeline, conventions) accept any
/// non-empty string. Closed-domain keys accept a fixed set, plus the `unknown`
/// sentinel, plus `mixed` for the keys where one long-lived table can
/// legitimately see multiple values. Keys not in [`SEMANTIC_OPTION_KEYS`] are
/// rejected.
pub fn validate_semantic_option(key: &str, value: &str) -> bool {
match key {
SEMANTIC_SOURCE_VERSION
| SEMANTIC_PIPELINE
SEMANTIC_PIPELINE
| SEMANTIC_METRIC_UNIT
| SEMANTIC_METRIC_ORIGINAL_NAME
| SEMANTIC_TRACE_CONVENTIONS
| SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED => !value.is_empty(),
| SEMANTIC_TRACE_CONVENTIONS => !value.is_empty(),
SEMANTIC_SIGNAL_TYPE => matches!(value, "trace" | "log" | "metric" | "event" | "unknown"),
SEMANTIC_SOURCE => matches!(
value,
"opentelemetry"
| "prometheus"
| "influxdb"
| "opentsdb"
| "elasticsearch"
| "loki"
| "custom"
@@ -185,14 +164,7 @@ pub fn validate_semantic_option(key: &str, value: &str) -> bool {
SEMANTIC_METRIC_TEMPORALITY => {
matches!(value, "cumulative" | "delta" | "mixed" | "unknown")
}
SEMANTIC_METRIC_MONOTONIC
| SEMANTIC_TRACE_HAS_EVENTS
| SEMANTIC_TRACE_HAS_LINKS
| SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED
| SEMANTIC_SCOPE_PRESERVED => matches!(value, "true" | "false" | "unknown"),
SEMANTIC_METRIC_METADATA_QUALITY => matches!(value, "declared" | "inferred" | "unknown"),
SEMANTIC_LOG_SEVERITY_SCHEME => matches!(value, "otlp" | "syslog" | "custom" | "unknown"),
SEMANTIC_LOG_BODY_FORMAT => matches!(value, "string" | "json" | "mixed" | "unknown"),
_ => false,
}
@@ -206,10 +178,18 @@ mod tests {
fn test_is_semantic_option_key() {
assert!(is_semantic_option_key(SEMANTIC_SIGNAL_TYPE));
assert!(is_semantic_option_key(SEMANTIC_METRIC_TYPE));
assert!(is_semantic_option_key(SEMANTIC_PIPELINE));
// Unknown keys under the prefix are not whitelisted.
assert!(!is_semantic_option_key("greptime.semantic.future.key"));
assert!(!is_semantic_option_key("greptime.semantic.unknown_key"));
// Keys cut from the vocabulary are no longer accepted.
assert!(!is_semantic_option_key(
"greptime.semantic.metric.monotonic"
));
assert!(!is_semantic_option_key(
"greptime.semantic.resource.attributes_dropped"
));
// Near-misses must not match.
assert!(!is_semantic_option_key("greptime.semanticx"));
assert!(!is_semantic_option_key("semantic.signal_type"));
@@ -227,16 +207,23 @@ mod tests {
assert!(validate_semantic_option(SEMANTIC_METRIC_TYPE, "mixed"));
assert!(!validate_semantic_option(SEMANTIC_METRIC_TYPE, "bogus"));
// Booleans, sentinels, open values.
assert!(validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "true"));
assert!(!validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "yes"));
// Sentinels and open values.
assert!(validate_semantic_option(
SEMANTIC_METRIC_TEMPORALITY,
"unknown"
));
assert!(validate_semantic_option(SEMANTIC_METRIC_UNIT, "By"));
assert!(!validate_semantic_option(SEMANTIC_METRIC_UNIT, ""));
assert!(validate_semantic_option(
SEMANTIC_PIPELINE,
"greptime_trace_v1"
));
// A cut key validates to false regardless of value.
assert!(!validate_semantic_option(
"greptime.semantic.metric.monotonic",
"true"
));
// Unknown key is rejected regardless of value.
assert!(!validate_semantic_option(
"greptime.semantic.future.key",
@@ -265,6 +252,10 @@ mod tests {
SEMANTIC_METRIC_METADATA_QUALITY,
METADATA_QUALITY_INFERRED
));
assert!(validate_semantic_option(
SEMANTIC_METRIC_METADATA_QUALITY,
METADATA_QUALITY_DECLARED
));
assert!(validate_semantic_option(
SEMANTIC_TRACE_CONVENTIONS,
SEMANTIC_VALUE_UNKNOWN

View File

@@ -5094,14 +5094,13 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
"[[1]]",
)
.await;
// OTLP metric type is declared, so Phase 1 must not stamp `metadata_quality`
// here (Phase 2 adds it as `declared`).
// OTLP metric type is declared, so Phase 2 stamps `metadata_quality=declared`.
validate_data(
"otlp_metrics_no_metadata_quality",
"otlp_metrics_metadata_quality_declared",
&client,
"select count(*) from information_schema.tables where table_name = 'claude_code_cost_usage_USD_total' \
and create_options like '%metadata_quality%';",
"[[0]]",
and create_options like '%greptime.semantic.metric.metadata_quality=declared%';",
"[[1]]",
)
.await;
@@ -5129,7 +5128,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'declared',\\n 'greptime.semantic.metric.original_name' = 'claude_code.cost.usage',\\n 'greptime.semantic.metric.temporality' = 'delta',\\n 'greptime.semantic.metric.type' = 'counter',\\n 'greptime.semantic.metric.unit' = 'USD',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_all_show_create_table",
&client,
@@ -5202,7 +5201,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'declared',\\n 'greptime.semantic.metric.original_name' = 'claude_code.cost.usage',\\n 'greptime.semantic.metric.temporality' = 'delta',\\n 'greptime.semantic.metric.type' = 'counter',\\n 'greptime.semantic.metric.unit' = 'USD',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_show_create_table",
&client,
@@ -5266,7 +5265,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'declared',\\n 'greptime.semantic.metric.original_name' = 'claude_code.cost.usage',\\n 'greptime.semantic.metric.temporality' = 'delta',\\n 'greptime.semantic.metric.type' = 'counter',\\n 'greptime.semantic.metric.unit' = 'USD',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_show_create_table_none",
&client,
@@ -5581,14 +5580,12 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
"select count(*) from information_schema.tables where table_name = 'mytable' \
and create_options like '%greptime.semantic.signal_type=trace%' \
and create_options like '%greptime.semantic.source=opentelemetry%' \
and create_options like '%greptime.semantic.pipeline=greptime_trace_v1%' \
and create_options like '%greptime.semantic.trace.has_events=true%' \
and create_options like '%greptime.semantic.trace.has_links=true%';",
and create_options like '%greptime.semantic.pipeline=greptime_trace_v1%';",
"[[1]]",
)
.await;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -5641,7 +5638,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -5678,7 +5675,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -5715,7 +5712,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -6546,7 +6543,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]";
validate_data(
"loki_pb_schema",
&client,
@@ -6681,7 +6678,7 @@ processors:
// 'comment' = 'Created on insertion',
// append_mode = 'true'
// )
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]";
validate_data(
"loki_pb_schema",
&client,
@@ -6751,7 +6748,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
@@ -6855,7 +6852,7 @@ processors:
// 'comment' = 'Created on insertion',
// append_mode = 'true'
// )
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.signal_type' = 'log',\\n 'greptime.semantic.source' = 'loki'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
@@ -7824,7 +7821,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
.await;
assert_eq!(StatusCode::OK, res.status());
let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\\n 'greptime.semantic.signal_type' = 'trace',\\n 'greptime.semantic.source' = 'opentelemetry',\\n 'greptime.semantic.trace.conventions' = 'unknown',\\n 'greptime.semantic.trace.has_events' = 'true',\\n 'greptime.semantic.trace.has_links' = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]";
let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\\n 'greptime.semantic.signal_type' = 'trace',\\n 'greptime.semantic.source' = 'opentelemetry',\\n 'greptime.semantic.trace.conventions' = 'https://opentelemetry.io/schemas/1.4.0',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]";
validate_data(
"trace_v1_create_table",
&client,