feat: allow customizing trace table partitions (#7944)

* feat: allow customizing trace table partitions

* feat: add hint

* feat: return error on invalid partition number

* feat: add full failure/partial success/full success

* chore: format

* fix: address review suggestion
This commit is contained in:
Ning Sun
2026-04-13 22:10:55 +08:00
committed by GitHub
parent 3750819f93
commit 32a2990802
5 changed files with 280 additions and 40 deletions

View File

@@ -62,7 +62,7 @@ use table::TableRef;
use table::metadata::TableInfo;
use table::requests::{
AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS,
};
use table::table_reference::TableReference;
@@ -623,6 +623,23 @@ impl Inserter {
.extension(TRACE_TABLE_NAME_SESSION_KEY)
.unwrap_or(TRACE_TABLE_NAME);
let trace_table_partitions = if let Some(trace_table_partitions) =
ctx.extension(TRACE_TABLE_PARTITIONS_HINT_KEY)
{
let p = trace_table_partitions.parse::<u32>().map_err(|_| {
InvalidInsertRequestSnafu {
reason: format!(
"Failed to parse trace_table_partitions: {}",
trace_table_partitions
),
}
.build()
})?;
Some(p)
} else {
None
};
// note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly
for mut create_table in create_tables {
@@ -647,8 +664,18 @@ impl Inserter {
} else {
// prebuilt partition rules for uuid data: see the function
// for more information
let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
let partitions = if matches!(trace_table_partitions, Some(0) | Some(1)) {
// disable partitions
None
} else {
let p = partition_rule_for_hexstring(
TRACE_ID_COLUMN,
trace_table_partitions,
)
.context(CreatePartitionRulesSnafu)?;
Some(p)
};
// add skip index to
// - trace_id: when searching by trace id
// - parent_span_id: when searching root span
@@ -681,7 +708,7 @@ impl Inserter {
let table = self
.create_physical_table(
create_table,
Some(partitions),
partitions,
ctx,
statement_executor,
)

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use axum::Extension;
use axum::extract::State;
use axum::http::header;
use axum::http::{StatusCode, header};
use axum::response::IntoResponse;
use axum_extra::TypedHeader;
use bytes::Bytes;
@@ -44,7 +44,15 @@ use crate::http::extractor::{
};
use crate::http::header::{CONTENT_TYPE_PROTOBUF, write_cost_header_map};
use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler};
use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler, TraceIngestOutcome};
#[derive(Clone, prost::Message)]
pub struct GoogleRpcStatus {
#[prost(int32, tag = "1")]
pub code: i32,
#[prost(string, tag = "2")]
pub message: String,
}
fn is_json_content_type(content_type: Option<&ContentType>) -> bool {
match content_type {
@@ -129,7 +137,7 @@ pub async fn traces(
Extension(mut query_ctx): Extension<QueryContext>,
content_type: Option<TypedHeader<ContentType>>,
bytes: Bytes,
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
) -> Result<OtlpTraceResponse> {
if is_json_content_type(content_type.as_ref().map(|h| &h.0)) {
return error::UnsupportedJsonContentTypeSnafu {}.fail();
}
@@ -175,16 +183,14 @@ pub async fn traces(
query_ctx,
)
.await
.map(|outcome| OtlpResponse {
resp_body: ExportTraceServiceResponse {
partial_success: outcome.error_message.map(|error_message| {
ExportTracePartialSuccess {
rejected_spans: outcome.rejected_spans as i64,
error_message,
}
}),
},
write_cost: outcome.write_cost,
.map(|outcome| {
if outcome.accepted_spans == 0 && outcome.rejected_spans > 0 {
OtlpTraceResponse::Failure(outcome)
} else if outcome.rejected_spans > 0 || outcome.error_message.is_some() {
OtlpTraceResponse::PartialSuccess(outcome)
} else {
OtlpTraceResponse::FullSuccess(outcome)
}
})
}
@@ -260,3 +266,49 @@ impl<T: Message> IntoResponse for OtlpResponse<T> {
(header_map, self.resp_body.encode_to_vec()).into_response()
}
}
pub enum OtlpTraceResponse {
FullSuccess(TraceIngestOutcome),
PartialSuccess(TraceIngestOutcome),
Failure(TraceIngestOutcome),
}
impl IntoResponse for OtlpTraceResponse {
fn into_response(self) -> axum::response::Response {
match self {
OtlpTraceResponse::FullSuccess(outcome) => {
let mut header_map = write_cost_header_map(outcome.write_cost);
header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone());
let body = ExportTraceServiceResponse {
partial_success: None,
};
(header_map, body.encode_to_vec()).into_response()
}
OtlpTraceResponse::PartialSuccess(outcome) => {
let mut header_map = write_cost_header_map(outcome.write_cost);
header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone());
let body = ExportTraceServiceResponse {
partial_success: outcome.error_message.map(|error_message| {
ExportTracePartialSuccess {
rejected_spans: outcome.rejected_spans as i64,
error_message,
}
}),
};
(header_map, body.encode_to_vec()).into_response()
}
OtlpTraceResponse::Failure(outcome) => {
let status = GoogleRpcStatus {
code: 0,
message: outcome.error_message.unwrap_or_default(),
};
(
StatusCode::BAD_REQUEST,
[(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.as_ref())],
status.encode_to_vec(),
)
.into_response()
}
}
}
}

View File

@@ -46,10 +46,13 @@ macro_rules! between_string {
};
}
pub fn partition_rule_for_hexstring(ident: &str) -> Result<Partitions> {
pub fn partition_rule_for_hexstring(ident: &str, partition_num: Option<u32>) -> Result<Partitions> {
Ok(Partitions {
column_list: vec![Ident::new(ident)],
exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?,
exprs: partition_rules_for_uuid(
partition_num.unwrap_or(DEFAULT_PARTITION_NUM_FOR_TRACES),
ident,
)?,
})
}
@@ -173,7 +176,30 @@ mod tests {
assert_eq!(
results,
partition_rule_for_hexstring("trace_id").unwrap().exprs
partition_rule_for_hexstring("trace_id", None)
.unwrap()
.exprs
);
// custom partition number
let expr = vec![
"trace_id < '4'",
"trace_id >= '4' AND trace_id < '8'",
"trace_id >= '8' AND trace_id < 'c'",
"trace_id >= 'c'",
];
let results = expr
.into_iter()
.map(|s| {
let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
parser.parse_expr().unwrap()
})
.collect::<Vec<Expr>>();
assert_eq!(
results,
partition_rule_for_hexstring("trace_id", Some(4))
.unwrap()
.exprs
);
}

View File

@@ -145,6 +145,7 @@ pub const STORAGE_KEY: &str = "storage";
pub const COMMENT_KEY: &str = "comment";
pub const AUTO_CREATE_TABLE_KEY: &str = "auto_create_table";
pub const SKIP_WAL_KEY: &str = store_api::mito_engine_options::SKIP_WAL_KEY;
pub const TRACE_TABLE_PARTITIONS_HINT_KEY: &str = "trace_table_partitions";
impl TableOptions {
pub fn try_from_iter<T: ToString, U: IntoIterator<Item = (T, T)>>(

View File

@@ -52,6 +52,7 @@ use servers::http::header::constants::{
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
};
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::otlp::GoogleRpcStatus;
use servers::http::prometheus::{Column, PrometheusJsonResponse, PrometheusResponse};
use servers::http::result::error_result::ErrorResponse;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
@@ -5265,6 +5266,146 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
let res = client.get("/v1/sql?sql=drop table mytable;").send().await;
assert_eq!(res.status(), StatusCode::OK);
// test trace table with custom partitions: 1
let trace_table_part1 = "trace_table_part1";
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static(trace_table_part1),
),
(
HeaderName::from_static("x-greptime-hints"),
HeaderValue::from_static("trace_table_partitions=1"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected_ddl = r#"[["trace_table_part1","CREATE TABLE IF NOT EXISTS \"trace_table_part1\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
"show create table trace_table_part1;",
expected_ddl,
)
.await;
// test trace table with custom partitions: 4
let trace_table_part4 = "trace_table_part4";
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static(trace_table_part4),
),
(
HeaderName::from_static("x-greptime-hints"),
HeaderValue::from_static("trace_table_partitions=4"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected_ddl = r#"[["trace_table_part4","CREATE TABLE IF NOT EXISTS \"trace_table_part4\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '4',\n trace_id >= '4' AND trace_id < '8',\n trace_id >= '8' AND trace_id < 'c',\n trace_id >= 'c'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
"show create table trace_table_part4;",
expected_ddl,
)
.await;
// test trace table with custom partitions: 32
let trace_table_part32 = "trace_table_part32";
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static(trace_table_part32),
),
(
HeaderName::from_static("x-greptime-hints"),
HeaderValue::from_static("trace_table_partitions=32"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected_ddl = r#"[["trace_table_part32","CREATE TABLE IF NOT EXISTS \"trace_table_part32\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '08',\n trace_id >= '08' AND trace_id < '10',\n trace_id >= '10' AND trace_id < '18',\n trace_id >= '18' AND trace_id < '20',\n trace_id >= '20' AND trace_id < '28',\n trace_id >= '28' AND trace_id < '30',\n trace_id >= '30' AND trace_id < '38',\n trace_id >= '38' AND trace_id < '40',\n trace_id >= '40' AND trace_id < '48',\n trace_id >= '48' AND trace_id < '50',\n trace_id >= '50' AND trace_id < '58',\n trace_id >= '58' AND trace_id < '60',\n trace_id >= '60' AND trace_id < '68',\n trace_id >= '68' AND trace_id < '70',\n trace_id >= '70' AND trace_id < '78',\n trace_id >= '78' AND trace_id < '80',\n trace_id >= '80' AND trace_id < '88',\n trace_id >= '88' AND trace_id < '90',\n trace_id >= '90' AND trace_id < '98',\n trace_id >= '98' AND trace_id < 'a0',\n trace_id >= 'a0' AND trace_id < 'a8',\n trace_id >= 'a8' AND trace_id < 'b0',\n trace_id >= 'b0' AND trace_id < 'b8',\n trace_id >= 'b8' AND trace_id < 'c0',\n trace_id >= 'c0' AND trace_id < 'c8',\n trace_id >= 'c8' AND trace_id < 'd0',\n trace_id >= 'd0' AND trace_id < 'd8',\n trace_id >= 'd8' AND trace_id < 'e0',\n trace_id >= 'e0' AND trace_id < 'e8',\n trace_id >= 'e8' AND trace_id < 'f0',\n trace_id >= 'f0' AND trace_id < 'f8',\n trace_id >= 'f8'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
"show create table trace_table_part32;",
expected_ddl,
)
.await;
// invalid partition count
let trace_table_part_abc = "trace_table_part_abc";
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
HeaderName::from_static("x-greptime-trace-table-name"),
HeaderValue::from_static(trace_table_part_abc),
),
(
HeaderName::from_static("x-greptime-hints"),
HeaderValue::from_static("trace_table_partitions=abc"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::BAD_REQUEST, res.status());
// write traces data with gzip
let res = send_req(
&client,
@@ -5630,15 +5771,14 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap();
let partial_success = body.partial_success.as_ref().unwrap();
assert_eq!(partial_success.rejected_spans, 1);
assert_eq!(StatusCode::BAD_REQUEST, res.status());
let body = res.bytes().await;
let status = GoogleRpcStatus::decode(body.as_ref()).unwrap();
assert!(
partial_success
.error_message
status
.message
.contains("Accepted 0 spans, rejected 1 spans"),
"unexpected partial success body: {body:?}"
"unexpected error body: {status:?}"
);
validate_data(
@@ -5840,24 +5980,18 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let body = ExportTraceServiceResponse::decode(res.bytes().await).unwrap();
let partial_success = body.partial_success.as_ref().unwrap();
assert_eq!(partial_success.rejected_spans, 2);
assert_eq!(StatusCode::BAD_REQUEST, res.status());
let body = GoogleRpcStatus::decode(res.bytes().await.as_ref()).unwrap();
assert!(
partial_success
.error_message
.contains("Accepted 0 spans, rejected 2 spans"),
"unexpected partial success body: {body:?}"
body.message.contains("Accepted 0 spans, rejected 2 spans"),
"unexpected error body: {body:?}"
);
assert!(
partial_success
.error_message
.contains("Chunk fallback triggered by")
|| partial_success
.error_message
body.message.contains("Chunk fallback triggered by")
|| body
.message
.contains("Discarded 2 spans after ambiguous chunk failure"),
"unexpected partial success body: {body:?}"
"unexpected error body: {body:?}"
);
guard.remove_all().await;