diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index a9363a0efe..e68ad232c5 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -16,14 +16,14 @@ workspace = true [dependencies] aide = { version = "0.9", features = ["axum"] } api.workspace = true -arrow.workspace = true arrow-flight.workspace = true arrow-ipc.workspace = true arrow-schema.workspace = true +arrow.workspace = true async-trait = "0.1" auth.workspace = true -axum.workspace = true axum-macros = "0.3.8" +axum.workspace = true base64.workspace = true bytes.workspace = true catalog.workspace = true @@ -31,8 +31,8 @@ chrono.workspace = true common-base.workspace = true common-catalog.workspace = true common-error.workspace = true -common-grpc.workspace = true common-grpc-expr.workspace = true +common-grpc.workspace = true common-macro.workspace = true common-mem-prof = { workspace = true, optional = true } common-meta.workspace = true @@ -42,9 +42,9 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true +datafusion.workspace = true datatypes.workspace = true derive_builder.workspace = true digest = "0.10" @@ -96,11 +96,11 @@ snap = "1" sql.workspace = true strum.workspace = true table.workspace = true -tokio.workspace = true tokio-rustls = "0.25" tokio-stream = { workspace = true, features = ["net"] } -tonic.workspace = true +tokio.workspace = true tonic-reflection = "0.10" +tonic.workspace = true tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } urlencoding = "2.1" @@ -119,6 +119,7 @@ criterion = "0.4" mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } +permutation = "0.4" rand.workspace = true script = { workspace = true, features = ["python"] } serde_json.workspace = true diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 906a7ff52a..aeed7d623a 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -227,10 +227,10 @@ impl PromWriteRequest { #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; + use std::collections::HashMap; use api::prom_store::remote::WriteRequest; - use api::v1::RowInsertRequests; + use api::v1::{Row, RowInsertRequests, Rows}; use bytes::Bytes; use prost::Message; @@ -238,6 +238,21 @@ mod tests { use crate::proto::PromWriteRequest; use crate::repeated_field::Clear; + fn sort_rows(rows: Rows) -> Rows { + let permutation = + permutation::sort_by_key(&rows.schema, |schema| schema.column_name.clone()); + let schema = permutation.apply_slice(&rows.schema); + let mut inner_rows = vec![]; + for row in rows.rows { + let values = permutation.apply_slice(&row.values); + inner_rows.push(Row { values }); + } + Rows { + schema, + rows: inner_rows, + } + } + fn check_deserialized( prom_write_request: &mut PromWriteRequest, data: &Bytes, @@ -251,35 +266,16 @@ mod tests { assert_eq!(expected_samples, samples); assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len()); - let schemas = expected_rows + let expected_rows_map = expected_rows .inserts .iter() - .map(|r| { - ( - r.table_name.clone(), - r.rows - .as_ref() - .unwrap() - .schema - .iter() - .map(|c| (c.column_name.clone(), c.datatype, c.semantic_type)) - .collect::>(), - ) - }) + .map(|insert| (insert.table_name.clone(), insert.rows.clone().unwrap())) .collect::>(); for r in &prom_rows.inserts { - let expected = schemas.get(&r.table_name).unwrap(); - assert_eq!( - expected, - &r.rows - .as_ref() - .unwrap() - .schema - .iter() - .map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) }) - .collect() - ); + // check value + let expected_rows = expected_rows_map.get(&r.table_name).unwrap().clone(); + assert_eq!(sort_rows(expected_rows), sort_rows(r.rows.clone().unwrap())); } }