mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
4 Commits
4650935fc0
...
fix-proto-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
038bc4fe6e | ||
|
|
6d07c422d8 | ||
|
|
6c14ece23f | ||
|
|
89c51d9b87 |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -6632,6 +6632,12 @@ version = "2.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||
|
||||
[[package]]
|
||||
name = "permutation"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df202b0b0f5b8e389955afd5f27b007b00fb948162953f1db9c70d2c7e3157d7"
|
||||
|
||||
[[package]]
|
||||
name = "pest"
|
||||
version = "2.7.5"
|
||||
@@ -9212,6 +9218,7 @@ dependencies = [
|
||||
"opensrv-mysql",
|
||||
"opentelemetry-proto 0.3.0",
|
||||
"parking_lot 0.12.1",
|
||||
"permutation",
|
||||
"pgwire",
|
||||
"pin-project",
|
||||
"postgres-types",
|
||||
|
||||
@@ -120,6 +120,7 @@ criterion = "0.4"
|
||||
mysql_async = { version = "0.33", default-features = false, features = [
|
||||
"default-rustls",
|
||||
] }
|
||||
permutation = "0.4"
|
||||
rand.workspace = true
|
||||
script = { workspace = true, features = ["python"] }
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::proto::PromLabel;
|
||||
use crate::repeated_field::Clear;
|
||||
|
||||
/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub(crate) struct TablesBuilder {
|
||||
tables: HashMap<String, TableBuilder>,
|
||||
}
|
||||
@@ -68,6 +68,7 @@ impl TablesBuilder {
|
||||
}
|
||||
|
||||
/// Builder for one table.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TableBuilder {
|
||||
/// Column schemas.
|
||||
schema: Vec<ColumnSchema>,
|
||||
|
||||
@@ -27,10 +27,13 @@ use crate::prom_store::METRIC_NAME_LABEL_BYTES;
|
||||
use crate::repeated_field::{Clear, RepeatedField};
|
||||
|
||||
impl Clear for Sample {
|
||||
fn clear(&mut self) {}
|
||||
fn clear(&mut self) {
|
||||
self.timestamp = 0;
|
||||
self.value = 0.0;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub struct PromLabel {
|
||||
pub name: Bytes,
|
||||
pub value: Bytes,
|
||||
@@ -123,7 +126,7 @@ fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct PromTimeSeries {
|
||||
pub table_name: String,
|
||||
pub labels: RepeatedField<PromLabel>,
|
||||
@@ -206,7 +209,7 @@ impl PromTimeSeries {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct PromWriteRequest {
|
||||
table_data: TablesBuilder,
|
||||
series: PromTimeSeries,
|
||||
@@ -264,10 +267,10 @@ impl PromWriteRequest {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::prom_store::remote::WriteRequest;
|
||||
use api::v1::RowInsertRequests;
|
||||
use api::v1::{Row, RowInsertRequests, Rows};
|
||||
use bytes::Bytes;
|
||||
use prost::Message;
|
||||
|
||||
@@ -275,6 +278,21 @@ mod tests {
|
||||
use crate::proto::PromWriteRequest;
|
||||
use crate::repeated_field::Clear;
|
||||
|
||||
fn sort_rows(rows: Rows) -> Rows {
|
||||
let permutation =
|
||||
permutation::sort_by_key(&rows.schema, |schema| schema.column_name.clone());
|
||||
let schema = permutation.apply_slice(&rows.schema);
|
||||
let mut inner_rows = vec![];
|
||||
for row in rows.rows {
|
||||
let values = permutation.apply_slice(&row.values);
|
||||
inner_rows.push(Row { values });
|
||||
}
|
||||
Rows {
|
||||
schema,
|
||||
rows: inner_rows,
|
||||
}
|
||||
}
|
||||
|
||||
fn check_deserialized(
|
||||
prom_write_request: &mut PromWriteRequest,
|
||||
data: &Bytes,
|
||||
@@ -288,35 +306,16 @@ mod tests {
|
||||
assert_eq!(expected_samples, samples);
|
||||
assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len());
|
||||
|
||||
let schemas = expected_rows
|
||||
let expected_rows_map = expected_rows
|
||||
.inserts
|
||||
.iter()
|
||||
.map(|r| {
|
||||
(
|
||||
r.table_name.clone(),
|
||||
r.rows
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.schema
|
||||
.iter()
|
||||
.map(|c| (c.column_name.clone(), c.datatype, c.semantic_type))
|
||||
.collect::<HashSet<_>>(),
|
||||
)
|
||||
})
|
||||
.map(|insert| (insert.table_name.clone(), insert.rows.clone().unwrap()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
for r in &prom_rows.inserts {
|
||||
let expected = schemas.get(&r.table_name).unwrap();
|
||||
assert_eq!(
|
||||
expected,
|
||||
&r.rows
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.schema
|
||||
.iter()
|
||||
.map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) })
|
||||
.collect()
|
||||
);
|
||||
// check value
|
||||
let expected_rows = expected_rows_map.get(&r.table_name).unwrap().clone();
|
||||
assert_eq!(sort_rows(expected_rows), sort_rows(r.rows.clone().unwrap()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user