refactor!: add a opentelemetry_traces_operations table to aggregate (service_name, span_name, span_kind) to improve query performance (#7144)

refactor: add a `*_operations` table to aggregate `(service_name, span_name, span_kind)` to improve query performance

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-10-27 11:36:22 +08:00
committed by GitHub
parent d7ed6a69ab
commit 0a3961927d
8 changed files with 169 additions and 90 deletions

View File

@@ -150,4 +150,9 @@ pub const TRACE_TABLE_NAME_SESSION_KEY: &str = "trace_table_name";
pub fn trace_services_table_name(trace_table_name: &str) -> String {
format!("{}_services", trace_table_name)
}
/// Generate the trace operations table name from the trace table name by adding `_operations` suffix.
pub fn trace_operations_table_name(trace_table_name: &str) -> String {
format!("{}_operations", trace_table_name)
}
// ---- End of special table and fields ----

View File

@@ -17,7 +17,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_catalog::consts::{TRACE_TABLE_NAME, trace_services_table_name};
use common_catalog::consts::{
TRACE_TABLE_NAME, trace_operations_table_name, trace_services_table_name,
};
use common_function::function::FunctionRef;
use common_function::scalars::json::json_get::{
JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
@@ -76,8 +78,6 @@ impl JaegerQueryHandler for Instance {
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
start_time: Option<i64>,
end_time: Option<i64>,
) -> ServerResult<Output> {
let mut filters = vec![col(SERVICE_NAME_COLUMN).eq(lit(service_name))];
@@ -89,16 +89,6 @@ impl JaegerQueryHandler for Instance {
))));
}
if let Some(start_time) = start_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).gt_eq(lit_timestamp_nano(start_time * 1_000)));
}
if let Some(end_time) = end_time {
// Microseconds to nanoseconds.
filters.push(col(TIMESTAMP_COLUMN).lt_eq(lit_timestamp_nano(end_time * 1_000)));
}
// It's equivalent to the following SQL query:
//
// ```
@@ -107,8 +97,6 @@ impl JaegerQueryHandler for Instance {
// {db}.{trace_table}
// WHERE
// service_name = '{service_name}' AND
// timestamp >= {start_time} AND
// timestamp <= {end_time} AND
// span_kind = '{span_kind}'
// ORDER BY
// span_name ASC
@@ -301,12 +289,18 @@ async fn query_trace_table(
.unwrap_or(TRACE_TABLE_NAME);
// If only select services, use the trace services table.
// If querying operations (distinct by span_name and span_kind), use the trace operations table.
let table_name = {
if match selects.as_slice() {
[SelectExpr::Expression(x)] => x == &col(SERVICE_NAME_COLUMN),
_ => false,
} {
&trace_services_table_name(trace_table_name)
} else if !distincts.is_empty()
&& distincts.contains(&col(SPAN_NAME_COLUMN))
&& distincts.contains(&col(SPAN_KIND_COLUMN))
{
&trace_operations_table_name(trace_table_name)
} else {
trace_table_name
}

View File

@@ -29,7 +29,8 @@ use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::{
PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_services_table_name,
TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
trace_services_table_name,
};
use common_grpc_expr::util::ColumnExpr;
use common_meta::cache::TableFlownodeSetCacheRef;
@@ -618,8 +619,10 @@ impl Inserter {
// note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly
for mut create_table in create_tables {
if create_table.table_name == trace_services_table_name(trace_table_name) {
// Disable append mode for trace services table since it requires upsert behavior.
if create_table.table_name == trace_services_table_name(trace_table_name)
|| create_table.table_name == trace_operations_table_name(trace_table_name)
{
// Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
create_table
.table_options
.insert(APPEND_MODE_KEY.to_string(), "false".to_string());

View File

@@ -21,7 +21,6 @@ use axum::Extension;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
use axum::response::IntoResponse;
use chrono::Utc;
use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -52,7 +51,6 @@ pub const JAEGER_QUERY_TABLE_NAME_KEY: &str = "jaeger_query_table_name";
const REF_TYPE_CHILD_OF: &str = "CHILD_OF";
const SPAN_KIND_TIME_FMTS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.6f%z", "%Y-%m-%d %H:%M:%S%.9f%z"];
pub const JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER: &str = "x-greptime-jaeger-query-time-range";
/// JaegerAPIResponse is the response of Jaeger HTTP API.
/// The original version is `structuredResponse` which is defined in https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/http_handler.go.
@@ -528,13 +526,6 @@ pub async fn handle_get_operations(
query_params, query_ctx, headers
);
let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
Ok((start, end)) => (start, end),
Err(e) => return error_response(e),
};
debug!("Get operations with start: {:?}, end: {:?}", start, end);
if let Some(service_name) = &query_params.service_name {
update_query_context(&mut query_ctx, table_name);
let query_ctx = Arc::new(query_ctx);
@@ -546,13 +537,7 @@ pub async fn handle_get_operations(
.start_timer();
match handler
.get_operations(
query_ctx,
service_name,
query_params.span_kind.as_deref(),
start,
end,
)
.get_operations(query_ctx, service_name, query_params.span_kind.as_deref())
.await
{
Ok(output) => match covert_to_records(output).await {
@@ -625,15 +610,7 @@ pub async fn handle_get_operations_by_service(
.with_label_values(&[&db, "/api/services"])
.start_timer();
let (start, end) = match parse_jaeger_time_range_for_operations(&headers, &query_params) {
Ok((start, end)) => (start, end),
Err(e) => return error_response(e),
};
match handler
.get_operations(query_ctx, &service_name, None, start, end)
.await
{
match handler.get_operations(query_ctx, &service_name, None).await {
Ok(output) => match covert_to_records(output).await {
Ok(Some(records)) => match operations_from_records(records, false) {
Ok(operations) => {
@@ -1117,42 +1094,6 @@ fn convert_string_to_boolean(input: &serde_json::Value) -> Option<serde_json::Va
None
}
fn parse_jaeger_time_range_for_operations(
headers: &HeaderMap,
query_params: &JaegerQueryParams,
) -> Result<(Option<i64>, Option<i64>)> {
if let Some(time_range) = headers.get(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER) {
match time_range.to_str() {
Ok(time_range) => match humantime::parse_duration(time_range) {
Ok(duration) => {
debug!(
"Get operations with time range: {:?}, duration: {:?}",
time_range, duration
);
let now = Utc::now().timestamp_micros();
Ok((Some(now - duration.as_micros() as i64), Some(now)))
}
Err(e) => {
error!("Failed to parse time range header: {:?}", e);
Err(InvalidJaegerQuerySnafu {
reason: format!("invalid time range header: {:?}", time_range),
}
.build())
}
},
Err(e) => {
error!("Failed to convert time range header to string: {:?}", e);
Err(InvalidJaegerQuerySnafu {
reason: format!("invalid time range header: {:?}", time_range),
}
.build())
}
}
} else {
Ok((query_params.start, query_params.end))
}
}
#[cfg(test)]
mod tests {
use serde_json::{Number, Value as JsonValue, json};

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests};
use common_catalog::consts::trace_services_table_name;
use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
use common_grpc::precision::Precision;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
@@ -35,6 +35,9 @@ use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 24;
// Use a timestamp(2100-01-01 00:00:00) as large as possible.
const MAX_TIMESTAMP: i64 = 4102444800000000000;
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
pub fn v0_to_grpc_insert_requests(
@@ -49,23 +52,40 @@ pub fn v0_to_grpc_insert_requests(
let mut multi_table_writer = MultiTableData::default();
let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
let mut services = HashSet::new();
let mut operations = HashSet::new();
for span in spans {
if let Some(service_name) = &span.service_name {
// Only insert the service name if it's not already in the set.
if !services.contains(service_name) {
services.insert(service_name.clone());
}
// Collect operations (service_name + span_name + span_kind).
let operation = (
service_name.clone(),
span.span_name.clone(),
span.span_kind.clone(),
);
if !operations.contains(&operation) {
operations.insert(operation);
}
}
write_span_to_row(&mut trace_writer, span)?;
}
write_trace_services_to_row(&mut trace_services_writer, services)?;
write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
multi_table_writer.add_table_data(
trace_services_table_name(&table_name),
trace_services_writer,
);
multi_table_writer.add_table_data(
trace_operations_table_name(&table_name),
trace_operations_writer,
);
multi_table_writer.add_table_data(table_name, trace_writer);
Ok(multi_table_writer.into_row_insert_requests())
@@ -161,7 +181,7 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>
row_writer::write_ts_to_nanos(
writer,
TIMESTAMP_COLUMN,
Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
Some(MAX_TIMESTAMP),
Precision::Nanosecond,
&mut row,
)?;
@@ -180,3 +200,35 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>
Ok(())
}
fn write_trace_operations_to_row(
writer: &mut TableData,
operations: HashSet<(String, String, String)>,
) -> Result<()> {
for (service_name, span_name, span_kind) in operations {
let mut row = writer.alloc_one_row();
// Write the timestamp as 0.
row_writer::write_ts_to_nanos(
writer,
TIMESTAMP_COLUMN,
Some(MAX_TIMESTAMP),
Precision::Nanosecond,
&mut row,
)?;
// Write the `service_name`, `span_name`, and `span_kind` columns.
row_writer::write_fields(
writer,
vec![
make_string_column_data(SERVICE_NAME_COLUMN, Some(service_name)),
make_string_column_data(SPAN_NAME_COLUMN, Some(span_name)),
make_string_column_data(SPAN_KIND_COLUMN, Some(span_kind)),
]
.into_iter(),
&mut row,
)?;
writer.add_row(row);
}
Ok(())
}

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests, Value};
use common_catalog::consts::trace_services_table_name;
use common_catalog::consts::{trace_operations_table_name, trace_services_table_name};
use common_grpc::precision::Precision;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
@@ -37,6 +37,9 @@ use crate::row_writer::{self, MultiTableData, TableData};
const APPROXIMATE_COLUMN_COUNT: usize = 30;
// Use a timestamp(2100-01-01 00:00:00) as large as possible.
const MAX_TIMESTAMP: i64 = 4102444800000000000;
/// Convert SpanTraces to GreptimeDB row insert requests.
/// Returns `InsertRequests` and total number of rows to ingest
///
@@ -60,23 +63,40 @@ pub fn v1_to_grpc_insert_requests(
let mut multi_table_writer = MultiTableData::default();
let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
let mut trace_operations_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
let mut services = HashSet::new();
let mut operations = HashSet::new();
for span in spans {
if let Some(service_name) = &span.service_name {
// Only insert the service name if it's not already in the set.
if !services.contains(service_name) {
services.insert(service_name.clone());
}
// Only insert the operation if it's not already in the set.
let operation = (
service_name.clone(),
span.span_name.clone(),
span.span_kind.clone(),
);
if !operations.contains(&operation) {
operations.insert(operation);
}
}
write_span_to_row(&mut trace_writer, span)?;
}
write_trace_services_to_row(&mut trace_services_writer, services)?;
write_trace_operations_to_row(&mut trace_operations_writer, operations)?;
multi_table_writer.add_table_data(
trace_services_table_name(&table_name),
trace_services_writer,
);
multi_table_writer.add_table_data(
trace_operations_table_name(&table_name),
trace_operations_writer,
);
multi_table_writer.add_table_data(table_name, trace_writer);
Ok(multi_table_writer.into_row_insert_requests())
@@ -160,7 +180,7 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>
row_writer::write_ts_to_nanos(
writer,
TIMESTAMP_COLUMN,
Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
Some(MAX_TIMESTAMP),
Precision::Nanosecond,
&mut row,
)?;
@@ -177,6 +197,38 @@ fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>
Ok(())
}
fn write_trace_operations_to_row(
writer: &mut TableData,
operations: HashSet<(String, String, String)>,
) -> Result<()> {
for (service_name, span_name, span_kind) in operations {
let mut row = writer.alloc_one_row();
// Write the timestamp as 0.
row_writer::write_ts_to_nanos(
writer,
TIMESTAMP_COLUMN,
Some(MAX_TIMESTAMP),
Precision::Nanosecond,
&mut row,
)?;
// Write the `service_name`, `span_name`, and `span_kind` columns as tags.
row_writer::write_tags(
writer,
vec![
(SERVICE_NAME_COLUMN.to_string(), service_name),
(SPAN_NAME_COLUMN.to_string(), span_name),
(SPAN_KIND_COLUMN.to_string(), span_kind),
]
.into_iter(),
&mut row,
)?;
writer.add_row(row);
}
Ok(())
}
fn write_attributes(
writer: &mut TableData,
prefix: &str,

View File

@@ -198,8 +198,6 @@ pub trait JaegerQueryHandler {
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
start_time: Option<i64>,
end_time: Option<i64>,
) -> Result<Output>;
/// Retrieves a trace by its unique identifier.

View File

@@ -25,7 +25,8 @@ use auth::user_provider_from_option;
use axum::http::{HeaderName, HeaderValue, StatusCode};
use chrono::Utc;
use common_catalog::consts::{
DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME, trace_services_table_name,
DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME, trace_operations_table_name,
trace_services_table_name,
};
use common_error::status_code::StatusCode as ErrorCode;
use common_frontend::slow_query_event::{
@@ -48,7 +49,6 @@ use servers::http::header::constants::{
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
};
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::jaeger::JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER;
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::result::error_result::ErrorResponse;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
@@ -4496,6 +4496,19 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) {
)
.await;
// Validate operations table
let expected = r#"[["telemetrygen","SPAN_KIND_CLIENT","lets-go"],["telemetrygen","SPAN_KIND_SERVER","okey-dokey-0"]]"#;
validate_data(
"otlp_traces_operations",
&client,
&format!(
"select service_name, span_kind, span_name from {} order by span_kind, span_name;",
trace_operations_table_name(TRACE_TABLE_NAME)
),
expected,
)
.await;
// select traces data
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179",null,"SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#;
validate_data(
@@ -4611,6 +4624,19 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
)
.await;
// Validate operations table
let expected = r#"[["telemetrygen","SPAN_KIND_CLIENT","lets-go"],["telemetrygen","SPAN_KIND_SERVER","okey-dokey-0"]]"#;
validate_data(
"otlp_traces_operations_v1",
&client,
&format!(
"select service_name, span_kind, span_name from {} order by span_kind, span_name;",
trace_operations_table_name(trace_table_name)
),
expected,
)
.await;
// select traces data
let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#;
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
@@ -6017,19 +6043,26 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
let res = client
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
.header("x-greptime-trace-table-name", trace_table_name)
.header(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER, "3 days")
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = r#"
{
"data": [
{
"name": "access-mysql",
"spanKind": "server"
},
{
"name": "access-pg",
"spanKind": "server"
},
{
"name": "access-redis",
"spanKind": "server"
}
],
"total": 1,
"total": 3,
"limit": 0,
"offset": 0,
"errors": []
@@ -6050,9 +6083,10 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
{
"data": [
"access-mysql",
"access-pg",
"access-redis"
],
"total": 2,
"total": 3,
"limit": 0,
"offset": 0,
"errors": []