diff --git a/Cargo.lock b/Cargo.lock index 2ab3200029..f1bf4eb3d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11011,6 +11011,7 @@ dependencies = [ "sqlparser_derive 0.1.1", "store-api", "table", + "uuid", ] [[package]] diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index c0c102ceda..900a3b4310 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -799,6 +799,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to create partition rules"))] + CreatePartitionRules { + #[snafu(source)] + source: sql::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -840,7 +848,8 @@ impl ErrorExt for Error { | Error::PhysicalExpr { .. } | Error::InvalidJsonFormat { .. } | Error::CursorNotFound { .. } - | Error::CursorExists { .. } => StatusCode::InvalidArguments, + | Error::CursorExists { .. } + | Error::CreatePartitionRules { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => { StatusCode::TableAlreadyExists diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index b2742685fb..2f7c30ccd0 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -63,8 +63,8 @@ use table::table_reference::TableReference; use table::TableRef; use crate::error::{ - CatalogSnafu, ColumnOptionsSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu, - JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu, + CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu, + InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu, }; use crate::expr_helper; use crate::region_req_factory::RegionRequestFactory; @@ -591,7 +591,8 @@ impl Inserter { } else { // prebuilt partition rules for uuid data: see the function // for more information - let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN); + let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN) + .context(CreatePartitionRulesSnafu)?; // add skip index to // - trace_id: when searching by trace id // - parent_span_id: when searching root span diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 3cb81d6dd4..812fe42709 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -37,6 +37,7 @@ sqlparser.workspace = true sqlparser_derive = "0.1" store-api.workspace = true table.workspace = true +uuid.workspace = true [dev-dependencies] common-datasource.workspace = true diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index e7253d6c46..e07efdbe6c 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -345,6 +345,16 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Invalid partition number: {}, should be in range [2, 65536]", + partition_num + ))] + InvalidPartitionNumber { + partition_num: u32, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -380,6 +390,7 @@ impl ErrorExt for Error { | Simplification { .. } | InvalidInterval { .. } | InvalidUnaryOp { .. } + | InvalidPartitionNumber { .. } | UnsupportedUnaryOp { .. } => StatusCode::InvalidArguments, SerializeColumnDefaultConstraint { source, .. } => source.status_code(), diff --git a/src/sql/src/partition.rs b/src/sql/src/partition.rs index 4979bf702f..a1fd8e642e 100644 --- a/src/sql/src/partition.rs +++ b/src/sql/src/partition.rs @@ -12,10 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use snafu::ensure; use sqlparser::ast::{BinaryOperator, Expr, Ident, Value}; +use crate::error::{InvalidPartitionNumberSnafu, Result}; use crate::statements::create::Partitions; +/// The default number of partitions for OpenTelemetry traces. +const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16; + +/// The maximum number of partitions for OpenTelemetry traces. +const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536; + macro_rules! between_string { ($col: expr, $left_incl: expr, $right_excl: expr) => { Expr::BinaryOp { @@ -38,98 +46,105 @@ macro_rules! between_string { }; } -macro_rules! or { - ($left: expr, $right: expr) => { - Expr::BinaryOp { - op: BinaryOperator::Or, - left: Box::new($left), - right: Box::new($right), - } - }; +pub fn partition_rule_for_hexstring(ident: &str) -> Result { + Ok(Partitions { + column_list: vec![Ident::new(ident)], + exprs: partition_rules_for_uuid(DEFAULT_PARTITION_NUM_FOR_TRACES, ident)?, + }) } -pub fn partition_rule_for_hexstring(ident: &str) -> Partitions { - let ident = Ident::new(ident); - let ident_expr = Expr::Identifier(ident.clone()); +// partition_rules_for_uuid can creates partition rules up to 65536 partitions. +fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result> { + ensure!( + partition_num.is_power_of_two() && (2..=65536).contains(&partition_num), + InvalidPartitionNumberSnafu { partition_num } + ); - // rules are like: - // - // "trace_id < '1'", - // "trace_id >= '1' AND trace_id < '2'", - // "trace_id >= '2' AND trace_id < '3'", - // "trace_id >= '3' AND trace_id < '4'", - // "trace_id >= '4' AND trace_id < '5'", - // "trace_id >= '5' AND trace_id < '6'", - // "trace_id >= '6' AND trace_id < '7'", - // "trace_id >= '7' AND trace_id < '8'", - // "trace_id >= '8' AND trace_id < '9'", - // "trace_id >= '9' AND trace_id < 'A'", - // "trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b'", - // "trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c'", - // "trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd'", - // "trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e'", - // "trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f'", - // "trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'", - let rules = vec![ - Expr::BinaryOp { - left: Box::new(ident_expr.clone()), - op: BinaryOperator::Lt, - right: Box::new(Expr::Value(Value::SingleQuotedString("1".to_string()))), - }, - // [left, right) - between_string!(ident_expr, "1", "2"), - between_string!(ident_expr, "2", "3"), - between_string!(ident_expr, "3", "4"), - between_string!(ident_expr, "4", "5"), - between_string!(ident_expr, "5", "6"), - between_string!(ident_expr, "6", "7"), - between_string!(ident_expr, "7", "8"), - between_string!(ident_expr, "8", "9"), - between_string!(ident_expr, "9", "A"), - or!( - between_string!(ident_expr, "A", "B"), - between_string!(ident_expr, "a", "b") - ), - or!( - between_string!(ident_expr, "B", "C"), - between_string!(ident_expr, "b", "c") - ), - or!( - between_string!(ident_expr, "C", "D"), - between_string!(ident_expr, "c", "d") - ), - or!( - between_string!(ident_expr, "D", "E"), - between_string!(ident_expr, "d", "e") - ), - or!( - between_string!(ident_expr, "E", "F"), - between_string!(ident_expr, "e", "f") - ), - or!( - between_string!(ident_expr, "F", "a"), - Expr::BinaryOp { + let ident_expr = Expr::Identifier(Ident::new(ident).clone()); + + let (total_partitions, hex_length) = { + match partition_num { + 2..=16 => (16, 1), + 17..=256 => (256, 2), + 257..=4096 => (4096, 3), + 4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4), + _ => unreachable!(), + } + }; + + let partition_size = total_partitions / partition_num; + let remainder = total_partitions % partition_num; + + let mut rules = Vec::new(); + let mut current_boundary = 0; + for i in 0..partition_num { + let mut size = partition_size; + if i < remainder { + size += 1; + } + let start = current_boundary; + let end = current_boundary + size; + + if i == 0 { + // Create the leftmost rule, for example: trace_id < '1'. + rules.push(Expr::BinaryOp { + left: Box::new(ident_expr.clone()), + op: BinaryOperator::Lt, + right: Box::new(Expr::Value(Value::SingleQuotedString(format!( + "{:0hex_length$x}", + end + )))), + }); + } else if i == partition_num - 1 { + // Create the rightmost rule, for example: trace_id >= 'f'. + rules.push(Expr::BinaryOp { left: Box::new(ident_expr.clone()), op: BinaryOperator::GtEq, - right: Box::new(Expr::Value(Value::SingleQuotedString("f".to_string()))), - } - ), - ]; + right: Box::new(Expr::Value(Value::SingleQuotedString(format!( + "{:0hex_length$x}", + start + )))), + }); + } else { + // Create the middle rules, for example: trace_id >= '1' AND trace_id < '2'. + rules.push(between_string!( + ident_expr, + format!("{:0hex_length$x}", start), + format!("{:0hex_length$x}", end) + )); + } - Partitions { - column_list: vec![ident], - exprs: rules, + current_boundary = end; } + + Ok(rules) } #[cfg(test)] mod tests { + use std::collections::HashMap; + use sqlparser::ast::Expr; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; + use uuid::Uuid; use super::*; + #[test] + fn test_partition_rules_for_uuid() { + // NOTE: We only test a subset of partitions to keep the test execution time reasonable. + // As the number of partitions increases, we need to increase the number of test samples to ensure uniform distribution. + assert!(check_distribution(2, 10_000)); // 2^1 + assert!(check_distribution(4, 10_000)); // 2^2 + assert!(check_distribution(8, 10_000)); // 2^3 + assert!(check_distribution(16, 10_000)); // 2^4 + assert!(check_distribution(32, 10_000)); // 2^5 + assert!(check_distribution(64, 100_000)); // 2^6 + assert!(check_distribution(128, 100_000)); // 2^7 + assert!(check_distribution(256, 100_000)); // 2^8 + } + #[test] fn test_rules() { let expr = vec![ @@ -142,13 +157,13 @@ mod tests { "trace_id >= '6' AND trace_id < '7'", "trace_id >= '7' AND trace_id < '8'", "trace_id >= '8' AND trace_id < '9'", - "trace_id >= '9' AND trace_id < 'A'", - "trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b'", - "trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c'", - "trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd'", - "trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e'", - "trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f'", - "trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'", + "trace_id >= '9' AND trace_id < 'a'", + "trace_id >= 'a' AND trace_id < 'b'", + "trace_id >= 'b' AND trace_id < 'c'", + "trace_id >= 'c' AND trace_id < 'd'", + "trace_id >= 'd' AND trace_id < 'e'", + "trace_id >= 'e' AND trace_id < 'f'", + "trace_id >= 'f'", ]; let dialect = GenericDialect {}; @@ -160,6 +175,93 @@ mod tests { }) .collect::>(); - assert_eq!(results, partition_rule_for_hexstring("trace_id").exprs); + assert_eq!( + results, + partition_rule_for_hexstring("trace_id").unwrap().exprs + ); + } + + fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool { + // Generate test_uuid_num random uuids. + let uuids = (0..test_uuid_num) + .map(|_| Uuid::new_v4().to_string().replace("-", "").to_lowercase()) + .collect::>(); + + // Generate the partition rules. + let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap(); + + // Collect the number of partitions for each uuid. + let mut stats = HashMap::new(); + for uuid in uuids { + let partition = allocate_partition_for_uuid(uuid.clone(), &rules); + // Count the number of uuids in each partition. + *stats.entry(partition).or_insert(0) += 1; + } + + // Check if the partition distribution is uniform. + let expected_ratio = 100.0 / test_partition as f64; + + // tolerance is the allowed deviation from the expected ratio. + let tolerance = 100.0 / test_partition as f64 * 0.30; + + // For each partition, its ratio should be as close as possible to the expected ratio. + for (_, count) in stats { + let ratio = (count as f64 / test_uuid_num as f64) * 100.0; + if (ratio - expected_ratio).abs() >= tolerance { + return false; + } + } + + true + } + + fn allocate_partition_for_uuid(uuid: String, rules: &[Expr]) -> usize { + for (i, rule) in rules.iter().enumerate() { + if let Expr::BinaryOp { left, op: _, right } = rule { + if i == 0 { + // Hit the leftmost rule. + if let Expr::Value(Value::SingleQuotedString(leftmost)) = *right.clone() { + if uuid < leftmost { + return i; + } + } + } else if i == rules.len() - 1 { + // Hit the rightmost rule. + if let Expr::Value(Value::SingleQuotedString(rightmost)) = *right.clone() { + if uuid >= rightmost { + return i; + } + } + } else { + // Hit the middle rules. + if let Expr::BinaryOp { + left: _, + op: _, + right: inner_right, + } = *left.clone() + { + if let Expr::Value(Value::SingleQuotedString(lower)) = *inner_right.clone() + { + if let Expr::BinaryOp { + left: _, + op: _, + right: inner_right, + } = *right.clone() + { + if let Expr::Value(Value::SingleQuotedString(upper)) = + *inner_right.clone() + { + if uuid >= lower && uuid < upper { + return i; + } + } + } + } + } + } + } + } + + panic!("No partition found for uuid: {}, rules: {:?}", uuid, rules); } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 6eb4d10562..d20fe50241 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -2488,7 +2488,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#; validate_data("otlp_traces", &client, "select * from mytable;", expected).await; - let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'A',\n trace_id >= 'A' AND trace_id < 'B' OR trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'B' AND trace_id < 'C' OR trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'C' AND trace_id < 'D' OR trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'D' AND trace_id < 'E' OR trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'E' AND trace_id < 'F' OR trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'F' AND trace_id < 'a' OR trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; + let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= 'f',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#; validate_data( "otlp_traces", &client,