diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index aecca50e09..622c3e4b25 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -62,7 +62,7 @@ use table::TableRef; use table::metadata::TableInfo; use table::requests::{ AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL, - TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS, + TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS, }; use table::table_reference::TableReference; @@ -623,6 +623,23 @@ impl Inserter { .extension(TRACE_TABLE_NAME_SESSION_KEY) .unwrap_or(TRACE_TABLE_NAME); + let trace_table_partitions = if let Some(trace_table_partitions) = + ctx.extension(TRACE_TABLE_PARTITIONS_HINT_KEY) + { + let p = trace_table_partitions.parse::().map_err(|_| { + InvalidInsertRequestSnafu { + reason: format!( + "Failed to parse trace_table_partitions: {}", + trace_table_partitions + ), + } + .build() + })?; + Some(p) + } else { + None + }; + // note that auto create table shouldn't be ttl instant table // for it's a very unexpected behavior and should be set by user explicitly for mut create_table in create_tables { @@ -647,8 +664,18 @@ impl Inserter { } else { // prebuilt partition rules for uuid data: see the function // for more information - let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN) + let partitions = if matches!(trace_table_partitions, Some(0) | Some(1)) { + // disable partitions + None + } else { + let p = partition_rule_for_hexstring( + TRACE_ID_COLUMN, + trace_table_partitions, + ) .context(CreatePartitionRulesSnafu)?; + Some(p) + }; + // add skip index to // - trace_id: when searching by trace id // - parent_span_id: when searching root span @@ -681,7 +708,7 @@ impl Inserter { let table = self .create_physical_table( create_table, - Some(partitions), + partitions, ctx, statement_executor, ) diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 3d6057f046..ba0007bc06 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use axum::Extension; use axum::extract::State; -use axum::http::header; +use axum::http::{StatusCode, header}; use axum::response::IntoResponse; use axum_extra::TypedHeader; use bytes::Bytes; @@ -44,7 +44,15 @@ use crate::http::extractor::{ }; use crate::http::header::{CONTENT_TYPE_PROTOBUF, write_cost_header_map}; use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED; -use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler}; +use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler, TraceIngestOutcome}; + +#[derive(Clone, prost::Message)] +pub struct GoogleRpcStatus { + #[prost(int32, tag = "1")] + pub code: i32, + #[prost(string, tag = "2")] + pub message: String, +} fn is_json_content_type(content_type: Option<&ContentType>) -> bool { match content_type { @@ -129,7 +137,7 @@ pub async fn traces( Extension(mut query_ctx): Extension, content_type: Option>, bytes: Bytes, -) -> Result> { +) -> Result { if is_json_content_type(content_type.as_ref().map(|h| &h.0)) { return error::UnsupportedJsonContentTypeSnafu {}.fail(); } @@ -175,16 +183,14 @@ pub async fn traces( query_ctx, ) .await - .map(|outcome| OtlpResponse { - resp_body: ExportTraceServiceResponse { - partial_success: outcome.error_message.map(|error_message| { - ExportTracePartialSuccess { - rejected_spans: outcome.rejected_spans as i64, - error_message, - } - }), - }, - write_cost: outcome.write_cost, + .map(|outcome| { + if outcome.accepted_spans == 0 && outcome.rejected_spans > 0 { + OtlpTraceResponse::Failure(outcome) + } else if outcome.rejected_spans > 0 || outcome.error_message.is_some() { + OtlpTraceResponse::PartialSuccess(outcome) + } else { + OtlpTraceResponse::FullSuccess(outcome) + } }) } @@ -260,3 +266,49 @@ impl IntoResponse for OtlpResponse { (header_map, self.resp_body.encode_to_vec()).into_response() } } + +pub enum OtlpTraceResponse { + FullSuccess(TraceIngestOutcome), + PartialSuccess(TraceIngestOutcome), + Failure(TraceIngestOutcome), +} + +impl IntoResponse for OtlpTraceResponse { + fn into_response(self) -> axum::response::Response { + match self { + OtlpTraceResponse::FullSuccess(outcome) => { + let mut header_map = write_cost_header_map(outcome.write_cost); + header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); + let body = ExportTraceServiceResponse { + partial_success: None, + }; + (header_map, body.encode_to_vec()).into_response() + } + OtlpTraceResponse::PartialSuccess(outcome) => { + let mut header_map = write_cost_header_map(outcome.write_cost); + header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); + let body = ExportTraceServiceResponse { + partial_success: outcome.error_message.map(|error_message| { + ExportTracePartialSuccess { + rejected_spans: outcome.rejected_spans as i64, + error_message, + } + }), + }; + (header_map, body.encode_to_vec()).into_response() + } + OtlpTraceResponse::Failure(outcome) => { + let status = GoogleRpcStatus { + code: 0, + message: outcome.error_message.unwrap_or_default(), + }; + ( + StatusCode::BAD_REQUEST, + [(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.as_ref())], + status.encode_to_vec(), + ) + .into_response() + } + } + } +} diff --git a/src/sql/src/partition.rs b/src/sql/src/partition.rs index 0f1a7526dc..f562fabee2 100644 --- a/src/sql/src/partition.rs +++ b/src/sql/src/partition.rs @@ -46,10 +46,13 @@ macro_rules! between_string { }; } -pub fn partition_rule_for_hexstring(ident: &str) -> Result { +pub fn partition_rule_for_hexstring(ident: &str, partition_num: Option) -> Result { Ok(Partitions { column_list: vec![Ident::new(ident)], - exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?, + exprs: partition_rules_for_uuid( + partition_num.unwrap_or(DEFAULT_PARTITION_NUM_FOR_TRACES), + ident, + )?, }) } @@ -173,7 +176,30 @@ mod tests { assert_eq!( results, - partition_rule_for_hexstring("trace_id").unwrap().exprs + partition_rule_for_hexstring("trace_id", None) + .unwrap() + .exprs + ); + + // custom partition number + let expr = vec![ + "trace_id < '4'", + "trace_id >= '4' AND trace_id < '8'", + "trace_id >= '8' AND trace_id < 'c'", + "trace_id >= 'c'", + ]; + let results = expr + .into_iter() + .map(|s| { + let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap(); + parser.parse_expr().unwrap() + }) + .collect::>(); + assert_eq!( + results, + partition_rule_for_hexstring("trace_id", Some(4)) + .unwrap() + .exprs ); } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 15b4278f51..7e56026d93 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -145,6 +145,7 @@ pub const STORAGE_KEY: &str = "storage"; pub const COMMENT_KEY: &str = "comment"; pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table"; pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY; +pub const TRACE_TABLE_PARTITIONS_HINT_KEY: &str = "trace_table_partitions"; impl TableOptions { pub fn try_from_iter>( diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 29d4256864..a195e85e35 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -52,6 +52,7 @@ use servers::http::header::constants::{ GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME, }; use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; +use servers::http::otlp::GoogleRpcStatus; use servers::http::prometheus::{Column, PrometheusJsonResponse, PrometheusResponse}; use servers::http::result::error_result::ErrorResponse; use servers::http::result::greptime_result_v1::GreptimedbV1Response; @@ -5265,6 +5266,146 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { let res = client.get("/v1/sql?sql=drop table mytable;").send().await; assert_eq!(res.status(), StatusCode::OK); + // test trace table with custom partitions: 1 + let trace_table_part1 = "trace_table_part1"; + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME), + ), + ( + HeaderName::from_static("x-greptime-trace-table-name"), + HeaderValue::from_static(trace_table_part1), + ), + ( + HeaderName::from_static("x-greptime-hints"), + HeaderValue::from_static("trace_table_partitions=1"), + ), + ], + "/v1/otlp/v1/traces", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + validate_data( + "otlp_traces", + &client, + "show create table trace_table_part1;", + expected_ddl, + ) + .await; + + // test trace table with custom partitions: 4 + let trace_table_part4 = "trace_table_part4"; + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME), + ), + ( + HeaderName::from_static("x-greptime-trace-table-name"), + HeaderValue::from_static(trace_table_part4), + ), + ( + HeaderName::from_static("x-greptime-hints"), + HeaderValue::from_static("trace_table_partitions=4"), + ), + ], + "/v1/otlp/v1/traces", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + validate_data( + "otlp_traces", + &client, + "show create table trace_table_part4;", + expected_ddl, + ) + .await; + + // test trace table with custom partitions: 32 + let trace_table_part32 = "trace_table_part32"; + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME), + ), + ( + HeaderName::from_static("x-greptime-trace-table-name"), + HeaderValue::from_static(trace_table_part32), + ), + ( + HeaderName::from_static("x-greptime-hints"), + HeaderValue::from_static("trace_table_partitions=32"), + ), + ], + "/v1/otlp/v1/traces", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + validate_data( + "otlp_traces", + &client, + "show create table trace_table_part32;", + expected_ddl, + ) + .await; + + // invalid partition count + let trace_table_part_abc = "trace_table_part_abc"; + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME), + ), + ( + HeaderName::from_static("x-greptime-trace-table-name"), + HeaderValue::from_static(trace_table_part_abc), + ), + ( + HeaderName::from_static("x-greptime-hints"), + HeaderValue::from_static("trace_table_partitions=abc"), + ), + ], + "/v1/otlp/v1/traces", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::BAD_REQUEST, res.status()); + // write traces data with gzip let res = send_req( &client, @@ -5630,15 +5771,14 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { false, ) .await; - assert_eq!(StatusCode::OK, res.status()); - let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap(); - let partial_success = body.partial_success.as_ref().unwrap(); - assert_eq!(partial_success.rejected_spans, 1); + assert_eq!(StatusCode::BAD_REQUEST, res.status()); + let body = res.bytes().await; + let status = GoogleRpcStatus::decode(body.as_ref()).unwrap(); assert!( - partial_success - .error_message + status + .message .contains("Accepted 0 spans, rejected 1 spans"), - "unexpected partial success body: {body:?}" + "unexpected error body: {status:?}" ); validate_data( @@ -5840,24 +5980,18 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { false, ) .await; - assert_eq!(StatusCode::OK, res.status()); - let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap(); - let partial_success = body.partial_success.as_ref().unwrap(); - assert_eq!(partial_success.rejected_spans, 2); + assert_eq!(StatusCode::BAD_REQUEST, res.status()); + let body = GoogleRpcStatus::decode(res.bytes().await.as_ref()).unwrap(); assert!( - partial_success - .error_message - .contains("Accepted 0 spans, rejected 2 spans"), - "unexpected partial success body: {body:?}" + body.message.contains("Accepted 0 spans, rejected 2 spans"), + "unexpected error body: {body:?}" ); assert!( - partial_success - .error_message - .contains("Chunk fallback triggered by") - || partial_success - .error_message + body.message.contains("Chunk fallback triggered by") + || body + .message .contains("Discarded 2 spans after ambiguous chunk failure"), - "unexpected partial success body: {body:?}" + "unexpected error body: {body:?}" ); guard.remove_all().await;