feat: opentelemetry trace new data modeling (#5622)

* feat: include trace v1 encoding

* feat: add trace ingestion in inserter

* feat: add partition rules and index for trace_id

* chore: format

* chore: fmt

* fix: issue introduced with merge

* feat: adjust index and add integration test for v1

* refactor: remove comment key

* fix: update default value of skip index granularity

* fix: update default value of skip index granularity

* refactor: rename some functions

* feat: remove skipping index from span_id

* refactor: made span_id part of primary key for potential dedup purpose

* feat: move the special attribute resource_attribute.service.name to top level

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
This commit is contained in:
Ning Sun
2025-03-04 20:08:52 -08:00
committed by GitHub
parent b90ef10523
commit 37f8341963
16 changed files with 714 additions and 97 deletions

View File

@@ -130,3 +130,10 @@ pub const SEMANTIC_TYPE_TIME_INDEX: &str = "TIMESTAMP";
pub fn is_readonly_schema(schema: &str) -> bool {
matches!(schema, INFORMATION_SCHEMA_NAME)
}
// ---- special table and fields ----
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
// ---- End of special table and fields ----

View File

@@ -597,7 +597,7 @@ impl fmt::Display for FulltextAnalyzer {
}
/// Skipping options for a column.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
#[serde(rename_all = "kebab-case")]
pub struct SkippingIndexOptions {
/// The granularity of the skip index.
@@ -607,6 +607,15 @@ pub struct SkippingIndexOptions {
pub index_type: SkippingIndexType,
}
impl Default for SkippingIndexOptions {
fn default() -> Self {
Self {
granularity: DEFAULT_GRANULARITY,
index_type: SkippingIndexType::default(),
}
}
}
impl fmt::Display for SkippingIndexOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "granularity={}", self.granularity)?;

View File

@@ -36,11 +36,11 @@ use servers::error::{
TableNotFoundSnafu,
};
use servers::http::jaeger::QueryTraceParams;
use servers::otlp::trace::v0::{
use servers::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
TRACE_TABLE_NAME,
};
use servers::otlp::trace::TRACE_TABLE_NAME;
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

View File

@@ -127,4 +127,26 @@ impl Instance {
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)
}
pub async fn handle_trace_inserts(
&self,
rows: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&rows);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};
self.inserter
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)
}
}

View File

@@ -101,17 +101,7 @@ impl OpenTelemetryProtocolHandler for Instance {
OTLP_TRACES_ROWS.inc_by(rows as u64);
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
} else {
None
};
self.handle_log_inserts(requests, ctx)
self.handle_trace_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -799,6 +799,14 @@ pub enum Error {
#[snafu(display("A cursor named {name} already exists"))]
CursorExists { name: String },
#[snafu(display("Column options error"))]
ColumnOptions {
#[snafu(source)]
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -950,6 +958,8 @@ impl ErrorExt for Error {
Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal,
Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::ColumnOptions { source, .. } => source.status_code(),
}
}

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::options_from_skipping;
use api::v1::region::{
InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
RegionRequestHeader,
@@ -26,7 +27,9 @@ use api::v1::{
};
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::default_engine;
use common_catalog::consts::{
default_engine, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
};
use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
@@ -34,13 +37,16 @@ use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use datatypes::schema::SkippingIndexOptions;
use futures_util::future;
use meter_macros::write_meter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use snafu::ResultExt;
use sql::partition::partition_rule_for_hexstring;
use sql::statements::create::Partitions;
use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
@@ -53,8 +59,8 @@ use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu,
RequestInsertsSnafu, Result, TableNotFoundSnafu,
CatalogSnafu, ColumnOptionsSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu,
JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
};
use crate::expr_helper;
use crate::region_req_factory::RegionRequestFactory;
@@ -84,6 +90,8 @@ enum AutoCreateTableType {
Log,
/// A table that merges rows by `last_non_null` strategy.
LastNonNull,
/// Create table that build index and default partition rules on trace_id
Trace,
}
impl AutoCreateTableType {
@@ -93,6 +101,7 @@ impl AutoCreateTableType {
AutoCreateTableType::Physical => "physical",
AutoCreateTableType::Log => "log",
AutoCreateTableType::LastNonNull => "last_non_null",
AutoCreateTableType::Trace => "trace",
}
}
}
@@ -171,6 +180,21 @@ impl Inserter {
.await
}
pub async fn handle_trace_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Trace,
)
.await
}
/// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
pub async fn handle_last_non_null_inserts(
&self,
@@ -528,7 +552,56 @@ impl Inserter {
// for it's a very unexpected behavior and should be set by user explicitly
for create_table in create_tables {
let table = self
.create_physical_table(create_table, ctx, statement_executor)
.create_physical_table(create_table, None, ctx, statement_executor)
.await?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_infos.insert(table_info.table_id(), table.table_info());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
}
}
AutoCreateTableType::Trace => {
// note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly
for mut create_table in create_tables {
// prebuilt partition rules for uuid data: see the function
// for more information
let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN);
// add skip index to
// - trace_id: when searching by trace id
// - parent_span_id: when searching root span
// - span_name: when searching certain types of span
let index_columns = [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SPAN_NAME_COLUMN];
for index_column in index_columns {
if let Some(col) = create_table
.column_defs
.iter_mut()
.find(|c| c.name == index_column)
{
col.options = options_from_skipping(&SkippingIndexOptions::default())
.context(ColumnOptionsSnafu)?;
} else {
warn!(
"Column {} not found when creating index for trace table: {}.",
index_column, create_table.table_name
);
}
}
let table = self
.create_physical_table(
create_table,
Some(partitions),
ctx,
statement_executor,
)
.await?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
@@ -658,6 +731,9 @@ impl Inserter {
AutoCreateTableType::LastNonNull => {
table_options.push((MERGE_MODE_KEY, "last_non_null"));
}
AutoCreateTableType::Trace => {
table_options.push((APPEND_MODE_KEY, "true"));
}
}
let schema = ctx.current_schema();
@@ -666,6 +742,7 @@ impl Inserter {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr =
build_create_table_expr(&table_ref, request_schema, engine_name)?;
info!("Table `{table_ref}` does not exist, try creating table");
for (k, v) in table_options {
create_table_expr
@@ -707,6 +784,7 @@ impl Inserter {
async fn create_physical_table(
&self,
mut create_table_expr: CreateTableExpr,
partitions: Option<Partitions>,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
@@ -720,7 +798,7 @@ impl Inserter {
info!("Table `{table_ref}` does not exist, try creating table");
}
let res = statement_executor
.create_table_inner(&mut create_table_expr, None, ctx.clone())
.create_table_inner(&mut create_table_expr, partitions, ctx.clone())
.await;
let table_ref = TableReference::full(

View File

@@ -34,11 +34,11 @@ use crate::error::{
};
use crate::http::HttpRecordsOutput;
use crate::metrics::METRIC_JAEGER_QUERY_ELAPSED;
use crate::otlp::trace::v0::{
use crate::otlp::trace::{
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
TRACE_TABLE_NAME,
};
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::JaegerQueryHandlerRef;
/// JaegerAPIResponse is the response of Jaeger HTTP API.

View File

@@ -15,8 +15,12 @@
pub mod attributes;
pub mod span;
pub mod v0;
pub mod v1;
use api::v1::RowInsertRequests;
pub use common_catalog::consts::{
PARENT_SPAN_ID_COLUMN, SPAN_ID_COLUMN, SPAN_NAME_COLUMN, TRACE_ID_COLUMN,
};
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use session::context::QueryContextRef;
@@ -26,6 +30,15 @@ use crate::query_handler::PipelineHandlerRef;
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
@@ -45,8 +58,16 @@ pub fn to_grpc_insert_requests(
query_ctx,
pipeline_handler,
),
PipelineWay::OtlpTraceDirectV1 => v1::v1_to_grpc_insert_requests(
request,
pipeline,
pipeline_params,
table_name,
query_ctx,
pipeline_handler,
),
_ => NotSupportedSnafu {
feat: "Unsupported pipeline for logs",
feat: "Unsupported pipeline for trace",
}
.fail(),
}

View File

@@ -128,6 +128,10 @@ impl From<Attributes> for jsonb::Value<'static> {
}
impl Attributes {
pub fn take(self) -> Vec<KeyValue> {
self.0
}
pub fn get_ref(&self) -> &Vec<KeyValue> {
&self.0
}

View File

@@ -16,7 +16,8 @@ use std::fmt::Display;
use common_time::timestamp::Timestamp;
use itertools::Itertools;
use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue};
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, InstrumentationScope, KeyValue};
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
use serde::Serialize;
@@ -230,6 +231,51 @@ pub fn status_to_string(status: &Option<Status>) -> (String, String) {
}
}
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let span_size = request
.resource_spans
.iter()
.flat_map(|res| res.scope_spans.iter())
.flat_map(|scope| scope.spans.iter())
.count();
let mut spans = Vec::with_capacity(span_size);
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
let service_name = resource_attrs
.iter()
.find_or_first(|kv| kv.key == "service.name")
.and_then(|kv| kv.value.clone())
.and_then(|v| match v.value {
Some(any_value::Value::StringValue(s)) => Some(s),
Some(any_value::Value::BytesValue(b)) => {
Some(String::from_utf8_lossy(&b).to_string())
}
_ => None,
});
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(
service_name.clone(),
&resource_attrs,
&scope,
span,
));
}
}
}
spans
}
#[cfg(test)]
mod tests {
use opentelemetry_proto::tonic::trace::v1::Status;

View File

@@ -15,13 +15,15 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_grpc::precision::Precision;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value;
use pipeline::{GreptimePipelineParams, PipelineWay};
use session::context::QueryContextRef;
use super::span::{parse_span, TraceSpan, TraceSpans};
use super::span::{parse, TraceSpan};
use super::{
DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
};
use crate::error::Result;
use crate::otlp::utils::{make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
@@ -29,64 +31,6 @@ use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const TIMESTAMP_COLUMN: &str = "timestamp";
pub const DURATION_NANO_COLUMN: &str = "duration_nano";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SPAN_KIND_COLUMN: &str = "span_kind";
pub const SPAN_ATTRIBUTES_COLUMN: &str = "span_attributes";
/// The span kind prefix in the database.
/// If the span kind is `server`, it will be stored as `SPAN_KIND_SERVER` in the database.
pub const SPAN_KIND_PREFIX: &str = "SPAN_KIND_";
/// Convert OpenTelemetry traces to SpanTraces
///
/// See
/// <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto>
/// for data structure of OTLP traces.
pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans {
let span_size = request
.resource_spans
.iter()
.flat_map(|res| res.scope_spans.iter())
.flat_map(|scope| scope.spans.iter())
.count();
let mut spans = Vec::with_capacity(span_size);
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
let service_name = resource_attrs
.iter()
.find_or_first(|kv| kv.key == "service.name")
.and_then(|kv| kv.value.clone())
.and_then(|v| match v.value {
Some(any_value::Value::StringValue(s)) => Some(s),
Some(any_value::Value::BytesValue(b)) => {
Some(String::from_utf8_lossy(&b).to_string())
}
_ => None,
});
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
spans.push(parse_span(
service_name.clone(),
&resource_attrs,
&scope,
span,
));
}
}
}
spans
}
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn v0_to_grpc_insert_requests(
@@ -118,7 +62,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
// write ts
row_writer::write_ts_to_nanos(
writer,
"timestamp",
TIMESTAMP_COLUMN,
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
@@ -131,7 +75,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
),
make_column_data(
"duration_nano",
DURATION_NANO_COLUMN,
ColumnDataType::Uint64,
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
),
@@ -139,14 +83,14 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
if let Some(service_name) = span.service_name {
row_writer::write_tag(writer, "service_name", service_name, &mut row)?;
row_writer::write_tag(writer, SERVICE_NAME_COLUMN, service_name, &mut row)?;
}
// tags
let iter = vec![
("trace_id", span.trace_id),
("span_id", span.span_id),
("parent_span_id", span.parent_span_id),
(TRACE_ID_COLUMN, span.trace_id),
(SPAN_ID_COLUMN, span.span_id),
(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
]
.into_iter()
.map(|(col, val)| (col.to_string(), val));
@@ -154,8 +98,8 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
// write fields
let fields = vec![
make_string_column_data("span_kind", span.span_kind),
make_string_column_data("span_name", span.span_name),
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
make_string_column_data("span_status_code", span.span_status_code),
make_string_column_data("span_status_message", span.span_status_message),
make_string_column_data("trace_state", span.trace_state),
@@ -164,7 +108,7 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
row_writer::write_json(
writer,
"span_attributes",
SPAN_ATTRIBUTES_COLUMN,
span.span_attributes.into(),
&mut row,
)?;

View File

@@ -0,0 +1,226 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests, Value};
use common_grpc::precision::Precision;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
use pipeline::{GreptimePipelineParams, PipelineWay};
use session::context::QueryContextRef;
use super::attributes::Attributes;
use super::span::{parse, TraceSpan};
use super::{
DURATION_NANO_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, SPAN_ID_COLUMN,
SPAN_KIND_COLUMN, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
};
use crate::error::Result;
use crate::otlp::utils::{any_value_to_jsonb, make_column_data, make_string_column_data};
use crate::query_handler::PipelineHandlerRef;
use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 30;
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
///
/// Compared with v0, this v1 implementation:
/// 1. flattens all attribute data into columns.
/// 2. treat `span_id` and `parent_trace_id` as fields.
/// 3. removed `service_name` column because it's already in
/// `resource_attributes.service_name`
///
/// For other compound data structures like span_links and span_events here we
/// are still using `json` data structure.
pub fn v1_to_grpc_insert_requests(
request: ExportTraceServiceRequest,
_pipeline: PipelineWay,
_pipeline_params: GreptimePipelineParams,
table_name: String,
_query_ctx: &QueryContextRef,
_pipeline_handler: PipelineHandlerRef,
) -> Result<(RowInsertRequests, usize)> {
let spans = parse(request);
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
table_name,
APPROXIMATE_COLUMN_COUNT,
spans.len(),
);
for span in spans {
write_span_to_row(one_table_writer, span)?;
}
Ok(multi_table_writer.into_row_insert_requests())
}
pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> {
let mut row = writer.alloc_one_row();
// write ts
row_writer::write_ts_to_nanos(
writer,
TIMESTAMP_COLUMN,
Some(span.start_in_nanosecond as i64),
Precision::Nanosecond,
&mut row,
)?;
// write ts fields
let fields = vec![
make_column_data(
"timestamp_end",
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64),
),
make_column_data(
DURATION_NANO_COLUMN,
ColumnDataType::Uint64,
ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond),
),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
// tags
let tags = vec![
(TRACE_ID_COLUMN.to_string(), span.trace_id),
(SPAN_ID_COLUMN.to_string(), span.span_id),
];
row_writer::write_tags(writer, tags.into_iter(), &mut row)?;
// write fields
let fields = vec![
make_string_column_data(PARENT_SPAN_ID_COLUMN, span.parent_span_id),
make_string_column_data(SPAN_KIND_COLUMN, span.span_kind),
make_string_column_data(SPAN_NAME_COLUMN, span.span_name),
make_string_column_data("span_status_code", span.span_status_code),
make_string_column_data("span_status_message", span.span_status_message),
make_string_column_data("trace_state", span.trace_state),
make_string_column_data("scope_name", span.scope_name),
make_string_column_data("scope_version", span.scope_version),
];
row_writer::write_fields(writer, fields.into_iter(), &mut row)?;
if let Some(service_name) = span.service_name {
row_writer::write_fields(
writer,
std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)),
&mut row,
)?;
}
write_attributes(writer, "span_attributes", span.span_attributes, &mut row)?;
write_attributes(writer, "scope_attributes", span.scope_attributes, &mut row)?;
write_attributes(
writer,
"resource_attributes",
span.resource_attributes,
&mut row,
)?;
row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?;
row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?;
writer.add_row(row);
Ok(())
}
fn write_attributes(
writer: &mut TableData,
prefix: &str,
attributes: Attributes,
row: &mut Vec<Value>,
) -> Result<()> {
for attr in attributes.take().into_iter() {
let key_suffix = attr.key;
// skip resource_attributes.service.name because its already copied to
// top level as `SERVICE_NAME_COLUMN`
if prefix == "resource_attributes" && key_suffix == "service.name" {
continue;
}
let key = format!("{}.{}", prefix, key_suffix);
match attr.value.and_then(|v| v.value) {
Some(OtlpValue::StringValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_string_column_data(&key, v)),
row,
)?;
}
Some(OtlpValue::BoolValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_column_data(
&key,
ColumnDataType::Boolean,
ValueData::BoolValue(v),
)),
row,
)?;
}
Some(OtlpValue::IntValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_column_data(
&key,
ColumnDataType::Int64,
ValueData::I64Value(v),
)),
row,
)?;
}
Some(OtlpValue::DoubleValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_column_data(
&key,
ColumnDataType::Float64,
ValueData::F64Value(v),
)),
row,
)?;
}
Some(OtlpValue::ArrayValue(v)) => row_writer::write_json(
writer,
key,
any_value_to_jsonb(OtlpValue::ArrayValue(v)),
row,
)?,
Some(OtlpValue::KvlistValue(v)) => row_writer::write_json(
writer,
key,
any_value_to_jsonb(OtlpValue::KvlistValue(v)),
row,
)?,
Some(OtlpValue::BytesValue(v)) => {
row_writer::write_fields(
writer,
std::iter::once(make_column_data(
&key,
ColumnDataType::Binary,
ValueData::BinaryValue(v),
)),
row,
)?;
}
None => {}
}
}
Ok(())
}

View File

@@ -22,6 +22,7 @@ pub mod dialect;
pub mod error;
pub mod parser;
pub mod parsers;
pub mod partition;
pub mod statements;
pub mod util;

165
src/sql/src/partition.rs Normal file
View File

@@ -0,0 +1,165 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
use crate::statements::create::Partitions;
macro_rules! between_string {
($col: expr, $left_incl: expr, $right_excl: expr) => {
Expr::BinaryOp {
op: BinaryOperator::And,
left: Box::new(Expr::BinaryOp {
op: BinaryOperator::GtEq,
left: Box::new($col.clone()),
right: Box::new(Expr::Value(Value::SingleQuotedString(
$left_incl.to_string(),
))),
}),
right: Box::new(Expr::BinaryOp {
op: BinaryOperator::Lt,
left: Box::new($col.clone()),
right: Box::new(Expr::Value(Value::SingleQuotedString(
$right_excl.to_string(),
))),
}),
}
};
}
macro_rules! or {
($left: expr, $right: expr) => {
Expr::BinaryOp {
op: BinaryOperator::Or,
left: Box::new($left),
right: Box::new($right),
}
};
}
pub fn partition_rule_for_hexstring(ident: &str) -> Partitions {
let ident = Ident::new(ident);
let ident_expr = Expr::Identifier(ident.clone());
// rules are like:
//
// "trace_id < '1'",
// "trace_id >= '1' AND trace_id < '2'",
// "trace_id >= '2' AND trace_id < '3'",
// "trace_id >= '3' AND trace_id < '4'",
// "trace_id >= '4' AND trace_id < '5'",
// "trace_id >= '5' AND trace_id < '6'",
// "trace_id >= '6' AND trace_id < '7'",
// "trace_id >= '7' AND trace_id < '8'",
// "trace_id >= '8' AND trace_id < '9'",
// "trace_id >= '9' AND trace_id < 'A'",
// "trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b'",
// "trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c'",
// "trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd'",
// "trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e'",
// "trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f'",
// "trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'",
let rules = vec![
Expr::BinaryOp {
left: Box::new(ident_expr.clone()),
op: BinaryOperator::Lt,
right: Box::new(Expr::Value(Value::SingleQuotedString("1".to_string()))),
},
// [left, right)
between_string!(ident_expr, "1", "2"),
between_string!(ident_expr, "2", "3"),
between_string!(ident_expr, "3", "4"),
between_string!(ident_expr, "4", "5"),
between_string!(ident_expr, "5", "6"),
between_string!(ident_expr, "6", "7"),
between_string!(ident_expr, "7", "8"),
between_string!(ident_expr, "8", "9"),
between_string!(ident_expr, "9", "A"),
or!(
between_string!(ident_expr, "A", "B"),
between_string!(ident_expr, "a", "b")
),
or!(
between_string!(ident_expr, "B", "C"),
between_string!(ident_expr, "b", "c")
),
or!(
between_string!(ident_expr, "C", "D"),
between_string!(ident_expr, "c", "d")
),
or!(
between_string!(ident_expr, "D", "E"),
between_string!(ident_expr, "d", "e")
),
or!(
between_string!(ident_expr, "E", "F"),
between_string!(ident_expr, "e", "f")
),
or!(
between_string!(ident_expr, "F", "a"),
Expr::BinaryOp {
left: Box::new(ident_expr.clone()),
op: BinaryOperator::GtEq,
right: Box::new(Expr::Value(Value::SingleQuotedString("f".to_string()))),
}
),
];
Partitions {
column_list: vec![ident],
exprs: rules,
}
}
#[cfg(test)]
mod tests {
use sqlparser::ast::Expr;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use super::*;
#[test]
fn test_rules() {
let expr = vec![
"trace_id < '1'",
"trace_id >= '1' AND trace_id < '2'",
"trace_id >= '2' AND trace_id < '3'",
"trace_id >= '3' AND trace_id < '4'",
"trace_id >= '4' AND trace_id < '5'",
"trace_id >= '5' AND trace_id < '6'",
"trace_id >= '6' AND trace_id < '7'",
"trace_id >= '7' AND trace_id < '8'",
"trace_id >= '8' AND trace_id < '9'",
"trace_id >= '9' AND trace_id < 'A'",
"trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b'",
"trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c'",
"trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd'",
"trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e'",
"trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f'",
"trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'",
];
let dialect = GenericDialect {};
let results = expr
.into_iter()
.map(|s| {
let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
parser.parse_expr().unwrap()
})
.collect::<Vec<Expr>>();
assert_eq!(results, partition_rule_for_hexstring("trace_id").exprs);
}
}

View File

@@ -96,7 +96,8 @@ macro_rules! http_tests {
test_pipeline_dispatcher,
test_otlp_metrics,
test_otlp_traces,
test_otlp_traces_v0,
test_otlp_traces_v1,
test_otlp_logs,
test_loki_pb_logs,
test_loki_json_logs,
@@ -1320,7 +1321,7 @@ transform:
// 3. check schema
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '0', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"pipeline_schema",
&client,
@@ -2053,7 +2054,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_otlp_traces(store_type: StorageType) {
pub async fn test_otlp_traces_v0(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;
@@ -2125,6 +2126,99 @@ pub async fn test_otlp_traces(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_otlp_traces_v1(store_type: StorageType) {
// init
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;
const TRACE_V1: &str = "greptime_trace_v1";
let content = r#"
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
"#;
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();
// handshake
let client = TestClient::new(app).await;
// write traces data
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(TRACE_V1),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static("mytable"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// select traces data
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]]]"#;
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL,\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"trace_id\", \"span_id\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
"show create table mytable;",
expected_ddl,
)
.await;
// drop table
let res = client.get("/v1/sql?sql=drop table mytable;").send().await;
assert_eq!(res.status(), StatusCode::OK);
// write traces data with gzip
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderValue::from_static(TRACE_V1),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static("mytable"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
true,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// select traces data again
validate_data(
"otlp_traces_with_gzip",
&client,
"select * from mytable;",
expected,
)
.await;
guard.remove_all().await;
}
pub async fn test_otlp_logs(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_logs").await;