feat: support convert prometheus write request to InsertRequest (#433)

* support convert prometheus to InsertRequest.
This commit is contained in:
fys
2022-11-10 10:48:41 +08:00
committed by GitHub
parent 16d1132733
commit 056d7cb911
2 changed files with 159 additions and 16 deletions

View File

@@ -240,19 +240,19 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let columns = &insert_req.columns_values;
let host = columns.get("host").unwrap();
let expetcd: Vec<Value> = vec!["host1".into(), "host2".into()];
assert_vector(&expetcd, host);
let expected: Vec<Value> = vec!["host1".into(), "host2".into()];
assert_vector(&expected, host);
let cpu = columns.get("cpu").unwrap();
let expetcd: Vec<Value> = vec![66.6.into(), Value::Null];
assert_vector(&expetcd, cpu);
let expected: Vec<Value> = vec![66.6.into(), Value::Null];
assert_vector(&expected, cpu);
let memory = columns.get("memory").unwrap();
let expetcd: Vec<Value> = vec![1024.0.into(), 1027.0.into()];
assert_vector(&expetcd, memory);
let expected: Vec<Value> = vec![1024.0.into(), 1027.0.into()];
assert_vector(&expected, memory);
let ts = columns.get("ts").unwrap();
let expetcd: Vec<Value> = vec![
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496100,
TimeUnit::Millisecond,
@@ -262,7 +262,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
TimeUnit::Millisecond,
)),
];
assert_vector(&expetcd, ts);
assert_vector(&expected, ts);
}
fn assert_table_2(insert_req: &InsertRequest) {
@@ -272,19 +272,19 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let columns = &insert_req.columns_values;
let host = columns.get("host").unwrap();
let expetcd: Vec<Value> = vec!["host3".into(), "host4".into()];
assert_vector(&expetcd, host);
let expected: Vec<Value> = vec!["host3".into(), "host4".into()];
assert_vector(&expected, host);
let cpu = columns.get("cpu").unwrap();
let expetcd: Vec<Value> = vec![66.5.into(), 66.3.into()];
assert_vector(&expetcd, cpu);
let expected: Vec<Value> = vec![66.5.into(), 66.3.into()];
assert_vector(&expected, cpu);
let memory = columns.get("memory").unwrap();
let expetcd: Vec<Value> = vec![Value::Null, 1029.0.into()];
assert_vector(&expetcd, memory);
let expected: Vec<Value> = vec![Value::Null, 1029.0.into()];
assert_vector(&expected, memory);
let ts = columns.get("ts").unwrap();
let expetcd: Vec<Value> = vec![
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(
1663840496100,
TimeUnit::Millisecond,
@@ -294,7 +294,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
TimeUnit::Millisecond,
)),
];
assert_vector(&expetcd, ts);
assert_vector(&expected, ts);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {

View File

@@ -11,11 +11,14 @@ use api::v1::{
codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType,
InsertExpr,
};
use common_grpc::writer::Precision::MILLISECOND;
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
use table::requests::InsertRequest;
use crate::error::{self, Result};
use crate::line_writer::LineWriter;
const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
const VALUE_COLUMN_NAME: &str = "greptime_value";
@@ -275,6 +278,55 @@ pub fn select_result_to_timeseries(
Ok(timeseries_map.into_values().collect())
}
/// Cast a remote write request into InsertRequest
pub fn write_request_to_insert_reqs(mut request: WriteRequest) -> Result<Vec<InsertRequest>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries
.into_iter()
.map(timeseries_to_insert_request)
.collect()
}
fn timeseries_to_insert_request(mut timeseries: TimeSeries) -> Result<InsertRequest> {
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
let samples = std::mem::take(&mut timeseries.samples);
let mut table_name = None;
for label in &labels {
// The metric name is a special label
if label.name == METRIC_NAME_LABEL {
table_name = Some(&label.value);
}
}
let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?;
let row_count = samples.len();
let mut line_writer = LineWriter::with_lines(table_name, row_count);
for sample in samples {
let ts_millis = sample.timestamp;
let val = sample.value;
line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, MILLISECOND));
line_writer.write_f64(VALUE_COLUMN_NAME, val);
labels
.iter()
.filter(|label| label.name != METRIC_NAME_LABEL)
.for_each(|label| {
line_writer.write_tag(&label.name, &label.value);
});
line_writer.commit();
}
Ok(line_writer.finish())
}
// TODO(fys): it will remove in the future.
/// Cast a remote write request into gRPC's InsertExpr.
pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result<Vec<InsertExpr>> {
let timeseries = std::mem::take(&mut request.timeseries);
@@ -285,6 +337,7 @@ pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result<Vec<In
.collect()
}
// TODO(fys): it will remove in the future.
fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result<InsertExpr> {
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
@@ -439,7 +492,12 @@ pub fn mock_timeseries() -> Vec<TimeSeries> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::prometheus::remote::LabelMatcher;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::{value::Value, vectors::Vector};
use super::*;
@@ -499,6 +557,91 @@ mod tests {
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
}
#[test]
fn test_write_request_to_insert_reqs() {
let write_request = WriteRequest {
timeseries: mock_timeseries(),
..Default::default()
};
let reqs = write_request_to_insert_reqs(write_request).unwrap();
assert_eq!(3, reqs.len());
let req1 = reqs.get(0).unwrap();
assert_eq!("metric1", req1.table_name);
let columns = &req1.columns_values;
let job = columns.get("job").unwrap();
let expected: Vec<Value> = vec!["spark".into(), "spark".into()];
assert_vector(&expected, job);
let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)),
];
assert_vector(&expected, ts);
let val = columns.get(VALUE_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![1.0_f64.into(), 2.0_f64.into()];
assert_vector(&expected, val);
let req2 = reqs.get(1).unwrap();
assert_eq!("metric2", req2.table_name);
let columns = &req2.columns_values;
let instance = columns.get("instance").unwrap();
let expected: Vec<Value> = vec!["test_host1".into(), "test_host1".into()];
assert_vector(&expected, instance);
let idc = columns.get("idc").unwrap();
let expected: Vec<Value> = vec!["z001".into(), "z001".into()];
assert_vector(&expected, idc);
let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)),
];
assert_vector(&expected, ts);
let val = columns.get(VALUE_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![3.0_f64.into(), 4.0_f64.into()];
assert_vector(&expected, val);
let req3 = reqs.get(2).unwrap();
assert_eq!("metric3", req3.table_name);
let columns = &req3.columns_values;
let idc = columns.get("idc").unwrap();
let expected: Vec<Value> = vec!["z002".into(), "z002".into(), "z002".into()];
assert_vector(&expected, idc);
let app = columns.get("app").unwrap();
let expected: Vec<Value> = vec!["biz".into(), "biz".into(), "biz".into()];
assert_vector(&expected, app);
let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![
datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)),
datatypes::prelude::Value::Timestamp(Timestamp::new(3000, TimeUnit::Millisecond)),
];
assert_vector(&expected, ts);
let val = columns.get(VALUE_COLUMN_NAME).unwrap();
let expected: Vec<Value> = vec![5.0_f64.into(), 6.0_f64.into(), 7.0_f64.into()];
assert_vector(&expected, val);
}
fn assert_vector(expected: &[Value], vector: &Arc<dyn Vector>) {
for (idx, expected) in expected.iter().enumerate() {
let val = vector.get(idx);
assert_eq!(*expected, val);
}
}
#[test]
fn test_write_request_to_insert_exprs() {
let write_request = WriteRequest {