Compare commits

...

4 Commits

Author SHA1 Message Date
Ruihang Xia
038bc4fe6e revert toml format
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:46:00 +08:00
Ruihang Xia
6d07c422d8 Merge branch 'main' into fix-proto-clear
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:36:28 +08:00
Ruihang Xia
6c14ece23f accomplish test assertion
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:32:49 +08:00
Ruihang Xia
89c51d9b87 reset Sample
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-13 23:32:22 +08:00
4 changed files with 39 additions and 31 deletions

7
Cargo.lock generated
View File

@@ -6632,6 +6632,12 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permutation"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df202b0b0f5b8e389955afd5f27b007b00fb948162953f1db9c70d2c7e3157d7"
[[package]]
name = "pest"
version = "2.7.5"
@@ -9212,6 +9218,7 @@ dependencies = [
"opensrv-mysql",
"opentelemetry-proto 0.3.0",
"parking_lot 0.12.1",
"permutation",
"pgwire",
"pin-project",
"postgres-types",

View File

@@ -120,6 +120,7 @@ criterion = "0.4"
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
permutation = "0.4"
rand.workspace = true
script = { workspace = true, features = ["python"] }
serde_json.workspace = true

View File

@@ -28,7 +28,7 @@ use crate::proto::PromLabel;
use crate::repeated_field::Clear;
/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
#[derive(Default)]
#[derive(Default, Debug)]
pub(crate) struct TablesBuilder {
tables: HashMap<String, TableBuilder>,
}
@@ -68,6 +68,7 @@ impl TablesBuilder {
}
/// Builder for one table.
#[derive(Debug)]
pub(crate) struct TableBuilder {
/// Column schemas.
schema: Vec<ColumnSchema>,

View File

@@ -27,10 +27,13 @@ use crate::prom_store::METRIC_NAME_LABEL_BYTES;
use crate::repeated_field::{Clear, RepeatedField};
impl Clear for Sample {
fn clear(&mut self) {}
fn clear(&mut self) {
self.timestamp = 0;
self.value = 0.0;
}
}
#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
pub struct PromLabel {
pub name: Bytes,
pub value: Bytes,
@@ -123,7 +126,7 @@ fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> {
Ok(())
}
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PromTimeSeries {
pub table_name: String,
pub labels: RepeatedField<PromLabel>,
@@ -206,7 +209,7 @@ impl PromTimeSeries {
}
}
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PromWriteRequest {
table_data: TablesBuilder,
series: PromTimeSeries,
@@ -264,10 +267,10 @@ impl PromWriteRequest {
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use api::prom_store::remote::WriteRequest;
use api::v1::RowInsertRequests;
use api::v1::{Row, RowInsertRequests, Rows};
use bytes::Bytes;
use prost::Message;
@@ -275,6 +278,21 @@ mod tests {
use crate::proto::PromWriteRequest;
use crate::repeated_field::Clear;
fn sort_rows(rows: Rows) -> Rows {
let permutation =
permutation::sort_by_key(&rows.schema, |schema| schema.column_name.clone());
let schema = permutation.apply_slice(&rows.schema);
let mut inner_rows = vec![];
for row in rows.rows {
let values = permutation.apply_slice(&row.values);
inner_rows.push(Row { values });
}
Rows {
schema,
rows: inner_rows,
}
}
fn check_deserialized(
prom_write_request: &mut PromWriteRequest,
data: &Bytes,
@@ -288,35 +306,16 @@ mod tests {
assert_eq!(expected_samples, samples);
assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len());
let schemas = expected_rows
let expected_rows_map = expected_rows
.inserts
.iter()
.map(|r| {
(
r.table_name.clone(),
r.rows
.as_ref()
.unwrap()
.schema
.iter()
.map(|c| (c.column_name.clone(), c.datatype, c.semantic_type))
.collect::<HashSet<_>>(),
)
})
.map(|insert| (insert.table_name.clone(), insert.rows.clone().unwrap()))
.collect::<HashMap<_, _>>();
for r in &prom_rows.inserts {
let expected = schemas.get(&r.table_name).unwrap();
assert_eq!(
expected,
&r.rows
.as_ref()
.unwrap()
.schema
.iter()
.map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) })
.collect()
);
// check value
let expected_rows = expected_rows_map.get(&r.table_name).unwrap().clone();
assert_eq!(sort_rows(expected_rows), sort_rows(r.rows.clone().unwrap()));
}
}