mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-03 13:50:40 +00:00
feat: table semantic layer identity (Phase 1) (#8210)
* feat: table semantic layer identity (Phase 1) Attach a thin layer of semantic metadata to ingested tables via the existing `table_options` slot, so machine consumers (LLM agents, alert/dashboard builders, MCP servers, ETL) can align a table with the observability concept it stands for without guessing from column names. See docs/rfcs/2026-05-28-table-semantic-layer.md. Phase 1 (identity) only: - New `table::requests::semantic` module: the `greptime.semantic.*` vocabulary (signal/source/source_version/pipeline + trace/metric/log/resource-scope keys, defined now, populated by later phases), value constants, the internal `greptime.internal.semantic.per_table_index` transport key (reserved for Phase 2, deliberately outside the public namespace), and `is_semantic_option_key`. - `validate_table_option` accepts the `greptime.semantic.*` prefix, so the keys are valid both on the auto-create path and on explicit `CREATE TABLE ... WITH (...)`. - `fill_table_options_for_create` copies every semantic ctx extension into the new table's options (prefix passthrough alongside the fixed allowlist). - Frontend stamps identity on the context at each ingest entry: OTLP metrics (metric/opentelemetry), traces (+pipeline, has_events/has_links/conventions for the v1 model), logs (log/opentelemetry), and Prometheus remote write (metric/prometheus, metadata_quality=inferred). OTLP metric metadata_quality is left for Phase 2 (declared). - Trace identity is stamped only on the main span table; the derived `_services` / `_operations` lookup tables keep the unstamped context and carry no semantic identity (cross-table relationships are out of scope). Semantic options appear in SHOW CREATE TABLE (like table_data_model / otlp_metric_compat) and in information_schema, so an LLM inspecting a table sees its semantics directly. Tests: unit (validation prefix + internal-key rejection, ctx passthrough) and integration assertions that the common keys land for OTLP metrics (metric-engine logical table), traces, logs, and Prometheus remote write; SHOW CREATE goldens updated. Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * fix: prom batcher not cover and white list for semantic keys/values Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * fix: typo Signed-off-by: Dennis Zhuang <killme2008@gmail.com> --------- Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
@@ -43,7 +43,12 @@ use servers::query_handler::{
|
||||
};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{IntoError, ResultExt};
|
||||
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
|
||||
use table::requests::{
|
||||
OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM, SEMANTIC_PIPELINE, SEMANTIC_SIGNAL_TYPE,
|
||||
SEMANTIC_SOURCE, SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_TRACE_HAS_EVENTS,
|
||||
SEMANTIC_TRACE_HAS_LINKS, SEMANTIC_VALUE_UNKNOWN, SIGNAL_TYPE_LOG, SIGNAL_TYPE_METRIC,
|
||||
SIGNAL_TYPE_TRACE, SOURCE_OPENTELEMETRY, TABLE_DATA_MODEL_TRACE_V1,
|
||||
};
|
||||
|
||||
use crate::instance::Instance;
|
||||
use crate::instance::otlp::trace_semconv::trace_semconv_fixed_type;
|
||||
@@ -131,12 +136,14 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request, &mut metric_ctx)?;
|
||||
OTLP_METRICS_ROWS.inc_by(rows as u64);
|
||||
|
||||
let ctx = if !is_legacy {
|
||||
let ctx = {
|
||||
let mut c = (*ctx).clone();
|
||||
c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
|
||||
c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
|
||||
c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
|
||||
if !is_legacy {
|
||||
c.set_extension(OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM.to_string());
|
||||
}
|
||||
Arc::new(c)
|
||||
} else {
|
||||
ctx
|
||||
};
|
||||
|
||||
// If the user uses the legacy path, it is by default without metric engine.
|
||||
@@ -211,6 +218,15 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
|
||||
interceptor_ref.pre_execute(ctx.clone())?;
|
||||
|
||||
// `as_req_iter` clones this ctx into each `temp_ctx`, so identity set here
|
||||
// reaches the context that drives table auto-create.
|
||||
let ctx = {
|
||||
let mut c = (*ctx).clone();
|
||||
c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG);
|
||||
c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
|
||||
Arc::new(c)
|
||||
};
|
||||
|
||||
let opt_req = otlp::logs::to_grpc_insert_requests(
|
||||
request,
|
||||
pipeline,
|
||||
@@ -256,6 +272,23 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<TraceIngestOutcome> {
|
||||
let is_trace_v1_model = matches!(pipeline, PipelineWay::OtlpTraceDirectV1);
|
||||
|
||||
// Only the main span table gets the identity; the derived `_services` /
|
||||
// `_operations` lookup tables keep the unstamped `ctx`.
|
||||
let main_ctx = {
|
||||
let mut c = (*ctx).clone();
|
||||
c.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_TRACE);
|
||||
c.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
|
||||
if is_trace_v1_model {
|
||||
c.set_extension(SEMANTIC_PIPELINE, TABLE_DATA_MODEL_TRACE_V1);
|
||||
c.set_extension(SEMANTIC_TRACE_HAS_EVENTS, "true");
|
||||
c.set_extension(SEMANTIC_TRACE_HAS_LINKS, "true");
|
||||
// schema_url is row-level, so conventions is unknown at table level.
|
||||
c.set_extension(SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_VALUE_UNKNOWN);
|
||||
}
|
||||
Arc::new(c)
|
||||
};
|
||||
|
||||
let ingest_ctx = TraceChunkIngestContext {
|
||||
pipeline_handler,
|
||||
pipeline,
|
||||
@@ -278,7 +311,7 @@ impl Instance {
|
||||
.map(|chunk| chunk.collect::<Vec<_>>())
|
||||
.collect::<Vec<_>>();
|
||||
for chunk in chunks {
|
||||
self.ingest_trace_chunk(&ingest_ctx, chunk, ctx.clone(), &mut ingest_state)
|
||||
self.ingest_trace_chunk(&ingest_ctx, chunk, main_ctx.clone(), &mut ingest_state)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ use table::metadata::TableInfo;
|
||||
use table::requests::{
|
||||
AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
|
||||
TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS,
|
||||
is_semantic_option_key,
|
||||
};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
@@ -1089,6 +1090,13 @@ pub fn fill_table_options_for_create(
|
||||
}
|
||||
}
|
||||
|
||||
// Semantic keys are prefix-matched, not in the fixed allowlist above.
|
||||
for (key, value) in ctx.extensions() {
|
||||
if is_semantic_option_key(&key) {
|
||||
table_options.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(physical_table) => {
|
||||
table_options.insert(
|
||||
@@ -1391,6 +1399,34 @@ mod tests {
|
||||
assert!(!table_options.contains_key(APPEND_MODE_KEY));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fill_table_options_copies_semantic_extensions() {
|
||||
use table::requests::{
|
||||
SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE,
|
||||
SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY,
|
||||
};
|
||||
|
||||
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
|
||||
ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
|
||||
ctx.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
|
||||
// The internal transport key must NOT be copied into table options.
|
||||
ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, "{}");
|
||||
let ctx = Arc::new(ctx);
|
||||
let mut table_options = Default::default();
|
||||
|
||||
fill_table_options_for_create(&mut table_options, &AutoCreateTableType::Physical, &ctx);
|
||||
|
||||
assert_eq!(
|
||||
Some(SIGNAL_TYPE_METRIC),
|
||||
table_options.get(SEMANTIC_SIGNAL_TYPE).map(String::as_str)
|
||||
);
|
||||
assert_eq!(
|
||||
Some(SOURCE_OPENTELEMETRY),
|
||||
table_options.get(SEMANTIC_SOURCE).map(String::as_str)
|
||||
);
|
||||
assert!(!table_options.contains_key(SEMANTIC_PER_TABLE_INDEX_KEY));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_non_null_create_options_preserve_default_with_append_mode_false() {
|
||||
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
|
||||
|
||||
@@ -31,6 +31,10 @@ use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::{Channel, QueryContext};
|
||||
use snafu::prelude::*;
|
||||
use table::requests::{
|
||||
METADATA_QUALITY_INFERRED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_SIGNAL_TYPE,
|
||||
SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS,
|
||||
};
|
||||
|
||||
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
|
||||
use crate::http::extractor::PipelineInfo;
|
||||
@@ -108,6 +112,13 @@ pub async fn remote_write(
|
||||
.clone()
|
||||
.unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string());
|
||||
query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table.clone());
|
||||
// Stamp the Prometheus metric identity here, before `as_req_iter` splits into the
|
||||
// batched and direct write paths, so both inherit it (the batched path bypasses
|
||||
// `PromStoreProtocolHandler::write`). Prom RW v1 metadata is weak, so the type is
|
||||
// inferred from naming.
|
||||
query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
|
||||
query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS);
|
||||
query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
|
||||
.with_label_values(&[db.as_str()])
|
||||
|
||||
@@ -40,7 +40,7 @@ use snafu::{ResultExt, ensure};
|
||||
use sqlparser::dialect::Dialect;
|
||||
use sqlparser::keywords::Keyword;
|
||||
use sqlparser::parser::Parser;
|
||||
use table::requests::validate_table_option;
|
||||
use table::requests::{SEMANTIC_PREFIX, validate_semantic_option, validate_table_option};
|
||||
|
||||
use crate::error::{
|
||||
ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, InvalidTableOptionSnafu, ParseSqlValueSnafu,
|
||||
@@ -395,8 +395,18 @@ pub fn parse_with_options(parser: &mut Parser) -> Result<OptionMap> {
|
||||
.into_iter()
|
||||
.map(parse_option_string)
|
||||
.collect::<Result<HashMap<String, OptionValue>>>()?;
|
||||
for key in options.keys() {
|
||||
ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
|
||||
for (key, value) in &options {
|
||||
if key.starts_with(SEMANTIC_PREFIX) {
|
||||
// Semantic keys are whitelisted and value-checked against their domain,
|
||||
// so a user cannot set an unknown key or an out-of-range value.
|
||||
let value = value.as_string().unwrap_or_default();
|
||||
ensure!(
|
||||
validate_semantic_option(key, value),
|
||||
InvalidTableOptionSnafu { key }
|
||||
);
|
||||
} else {
|
||||
ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
|
||||
}
|
||||
}
|
||||
Ok(OptionMap::new(options))
|
||||
}
|
||||
|
||||
@@ -868,7 +868,25 @@ ENGINE=mito
|
||||
";
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
|
||||
assert_matches!(result, Err(Error::InvalidTableOption { .. }))
|
||||
assert_matches!(result, Err(Error::InvalidTableOption { .. }));
|
||||
|
||||
// A whitelisted semantic key with an in-domain value is accepted.
|
||||
let semantic = |with: &str| {
|
||||
let sql =
|
||||
format!("create table demo(host string, ts timestamp time index) with({with});");
|
||||
ParserContext::create_with_dialect(&sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
};
|
||||
assert!(semantic("'greptime.semantic.signal_type'='metric'").is_ok());
|
||||
// An out-of-domain value is rejected.
|
||||
assert_matches!(
|
||||
semantic("'greptime.semantic.signal_type'='spans'"),
|
||||
Err(Error::InvalidTableOption { .. })
|
||||
);
|
||||
// An unknown key under the semantic prefix is rejected.
|
||||
assert_matches!(
|
||||
semantic("'greptime.semantic.bogus'='x'"),
|
||||
Err(Error::InvalidTableOption { .. })
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -48,6 +48,9 @@ use crate::error::{ParseTableOptionSnafu, Result};
|
||||
use crate::metadata::{TableId, TableVersion};
|
||||
use crate::table_reference::TableReference;
|
||||
|
||||
mod semantic;
|
||||
pub use semantic::*;
|
||||
|
||||
pub const FILE_TABLE_META_KEY: &str = "__private.file_table_meta";
|
||||
pub const FILE_TABLE_LOCATION_KEY: &str = "location";
|
||||
pub const FILE_TABLE_PATTERN_KEY: &str = "pattern";
|
||||
@@ -129,6 +132,12 @@ pub fn validate_table_option(key: &str) -> bool {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Semantic-layer keys share a reserved prefix instead of a fixed allowlist so
|
||||
// the vocabulary can grow without touching this gate. See `semantic` module.
|
||||
if is_semantic_option_key(key) {
|
||||
return true;
|
||||
}
|
||||
|
||||
VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
|
||||
}
|
||||
|
||||
@@ -490,6 +499,14 @@ mod tests {
|
||||
assert!(validate_table_option(STORAGE_KEY));
|
||||
assert!(validate_table_option(MEMTABLE_BULK_MERGE_THRESHOLD));
|
||||
assert!(!validate_table_option("foo"));
|
||||
|
||||
// Only whitelisted semantic keys are accepted.
|
||||
assert!(validate_table_option(SEMANTIC_SIGNAL_TYPE));
|
||||
assert!(validate_table_option(SEMANTIC_METRIC_TYPE));
|
||||
// Unknown semantic key, near-miss, and the internal transport key are rejected.
|
||||
assert!(!validate_table_option("greptime.semantic.future.key"));
|
||||
assert!(!validate_table_option("greptime.semanticx"));
|
||||
assert!(!validate_table_option(SEMANTIC_PER_TABLE_INDEX_KEY));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
280
src/table/src/requests/semantic.rs
Normal file
280
src/table/src/requests/semantic.rs
Normal file
@@ -0,0 +1,280 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Table semantic layer vocabulary.
|
||||
//!
|
||||
//! A thin layer of semantic metadata attached to a table via `table_options`, so
|
||||
//! machine consumers (LLM agents, alert/dashboard builders, MCP servers, ETL) can
|
||||
//! align a table with the observability concept it stands for without guessing
|
||||
//! from column names. See `docs/rfcs/2026-05-28-table-semantic-layer.md`.
|
||||
//!
|
||||
//! All public table-option keys share the [`SEMANTIC_PREFIX`] namespace and are
|
||||
//! string-valued. [`is_semantic_option_key`] gates them through
|
||||
//! [`crate::requests::validate_table_option`], so they are accepted both on the
|
||||
//! ingestion auto-create path and on explicit `CREATE TABLE ... WITH (...)` DDL.
|
||||
|
||||
/// Reserved prefix for every public semantic table-option key.
|
||||
pub const SEMANTIC_PREFIX: &str = "greptime.semantic.";
|
||||
|
||||
/// Internal `QueryContext` extension key carrying the per-table semantic index
|
||||
/// (a `{table_name -> {semantic_key: value}}` JSON blob) from the ingestion
|
||||
/// encode path to the auto-create site. Deliberately OUTSIDE [`SEMANTIC_PREFIX`]
|
||||
/// so it is not a valid table option and never leaks into a table's options.
|
||||
pub const SEMANTIC_PER_TABLE_INDEX_KEY: &str = "greptime.internal.semantic.per_table_index";
|
||||
|
||||
// ---- Common keys (all signals) ----
|
||||
|
||||
/// Signal kind: one of [`SIGNAL_TYPE_TRACE`] / [`SIGNAL_TYPE_LOG`] /
|
||||
/// [`SIGNAL_TYPE_METRIC`] / [`SIGNAL_TYPE_EVENT`].
|
||||
pub const SEMANTIC_SIGNAL_TYPE: &str = "greptime.semantic.signal_type";
|
||||
/// Ingestion ecosystem, e.g. [`SOURCE_OPENTELEMETRY`] / [`SOURCE_PROMETHEUS`].
|
||||
pub const SEMANTIC_SOURCE: &str = "greptime.semantic.source";
|
||||
/// Optional protocol or SDK version string, e.g. `v2` (Prom remote write), `1.30.0`.
|
||||
pub const SEMANTIC_SOURCE_VERSION: &str = "greptime.semantic.source_version";
|
||||
/// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`.
|
||||
pub const SEMANTIC_PIPELINE: &str = "greptime.semantic.pipeline";
|
||||
|
||||
// ---- Trace keys ----
|
||||
|
||||
/// Semantic-conventions version the rows conform to (e.g. `otel-semconv-1.27`),
|
||||
/// or [`SEMANTIC_VALUE_UNKNOWN`] / [`SEMANTIC_VALUE_MIXED`] when not single-valued.
|
||||
pub const SEMANTIC_TRACE_CONVENTIONS: &str = "greptime.semantic.trace.conventions";
|
||||
/// Whether `span_events` are preserved on the table.
|
||||
pub const SEMANTIC_TRACE_HAS_EVENTS: &str = "greptime.semantic.trace.has_events";
|
||||
/// Whether `span_links` are preserved on the table.
|
||||
pub const SEMANTIC_TRACE_HAS_LINKS: &str = "greptime.semantic.trace.has_links";
|
||||
|
||||
// ---- Metric keys (populated in Phase 2) ----
|
||||
|
||||
/// Instrument kind: `counter` / `gauge` / `histogram` / `summary` /
|
||||
/// `updown_counter` / `gauge_histogram` / `info` / `stateset`.
|
||||
pub const SEMANTIC_METRIC_TYPE: &str = "greptime.semantic.metric.type";
|
||||
/// UCUM unit, e.g. `s`, `By`, `{request}`.
|
||||
pub const SEMANTIC_METRIC_UNIT: &str = "greptime.semantic.metric.unit";
|
||||
/// `cumulative` / `delta` (OTel only).
|
||||
pub const SEMANTIC_METRIC_TEMPORALITY: &str = "greptime.semantic.metric.temporality";
|
||||
/// `true` / `false` for sum / counter typed data.
|
||||
pub const SEMANTIC_METRIC_MONOTONIC: &str = "greptime.semantic.metric.monotonic";
|
||||
/// [`METADATA_QUALITY_DECLARED`] when the protocol stated the type, or
|
||||
/// [`METADATA_QUALITY_INFERRED`] when guessed from a name suffix.
|
||||
pub const SEMANTIC_METRIC_METADATA_QUALITY: &str = "greptime.semantic.metric.metadata_quality";
|
||||
/// Pre-translation OTel metric name when the table name was Prometheus-ised.
|
||||
pub const SEMANTIC_METRIC_ORIGINAL_NAME: &str = "greptime.semantic.metric.original_name";
|
||||
|
||||
// ---- Log keys (populated in Phase 3) ----
|
||||
|
||||
/// `otlp` / `syslog` / `custom` — which mapping to use for `severity_number`.
|
||||
pub const SEMANTIC_LOG_SEVERITY_SCHEME: &str = "greptime.semantic.log.severity_scheme";
|
||||
/// `string` / `json` / `mixed` — how to parse `body`.
|
||||
pub const SEMANTIC_LOG_BODY_FORMAT: &str = "greptime.semantic.log.body_format";
|
||||
|
||||
// ---- Resource / scope preservation keys (populated in Phase 3) ----
|
||||
|
||||
/// JSON array string of resource attributes promoted to first-class columns.
|
||||
pub const SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED: &str =
|
||||
"greptime.semantic.resource.attributes_preserved";
|
||||
/// `true` / `false` — whether any resource attribute was dropped at ingest.
|
||||
pub const SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED: &str =
|
||||
"greptime.semantic.resource.attributes_dropped";
|
||||
/// `true` / `false` — whether `scope.name` / `scope.version` survive on the row.
|
||||
pub const SEMANTIC_SCOPE_PRESERVED: &str = "greptime.semantic.scope.preserved";
|
||||
|
||||
// ---- Value constants ----
|
||||
|
||||
pub const SIGNAL_TYPE_TRACE: &str = "trace";
|
||||
pub const SIGNAL_TYPE_LOG: &str = "log";
|
||||
pub const SIGNAL_TYPE_METRIC: &str = "metric";
|
||||
pub const SIGNAL_TYPE_EVENT: &str = "event";
|
||||
|
||||
pub const SOURCE_OPENTELEMETRY: &str = "opentelemetry";
|
||||
pub const SOURCE_PROMETHEUS: &str = "prometheus";
|
||||
|
||||
pub const METADATA_QUALITY_DECLARED: &str = "declared";
|
||||
pub const METADATA_QUALITY_INFERRED: &str = "inferred";
|
||||
|
||||
/// Sentinel for a key that cannot be determined at stamp time.
|
||||
pub const SEMANTIC_VALUE_UNKNOWN: &str = "unknown";
|
||||
/// Sentinel for a single-valued key that saw conflicting sources.
|
||||
pub const SEMANTIC_VALUE_MIXED: &str = "mixed";
|
||||
|
||||
/// Every recognised public semantic table-option key. The set is a closed
|
||||
/// whitelist: keys under [`SEMANTIC_PREFIX`] that are not listed here are rejected,
|
||||
/// so an unknown key like `greptime.semantic.unknown_key` does not silently land
|
||||
/// in a table's options. Adding a key to the vocabulary means adding it here.
|
||||
pub const SEMANTIC_OPTION_KEYS: &[&str] = &[
|
||||
SEMANTIC_SIGNAL_TYPE,
|
||||
SEMANTIC_SOURCE,
|
||||
SEMANTIC_SOURCE_VERSION,
|
||||
SEMANTIC_PIPELINE,
|
||||
SEMANTIC_TRACE_CONVENTIONS,
|
||||
SEMANTIC_TRACE_HAS_EVENTS,
|
||||
SEMANTIC_TRACE_HAS_LINKS,
|
||||
SEMANTIC_METRIC_TYPE,
|
||||
SEMANTIC_METRIC_UNIT,
|
||||
SEMANTIC_METRIC_TEMPORALITY,
|
||||
SEMANTIC_METRIC_MONOTONIC,
|
||||
SEMANTIC_METRIC_METADATA_QUALITY,
|
||||
SEMANTIC_METRIC_ORIGINAL_NAME,
|
||||
SEMANTIC_LOG_SEVERITY_SCHEME,
|
||||
SEMANTIC_LOG_BODY_FORMAT,
|
||||
SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED,
|
||||
SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED,
|
||||
SEMANTIC_SCOPE_PRESERVED,
|
||||
];
|
||||
|
||||
/// Returns true if `key` is a recognised semantic table-option key (whitelist).
|
||||
///
|
||||
/// Note this is membership, not a prefix test: unknown keys under
|
||||
/// [`SEMANTIC_PREFIX`] are rejected, and the internal
|
||||
/// [`SEMANTIC_PER_TABLE_INDEX_KEY`] (outside the prefix) never matches.
|
||||
pub fn is_semantic_option_key(key: &str) -> bool {
|
||||
SEMANTIC_OPTION_KEYS.contains(&key)
|
||||
}
|
||||
|
||||
/// Validates a `greptime.semantic.*` option's `value` against its allowed domain.
|
||||
///
|
||||
/// Open-value keys (unit, original_name, version, pipeline, conventions, the
|
||||
/// preserved-attributes list) accept any non-empty string. Closed-domain keys
|
||||
/// accept a fixed set, plus the `unknown` sentinel, plus `mixed` for the keys
|
||||
/// where one long-lived table can legitimately see multiple values. Keys not in
|
||||
/// [`SEMANTIC_OPTION_KEYS`] are rejected.
|
||||
pub fn validate_semantic_option(key: &str, value: &str) -> bool {
|
||||
match key {
|
||||
SEMANTIC_SOURCE_VERSION
|
||||
| SEMANTIC_PIPELINE
|
||||
| SEMANTIC_METRIC_UNIT
|
||||
| SEMANTIC_METRIC_ORIGINAL_NAME
|
||||
| SEMANTIC_TRACE_CONVENTIONS
|
||||
| SEMANTIC_RESOURCE_ATTRIBUTES_PRESERVED => !value.is_empty(),
|
||||
|
||||
SEMANTIC_SIGNAL_TYPE => matches!(value, "trace" | "log" | "metric" | "event" | "unknown"),
|
||||
SEMANTIC_SOURCE => matches!(
|
||||
value,
|
||||
"opentelemetry"
|
||||
| "prometheus"
|
||||
| "elasticsearch"
|
||||
| "loki"
|
||||
| "custom"
|
||||
| "mixed"
|
||||
| "unknown"
|
||||
),
|
||||
SEMANTIC_METRIC_TYPE => matches!(
|
||||
value,
|
||||
"counter"
|
||||
| "gauge"
|
||||
| "histogram"
|
||||
| "summary"
|
||||
| "updown_counter"
|
||||
| "gauge_histogram"
|
||||
| "info"
|
||||
| "stateset"
|
||||
| "mixed"
|
||||
| "unknown"
|
||||
),
|
||||
SEMANTIC_METRIC_TEMPORALITY => {
|
||||
matches!(value, "cumulative" | "delta" | "mixed" | "unknown")
|
||||
}
|
||||
SEMANTIC_METRIC_MONOTONIC
|
||||
| SEMANTIC_TRACE_HAS_EVENTS
|
||||
| SEMANTIC_TRACE_HAS_LINKS
|
||||
| SEMANTIC_RESOURCE_ATTRIBUTES_DROPPED
|
||||
| SEMANTIC_SCOPE_PRESERVED => matches!(value, "true" | "false" | "unknown"),
|
||||
SEMANTIC_METRIC_METADATA_QUALITY => matches!(value, "declared" | "inferred" | "unknown"),
|
||||
SEMANTIC_LOG_SEVERITY_SCHEME => matches!(value, "otlp" | "syslog" | "custom" | "unknown"),
|
||||
SEMANTIC_LOG_BODY_FORMAT => matches!(value, "string" | "json" | "mixed" | "unknown"),
|
||||
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_is_semantic_option_key() {
|
||||
assert!(is_semantic_option_key(SEMANTIC_SIGNAL_TYPE));
|
||||
assert!(is_semantic_option_key(SEMANTIC_METRIC_TYPE));
|
||||
|
||||
// Unknown keys under the prefix are not whitelisted.
|
||||
assert!(!is_semantic_option_key("greptime.semantic.future.key"));
|
||||
assert!(!is_semantic_option_key("greptime.semantic.unknown_key"));
|
||||
// Near-misses must not match.
|
||||
assert!(!is_semantic_option_key("greptime.semanticx"));
|
||||
assert!(!is_semantic_option_key("semantic.signal_type"));
|
||||
assert!(!is_semantic_option_key("table_data_model"));
|
||||
// The internal transport key must never be treated as a table option.
|
||||
assert!(!is_semantic_option_key(SEMANTIC_PER_TABLE_INDEX_KEY));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_semantic_option() {
|
||||
// Enum keys reject out-of-domain values.
|
||||
assert!(validate_semantic_option(SEMANTIC_SIGNAL_TYPE, "metric"));
|
||||
assert!(!validate_semantic_option(SEMANTIC_SIGNAL_TYPE, "spans"));
|
||||
assert!(validate_semantic_option(SEMANTIC_METRIC_TYPE, "counter"));
|
||||
assert!(validate_semantic_option(SEMANTIC_METRIC_TYPE, "mixed"));
|
||||
assert!(!validate_semantic_option(SEMANTIC_METRIC_TYPE, "bogus"));
|
||||
|
||||
// Booleans, sentinels, open values.
|
||||
assert!(validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "true"));
|
||||
assert!(!validate_semantic_option(SEMANTIC_TRACE_HAS_EVENTS, "yes"));
|
||||
assert!(validate_semantic_option(
|
||||
SEMANTIC_METRIC_TEMPORALITY,
|
||||
"unknown"
|
||||
));
|
||||
assert!(validate_semantic_option(SEMANTIC_METRIC_UNIT, "By"));
|
||||
assert!(!validate_semantic_option(SEMANTIC_METRIC_UNIT, ""));
|
||||
|
||||
// Unknown key is rejected regardless of value.
|
||||
assert!(!validate_semantic_option(
|
||||
"greptime.semantic.future.key",
|
||||
"x"
|
||||
));
|
||||
|
||||
// Drift guard: every value stamped by the ingestion path must validate.
|
||||
assert!(validate_semantic_option(
|
||||
SEMANTIC_SIGNAL_TYPE,
|
||||
SIGNAL_TYPE_TRACE
|
||||
));
|
||||
assert!(validate_semantic_option(
|
||||
SEMANTIC_SIGNAL_TYPE,
|
||||
SIGNAL_TYPE_METRIC
|
||||
));
|
||||
assert!(validate_semantic_option(
|
||||
SEMANTIC_SIGNAL_TYPE,
|
||||
SIGNAL_TYPE_LOG
|
||||
));
|
||||
assert!(validate_semantic_option(
|
||||
SEMANTIC_SOURCE,
|
||||
SOURCE_OPENTELEMETRY
|
||||
));
|
||||
assert!(validate_semantic_option(SEMANTIC_SOURCE, SOURCE_PROMETHEUS));
|
||||
assert!(validate_semantic_option(
|
||||
SEMANTIC_METRIC_METADATA_QUALITY,
|
||||
METADATA_QUALITY_INFERRED
|
||||
));
|
||||
assert!(validate_semantic_option(
|
||||
SEMANTIC_TRACE_CONVENTIONS,
|
||||
SEMANTIC_VALUE_UNKNOWN
|
||||
));
|
||||
// An empty value never validates, for any whitelisted key.
|
||||
for key in SEMANTIC_OPTION_KEYS {
|
||||
assert!(
|
||||
!validate_semantic_option(key, ""),
|
||||
"empty value should never validate for {key}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ use std::env;
|
||||
use std::fmt::Display;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use auth::{DefaultPermissionChecker, PermissionCheckerRef, UserProviderRef};
|
||||
use axum::Router;
|
||||
@@ -49,6 +50,7 @@ use servers::http::{HttpOptions, HttpServerBuilder};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||
use servers::otel_arrow::OtelArrowServiceHandler;
|
||||
use servers::pending_rows_batcher::PendingRowsBatcher;
|
||||
use servers::postgres::PostgresServer;
|
||||
use servers::prom_remote_write::validation::PromValidationMode;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
@@ -564,6 +566,24 @@ async fn run_sql(sql: &str, instance: &GreptimeDbStandalone) {
|
||||
pub async fn setup_test_prom_app_with_frontend(
|
||||
store_type: StorageType,
|
||||
name: &str,
|
||||
) -> (Router, TestGuard) {
|
||||
setup_test_prom_app_with_frontend_inner(store_type, name, false).await
|
||||
}
|
||||
|
||||
/// Like [`setup_test_prom_app_with_frontend`] but enables the pending-rows batcher,
|
||||
/// so Prometheus remote write goes through the batched (metric-engine) path instead
|
||||
/// of the direct `PromStoreProtocolHandler::write` path.
|
||||
pub async fn setup_test_prom_app_with_frontend_batched(
|
||||
store_type: StorageType,
|
||||
name: &str,
|
||||
) -> (Router, TestGuard) {
|
||||
setup_test_prom_app_with_frontend_inner(store_type, name, true).await
|
||||
}
|
||||
|
||||
async fn setup_test_prom_app_with_frontend_inner(
|
||||
store_type: StorageType,
|
||||
name: &str,
|
||||
enable_batcher: bool,
|
||||
) -> (Router, TestGuard) {
|
||||
unsafe {
|
||||
std::env::set_var("TZ", "UTC");
|
||||
@@ -617,6 +637,24 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
..Default::default()
|
||||
};
|
||||
let frontend_ref = instance.fe_instance().clone();
|
||||
// Mirror the production wiring at `frontend::server`: build the batcher from the
|
||||
// instance's managers. A short flush interval keeps the test responsive.
|
||||
let pending_rows_batcher = if enable_batcher {
|
||||
PendingRowsBatcher::try_new(
|
||||
frontend_ref.partition_manager().clone(),
|
||||
frontend_ref.node_manager().clone(),
|
||||
frontend_ref.catalog_manager().clone(),
|
||||
true,
|
||||
frontend_ref.clone(),
|
||||
Duration::from_millis(50),
|
||||
1000,
|
||||
4,
|
||||
64,
|
||||
64,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let http_server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(frontend_ref.clone())
|
||||
.with_logs_handler(instance.fe_instance().clone())
|
||||
@@ -625,7 +663,7 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
Some(frontend_ref.clone()),
|
||||
true,
|
||||
PromValidationMode::Strict,
|
||||
None,
|
||||
pending_rows_batcher,
|
||||
)
|
||||
.with_prometheus_handler(frontend_ref)
|
||||
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
|
||||
|
||||
@@ -71,6 +71,7 @@ use tests_integration::test_util::{
|
||||
StorageType, setup_test_http_app, setup_test_http_app_with_frontend,
|
||||
setup_test_http_app_with_frontend_and_slow_query_threshold,
|
||||
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
|
||||
setup_test_prom_app_with_frontend_batched,
|
||||
};
|
||||
use urlencoding::encode;
|
||||
use yaml_rust::YamlLoader;
|
||||
@@ -117,6 +118,7 @@ macro_rules! http_tests {
|
||||
test_dashboard_path,
|
||||
test_dashboard_api,
|
||||
test_prometheus_remote_write,
|
||||
test_prometheus_remote_write_batched,
|
||||
test_prometheus_remote_special_labels,
|
||||
test_prometheus_remote_schema_labels,
|
||||
test_prometheus_remote_write_with_pipeline,
|
||||
@@ -1956,6 +1958,18 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Prom RW tables carry the metric identity; type is inferred from naming.
|
||||
validate_data(
|
||||
"prometheus_remote_write_semantic_identity",
|
||||
&client,
|
||||
"select count(*) from information_schema.tables where table_name = 'metric2' \
|
||||
and create_options like '%greptime.semantic.signal_type=metric%' \
|
||||
and create_options like '%greptime.semantic.source=prometheus%' \
|
||||
and create_options like '%greptime.semantic.metric.metadata_quality=inferred%';",
|
||||
"[[1]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
// Write snappy encoded data with new labels
|
||||
let write_request = WriteRequest {
|
||||
timeseries: mock_timeseries_new_label(),
|
||||
@@ -1977,6 +1991,48 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
/// Covers the batched (pending-rows-batcher) Prometheus remote write path, which
|
||||
/// bypasses `PromStoreProtocolHandler::write`. Verifies the metric table is created
|
||||
/// asynchronously and still carries the Prometheus semantic identity stamped on the
|
||||
/// shared request context.
|
||||
pub async fn test_prometheus_remote_write_batched(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_prom_app_with_frontend_batched(store_type, "prometheus_remote_write_batched")
|
||||
.await;
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
let write_request = WriteRequest {
|
||||
timeseries: prom_store::mock_timeseries(),
|
||||
..Default::default()
|
||||
};
|
||||
let serialized_request = write_request.encode_to_vec();
|
||||
let compressed_request =
|
||||
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
|
||||
|
||||
let res = client
|
||||
.post("/v1/prometheus/write")
|
||||
.header("Content-Encoding", "snappy")
|
||||
.body(compressed_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// The batcher flushes asynchronously, so poll until the table exists and carries
|
||||
// the semantic identity (signal_type/source/metadata_quality).
|
||||
wait_for_data(
|
||||
&client,
|
||||
"select count(*) from information_schema.tables where table_name = 'metric2' \
|
||||
and create_options like '%greptime.semantic.signal_type=metric%' \
|
||||
and create_options like '%greptime.semantic.source=prometheus%' \
|
||||
and create_options like '%greptime.semantic.metric.metadata_quality=inferred%'",
|
||||
"[[1]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
@@ -2025,7 +2081,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f1'\\n)\"]]";
|
||||
let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f1'\\n)\"]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc3_show_create_table",
|
||||
&client,
|
||||
@@ -2051,7 +2107,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f2'\\n)\"]]";
|
||||
let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f2'\\n)\"]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc4_show_create_table",
|
||||
&client,
|
||||
@@ -5027,6 +5083,28 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
||||
let expected = "[[\"claude_code_cost_usage_USD_total\"],[\"claude_code_token_usage_tokens_total\"],[\"demo\"],[\"greptime_physical_table\"],[\"numbers\"]]";
|
||||
validate_data("otlp_metrics_all_tables", &client, "show tables;", expected).await;
|
||||
|
||||
// Metric-engine logical table carries the semantic identity. Match substrings
|
||||
// because extra_options ordering is not stable.
|
||||
validate_data(
|
||||
"otlp_metrics_semantic_identity",
|
||||
&client,
|
||||
"select count(*) from information_schema.tables where table_name = 'claude_code_cost_usage_USD_total' \
|
||||
and create_options like '%greptime.semantic.signal_type=metric%' \
|
||||
and create_options like '%greptime.semantic.source=opentelemetry%';",
|
||||
"[[1]]",
|
||||
)
|
||||
.await;
|
||||
// OTLP metric type is declared, so Phase 1 must not stamp `metadata_quality`
|
||||
// here (Phase 2 adds it as `declared`).
|
||||
validate_data(
|
||||
"otlp_metrics_no_metadata_quality",
|
||||
&client,
|
||||
"select count(*) from information_schema.tables where table_name = 'claude_code_cost_usage_USD_total' \
|
||||
and create_options like '%metadata_quality%';",
|
||||
"[[0]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
// CREATE TABLE IF NOT EXISTS "claude_code_cost_usage_USD_total" (
|
||||
// "greptime_timestamp" TIMESTAMP(3) NOT NULL,
|
||||
// "greptime_value" DOUBLE NULL,
|
||||
@@ -5051,7 +5129,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
||||
// on_physical_table = 'greptime_physical_table',
|
||||
// otlp_metric_compat = 'prom'
|
||||
// )
|
||||
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
|
||||
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
|
||||
validate_data(
|
||||
"otlp_metrics_all_show_create_table",
|
||||
&client,
|
||||
@@ -5124,7 +5202,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
||||
// on_physical_table = 'greptime_physical_table',
|
||||
// otlp_metric_compat = 'prom'
|
||||
// )
|
||||
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
|
||||
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
|
||||
validate_data(
|
||||
"otlp_metrics_show_create_table",
|
||||
&client,
|
||||
@@ -5188,7 +5266,7 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
|
||||
// on_physical_table = 'greptime_physical_table',
|
||||
// otlp_metric_compat = 'prom'
|
||||
// )
|
||||
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
|
||||
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'opentelemetry',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
|
||||
validate_data(
|
||||
"otlp_metrics_show_create_table_none",
|
||||
&client,
|
||||
@@ -5495,7 +5573,22 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#;
|
||||
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
|
||||
|
||||
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
// The trace v1 main table carries the trace identity (events/links preserved as
|
||||
// JSON columns by the v1 model).
|
||||
validate_data(
|
||||
"otlp_traces_semantic_identity",
|
||||
&client,
|
||||
"select count(*) from information_schema.tables where table_name = 'mytable' \
|
||||
and create_options like '%greptime.semantic.signal_type=trace%' \
|
||||
and create_options like '%greptime.semantic.source=opentelemetry%' \
|
||||
and create_options like '%greptime.semantic.pipeline=greptime_trace_v1%' \
|
||||
and create_options like '%greptime.semantic.trace.has_events=true%' \
|
||||
and create_options like '%greptime.semantic.trace.has_links=true%';",
|
||||
"[[1]]",
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
@@ -5548,7 +5641,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
@@ -5585,7 +5678,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
@@ -5622,7 +5715,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
)
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\n 'greptime.semantic.signal_type' = 'trace',\n 'greptime.semantic.source' = 'opentelemetry',\n 'greptime.semantic.trace.conventions' = 'unknown',\n 'greptime.semantic.trace.has_events' = 'true',\n 'greptime.semantic.trace.has_links' = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
@@ -6285,6 +6378,17 @@ pub async fn test_otlp_logs(store_type: StorageType) {
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// The auto-created log table carries the log identity.
|
||||
validate_data(
|
||||
"otlp_logs_semantic_identity",
|
||||
&client,
|
||||
"select count(*) from information_schema.tables where table_name = 'opentelemetry_logs' \
|
||||
and create_options like '%greptime.semantic.signal_type=log%' \
|
||||
and create_options like '%greptime.semantic.source=opentelemetry%';",
|
||||
"[[1]]",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
{
|
||||
@@ -7720,7 +7824,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]";
|
||||
let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n 'greptime.semantic.pipeline' = 'greptime_trace_v1',\\n 'greptime.semantic.signal_type' = 'trace',\\n 'greptime.semantic.source' = 'opentelemetry',\\n 'greptime.semantic.trace.conventions' = 'unknown',\\n 'greptime.semantic.trace.has_events' = 'true',\\n 'greptime.semantic.trace.has_links' = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]";
|
||||
validate_data(
|
||||
"trace_v1_create_table",
|
||||
&client,
|
||||
|
||||
Reference in New Issue
Block a user