Compare commits

...

4 Commits

Author SHA1 Message Date
Ruihang Xia
038bc4fe6e revert toml format
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:46:00 +08:00
Ruihang Xia
6d07c422d8 Merge branch 'main' into fix-proto-clear
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:36:28 +08:00
Ruihang Xia
6c14ece23f accomplish test assertion
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:32:49 +08:00
Ruihang Xia
89c51d9b87 reset Sample
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-13 23:32:22 +08:00
4 changed files with 39 additions and 31 deletions

7
Cargo.lock generated
View File

@@ -6632,6 +6632,12 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permutation"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df202b0b0f5b8e389955afd5f27b007b00fb948162953f1db9c70d2c7e3157d7"
[[package]] [[package]]
name = "pest" name = "pest"
version = "2.7.5" version = "2.7.5"
@@ -9212,6 +9218,7 @@ dependencies = [
"opensrv-mysql", "opensrv-mysql",
"opentelemetry-proto 0.3.0", "opentelemetry-proto 0.3.0",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"permutation",
"pgwire", "pgwire",
"pin-project", "pin-project",
"postgres-types", "postgres-types",

View File

@@ -120,6 +120,7 @@ criterion = "0.4"
mysql_async = { version = "0.33", default-features = false, features = [ mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls", "default-rustls",
] } ] }
permutation = "0.4"
rand.workspace = true rand.workspace = true
script = { workspace = true, features = ["python"] } script = { workspace = true, features = ["python"] }
serde_json.workspace = true serde_json.workspace = true

View File

@@ -28,7 +28,7 @@ use crate::proto::PromLabel;
use crate::repeated_field::Clear; use crate::repeated_field::Clear;
/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests]. /// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
#[derive(Default)] #[derive(Default, Debug)]
pub(crate) struct TablesBuilder { pub(crate) struct TablesBuilder {
tables: HashMap<String, TableBuilder>, tables: HashMap<String, TableBuilder>,
} }
@@ -68,6 +68,7 @@ impl TablesBuilder {
} }
/// Builder for one table. /// Builder for one table.
#[derive(Debug)]
pub(crate) struct TableBuilder { pub(crate) struct TableBuilder {
/// Column schemas. /// Column schemas.
schema: Vec<ColumnSchema>, schema: Vec<ColumnSchema>,

View File

@@ -27,10 +27,13 @@ use crate::prom_store::METRIC_NAME_LABEL_BYTES;
use crate::repeated_field::{Clear, RepeatedField}; use crate::repeated_field::{Clear, RepeatedField};
impl Clear for Sample { impl Clear for Sample {
fn clear(&mut self) {} fn clear(&mut self) {
self.timestamp = 0;
self.value = 0.0;
}
} }
#[derive(Default, Clone)] #[derive(Default, Clone, Debug)]
pub struct PromLabel { pub struct PromLabel {
pub name: Bytes, pub name: Bytes,
pub value: Bytes, pub value: Bytes,
@@ -123,7 +126,7 @@ fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> {
Ok(()) Ok(())
} }
#[derive(Default)] #[derive(Default, Debug)]
pub struct PromTimeSeries { pub struct PromTimeSeries {
pub table_name: String, pub table_name: String,
pub labels: RepeatedField<PromLabel>, pub labels: RepeatedField<PromLabel>,
@@ -206,7 +209,7 @@ impl PromTimeSeries {
} }
} }
#[derive(Default)] #[derive(Default, Debug)]
pub struct PromWriteRequest { pub struct PromWriteRequest {
table_data: TablesBuilder, table_data: TablesBuilder,
series: PromTimeSeries, series: PromTimeSeries,
@@ -264,10 +267,10 @@ impl PromWriteRequest {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use api::prom_store::remote::WriteRequest; use api::prom_store::remote::WriteRequest;
use api::v1::RowInsertRequests; use api::v1::{Row, RowInsertRequests, Rows};
use bytes::Bytes; use bytes::Bytes;
use prost::Message; use prost::Message;
@@ -275,6 +278,21 @@ mod tests {
use crate::proto::PromWriteRequest; use crate::proto::PromWriteRequest;
use crate::repeated_field::Clear; use crate::repeated_field::Clear;
fn sort_rows(rows: Rows) -> Rows {
let permutation =
permutation::sort_by_key(&rows.schema, |schema| schema.column_name.clone());
let schema = permutation.apply_slice(&rows.schema);
let mut inner_rows = vec![];
for row in rows.rows {
let values = permutation.apply_slice(&row.values);
inner_rows.push(Row { values });
}
Rows {
schema,
rows: inner_rows,
}
}
fn check_deserialized( fn check_deserialized(
prom_write_request: &mut PromWriteRequest, prom_write_request: &mut PromWriteRequest,
data: &Bytes, data: &Bytes,
@@ -288,35 +306,16 @@ mod tests {
assert_eq!(expected_samples, samples); assert_eq!(expected_samples, samples);
assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len()); assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len());
let schemas = expected_rows let expected_rows_map = expected_rows
.inserts .inserts
.iter() .iter()
.map(|r| { .map(|insert| (insert.table_name.clone(), insert.rows.clone().unwrap()))
(
r.table_name.clone(),
r.rows
.as_ref()
.unwrap()
.schema
.iter()
.map(|c| (c.column_name.clone(), c.datatype, c.semantic_type))
.collect::<HashSet<_>>(),
)
})
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
for r in &prom_rows.inserts { for r in &prom_rows.inserts {
let expected = schemas.get(&r.table_name).unwrap(); // check value
assert_eq!( let expected_rows = expected_rows_map.get(&r.table_name).unwrap().clone();
expected, assert_eq!(sort_rows(expected_rows), sort_rows(r.rows.clone().unwrap()));
&r.rows
.as_ref()
.unwrap()
.schema
.iter()
.map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) })
.collect()
);
} }
} }