From 466f258266ecf7b06c4281fb5971e5dee04561dd Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sat, 3 Jun 2023 17:17:52 +0800 Subject: [PATCH] feat(servers): collect samples by metric (#1706) --- src/servers/src/error.rs | 11 +- src/servers/src/prometheus.rs | 182 ++++++++++++++++------------------ 2 files changed, 97 insertions(+), 96 deletions(-) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 247dbd3fcb..57d3789c01 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -122,6 +122,12 @@ pub enum Error { source: common_grpc::error::Error, }, + #[snafu(display("Failed to write prometheus series, source: {}", source))] + PromSeriesWrite { + #[snafu(backtrace)] + source: common_grpc::error::Error, + }, + #[snafu(display("Failed to convert time precision, name: {}", name))] TimePrecision { name: String, location: Location }, @@ -300,7 +306,9 @@ impl ErrorExt for Error { | InvalidPrepareStatement { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, - InfluxdbLinesWrite { source, .. } => source.status_code(), + InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } => { + source.status_code() + } Hyper { .. } => StatusCode::Unknown, TlsRequired { .. } => StatusCode::Unknown, @@ -405,6 +413,7 @@ impl IntoResponse for Error { let (status, error_message) = match self { Error::InfluxdbLineProtocol { .. } | Error::InfluxdbLinesWrite { .. } + | Error::PromSeriesWrite { .. } | Error::InvalidOpentsdbLine { .. } | Error::InvalidOpentsdbJsonRequest { .. } | Error::DecodePromRemoteRequest { .. } diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 17ab69053e..d285b4075f 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -15,13 +15,13 @@ //! prometheus protocol supportings //! handles prometheus remote_write, remote_read logic use std::cmp::Ordering; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::hash::{Hash, Hasher}; use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; -use api::v1::column::SemanticType; -use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests}; +use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests}; +use common_grpc::writer::{LinesWriter, Precision}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_time::timestamp::TimeUnit; use datatypes::prelude::{ConcreteDataType, Value}; @@ -284,83 +284,68 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result<(InsertRequests, usize)> { - let (inserts, samples_counts) = itertools::process_results( - request.timeseries.into_iter().map(to_grpc_insert_request), - |x| x.unzip::<_, _, Vec<_>, Vec<_>>(), - )?; - Ok(( - InsertRequests { inserts }, - samples_counts.into_iter().sum::(), - )) -} + let mut writers: HashMap = HashMap::new(); + for timeseries in &request.timeseries { + let table_name = timeseries + .labels + .iter() + .find(|label| { + // The metric name is a special label + label.name == METRIC_NAME_LABEL + }) + .context(error::InvalidPromRemoteRequestSnafu { + msg: "missing '__name__' label in timeseries", + })? + .value + .clone(); -fn to_grpc_insert_request(timeseries: TimeSeries) -> Result<(GrpcInsertRequest, usize)> { - let samples_count = timeseries.samples.len(); + let writer = writers + .entry(table_name) + .or_insert_with(|| LinesWriter::with_lines(16)); + // For each sample + for sample in ×eries.samples { + // Insert labels first. + for label in ×eries.labels { + // The metric name is a special label + if label.name == METRIC_NAME_LABEL { + continue; + } - // TODO(dennis): save exemplars into a column - let labels = timeseries.labels; - let samples = timeseries.samples; + writer + .write_tag(&label.name, &label.value) + .context(error::PromSeriesWriteSnafu)?; + } + // Insert sample timestamp. + writer + .write_ts( + TIMESTAMP_COLUMN_NAME, + (sample.timestamp, Precision::Millisecond), + ) + .context(error::PromSeriesWriteSnafu)?; + // Insert sample value. + writer + .write_f64(FIELD_COLUMN_NAME, sample.value) + .context(error::PromSeriesWriteSnafu)?; - let row_count = samples.len(); - let mut columns = Vec::with_capacity(2 + labels.len()); - - let ts_column = Column { - column_name: TIMESTAMP_COLUMN_NAME.to_string(), - values: Some(column::Values { - ts_millisecond_values: samples.iter().map(|x| x.timestamp).collect(), - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }; - columns.push(ts_column); - - let field_column = Column { - column_name: FIELD_COLUMN_NAME.to_string(), - values: Some(column::Values { - f64_values: samples.iter().map(|x| x.value).collect(), - ..Default::default() - }), - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Float64 as i32, - ..Default::default() - }; - columns.push(field_column); - - let mut table_name = None; - - for label in labels { - let tagk = label.name; - let tagv = label.value; - - // The metric name is a special label - if tagk == METRIC_NAME_LABEL { - table_name = Some(tagv); - continue; + writer.commit(); } - - columns.push(Column { - column_name: tagk.to_string(), - values: Some(column::Values { - string_values: std::iter::repeat(tagv).take(row_count).collect(), - ..Default::default() - }), - semantic_type: SemanticType::Tag as i32, - datatype: ColumnDataType::String as i32, - ..Default::default() - }); } - let request = GrpcInsertRequest { - table_name: table_name.context(error::InvalidPromRemoteRequestSnafu { - msg: "missing '__name__' label in timeseries", - })?, - region_number: 0, - columns, - row_count: row_count as u32, - }; - Ok((request, samples_count)) + let mut sample_counts = 0; + let inserts = writers + .into_iter() + .map(|(table_name, writer)| { + let (columns, row_count) = writer.finish(); + sample_counts += row_count as usize; + GrpcInsertRequest { + table_name, + region_number: 0, + columns, + row_count, + } + }) + .collect(); + Ok((InsertRequests { inserts }, sample_counts)) } #[inline] @@ -516,13 +501,16 @@ mod tests { ..Default::default() }; - let exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts; + let mut exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts; + exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name)); assert_eq!(3, exprs.len()); assert_eq!("metric1", exprs[0].table_name); assert_eq!("metric2", exprs[1].table_name); assert_eq!("metric3", exprs[2].table_name); - let expr = exprs.get(0).unwrap(); + let expr = exprs.get_mut(0).unwrap(); + expr.columns + .sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name)); let columns = &expr.columns; let row_count = expr.row_count; @@ -548,7 +536,9 @@ mod tests { vec!["spark", "spark"] ); - let expr = exprs.get(1).unwrap(); + let expr = exprs.get_mut(1).unwrap(); + expr.columns + .sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name)); let columns = &expr.columns; let row_count = expr.row_count; @@ -568,18 +558,20 @@ mod tests { vec![3.0, 4.0] ); - assert_eq!(columns[2].column_name, "instance"); + assert_eq!(columns[2].column_name, "idc"); assert_eq!( columns[2].values.as_ref().unwrap().string_values, - vec!["test_host1", "test_host1"] - ); - assert_eq!(columns[3].column_name, "idc"); - assert_eq!( - columns[3].values.as_ref().unwrap().string_values, vec!["z001", "z001"] ); + assert_eq!(columns[3].column_name, "instance"); + assert_eq!( + columns[3].values.as_ref().unwrap().string_values, + vec!["test_host1", "test_host1"] + ); - let expr = exprs.get(2).unwrap(); + let expr = exprs.get_mut(2).unwrap(); + expr.columns + .sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name)); let columns = &expr.columns; let row_count = expr.row_count; @@ -587,27 +579,27 @@ mod tests { assert_eq!(3, row_count); assert_eq!(columns.len(), 4); - assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); + assert_eq!(columns[0].column_name, "app"); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millisecond_values, + columns[0].values.as_ref().unwrap().string_values, + vec!["biz", "biz", "biz"] + ); + assert_eq!(columns[1].column_name, TIMESTAMP_COLUMN_NAME); + assert_eq!( + columns[1].values.as_ref().unwrap().ts_millisecond_values, vec![1000, 2000, 3000] ); - assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME); + assert_eq!(columns[2].column_name, FIELD_COLUMN_NAME); assert_eq!( - columns[1].values.as_ref().unwrap().f64_values, + columns[2].values.as_ref().unwrap().f64_values, vec![5.0, 6.0, 7.0] ); - assert_eq!(columns[2].column_name, "idc"); - assert_eq!( - columns[2].values.as_ref().unwrap().string_values, - vec!["z002", "z002", "z002"] - ); - assert_eq!(columns[3].column_name, "app"); + assert_eq!(columns[3].column_name, "idc"); assert_eq!( columns[3].values.as_ref().unwrap().string_values, - vec!["biz", "biz", "biz"] + vec!["z002", "z002", "z002"] ); }