mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 20:00:36 +00:00
refactor: improve performance for Jaeger APIs (#5838)
* refactor: improve jaeger '/api/services' performance by adding the trace services table * chore: refine some logic * chore: compatible v0 * test: add integration test * chore: expand default limit from 100 to 2000 * test: fix integration test * refactor: make trace service table configurable * refactor: use a timestamp(2100-01-01 00:00:00) as large as possible * refactor: use '<trace_table>_services' as trace services table name
This commit is contained in:
@@ -137,4 +137,12 @@ pub const SPAN_ID_COLUMN: &str = "span_id";
|
||||
pub const SPAN_NAME_COLUMN: &str = "span_name";
|
||||
pub const SERVICE_NAME_COLUMN: &str = "service_name";
|
||||
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
|
||||
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
|
||||
pub const TRACE_TABLE_NAME_SESSION_KEY: &str = "trace_table_name";
|
||||
// ---- End of special table and fields ----
|
||||
|
||||
/// Generate the trace services table name from the trace table name by adding `_services` suffix.
|
||||
pub fn trace_services_table_name(trace_table_name: &str) -> String {
|
||||
format!("{}_services", trace_table_name)
|
||||
}
|
||||
// ---- End of special table and fields ----
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
|
||||
use common_function::function::{Function, FunctionRef};
|
||||
use common_function::scalars::json::json_get::{
|
||||
JsonGetBool, JsonGetFloat, JsonGetInt, JsonGetString,
|
||||
@@ -38,7 +39,7 @@ use servers::error::{
|
||||
use servers::http::jaeger::{QueryTraceParams, JAEGER_QUERY_TABLE_NAME_KEY};
|
||||
use servers::otlp::trace::{
|
||||
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
|
||||
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
|
||||
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
};
|
||||
use servers::query_handler::JaegerQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -48,7 +49,7 @@ use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use super::Instance;
|
||||
|
||||
const DEFAULT_LIMIT: usize = 100;
|
||||
const DEFAULT_LIMIT: usize = 2000;
|
||||
|
||||
#[async_trait]
|
||||
impl JaegerQueryHandler for Instance {
|
||||
@@ -61,7 +62,7 @@ impl JaegerQueryHandler for Instance {
|
||||
vec![col(SERVICE_NAME_COLUMN)],
|
||||
vec![],
|
||||
vec![],
|
||||
Some(DEFAULT_LIMIT),
|
||||
None,
|
||||
None,
|
||||
vec![col(SERVICE_NAME_COLUMN)],
|
||||
)
|
||||
@@ -216,10 +217,19 @@ async fn query_trace_table(
|
||||
tags: Option<HashMap<String, JsonValue>>,
|
||||
distincts: Vec<Expr>,
|
||||
) -> ServerResult<Output> {
|
||||
let table_name = ctx
|
||||
let trace_table_name = ctx
|
||||
.extension(JAEGER_QUERY_TABLE_NAME_KEY)
|
||||
.unwrap_or(TRACE_TABLE_NAME);
|
||||
|
||||
// If only select services, use the trace services table.
|
||||
let table_name = {
|
||||
if selects.len() == 1 && selects[0] == col(SERVICE_NAME_COLUMN) {
|
||||
&trace_services_table_name(trace_table_name)
|
||||
} else {
|
||||
trace_table_name
|
||||
}
|
||||
};
|
||||
|
||||
let table = catalog_manager
|
||||
.table(
|
||||
ctx.current_catalog(),
|
||||
|
||||
@@ -28,7 +28,8 @@ use api::v1::{
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::{OutputData, OutputMeta};
|
||||
use common_catalog::consts::{
|
||||
default_engine, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN,
|
||||
default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
|
||||
TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY,
|
||||
};
|
||||
use common_grpc_expr::util::ColumnExpr;
|
||||
use common_meta::cache::TableFlownodeSetCacheRef;
|
||||
@@ -571,53 +572,69 @@ impl Inserter {
|
||||
}
|
||||
|
||||
AutoCreateTableType::Trace => {
|
||||
let trace_table_name = ctx
|
||||
.extension(TRACE_TABLE_NAME_SESSION_KEY)
|
||||
.unwrap_or(TRACE_TABLE_NAME);
|
||||
|
||||
// note that auto create table shouldn't be ttl instant table
|
||||
// for it's a very unexpected behavior and should be set by user explicitly
|
||||
for mut create_table in create_tables {
|
||||
// prebuilt partition rules for uuid data: see the function
|
||||
// for more information
|
||||
let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN);
|
||||
// add skip index to
|
||||
// - trace_id: when searching by trace id
|
||||
// - parent_span_id: when searching root span
|
||||
// - span_name: when searching certain types of span
|
||||
let index_columns =
|
||||
[TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
|
||||
for index_column in index_columns {
|
||||
if let Some(col) = create_table
|
||||
.column_defs
|
||||
.iter_mut()
|
||||
.find(|c| c.name == index_column)
|
||||
{
|
||||
col.options = options_from_skipping(&SkippingIndexOptions::default())
|
||||
.context(ColumnOptionsSnafu)?;
|
||||
} else {
|
||||
warn!(
|
||||
"Column {} not found when creating index for trace table: {}.",
|
||||
index_column, create_table.table_name
|
||||
);
|
||||
if create_table.table_name == trace_services_table_name(trace_table_name) {
|
||||
let table = self
|
||||
.create_physical_table(create_table, None, ctx, statement_executor)
|
||||
.await?;
|
||||
let table_info = table.table_info();
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
} else {
|
||||
// prebuilt partition rules for uuid data: see the function
|
||||
// for more information
|
||||
let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN);
|
||||
// add skip index to
|
||||
// - trace_id: when searching by trace id
|
||||
// - parent_span_id: when searching root span
|
||||
// - span_name: when searching certain types of span
|
||||
let index_columns =
|
||||
[TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
|
||||
for index_column in index_columns {
|
||||
if let Some(col) = create_table
|
||||
.column_defs
|
||||
.iter_mut()
|
||||
.find(|c| c.name == index_column)
|
||||
{
|
||||
col.options =
|
||||
options_from_skipping(&SkippingIndexOptions::default())
|
||||
.context(ColumnOptionsSnafu)?;
|
||||
} else {
|
||||
warn!(
|
||||
"Column {} not found when creating index for trace table: {}.",
|
||||
index_column, create_table.table_name
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// use table_options to mark table model version
|
||||
create_table.table_options.insert(
|
||||
TABLE_DATA_MODEL.to_string(),
|
||||
TABLE_DATA_MODEL_TRACE_V1.to_string(),
|
||||
);
|
||||
// use table_options to mark table model version
|
||||
create_table.table_options.insert(
|
||||
TABLE_DATA_MODEL.to_string(),
|
||||
TABLE_DATA_MODEL_TRACE_V1.to_string(),
|
||||
);
|
||||
|
||||
let table = self
|
||||
.create_physical_table(
|
||||
create_table,
|
||||
Some(partitions),
|
||||
ctx,
|
||||
statement_executor,
|
||||
)
|
||||
.await?;
|
||||
let table_info = table.table_info();
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
let table = self
|
||||
.create_physical_table(
|
||||
create_table,
|
||||
Some(partitions),
|
||||
ctx,
|
||||
statement_executor,
|
||||
)
|
||||
.await?;
|
||||
let table_info = table.table_info();
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
}
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
}
|
||||
for alter_expr in alter_tables.into_iter() {
|
||||
statement_executor
|
||||
|
||||
@@ -58,6 +58,7 @@ pub mod constants {
|
||||
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
|
||||
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
|
||||
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";
|
||||
|
||||
/// The header key that contains the pipeline params.
|
||||
pub const GREPTIME_PIPELINE_PARAMS_HEADER: &str = "x-greptime-pipeline-params";
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use axum::http::{HeaderMap, StatusCode as HttpStatusCode};
|
||||
use axum::response::IntoResponse;
|
||||
use axum::Extension;
|
||||
use chrono::Utc;
|
||||
use common_catalog::consts::PARENT_SPAN_ID_COLUMN;
|
||||
use common_catalog::consts::{PARENT_SPAN_ID_COLUMN, TRACE_TABLE_NAME};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::{Output, OutputData};
|
||||
@@ -42,7 +42,7 @@ use crate::otlp::trace::{
|
||||
KEY_SERVICE_NAME, KEY_SPAN_KIND, RESOURCE_ATTRIBUTES_COLUMN, SCOPE_NAME_COLUMN,
|
||||
SCOPE_VERSION_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_EVENTS_COLUMN,
|
||||
SPAN_ID_COLUMN, SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE,
|
||||
SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
|
||||
SPAN_STATUS_PREFIX, SPAN_STATUS_UNSET, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
|
||||
};
|
||||
use crate::query_handler::JaegerQueryHandlerRef;
|
||||
|
||||
@@ -337,7 +337,11 @@ pub async fn handle_get_services(
|
||||
query_params, query_ctx
|
||||
);
|
||||
|
||||
update_query_context(&mut query_ctx, table_name);
|
||||
query_ctx.set_channel(Channel::Jaeger);
|
||||
if let Some(table) = table_name {
|
||||
query_ctx.set_extension(JAEGER_QUERY_TABLE_NAME_KEY, table);
|
||||
}
|
||||
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let db = query_ctx.get_db_string();
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ use axum::http::header;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::Extension;
|
||||
use bytes::Bytes;
|
||||
use common_catalog::consts::{TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY};
|
||||
use common_telemetry::tracing;
|
||||
use opentelemetry_proto::tonic::collector::logs::v1::{
|
||||
ExportLogsServiceRequest, ExportLogsServiceResponse,
|
||||
@@ -38,7 +39,6 @@ use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
|
||||
use crate::error::{self, PipelineSnafu, Result};
|
||||
use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName};
|
||||
use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
|
||||
use crate::otlp::trace::TRACE_TABLE_NAME;
|
||||
use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler};
|
||||
|
||||
#[axum_macros::debug_handler]
|
||||
@@ -82,6 +82,8 @@ pub async fn traces(
|
||||
let table_name = table_name.unwrap_or_else(|| TRACE_TABLE_NAME.to_string());
|
||||
|
||||
query_ctx.set_channel(Channel::Otlp);
|
||||
query_ctx.set_extension(TRACE_TABLE_NAME_SESSION_KEY, &table_name);
|
||||
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
|
||||
.with_label_values(&[db.as_str()])
|
||||
|
||||
@@ -28,8 +28,6 @@ use session::context::QueryContextRef;
|
||||
use crate::error::{NotSupportedSnafu, Result};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
|
||||
pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces";
|
||||
|
||||
// column names
|
||||
pub const SERVICE_NAME_COLUMN: &str = "service_name";
|
||||
pub const TIMESTAMP_COLUMN: &str = "timestamp";
|
||||
|
||||
@@ -12,8 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests};
|
||||
use common_catalog::consts::trace_services_table_name;
|
||||
use common_grpc::precision::Precision;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use pipeline::{GreptimePipelineParams, PipelineWay};
|
||||
@@ -44,15 +47,26 @@ pub fn v0_to_grpc_insert_requests(
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let spans = parse(request);
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let one_table_writer = multi_table_writer.get_or_default_table_data(
|
||||
table_name,
|
||||
APPROXIMATE_COLUMN_COUNT,
|
||||
spans.len(),
|
||||
);
|
||||
let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
|
||||
let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
|
||||
|
||||
let mut services = HashSet::new();
|
||||
for span in spans {
|
||||
write_span_to_row(one_table_writer, span)?;
|
||||
if let Some(service_name) = &span.service_name {
|
||||
// Only insert the service name if it's not already in the set.
|
||||
if !services.contains(service_name) {
|
||||
services.insert(service_name.clone());
|
||||
}
|
||||
}
|
||||
write_span_to_row(&mut trace_writer, span)?;
|
||||
}
|
||||
write_trace_services_to_row(&mut trace_services_writer, services)?;
|
||||
|
||||
multi_table_writer.add_table_data(
|
||||
trace_services_table_name(&table_name),
|
||||
trace_services_writer,
|
||||
);
|
||||
multi_table_writer.add_table_data(table_name, trace_writer);
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
@@ -142,3 +156,27 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
|
||||
for service_name in services {
|
||||
let mut row = writer.alloc_one_row();
|
||||
// Write the timestamp as 0.
|
||||
row_writer::write_ts_to_nanos(
|
||||
writer,
|
||||
TIMESTAMP_COLUMN,
|
||||
Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
|
||||
Precision::Nanosecond,
|
||||
&mut row,
|
||||
)?;
|
||||
|
||||
// Write the `service_name` column.
|
||||
row_writer::write_fields(
|
||||
writer,
|
||||
std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)),
|
||||
&mut row,
|
||||
)?;
|
||||
writer.add_row(row);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -12,8 +12,11 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, RowInsertRequests, Value};
|
||||
use common_catalog::consts::trace_services_table_name;
|
||||
use common_grpc::precision::Precision;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
|
||||
@@ -55,16 +58,26 @@ pub fn v1_to_grpc_insert_requests(
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let spans = parse(request);
|
||||
let mut multi_table_writer = MultiTableData::default();
|
||||
let mut trace_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, spans.len());
|
||||
let mut trace_services_writer = TableData::new(APPROXIMATE_COLUMN_COUNT, 1);
|
||||
|
||||
let one_table_writer = multi_table_writer.get_or_default_table_data(
|
||||
table_name,
|
||||
APPROXIMATE_COLUMN_COUNT,
|
||||
spans.len(),
|
||||
);
|
||||
|
||||
let mut services = HashSet::new();
|
||||
for span in spans {
|
||||
write_span_to_row(one_table_writer, span)?;
|
||||
if let Some(service_name) = &span.service_name {
|
||||
// Only insert the service name if it's not already in the set.
|
||||
if !services.contains(service_name) {
|
||||
services.insert(service_name.clone());
|
||||
}
|
||||
}
|
||||
write_span_to_row(&mut trace_writer, span)?;
|
||||
}
|
||||
write_trace_services_to_row(&mut trace_services_writer, services)?;
|
||||
|
||||
multi_table_writer.add_table_data(
|
||||
trace_services_table_name(&table_name),
|
||||
trace_services_writer,
|
||||
);
|
||||
multi_table_writer.add_table_data(table_name, trace_writer);
|
||||
|
||||
Ok(multi_table_writer.into_row_insert_requests())
|
||||
}
|
||||
@@ -150,6 +163,30 @@ pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_trace_services_to_row(writer: &mut TableData, services: HashSet<String>) -> Result<()> {
|
||||
for service_name in services {
|
||||
let mut row = writer.alloc_one_row();
|
||||
// Write the timestamp as 0.
|
||||
row_writer::write_ts_to_nanos(
|
||||
writer,
|
||||
TIMESTAMP_COLUMN,
|
||||
Some(4102444800000000000), // Use a timestamp(2100-01-01 00:00:00) as large as possible.
|
||||
Precision::Nanosecond,
|
||||
&mut row,
|
||||
)?;
|
||||
|
||||
// Write the `service_name` column.
|
||||
row_writer::write_fields(
|
||||
writer,
|
||||
std::iter::once(make_string_column_data(SERVICE_NAME_COLUMN, service_name)),
|
||||
&mut row,
|
||||
)?;
|
||||
writer.add_row(row);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_attributes(
|
||||
writer: &mut TableData,
|
||||
prefix: &str,
|
||||
|
||||
@@ -20,6 +20,7 @@ use api::prom_store::remote::WriteRequest;
|
||||
use auth::user_provider_from_option;
|
||||
use axum::http::{HeaderName, HeaderValue, StatusCode};
|
||||
use chrono::Utc;
|
||||
use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
|
||||
use common_error::status_code::StatusCode as ErrorCode;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
@@ -2356,6 +2357,18 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) {
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
let expected = r#"[["telemetrygen"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
&format!(
|
||||
"select service_name from {};",
|
||||
trace_services_table_name(TRACE_TABLE_NAME)
|
||||
),
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// select traces data
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444376000,1736480942444499000,123000,"telemetrygen","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","d24f921c75f68e23","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1736480942444589000,1736480942444712000,123000,"telemetrygen","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","eba7be77e3558179","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#;
|
||||
validate_data(
|
||||
@@ -2428,6 +2441,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
{"resourceSpans":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"telemetrygen"}}],"droppedAttributesCount":0},"scopeSpans":[{"scope":{"name":"telemetrygen","version":"","attributes":[],"droppedAttributesCount":0},"spans":[{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"9630f2916e2f7909","traceState":"","parentSpanId":"d24f921c75f68e23","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"c05d7a4ec8e1f231f02ed6e8da8655b4","spanId":"d24f921c75f68e23","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444376000","endTimeUnixNano":"1736480942444499000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"8f847259b0f6e1ab","traceState":"","parentSpanId":"eba7be77e3558179","flags":256,"name":"okey-dokey-0","kind":2,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-client"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}},{"traceId":"cc9e0991a2e63d274984bd44ee669203","spanId":"eba7be77e3558179","traceState":"","parentSpanId":"","flags":256,"name":"lets-go","kind":3,"startTimeUnixNano":"1736480942444589000","endTimeUnixNano":"1736480942444712000","attributes":[{"key":"net.peer.ip","value":{"stringValue":"1.2.3.4"}},{"key":"peer.service","value":{"stringValue":"telemetrygen-server"}}],"droppedAttributesCount":0,"events":[],"droppedEventsCount":0,"links":[],"droppedLinksCount":0,"status":{"message":"","code":0}}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
|
||||
"#;
|
||||
|
||||
let trace_table_name = "mytable";
|
||||
let req: ExportTraceServiceRequest = serde_json::from_str(content).unwrap();
|
||||
let body = req.encode_to_vec();
|
||||
|
||||
@@ -2448,7 +2462,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
HeaderValue::from_static("mytable"),
|
||||
HeaderValue::from_static(trace_table_name),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/traces",
|
||||
@@ -2458,6 +2472,18 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
let expected = r#"[["telemetrygen"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
&format!(
|
||||
"select service_name from {};",
|
||||
trace_services_table_name(trace_table_name)
|
||||
),
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// select traces data
|
||||
let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#;
|
||||
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
|
||||
@@ -2471,6 +2497,18 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected_ddl = r#"[["mytable_services","CREATE TABLE IF NOT EXISTS \"mytable_services\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"service_name\" STRING NULL,\n TIME INDEX (\"timestamp\")\n)\n\nENGINE=mito\nWITH(\n append_mode = 'true'\n)"]]"#;
|
||||
validate_data(
|
||||
"otlp_traces",
|
||||
&client,
|
||||
&format!(
|
||||
"show create table {};",
|
||||
trace_services_table_name(trace_table_name)
|
||||
),
|
||||
expected_ddl,
|
||||
)
|
||||
.await;
|
||||
|
||||
// drop table
|
||||
let res = client.get("/v1/sql?sql=drop table mytable;").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
@@ -2489,7 +2527,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
HeaderValue::from_static("mytable"),
|
||||
HeaderValue::from_static(trace_table_name),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/traces",
|
||||
@@ -3361,11 +3399,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// Test empty response for `/api/services` API before writing any traces.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/services")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.send()
|
||||
.await;
|
||||
let res = client.get("/v1/jaeger/api/services").send().await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
let expected = r#"
|
||||
{
|
||||
@@ -3526,6 +3560,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
}
|
||||
let body = req.encode_to_vec();
|
||||
|
||||
let trace_table_name = "mytable";
|
||||
// write traces data.
|
||||
let res = send_req(
|
||||
&client,
|
||||
@@ -3540,7 +3575,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
),
|
||||
(
|
||||
HeaderName::from_static("x-greptime-trace-table-name"),
|
||||
HeaderValue::from_static("mytable"),
|
||||
HeaderValue::from_static(trace_table_name),
|
||||
),
|
||||
],
|
||||
"/v1/otlp/v1/traces",
|
||||
@@ -3553,7 +3588,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
// Test `/api/services` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/services")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.header("x-greptime-trace-table-name", trace_table_name)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
@@ -3575,7 +3610,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
// Test `/api/operations` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/operations?service=test-jaeger-query-api")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.header("x-greptime-trace-table-name", trace_table_name)
|
||||
.header(JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER, "3 days")
|
||||
.send()
|
||||
.await;
|
||||
@@ -3601,7 +3636,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
// Test `/api/services/{service_name}/operations` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/services/test-jaeger-query-api/operations?start=1738726754492421&end=1738726754642422")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.header("x-greptime-trace-table-name", trace_table_name)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
@@ -3624,7 +3659,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
// Test `/api/traces/{trace_id}` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces/5611dce1bc9ebed65352d99a027b08ea")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.header("x-greptime-trace-table-name", trace_table_name)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
@@ -3740,7 +3775,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
|
||||
// Test `/api/traces` API.
|
||||
let res = client
|
||||
.get("/v1/jaeger/api/traces?service=test-jaeger-query-api&operation=access-mysql&start=1738726754492421&end=1738726754642422&tags=%7B%22operation.type%22%3A%22access-mysql%22%7D")
|
||||
.header("x-greptime-trace-table-name", "mytable")
|
||||
.header("x-greptime-trace-table-name", trace_table_name)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(StatusCode::OK, res.status());
|
||||
|
||||
Reference in New Issue
Block a user