feat(servers): collect samples by metric (#1706)

This commit is contained in:
Yingwen
2023-06-03 17:17:52 +08:00
committed by GitHub
parent 94228285a7
commit 466f258266
2 changed files with 97 additions and 96 deletions

View File

@@ -122,6 +122,12 @@ pub enum Error {
source: common_grpc::error::Error,
},
#[snafu(display("Failed to write prometheus series, source: {}", source))]
PromSeriesWrite {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
#[snafu(display("Failed to convert time precision, name: {}", name))]
TimePrecision { name: String, location: Location },
@@ -300,7 +306,9 @@ impl ErrorExt for Error {
| InvalidPrepareStatement { .. }
| TimePrecision { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. } => source.status_code(),
InfluxdbLinesWrite { source, .. } | PromSeriesWrite { source, .. } => {
source.status_code()
}
Hyper { .. } => StatusCode::Unknown,
TlsRequired { .. } => StatusCode::Unknown,
@@ -405,6 +413,7 @@ impl IntoResponse for Error {
let (status, error_message) = match self {
Error::InfluxdbLineProtocol { .. }
| Error::InfluxdbLinesWrite { .. }
| Error::PromSeriesWrite { .. }
| Error::InvalidOpentsdbLine { .. }
| Error::InvalidOpentsdbJsonRequest { .. }
| Error::DecodePromRemoteRequest { .. }

View File

@@ -15,13 +15,13 @@
//! prometheus protocol supportings
//! handles prometheus remote_write, remote_read logic
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests};
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use common_grpc::writer::{LinesWriter, Precision};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
use datatypes::prelude::{ConcreteDataType, Value};
@@ -284,83 +284,68 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
}
pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> {
let (inserts, samples_counts) = itertools::process_results(
request.timeseries.into_iter().map(to_grpc_insert_request),
|x| x.unzip::<_, _, Vec<_>, Vec<_>>(),
)?;
Ok((
InsertRequests { inserts },
samples_counts.into_iter().sum::<usize>(),
))
}
let mut writers: HashMap<String, LinesWriter> = HashMap::new();
for timeseries in &request.timeseries {
let table_name = timeseries
.labels
.iter()
.find(|label| {
// The metric name is a special label
label.name == METRIC_NAME_LABEL
})
.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?
.value
.clone();
fn to_grpc_insert_request(timeseries: TimeSeries) -> Result<(GrpcInsertRequest, usize)> {
let samples_count = timeseries.samples.len();
let writer = writers
.entry(table_name)
.or_insert_with(|| LinesWriter::with_lines(16));
// For each sample
for sample in &timeseries.samples {
// Insert labels first.
for label in &timeseries.labels {
// The metric name is a special label
if label.name == METRIC_NAME_LABEL {
continue;
}
// TODO(dennis): save exemplars into a column
let labels = timeseries.labels;
let samples = timeseries.samples;
writer
.write_tag(&label.name, &label.value)
.context(error::PromSeriesWriteSnafu)?;
}
// Insert sample timestamp.
writer
.write_ts(
TIMESTAMP_COLUMN_NAME,
(sample.timestamp, Precision::Millisecond),
)
.context(error::PromSeriesWriteSnafu)?;
// Insert sample value.
writer
.write_f64(FIELD_COLUMN_NAME, sample.value)
.context(error::PromSeriesWriteSnafu)?;
let row_count = samples.len();
let mut columns = Vec::with_capacity(2 + labels.len());
let ts_column = Column {
column_name: TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millisecond_values: samples.iter().map(|x| x.timestamp).collect(),
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
};
columns.push(ts_column);
let field_column = Column {
column_name: FIELD_COLUMN_NAME.to_string(),
values: Some(column::Values {
f64_values: samples.iter().map(|x| x.value).collect(),
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
..Default::default()
};
columns.push(field_column);
let mut table_name = None;
for label in labels {
let tagk = label.name;
let tagv = label.value;
// The metric name is a special label
if tagk == METRIC_NAME_LABEL {
table_name = Some(tagv);
continue;
writer.commit();
}
columns.push(Column {
column_name: tagk.to_string(),
values: Some(column::Values {
string_values: std::iter::repeat(tagv).take(row_count).collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
});
}
let request = GrpcInsertRequest {
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?,
region_number: 0,
columns,
row_count: row_count as u32,
};
Ok((request, samples_count))
let mut sample_counts = 0;
let inserts = writers
.into_iter()
.map(|(table_name, writer)| {
let (columns, row_count) = writer.finish();
sample_counts += row_count as usize;
GrpcInsertRequest {
table_name,
region_number: 0,
columns,
row_count,
}
})
.collect();
Ok((InsertRequests { inserts }, sample_counts))
}
#[inline]
@@ -516,13 +501,16 @@ mod tests {
..Default::default()
};
let exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts;
let mut exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts;
exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name));
assert_eq!(3, exprs.len());
assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name);
let expr = exprs.get(0).unwrap();
let expr = exprs.get_mut(0).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns;
let row_count = expr.row_count;
@@ -548,7 +536,9 @@ mod tests {
vec!["spark", "spark"]
);
let expr = exprs.get(1).unwrap();
let expr = exprs.get_mut(1).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns;
let row_count = expr.row_count;
@@ -568,18 +558,20 @@ mod tests {
vec![3.0, 4.0]
);
assert_eq!(columns[2].column_name, "instance");
assert_eq!(columns[2].column_name, "idc");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["z001", "z001"]
);
assert_eq!(columns[3].column_name, "instance");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
let expr = exprs.get(2).unwrap();
let expr = exprs.get_mut(2).unwrap();
expr.columns
.sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name));
let columns = &expr.columns;
let row_count = expr.row_count;
@@ -587,27 +579,27 @@ mod tests {
assert_eq!(3, row_count);
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(columns[0].column_name, "app");
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millisecond_values,
columns[0].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"]
);
assert_eq!(columns[1].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().ts_millisecond_values,
vec![1000, 2000, 3000]
);
assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME);
assert_eq!(columns[2].column_name, FIELD_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
columns[2].values.as_ref().unwrap().f64_values,
vec![5.0, 6.0, 7.0]
);
assert_eq!(columns[2].column_name, "idc");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["z002", "z002", "z002"]
);
assert_eq!(columns[3].column_name, "app");
assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"]
vec!["z002", "z002", "z002"]
);
}