diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index a8262e4bef..784a4bc72e 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -240,19 +240,19 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let columns = &insert_req.columns_values; let host = columns.get("host").unwrap(); - let expetcd: Vec = vec!["host1".into(), "host2".into()]; - assert_vector(&expetcd, host); + let expected: Vec = vec!["host1".into(), "host2".into()]; + assert_vector(&expected, host); let cpu = columns.get("cpu").unwrap(); - let expetcd: Vec = vec![66.6.into(), Value::Null]; - assert_vector(&expetcd, cpu); + let expected: Vec = vec![66.6.into(), Value::Null]; + assert_vector(&expected, cpu); let memory = columns.get("memory").unwrap(); - let expetcd: Vec = vec![1024.0.into(), 1027.0.into()]; - assert_vector(&expetcd, memory); + let expected: Vec = vec![1024.0.into(), 1027.0.into()]; + assert_vector(&expected, memory); let ts = columns.get("ts").unwrap(); - let expetcd: Vec = vec![ + let expected: Vec = vec![ datatypes::prelude::Value::Timestamp(Timestamp::new( 1663840496100, TimeUnit::Millisecond, @@ -262,7 +262,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; TimeUnit::Millisecond, )), ]; - assert_vector(&expetcd, ts); + assert_vector(&expected, ts); } fn assert_table_2(insert_req: &InsertRequest) { @@ -272,19 +272,19 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let columns = &insert_req.columns_values; let host = columns.get("host").unwrap(); - let expetcd: Vec = vec!["host3".into(), "host4".into()]; - assert_vector(&expetcd, host); + let expected: Vec = vec!["host3".into(), "host4".into()]; + assert_vector(&expected, host); let cpu = columns.get("cpu").unwrap(); - let expetcd: Vec = vec![66.5.into(), 66.3.into()]; - assert_vector(&expetcd, cpu); + let expected: Vec = vec![66.5.into(), 66.3.into()]; + assert_vector(&expected, cpu); let memory = columns.get("memory").unwrap(); - let expetcd: Vec = vec![Value::Null, 1029.0.into()]; - assert_vector(&expetcd, memory); + let expected: Vec = vec![Value::Null, 1029.0.into()]; + assert_vector(&expected, memory); let ts = columns.get("ts").unwrap(); - let expetcd: Vec = vec![ + let expected: Vec = vec![ datatypes::prelude::Value::Timestamp(Timestamp::new( 1663840496100, TimeUnit::Millisecond, @@ -294,7 +294,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; TimeUnit::Millisecond, )), ]; - assert_vector(&expetcd, ts); + assert_vector(&expected, ts); } fn assert_vector(expected: &[Value], vector: &Arc) { diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 953ea822d9..8a908a962c 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -11,11 +11,14 @@ use api::v1::{ codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr, }; +use common_grpc::writer::Precision::MILLISECOND; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; +use table::requests::InsertRequest; use crate::error::{self, Result}; +use crate::line_writer::LineWriter; const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; const VALUE_COLUMN_NAME: &str = "greptime_value"; @@ -275,6 +278,55 @@ pub fn select_result_to_timeseries( Ok(timeseries_map.into_values().collect()) } +/// Cast a remote write request into InsertRequest +pub fn write_request_to_insert_reqs(mut request: WriteRequest) -> Result> { + let timeseries = std::mem::take(&mut request.timeseries); + + timeseries + .into_iter() + .map(timeseries_to_insert_request) + .collect() +} + +fn timeseries_to_insert_request(mut timeseries: TimeSeries) -> Result { + // TODO(dennis): save exemplars into a column + let labels = std::mem::take(&mut timeseries.labels); + let samples = std::mem::take(&mut timeseries.samples); + + let mut table_name = None; + for label in &labels { + // The metric name is a special label + if label.name == METRIC_NAME_LABEL { + table_name = Some(&label.value); + } + } + let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu { + msg: "missing '__name__' label in timeseries", + })?; + + let row_count = samples.len(); + let mut line_writer = LineWriter::with_lines(table_name, row_count); + + for sample in samples { + let ts_millis = sample.timestamp; + let val = sample.value; + + line_writer.write_ts(TIMESTAMP_COLUMN_NAME, (ts_millis, MILLISECOND)); + line_writer.write_f64(VALUE_COLUMN_NAME, val); + + labels + .iter() + .filter(|label| label.name != METRIC_NAME_LABEL) + .for_each(|label| { + line_writer.write_tag(&label.name, &label.value); + }); + + line_writer.commit(); + } + Ok(line_writer.finish()) +} + +// TODO(fys): it will remove in the future. /// Cast a remote write request into gRPC's InsertExpr. pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result> { let timeseries = std::mem::take(&mut request.timeseries); @@ -285,6 +337,7 @@ pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result Result { // TODO(dennis): save exemplars into a column let labels = std::mem::take(&mut timeseries.labels); @@ -439,7 +492,12 @@ pub fn mock_timeseries() -> Vec { #[cfg(test)] mod tests { + use std::sync::Arc; + use api::prometheus::remote::LabelMatcher; + use common_time::timestamp::TimeUnit; + use common_time::Timestamp; + use datatypes::{value::Value, vectors::Vector}; use super::*; @@ -499,6 +557,91 @@ mod tests { assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql); } + #[test] + fn test_write_request_to_insert_reqs() { + let write_request = WriteRequest { + timeseries: mock_timeseries(), + ..Default::default() + }; + + let reqs = write_request_to_insert_reqs(write_request).unwrap(); + + assert_eq!(3, reqs.len()); + + let req1 = reqs.get(0).unwrap(); + assert_eq!("metric1", req1.table_name); + + let columns = &req1.columns_values; + let job = columns.get("job").unwrap(); + let expected: Vec = vec!["spark".into(), "spark".into()]; + assert_vector(&expected, job); + + let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap(); + let expected: Vec = vec![ + datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)), + datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)), + ]; + assert_vector(&expected, ts); + + let val = columns.get(VALUE_COLUMN_NAME).unwrap(); + let expected: Vec = vec![1.0_f64.into(), 2.0_f64.into()]; + assert_vector(&expected, val); + + let req2 = reqs.get(1).unwrap(); + assert_eq!("metric2", req2.table_name); + + let columns = &req2.columns_values; + let instance = columns.get("instance").unwrap(); + let expected: Vec = vec!["test_host1".into(), "test_host1".into()]; + assert_vector(&expected, instance); + + let idc = columns.get("idc").unwrap(); + let expected: Vec = vec!["z001".into(), "z001".into()]; + assert_vector(&expected, idc); + + let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap(); + let expected: Vec = vec![ + datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)), + datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)), + ]; + assert_vector(&expected, ts); + + let val = columns.get(VALUE_COLUMN_NAME).unwrap(); + let expected: Vec = vec![3.0_f64.into(), 4.0_f64.into()]; + assert_vector(&expected, val); + + let req3 = reqs.get(2).unwrap(); + assert_eq!("metric3", req3.table_name); + + let columns = &req3.columns_values; + let idc = columns.get("idc").unwrap(); + let expected: Vec = vec!["z002".into(), "z002".into(), "z002".into()]; + assert_vector(&expected, idc); + + let app = columns.get("app").unwrap(); + let expected: Vec = vec!["biz".into(), "biz".into(), "biz".into()]; + assert_vector(&expected, app); + + let ts = columns.get(TIMESTAMP_COLUMN_NAME).unwrap(); + let expected: Vec = vec![ + datatypes::prelude::Value::Timestamp(Timestamp::new(1000, TimeUnit::Millisecond)), + datatypes::prelude::Value::Timestamp(Timestamp::new(2000, TimeUnit::Millisecond)), + datatypes::prelude::Value::Timestamp(Timestamp::new(3000, TimeUnit::Millisecond)), + ]; + assert_vector(&expected, ts); + + let val = columns.get(VALUE_COLUMN_NAME).unwrap(); + let expected: Vec = vec![5.0_f64.into(), 6.0_f64.into(), 7.0_f64.into()]; + assert_vector(&expected, val); + } + + fn assert_vector(expected: &[Value], vector: &Arc) { + for (idx, expected) in expected.iter().enumerate() { + let val = vector.get(idx); + assert_eq!(*expected, val); + } + } + #[test] fn test_write_request_to_insert_exprs() { let write_request = WriteRequest {