diff --git a/Cargo.lock b/Cargo.lock index a8ca9d963b..fdf835ce31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6632,6 +6632,12 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "permutation" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df202b0b0f5b8e389955afd5f27b007b00fb948162953f1db9c70d2c7e3157d7" + [[package]] name = "pest" version = "2.7.5" @@ -9212,6 +9218,7 @@ dependencies = [ "opensrv-mysql", "opentelemetry-proto 0.3.0", "parking_lot 0.12.1", + "permutation", "pgwire", "pin-project", "postgres-types", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 04e226e3cd..d1eb805312 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -120,6 +120,7 @@ criterion = "0.4" mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } +permutation = "0.4" rand.workspace = true script = { workspace = true, features = ["python"] } serde_json.workspace = true diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 0829973092..3629aff4fd 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -28,7 +28,7 @@ use crate::proto::PromLabel; use crate::repeated_field::Clear; /// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests]. -#[derive(Default)] +#[derive(Default, Debug)] pub(crate) struct TablesBuilder { tables: HashMap, } @@ -68,6 +68,7 @@ impl TablesBuilder { } /// Builder for one table. +#[derive(Debug)] pub(crate) struct TableBuilder { /// Column schemas. schema: Vec, diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index adc9cc0ad3..9ea907306c 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -27,10 +27,13 @@ use crate::prom_store::METRIC_NAME_LABEL_BYTES; use crate::repeated_field::{Clear, RepeatedField}; impl Clear for Sample { - fn clear(&mut self) {} + fn clear(&mut self) { + self.timestamp = 0; + self.value = 0.0; + } } -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct PromLabel { pub name: Bytes, pub value: Bytes, @@ -123,7 +126,7 @@ fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> { Ok(()) } -#[derive(Default)] +#[derive(Default, Debug)] pub struct PromTimeSeries { pub table_name: String, pub labels: RepeatedField, @@ -206,7 +209,7 @@ impl PromTimeSeries { } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct PromWriteRequest { table_data: TablesBuilder, series: PromTimeSeries, @@ -264,10 +267,10 @@ impl PromWriteRequest { #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; + use std::collections::HashMap; use api::prom_store::remote::WriteRequest; - use api::v1::RowInsertRequests; + use api::v1::{Row, RowInsertRequests, Rows}; use bytes::Bytes; use prost::Message; @@ -275,6 +278,21 @@ mod tests { use crate::proto::PromWriteRequest; use crate::repeated_field::Clear; + fn sort_rows(rows: Rows) -> Rows { + let permutation = + permutation::sort_by_key(&rows.schema, |schema| schema.column_name.clone()); + let schema = permutation.apply_slice(&rows.schema); + let mut inner_rows = vec![]; + for row in rows.rows { + let values = permutation.apply_slice(&row.values); + inner_rows.push(Row { values }); + } + Rows { + schema, + rows: inner_rows, + } + } + fn check_deserialized( prom_write_request: &mut PromWriteRequest, data: &Bytes, @@ -288,35 +306,16 @@ mod tests { assert_eq!(expected_samples, samples); assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len()); - let schemas = expected_rows + let expected_rows_map = expected_rows .inserts .iter() - .map(|r| { - ( - r.table_name.clone(), - r.rows - .as_ref() - .unwrap() - .schema - .iter() - .map(|c| (c.column_name.clone(), c.datatype, c.semantic_type)) - .collect::>(), - ) - }) + .map(|insert| (insert.table_name.clone(), insert.rows.clone().unwrap())) .collect::>(); for r in &prom_rows.inserts { - let expected = schemas.get(&r.table_name).unwrap(); - assert_eq!( - expected, - &r.rows - .as_ref() - .unwrap() - .schema - .iter() - .map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) }) - .collect() - ); + // check value + let expected_rows = expected_rows_map.get(&r.table_name).unwrap().clone(); + assert_eq!(sort_rows(expected_rows), sort_rows(r.rows.clone().unwrap())); } }