From 37f8341963f832d635cf3dc672c7c9e324290833 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Tue, 4 Mar 2025 20:08:52 -0800 Subject: [PATCH] feat: opentelemetry trace new data modeling (#5622) * feat: include trace v1 encoding * feat: add trace ingestion in inserter * feat: add partition rules and index for trace_id * chore: format * chore: fmt * fix: issue introduced with merge * feat: adjust index and add integration test for v1 * refactor: remove comment key * fix: update default value of skip index granularity * fix: update default value of skip index granularity * refactor: rename some functions * feat: remove skipping index from span_id * refactor: made span_id part of primary key for potential dedup purpose * feat: move the special attribute resource_attribute.service.name to top level --------- Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com> --- src/common/catalog/src/consts.rs | 7 + src/datatypes/src/schema/column_schema.rs | 11 +- src/frontend/src/instance/jaeger.rs | 4 +- src/frontend/src/instance/log_handler.rs | 22 +++ src/frontend/src/instance/otlp.rs | 12 +- src/operator/src/error.rs | 10 + src/operator/src/insert.rs | 90 ++++++++- src/servers/src/http/jaeger.rs | 4 +- src/servers/src/otlp/trace.rs | 23 ++- src/servers/src/otlp/trace/attributes.rs | 4 + src/servers/src/otlp/trace/span.rs | 48 ++++- src/servers/src/otlp/trace/v0.rs | 84 ++------ src/servers/src/otlp/trace/v1.rs | 226 ++++++++++++++++++++++ src/sql/src/lib.rs | 1 + src/sql/src/partition.rs | 165 ++++++++++++++++ tests-integration/tests/http.rs | 100 +++++++++- 16 files changed, 714 insertions(+), 97 deletions(-) create mode 100644 src/servers/src/otlp/trace/v1.rs create mode 100644 src/sql/src/partition.rs diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 34c6fa0fdb..0d39a27b9d 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -130,3 +130,10 @@ pub const SEMANTIC_TYPE_TIME_INDEX: &str = "TIMESTAMP"; pub fn is_readonly_schema(schema: &str) -> bool { matches!(schema, INFORMATION_SCHEMA_NAME) } + +// ---- special table and fields ---- +pub const TRACE_ID_COLUMN: &str = "trace_id"; +pub const SPAN_ID_COLUMN: &str = "span_id"; +pub const SPAN_NAME_COLUMN: &str = "span_name"; +pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id"; +// ---- End of special table and fields ---- diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 6547886dfb..24f9b8ac3f 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -597,7 +597,7 @@ impl fmt::Display for FulltextAnalyzer { } /// Skipping options for a column. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)] #[serde(rename_all = "kebab-case")] pub struct SkippingIndexOptions { /// The granularity of the skip index. @@ -607,6 +607,15 @@ pub struct SkippingIndexOptions { pub index_type: SkippingIndexType, } +impl Default for SkippingIndexOptions { + fn default() -> Self { + Self { + granularity: DEFAULT_GRANULARITY, + index_type: SkippingIndexType::default(), + } + } +} + impl fmt::Display for SkippingIndexOptions { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "granularity={}", self.granularity)?; diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index cfc89a8ed9..6a314e4b0c 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -36,11 +36,11 @@ use servers::error::{ TableNotFoundSnafu, }; use servers::http::jaeger::QueryTraceParams; -use servers::otlp::trace::v0::{ +use servers::otlp::trace::{ DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, + TRACE_TABLE_NAME, }; -use servers::otlp::trace::TRACE_TABLE_NAME; use servers::query_handler::JaegerQueryHandler; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 671caf1de7..bb8bc5727e 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -127,4 +127,26 @@ impl Instance { .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu) } + + pub async fn handle_trace_inserts( + &self, + rows: RowInsertRequests, + ctx: QueryContextRef, + ) -> ServerResult { + let _guard = if let Some(limiter) = &self.limiter { + let result = limiter.limit_row_inserts(&rows); + if result.is_none() { + return InFlightWriteBytesExceededSnafu.fail(); + } + result + } else { + None + }; + + self.inserter + .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref()) + .await + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu) + } } diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index f1642da1cb..98a1baf66a 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -101,17 +101,7 @@ impl OpenTelemetryProtocolHandler for Instance { OTLP_TRACES_ROWS.inc_by(rows as u64); - let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&requests); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result - } else { - None - }; - - self.handle_log_inserts(requests, ctx) + self.handle_trace_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 990f6123a3..6cfbab5646 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -799,6 +799,14 @@ pub enum Error { #[snafu(display("A cursor named {name} already exists"))] CursorExists { name: String }, + + #[snafu(display("Column options error"))] + ColumnOptions { + #[snafu(source)] + source: api::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -950,6 +958,8 @@ impl ErrorExt for Error { Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal, Error::StatementTimeout { .. } => StatusCode::Cancelled, + + Error::ColumnOptions { source, .. } => source.status_code(), } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 2bd4304510..a1c2c59d50 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use api::v1::alter_table_expr::Kind; +use api::v1::column_def::options_from_skipping; use api::v1::region::{ InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests, RegionRequestHeader, @@ -26,7 +27,9 @@ use api::v1::{ }; use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; -use common_catalog::consts::default_engine; +use common_catalog::consts::{ + default_engine, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN, +}; use common_grpc_expr::util::ColumnExpr; use common_meta::cache::TableFlownodeSetCacheRef; use common_meta::node_manager::{AffectedRows, NodeManagerRef}; @@ -34,13 +37,16 @@ use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; +use datatypes::schema::SkippingIndexOptions; use futures_util::future; use meter_macros::write_meter; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::prelude::*; use snafu::ResultExt; +use sql::partition::partition_rule_for_hexstring; +use sql::statements::create::Partitions; use sql::statements::insert::Insert; use store_api::metric_engine_consts::{ LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, @@ -53,8 +59,8 @@ use table::table_reference::TableReference; use table::TableRef; use crate::error::{ - CatalogSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu, - RequestInsertsSnafu, Result, TableNotFoundSnafu, + CatalogSnafu, ColumnOptionsSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu, + JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu, }; use crate::expr_helper; use crate::region_req_factory::RegionRequestFactory; @@ -84,6 +90,8 @@ enum AutoCreateTableType { Log, /// A table that merges rows by `last_non_null` strategy. LastNonNull, + /// Create table that build index and default partition rules on trace_id + Trace, } impl AutoCreateTableType { @@ -93,6 +101,7 @@ impl AutoCreateTableType { AutoCreateTableType::Physical => "physical", AutoCreateTableType::Log => "log", AutoCreateTableType::LastNonNull => "last_non_null", + AutoCreateTableType::Trace => "trace", } } } @@ -171,6 +180,21 @@ impl Inserter { .await } + pub async fn handle_trace_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + statement_executor: &StatementExecutor, + ) -> Result { + self.handle_row_inserts_with_create_type( + requests, + ctx, + statement_executor, + AutoCreateTableType::Trace, + ) + .await + } + /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand. pub async fn handle_last_non_null_inserts( &self, @@ -528,7 +552,56 @@ impl Inserter { // for it's a very unexpected behavior and should be set by user explicitly for create_table in create_tables { let table = self - .create_physical_table(create_table, ctx, statement_executor) + .create_physical_table(create_table, None, ctx, statement_executor) + .await?; + let table_info = table.table_info(); + if table_info.is_ttl_instant_table() { + instant_table_ids.insert(table_info.table_id()); + } + table_infos.insert(table_info.table_id(), table.table_info()); + } + for alter_expr in alter_tables.into_iter() { + statement_executor + .alter_table_inner(alter_expr, ctx.clone()) + .await?; + } + } + + AutoCreateTableType::Trace => { + // note that auto create table shouldn't be ttl instant table + // for it's a very unexpected behavior and should be set by user explicitly + for mut create_table in create_tables { + // prebuilt partition rules for uuid data: see the function + // for more information + let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN); + // add skip index to + // - trace_id: when searching by trace id + // - parent_span_id: when searching root span + // - span_name: when searching certain types of span + let index_columns = [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN]; + for index_column in index_columns { + if let Some(col) = create_table + .column_defs + .iter_mut() + .find(|c| c.name == index_column) + { + col.options = options_from_skipping(&SkippingIndexOptions::default()) + .context(ColumnOptionsSnafu)?; + } else { + warn!( + "Column {} not found when creating index for trace table: {}.", + index_column, create_table.table_name + ); + } + } + + let table = self + .create_physical_table( + create_table, + Some(partitions), + ctx, + statement_executor, + ) .await?; let table_info = table.table_info(); if table_info.is_ttl_instant_table() { @@ -658,6 +731,9 @@ impl Inserter { AutoCreateTableType::LastNonNull => { table_options.push((MERGE_MODE_KEY, "last_non_null")); } + AutoCreateTableType::Trace => { + table_options.push((APPEND_MODE_KEY, "true")); + } } let schema = ctx.current_schema(); @@ -666,6 +742,7 @@ impl Inserter { let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let mut create_table_expr = build_create_table_expr(&table_ref, request_schema, engine_name)?; + info!("Table `{table_ref}` does not exist, try creating table"); for (k, v) in table_options { create_table_expr @@ -707,6 +784,7 @@ impl Inserter { async fn create_physical_table( &self, mut create_table_expr: CreateTableExpr, + partitions: Option, ctx: &QueryContextRef, statement_executor: &StatementExecutor, ) -> Result { @@ -720,7 +798,7 @@ impl Inserter { info!("Table `{table_ref}` does not exist, try creating table"); } let res = statement_executor - .create_table_inner(&mut create_table_expr, None, ctx.clone()) + .create_table_inner(&mut create_table_expr, partitions, ctx.clone()) .await; let table_ref = TableReference::full( diff --git a/src/servers/src/http/jaeger.rs b/src/servers/src/http/jaeger.rs index 9ae56703b5..ed03f93aff 100644 --- a/src/servers/src/http/jaeger.rs +++ b/src/servers/src/http/jaeger.rs @@ -34,11 +34,11 @@ use crate::error::{ }; use crate::http::HttpRecordsOutput; use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED; -use crate::otlp::trace::v0::{ +use crate::otlp::trace::{ DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, + TRACE_TABLE_NAME, }; -use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::JaegerQueryHandlerRef; /// JaegerAPIResponse is the response of Jaeger HTTP API. diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index 1ec8ce4825..8cc51b532a 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -15,8 +15,12 @@ pub mod attributes; pub mod span; pub mod v0; +pub mod v1; use api::v1::RowInsertRequests; +pub use common_catalog::consts::{ + PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN, +}; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; use session::context::QueryContextRef; @@ -26,6 +30,15 @@ use crate::query_handler::PipelineHandlerRef; pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; +pub const SERVICE_NAME_COLUMN: &str = "service_name"; +pub const TIMESTAMP_COLUMN: &str = "timestamp"; +pub const DURATION_NANO_COLUMN: &str = "duration_nano"; +pub const SPAN_KIND_COLUMN: &str = "span_kind"; +pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes"; +/// The span kind prefix in the database. +/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database. +pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_"; + /// Convert SpanTraces to GreptimeDB row insert requests. /// Returns `InsertRequests` and total number of rows to ingest pub fn to_grpc_insert_requests( @@ -45,8 +58,16 @@ pub fn to_grpc_insert_requests( query_ctx, pipeline_handler, ), + PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_insert_requests( + request, + pipeline, + pipeline_params, + table_name, + query_ctx, + pipeline_handler, + ), _ => NotSupportedSnafu { - feat: "Unsupported pipeline for logs", + feat: "Unsupported pipeline for trace", } .fail(), } diff --git a/src/servers/src/otlp/trace/attributes.rs b/src/servers/src/otlp/trace/attributes.rs index 2fce6225ed..b4c3a323f0 100644 --- a/src/servers/src/otlp/trace/attributes.rs +++ b/src/servers/src/otlp/trace/attributes.rs @@ -128,6 +128,10 @@ impl From for jsonb::Value<'static> { } impl Attributes { + pub fn take(self) -> Vec { + self.0 + } + pub fn get_ref(&self) -> &Vec { &self.0 } diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs index a6d810d045..8d1864cf69 100644 --- a/src/servers/src/otlp/trace/span.rs +++ b/src/servers/src/otlp/trace/span.rs @@ -16,7 +16,8 @@ use std::fmt::Display; use common_time::timestamp::Timestamp; use itertools::Itertools; -use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::{any_value, InstrumentationScope, KeyValue}; use opentelemetry_proto::tonic::trace::v1::span::{Event, Link}; use opentelemetry_proto::tonic::trace::v1::{Span, Status}; use serde::Serialize; @@ -230,6 +231,51 @@ pub fn status_to_string(status: &Option) -> (String, String) { } } +/// Convert OpenTelemetry traces to SpanTraces +/// +/// See +/// +/// for data structure of OTLP traces. +pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { + let span_size = request + .resource_spans + .iter() + .flat_map(|res| res.scope_spans.iter()) + .flat_map(|scope| scope.spans.iter()) + .count(); + let mut spans = Vec::with_capacity(span_size); + for resource_spans in request.resource_spans { + let resource_attrs = resource_spans + .resource + .map(|r| r.attributes) + .unwrap_or_default(); + let service_name = resource_attrs + .iter() + .find_or_first(|kv| kv.key == "service.name") + .and_then(|kv| kv.value.clone()) + .and_then(|v| match v.value { + Some(any_value::Value::StringValue(s)) => Some(s), + Some(any_value::Value::BytesValue(b)) => { + Some(String::from_utf8_lossy(&b).to_string()) + } + _ => None, + }); + + for scope_spans in resource_spans.scope_spans { + let scope = scope_spans.scope.unwrap_or_default(); + for span in scope_spans.spans { + spans.push(parse_span( + service_name.clone(), + &resource_attrs, + &scope, + span, + )); + } + } + } + spans +} + #[cfg(test)] mod tests { use opentelemetry_proto::tonic::trace::v1::Status; diff --git a/src/servers/src/otlp/trace/v0.rs b/src/servers/src/otlp/trace/v0.rs index 5783d5f241..3e7a609515 100644 --- a/src/servers/src/otlp/trace/v0.rs +++ b/src/servers/src/otlp/trace/v0.rs @@ -15,13 +15,15 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_grpc::precision::Precision; -use itertools::Itertools; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use opentelemetry_proto::tonic::common::v1::any_value; use pipeline::{GreptimePipelineParams, PipelineWay}; use session::context::QueryContextRef; -use super::span::{parse_span, TraceSpan, TraceSpans}; +use super::span::{parse, TraceSpan}; +use super::{ + DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, + SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, +}; use crate::error::Result; use crate::otlp::utils::{make_column_data, make_string_column_data}; use crate::query_handler::PipelineHandlerRef; @@ -29,64 +31,6 @@ use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 24; -pub const SERVICE_NAME_COLUMN: &str = "service_name"; -pub const TRACE_ID_COLUMN: &str = "trace_id"; -pub const TIMESTAMP_COLUMN: &str = "timestamp"; -pub const DURATION_NANO_COLUMN: &str = "duration_nano"; -pub const SPAN_ID_COLUMN: &str = "span_id"; -pub const SPAN_NAME_COLUMN: &str = "span_name"; -pub const SPAN_KIND_COLUMN: &str = "span_kind"; -pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes"; - -/// The span kind prefix in the database. -/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database. -pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_"; - -/// Convert OpenTelemetry traces to SpanTraces -/// -/// See -/// -/// for data structure of OTLP traces. -pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { - let span_size = request - .resource_spans - .iter() - .flat_map(|res| res.scope_spans.iter()) - .flat_map(|scope| scope.spans.iter()) - .count(); - let mut spans = Vec::with_capacity(span_size); - for resource_spans in request.resource_spans { - let resource_attrs = resource_spans - .resource - .map(|r| r.attributes) - .unwrap_or_default(); - let service_name = resource_attrs - .iter() - .find_or_first(|kv| kv.key == "service.name") - .and_then(|kv| kv.value.clone()) - .and_then(|v| match v.value { - Some(any_value::Value::StringValue(s)) => Some(s), - Some(any_value::Value::BytesValue(b)) => { - Some(String::from_utf8_lossy(&b).to_string()) - } - _ => None, - }); - - for scope_spans in resource_spans.scope_spans { - let scope = scope_spans.scope.unwrap_or_default(); - for span in scope_spans.spans { - spans.push(parse_span( - service_name.clone(), - &resource_attrs, - &scope, - span, - )); - } - } - } - spans -} - /// Convert SpanTraces to GreptimeDB row insert requests. /// Returns `InsertRequests` and total number of rows to ingest pub fn v0_to_grpc_insert_requests( @@ -118,7 +62,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> // write ts row_writer::write_ts_to_nanos( writer, - "timestamp", + TIMESTAMP_COLUMN, Some(span.start_in_nanosecond as i64), Precision::Nanosecond, &mut row, @@ -131,7 +75,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64), ), make_column_data( - "duration_nano", + DURATION_NANO_COLUMN, ColumnDataType::Uint64, ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond), ), @@ -139,14 +83,14 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> row_writer::write_fields(writer, fields.into_iter(), &mut row)?; if let Some(service_name) = span.service_name { - row_writer::write_tag(writer, "service_name", service_name, &mut row)?; + row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?; } // tags let iter = vec![ - ("trace_id", span.trace_id), - ("span_id", span.span_id), - ("parent_span_id", span.parent_span_id), + (TRACE_ID_COLUMN, span.trace_id), + (SPAN_ID_COLUMN, span.span_id), + (PARENT_SPAN_ID_COLUMN, span.parent_span_id), ] .into_iter() .map(|(col, val)| (col.to_string(), val)); @@ -154,8 +98,8 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> // write fields let fields = vec![ - make_string_column_data("span_kind", span.span_kind), - make_string_column_data("span_name", span.span_name), + make_string_column_data(SPAN_KIND_COLUMN, span.span_kind), + make_string_column_data(SPAN_NAME_COLUMN, span.span_name), make_string_column_data("span_status_code", span.span_status_code), make_string_column_data("span_status_message", span.span_status_message), make_string_column_data("trace_state", span.trace_state), @@ -164,7 +108,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> row_writer::write_json( writer, - "span_attributes", + SPAN_ATTRIBUTES_COLUMN, span.span_attributes.into(), &mut row, )?; diff --git a/src/servers/src/otlp/trace/v1.rs b/src/servers/src/otlp/trace/v1.rs new file mode 100644 index 0000000000..04732d9d73 --- /dev/null +++ b/src/servers/src/otlp/trace/v1.rs @@ -0,0 +1,226 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, RowInsertRequests, Value}; +use common_grpc::precision::Precision; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue; +use pipeline::{GreptimePipelineParams, PipelineWay}; +use session::context::QueryContextRef; + +use super::attributes::Attributes; +use super::span::{parse, TraceSpan}; +use super::{ + DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ID_COLUMN, + SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, +}; +use crate::error::Result; +use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data}; +use crate::query_handler::PipelineHandlerRef; +use crate::row_writer::{self, MultiTableData, TableData}; + +const APPROXIMATE_COLUMN_COUNT: usize = 30; + +/// Convert SpanTraces to GreptimeDB row insert requests. +/// Returns `InsertRequests` and total number of rows to ingest +/// +/// Compared with v0, this v1 implementation: +/// 1. flattens all attribute data into columns. +/// 2. treat `span_id` and `parent_trace_id` as fields. +/// 3. removed `service_name` column because it's already in +/// `resource_attributes.service_name` +/// +/// For other compound data structures like span_links and span_events here we +/// are still using `json` data structure. +pub fn v1_to_grpc_insert_requests( + request: ExportTraceServiceRequest, + _pipeline: PipelineWay, + _pipeline_params: GreptimePipelineParams, + table_name: String, + _query_ctx: &QueryContextRef, + _pipeline_handler: PipelineHandlerRef, +) -> Result<(RowInsertRequests, usize)> { + let spans = parse(request); + let mut multi_table_writer = MultiTableData::default(); + + let one_table_writer = multi_table_writer.get_or_default_table_data( + table_name, + APPROXIMATE_COLUMN_COUNT, + spans.len(), + ); + + for span in spans { + write_span_to_row(one_table_writer, span)?; + } + + Ok(multi_table_writer.into_row_insert_requests()) +} + +pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { + let mut row = writer.alloc_one_row(); + + // write ts + row_writer::write_ts_to_nanos( + writer, + TIMESTAMP_COLUMN, + Some(span.start_in_nanosecond as i64), + Precision::Nanosecond, + &mut row, + )?; + // write ts fields + let fields = vec![ + make_column_data( + "timestamp_end", + ColumnDataType::TimestampNanosecond, + ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64), + ), + make_column_data( + DURATION_NANO_COLUMN, + ColumnDataType::Uint64, + ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond), + ), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + // tags + let tags = vec![ + (TRACE_ID_COLUMN.to_string(), span.trace_id), + (SPAN_ID_COLUMN.to_string(), span.span_id), + ]; + row_writer::write_tags(writer, tags.into_iter(), &mut row)?; + + // write fields + let fields = vec![ + make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id), + make_string_column_data(SPAN_KIND_COLUMN, span.span_kind), + make_string_column_data(SPAN_NAME_COLUMN, span.span_name), + make_string_column_data("span_status_code", span.span_status_code), + make_string_column_data("span_status_message", span.span_status_message), + make_string_column_data("trace_state", span.trace_state), + make_string_column_data("scope_name", span.scope_name), + make_string_column_data("scope_version", span.scope_version), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + if let Some(service_name) = span.service_name { + row_writer::write_fields( + writer, + std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)), + &mut row, + )?; + } + + write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?; + write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?; + write_attributes( + writer, + "resource_attributes", + span.resource_attributes, + &mut row, + )?; + + row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?; + row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?; + + writer.add_row(row); + + Ok(()) +} + +fn write_attributes( + writer: &mut TableData, + prefix: &str, + attributes: Attributes, + row: &mut Vec, +) -> Result<()> { + for attr in attributes.take().into_iter() { + let key_suffix = attr.key; + // skip resource_attributes.service.name because its already copied to + // top level as `SERVICE_NAME_COLUMN` + if prefix == "resource_attributes" && key_suffix == "service.name" { + continue; + } + + let key = format!("{}.{}", prefix, key_suffix); + match attr.value.and_then(|v| v.value) { + Some(OtlpValue::StringValue(v)) => { + row_writer::write_fields( + writer, + std::iter::once(make_string_column_data(&key, v)), + row, + )?; + } + Some(OtlpValue::BoolValue(v)) => { + row_writer::write_fields( + writer, + std::iter::once(make_column_data( + &key, + ColumnDataType::Boolean, + ValueData::BoolValue(v), + )), + row, + )?; + } + Some(OtlpValue::IntValue(v)) => { + row_writer::write_fields( + writer, + std::iter::once(make_column_data( + &key, + ColumnDataType::Int64, + ValueData::I64Value(v), + )), + row, + )?; + } + Some(OtlpValue::DoubleValue(v)) => { + row_writer::write_fields( + writer, + std::iter::once(make_column_data( + &key, + ColumnDataType::Float64, + ValueData::F64Value(v), + )), + row, + )?; + } + Some(OtlpValue::ArrayValue(v)) => row_writer::write_json( + writer, + key, + any_value_to_jsonb(OtlpValue::ArrayValue(v)), + row, + )?, + Some(OtlpValue::KvlistValue(v)) => row_writer::write_json( + writer, + key, + any_value_to_jsonb(OtlpValue::KvlistValue(v)), + row, + )?, + Some(OtlpValue::BytesValue(v)) => { + row_writer::write_fields( + writer, + std::iter::once(make_column_data( + &key, + ColumnDataType::Binary, + ValueData::BinaryValue(v), + )), + row, + )?; + } + None => {} + } + } + + Ok(()) +} diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index 8f2ca7fc4f..90501ece82 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -22,6 +22,7 @@ pub mod dialect; pub mod error; pub mod parser; pub mod parsers; +pub mod partition; pub mod statements; pub mod util; diff --git a/src/sql/src/partition.rs b/src/sql/src/partition.rs new file mode 100644 index 0000000000..4979bf702f --- /dev/null +++ b/src/sql/src/partition.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sqlparser::ast::{BinaryOperator, Expr, Ident, Value}; + +use crate::statements::create::Partitions; + +macro_rules! between_string { + ($col: expr, $left_incl: expr, $right_excl: expr) => { + Expr::BinaryOp { + op: BinaryOperator::And, + left: Box::new(Expr::BinaryOp { + op: BinaryOperator::GtEq, + left: Box::new($col.clone()), + right: Box::new(Expr::Value(Value::SingleQuotedString( + $left_incl.to_string(), + ))), + }), + right: Box::new(Expr::BinaryOp { + op: BinaryOperator::Lt, + left: Box::new($col.clone()), + right: Box::new(Expr::Value(Value::SingleQuotedString( + $right_excl.to_string(), + ))), + }), + } + }; +} + +macro_rules! or { + ($left: expr, $right: expr) => { + Expr::BinaryOp { + op: BinaryOperator::Or, + left: Box::new($left), + right: Box::new($right), + } + }; +} + +pub fn partition_rule_for_hexstring(ident: &str) -> Partitions { + let ident = Ident::new(ident); + let ident_expr = Expr::Identifier(ident.clone()); + + // rules are like: + // + // "trace_id < '1'", + // "trace_id >= '1' AND trace_id < '2'", + // "trace_id >= '2' AND trace_id < '3'", + // "trace_id >= '3' AND trace_id < '4'", + // "trace_id >= '4' AND trace_id < '5'", + // "trace_id >= '5' AND trace_id < '6'", + // "trace_id >= '6' AND trace_id < '7'", + // "trace_id >= '7' AND trace_id < '8'", + // "trace_id >= '8' AND trace_id < '9'", + // "trace_id >= '9' AND trace_id < 'A'", + // "trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b'", + // "trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c'", + // "trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd'", + // "trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e'", + // "trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f'", + // "trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'", + let rules = vec![ + Expr::BinaryOp { + left: Box::new(ident_expr.clone()), + op: BinaryOperator::Lt, + right: Box::new(Expr::Value(Value::SingleQuotedString("1".to_string()))), + }, + // [left, right) + between_string!(ident_expr, "1", "2"), + between_string!(ident_expr, "2", "3"), + between_string!(ident_expr, "3", "4"), + between_string!(ident_expr, "4", "5"), + between_string!(ident_expr, "5", "6"), + between_string!(ident_expr, "6", "7"), + between_string!(ident_expr, "7", "8"), + between_string!(ident_expr, "8", "9"), + between_string!(ident_expr, "9", "A"), + or!( + between_string!(ident_expr, "A", "B"), + between_string!(ident_expr, "a", "b") + ), + or!( + between_string!(ident_expr, "B", "C"), + between_string!(ident_expr, "b", "c") + ), + or!( + between_string!(ident_expr, "C", "D"), + between_string!(ident_expr, "c", "d") + ), + or!( + between_string!(ident_expr, "D", "E"), + between_string!(ident_expr, "d", "e") + ), + or!( + between_string!(ident_expr, "E", "F"), + between_string!(ident_expr, "e", "f") + ), + or!( + between_string!(ident_expr, "F", "a"), + Expr::BinaryOp { + left: Box::new(ident_expr.clone()), + op: BinaryOperator::GtEq, + right: Box::new(Expr::Value(Value::SingleQuotedString("f".to_string()))), + } + ), + ]; + + Partitions { + column_list: vec![ident], + exprs: rules, + } +} + +#[cfg(test)] +mod tests { + use sqlparser::ast::Expr; + use sqlparser::dialect::GenericDialect; + use sqlparser::parser::Parser; + + use super::*; + + #[test] + fn test_rules() { + let expr = vec![ + "trace_id < '1'", + "trace_id >= '1' AND trace_id < '2'", + "trace_id >= '2' AND trace_id < '3'", + "trace_id >= '3' AND trace_id < '4'", + "trace_id >= '4' AND trace_id < '5'", + "trace_id >= '5' AND trace_id < '6'", + "trace_id >= '6' AND trace_id < '7'", + "trace_id >= '7' AND trace_id < '8'", + "trace_id >= '8' AND trace_id < '9'", + "trace_id >= '9' AND trace_id < 'A'", + "trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b'", + "trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c'", + "trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd'", + "trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e'", + "trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f'", + "trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'", + ]; + + let dialect = GenericDialect {}; + let results = expr + .into_iter() + .map(|s| { + let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap(); + parser.parse_expr().unwrap() + }) + .collect::>(); + + assert_eq!(results, partition_rule_for_hexstring("trace_id").exprs); + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 4053c179ac..0db1e35826 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -96,7 +96,8 @@ macro_rules! http_tests { test_pipeline_dispatcher, test_otlp_metrics, - test_otlp_traces, + test_otlp_traces_v0, + test_otlp_traces_v1, test_otlp_logs, test_loki_pb_logs, test_loki_json_logs, @@ -1320,7 +1321,7 @@ transform: // 3. check schema - let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '0', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; validate_data( "pipeline_schema", &client, @@ -2053,7 +2054,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { guard.remove_all().await; } -pub async fn test_otlp_traces(store_type: StorageType) { +pub async fn test_otlp_traces_v0(store_type: StorageType) { // init common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; @@ -2125,6 +2126,99 @@ pub async fn test_otlp_traces(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_otlp_traces_v1(store_type: StorageType) { + // init + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; + const TRACE_V1: &str = "greptime_trace_v1"; + + let content = r#" +{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]} +"#; + + let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + + // handshake + let client = TestClient::new(app).await; + + // write traces data + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-log-pipeline-name"), + HeaderValue::from_static(TRACE_V1), + ), + ( + HeaderName::from_static("x-greptime-trace-table-name"), + HeaderValue::from_static("mytable"), + ), + ], + "/v1/otlp/v1/traces", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // select traces data + let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#; + validate_data("otlp_traces", &client, "select * from mytable;", expected).await; + + let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#; + validate_data( + "otlp_traces", + &client, + "show create table mytable;", + expected_ddl, + ) + .await; + + // drop table + let res = client.get("/v1/sql?sql=drop table mytable;").send().await; + assert_eq!(res.status(), StatusCode::OK); + + // write traces data with gzip + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-log-pipeline-name"), + HeaderValue::from_static(TRACE_V1), + ), + ( + HeaderName::from_static("x-greptime-trace-table-name"), + HeaderValue::from_static("mytable"), + ), + ], + "/v1/otlp/v1/traces", + body.clone(), + true, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // select traces data again + validate_data( + "otlp_traces_with_gzip", + &client, + "select * from mytable;", + expected, + ) + .await; + + guard.remove_all().await; +} + pub async fn test_otlp_logs(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_logs").await;