feat: decode prom requests to grpc (#3425)

* hack: inline decode

* move to servers

* fix: samples lost

* add bench

* remove useless functions

* wip

* feat: remove object pools

* fix: minor issues

* fix: remove useless dep

* chore: rebase main

* format

* finish

* fix: format

* feat: introduce request pool

* try to fix license issue

* fix: clippy

* resolve comments

* fix:typo

* remove useless comments
This commit is contained in:
Lei, HUANG
2024-03-05 17:47:32 +08:00
committed by GitHub
parent 7b1c3503d0
commit 02b18fbca1
14 changed files with 1318 additions and 25 deletions

11
Cargo.lock generated
View File

@@ -5921,6 +5921,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "object-pool"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee9a3e7196d09ec86002b939f1576e8e446d58def8fd48fe578e2c72d5328d68"
dependencies = [
"parking_lot 0.11.2",
]
[[package]]
name = "object-store"
version = "0.6.0"
@@ -9054,6 +9063,7 @@ dependencies = [
"common-test-util",
"common-time",
"common-version",
"criterion",
"datafusion",
"datafusion-common",
"datafusion-expr",
@@ -9073,6 +9083,7 @@ dependencies = [
"mime_guess",
"mysql_async",
"notify",
"object-pool",
"once_cell",
"openmetrics-parser",
"opensrv-mysql",

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_catalog::format_full_table_name;
@@ -174,7 +175,7 @@ impl PromStoreProtocolHandler for Instance {
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?;
if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
@@ -197,6 +198,38 @@ impl PromStoreProtocolHandler for Instance {
Ok(())
}
async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<()> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;
if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
let _ = self
.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
} else {
let _ = self
.handle_row_inserts(request, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
}
Ok(())
}
async fn read(
&self,
request: ReadRequest,
@@ -276,7 +309,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
ctx: QueryContextRef,
_: bool,
) -> ServerResult<()> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?;
let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?;
self.inserter
.handle_metric_row_inserts(
requests,
@@ -290,6 +323,15 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
Ok(())
}
async fn write_fast(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> ServerResult<()> {
unimplemented!()
}
async fn read(
&self,
_request: ReadRequest,

View File

@@ -60,6 +60,7 @@ itertools.workspace = true
lazy_static.workspace = true
mime_guess = "2.0"
notify = "6.1"
object-pool = "0.5"
once_cell.workspace = true
openmetrics-parser = "0.4"
opensrv-mysql = "0.7.0"
@@ -114,6 +115,7 @@ catalog = { workspace = true, features = ["testing"] }
client.workspace = true
common-base.workspace = true
common-test-util.workspace = true
criterion = "0.4"
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
@@ -129,3 +131,7 @@ tokio-test = "0.4"
[build-dependencies]
common-version.workspace = true
[[bench]]
name = "bench_prom"
harness = false

View File

@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::criterion_main;
mod prom_decode;
criterion_main! {
prom_decode::benches
}

View File

@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use api::prom_store::remote::WriteRequest;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use prost::Message;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::proto::PromWriteRequest;
fn bench_decode_prom_request(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");
let data = Bytes::from(std::fs::read(d).unwrap());
let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
b.iter(|| {
request.clear();
let data = data.clone();
request.merge(data).unwrap();
to_grpc_row_insert_requests(&request).unwrap();
});
})
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data).unwrap();
prom_request.as_row_insert_requests();
});
});
}
criterion_group!(benches, bench_decode_prom_request);
criterion_main!(benches);

Binary file not shown.

View File

@@ -15,14 +15,18 @@
use std::sync::Arc;
use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::v1::RowInsertRequests;
use axum::extract::{Query, RawBody, State};
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
use axum::Extension;
use bytes::Bytes;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing;
use hyper::Body;
use lazy_static::lazy_static;
use object_pool::Pool;
use prost::Message;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@@ -31,9 +35,14 @@ use snafu::prelude::*;
use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
use crate::prom_store::snappy_decompress;
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};
pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
lazy_static! {
static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest> =
Pool::new(256, PromWriteRequest::default);
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct DatabaseQuery {
@@ -91,14 +100,15 @@ pub async fn remote_write(
.with_label_values(&[db.as_str()])
.start_timer();
let request = decode_remote_write_request(body).await?;
let request = decode_remote_write_request_to_row_inserts(body).await?;
if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}
handler.write(request, query_ctx, true).await?;
handler.write_fast(request, query_ctx, true).await?;
Ok((StatusCode::NO_CONTENT, ()))
}
@@ -136,6 +146,23 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}
async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result<RowInsertRequests> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;
let buf = Bytes::from(snappy_decompress(&body[..])?);
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf)
.context(error::DecodePromRemoteRequestSnafu)?;
let (requests, samples) = request.as_row_insert_requests();
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES.observe(samples as f64);
Ok(requests)
}
async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)

View File

@@ -36,9 +36,13 @@ pub mod mysql;
pub mod opentsdb;
pub mod otlp;
pub mod postgres;
mod prom_row_builder;
pub mod prom_store;
pub mod prometheus_handler;
pub mod proto;
pub mod query_handler;
#[allow(clippy::all)]
mod repeated_field;
mod row_writer;
pub mod server;
mod shutdown;

View File

@@ -0,0 +1,272 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::string::ToString;
use api::prom_store::remote::Sample;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value,
};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::proto::PromLabel;
use crate::repeated_field::Clear;
/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
#[derive(Default)]
pub(crate) struct TablesBuilder {
tables: HashMap<String, TableBuilder>,
}
impl Clear for TablesBuilder {
fn clear(&mut self) {
self.tables.clear();
}
}
impl TablesBuilder {
/// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist.
pub(crate) fn get_or_create_table_builder(
&mut self,
table_name: String,
label_num: usize,
row_num: usize,
) -> &mut TableBuilder {
self.tables
.entry(table_name)
.or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
}
/// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states.
pub(crate) fn as_insert_requests(&mut self) -> (RowInsertRequests, usize) {
let mut total_rows = 0;
let inserts = self
.tables
.drain()
.map(|(name, mut table)| {
total_rows += table.num_rows();
table.as_row_insert_request(name)
})
.collect();
(RowInsertRequests { inserts }, total_rows)
}
}
/// Builder for one table.
pub(crate) struct TableBuilder {
/// Column schemas.
schema: Vec<ColumnSchema>,
/// Rows written.
rows: Vec<Row>,
/// Indices of columns inside `schema`.
col_indexes: HashMap<String, usize>,
}
impl Default for TableBuilder {
fn default() -> Self {
Self::with_capacity(2, 0)
}
}
impl TableBuilder {
pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
let mut col_indexes = HashMap::with_capacity(cols);
col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
let mut schema = Vec::with_capacity(cols);
schema.push(ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
});
schema.push(ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
});
Self {
schema,
rows: Vec::with_capacity(rows),
col_indexes,
}
}
/// Total number of rows inside table builder.
fn num_rows(&self) -> usize {
self.rows.len()
}
/// Adds a set of labels and samples to table builder.
pub(crate) fn add_labels_and_samples(&mut self, labels: &[PromLabel], samples: &[Sample]) {
let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
for PromLabel { name, value } in labels {
// safety: we expect all labels are UTF-8 encoded strings.
let tag_name = unsafe { String::from_utf8_unchecked(name.to_vec()) };
let tag_value = unsafe { String::from_utf8_unchecked(value.to_vec()) };
let tag_value = Some(ValueData::StringValue(tag_value));
let tag_num = self.col_indexes.len();
match self.col_indexes.entry(tag_name) {
Entry::Occupied(e) => {
row[*e.get()].value_data = tag_value;
}
Entry::Vacant(e) => {
let column_name = e.key().clone();
e.insert(tag_num);
self.schema.push(ColumnSchema {
column_name,
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
datatype_extension: None,
});
row.push(Value {
value_data: tag_value,
});
}
}
}
if samples.len() == 1 {
let sample = &samples[0];
row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
row[1].value_data = Some(ValueData::F64Value(sample.value));
self.rows.push(Row { values: row });
return;
}
for sample in samples {
row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
row[1].value_data = Some(ValueData::F64Value(sample.value));
self.rows.push(Row {
values: row.clone(),
});
}
}
/// Converts [TableBuilder] to [RowInsertRequest] and clears buffered data.
pub(crate) fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
let mut rows = std::mem::take(&mut self.rows);
let schema = std::mem::take(&mut self.schema);
let col_num = schema.len();
for row in &mut rows {
if row.values.len() < col_num {
row.values.resize(col_num, Value { value_data: None });
}
}
RowInsertRequest {
table_name,
rows: Some(Rows { schema, rows }),
}
}
}
#[cfg(test)]
mod tests {
use api::prom_store::remote::Sample;
use api::v1::value::ValueData;
use api::v1::Value;
use bytes::Bytes;
use crate::prom_row_builder::TableBuilder;
use crate::proto::PromLabel;
#[test]
fn test_table_builder() {
let mut builder = TableBuilder::default();
builder.add_labels_and_samples(
&[
PromLabel {
name: Bytes::from("tag0"),
value: Bytes::from("v0"),
},
PromLabel {
name: Bytes::from("tag1"),
value: Bytes::from("v1"),
},
],
&[Sample {
value: 0.0,
timestamp: 0,
}],
);
builder.add_labels_and_samples(
&[
PromLabel {
name: Bytes::from("tag0"),
value: Bytes::from("v0"),
},
PromLabel {
name: Bytes::from("tag2"),
value: Bytes::from("v2"),
},
],
&[Sample {
value: 0.1,
timestamp: 1,
}],
);
let request = builder.as_row_insert_request("test".to_string());
let rows = request.rows.unwrap().rows;
assert_eq!(2, rows.len());
assert_eq!(
vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0))
},
Value {
value_data: Some(ValueData::F64Value(0.0))
},
Value {
value_data: Some(ValueData::StringValue("v0".to_string()))
},
Value {
value_data: Some(ValueData::StringValue("v1".to_string()))
},
Value { value_data: None },
],
rows[0].values
);
assert_eq!(
vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(1))
},
Value {
value_data: Some(ValueData::F64Value(0.1))
},
Value {
value_data: Some(ValueData::StringValue("v0".to_string()))
},
Value { value_data: None },
Value {
value_data: Some(ValueData::StringValue("v2".to_string()))
},
],
rows[1].values
);
}
}

View File

@@ -39,6 +39,8 @@ use crate::row_writer::{self, MultiTableData};
pub const METRIC_NAME_LABEL: &str = "__name__";
pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
/// Metrics for push gateway protocol
pub struct Metrics {
pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
@@ -300,12 +302,12 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
Ok(timeseries_map.into_values().collect())
}
pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRequests, usize)> {
pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
let mut multi_table_data = MultiTableData::new();
for series in request.timeseries {
for series in &request.timeseries {
let table_name = &series
.labels
.iter()
@@ -329,11 +331,11 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe
);
// labels
let kvs = series.labels.into_iter().filter_map(|label| {
let kvs = series.labels.iter().filter_map(|label| {
if label.name == METRIC_NAME_LABEL {
None
} else {
Some((label.name, label.value))
Some((label.name.clone(), label.value.clone()))
}
});
@@ -649,7 +651,7 @@ mod tests {
..Default::default()
};
let mut exprs = to_grpc_row_insert_requests(write_request)
let mut exprs = to_grpc_row_insert_requests(&write_request)
.unwrap()
.0
.inserts;

304
src/servers/src/proto.rs Normal file
View File

@@ -0,0 +1,304 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use api::prom_store::remote::Sample;
use api::v1::RowInsertRequests;
use bytes::{Buf, Bytes};
use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, DecodeContext, WireType};
use prost::DecodeError;
use crate::prom_row_builder::TablesBuilder;
use crate::prom_store::METRIC_NAME_LABEL_BYTES;
use crate::repeated_field::{Clear, RepeatedField};
impl Clear for Sample {
fn clear(&mut self) {}
}
#[derive(Default, Clone)]
pub struct PromLabel {
pub name: Bytes,
pub value: Bytes,
}
impl Clear for PromLabel {
fn clear(&mut self) {
self.name.clear();
self.value.clear();
}
}
impl PromLabel {
pub fn merge_field<B>(
&mut self,
tag: u32,
wire_type: WireType,
buf: &mut B,
ctx: DecodeContext,
) -> Result<(), DecodeError>
where
B: Buf,
{
const STRUCT_NAME: &str = "PromLabel";
match tag {
1u32 => {
// decode label name
let value = &mut self.name;
prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| {
error.push(STRUCT_NAME, "name");
error
})
}
2u32 => {
// decode label value
let value = &mut self.value;
prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| {
error.push(STRUCT_NAME, "value");
error
})
}
_ => prost::encoding::skip_field(wire_type, tag, buf, ctx),
}
}
}
#[derive(Default)]
pub struct PromTimeSeries {
pub table_name: String,
pub labels: RepeatedField<PromLabel>,
pub samples: RepeatedField<Sample>,
}
impl Clear for PromTimeSeries {
fn clear(&mut self) {
self.table_name.clear();
self.labels.clear();
self.samples.clear();
}
}
impl PromTimeSeries {
pub fn merge_field<B>(
&mut self,
tag: u32,
wire_type: WireType,
buf: &mut B,
ctx: DecodeContext,
) -> Result<(), DecodeError>
where
B: Buf,
{
const STRUCT_NAME: &str = "PromTimeSeries";
match tag {
1u32 => {
// decode labels
let label = self.labels.push_default();
let len = decode_varint(buf).map_err(|mut error| {
error.push(STRUCT_NAME, "labels");
error
})?;
let remaining = buf.remaining();
if len > remaining as u64 {
return Err(DecodeError::new("buffer underflow"));
}
let limit = remaining - len as usize;
while buf.remaining() > limit {
let (tag, wire_type) = decode_key(buf)?;
label.merge_field(tag, wire_type, buf, ctx.clone())?;
}
if buf.remaining() != limit {
return Err(DecodeError::new("delimited length exceeded"));
}
if label.name.deref() == METRIC_NAME_LABEL_BYTES {
// safety: we expect all labels are UTF-8 encoded strings.
let table_name = unsafe { String::from_utf8_unchecked(label.value.to_vec()) };
self.table_name = table_name;
self.labels.truncate(self.labels.len() - 1); // remove last label
}
Ok(())
}
2u32 => {
let sample = self.samples.push_default();
merge(WireType::LengthDelimited, sample, buf, ctx).map_err(|mut error| {
error.push(STRUCT_NAME, "samples");
error
})?;
Ok(())
}
// skip exemplars
3u32 => prost::encoding::skip_field(wire_type, tag, buf, ctx),
_ => prost::encoding::skip_field(wire_type, tag, buf, ctx),
}
}
fn add_to_table_data(&mut self, table_builders: &mut TablesBuilder) {
let label_num = self.labels.len();
let row_num = self.samples.len();
let table_data = table_builders.get_or_create_table_builder(
std::mem::take(&mut self.table_name),
label_num,
row_num,
);
table_data.add_labels_and_samples(self.labels.as_slice(), self.samples.as_slice());
self.labels.clear();
self.samples.clear();
}
}
#[derive(Default)]
pub struct PromWriteRequest {
table_data: TablesBuilder,
series: PromTimeSeries,
}
impl Clear for PromWriteRequest {
fn clear(&mut self) {
self.table_data.clear();
}
}
impl PromWriteRequest {
pub fn as_row_insert_requests(&mut self) -> (RowInsertRequests, usize) {
self.table_data.as_insert_requests()
}
pub fn merge<B>(&mut self, mut buf: B) -> Result<(), DecodeError>
where
B: Buf,
Self: Sized,
{
const STRUCT_NAME: &str = "PromWriteRequest";
let ctx = DecodeContext::default();
while buf.has_remaining() {
let (tag, wire_type) = decode_key(&mut buf)?;
assert_eq!(WireType::LengthDelimited, wire_type);
match tag {
1u32 => {
// decode TimeSeries
let len = decode_varint(&mut buf).map_err(|mut e| {
e.push(STRUCT_NAME, "timeseries");
e
})?;
let remaining = buf.remaining();
if len > remaining as u64 {
return Err(DecodeError::new("buffer underflow"));
}
let limit = remaining - len as usize;
while buf.remaining() > limit {
let (tag, wire_type) = decode_key(&mut buf)?;
self.series
.merge_field(tag, wire_type, &mut buf, ctx.clone())?;
}
if buf.remaining() != limit {
return Err(DecodeError::new("delimited length exceeded"));
}
self.series.add_to_table_data(&mut self.table_data);
}
3u32 => {
// we can ignore metadata for now.
prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?;
}
_ => prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?,
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use api::prom_store::remote::WriteRequest;
use api::v1::RowInsertRequests;
use bytes::Bytes;
use prost::Message;
use crate::prom_store::to_grpc_row_insert_requests;
use crate::proto::PromWriteRequest;
use crate::repeated_field::Clear;
fn check_deserialized(
prom_write_request: &mut PromWriteRequest,
data: &Bytes,
expected_samples: usize,
expected_rows: &RowInsertRequests,
) {
prom_write_request.clear();
prom_write_request.merge(data.clone()).unwrap();
let (prom_rows, samples) = prom_write_request.as_row_insert_requests();
assert_eq!(expected_samples, samples);
assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len());
let schemas = expected_rows
.inserts
.iter()
.map(|r| {
(
r.table_name.clone(),
r.rows
.as_ref()
.unwrap()
.schema
.iter()
.map(|c| (c.column_name.clone(), c.datatype, c.semantic_type))
.collect::<HashSet<_>>(),
)
})
.collect::<HashMap<_, _>>();
for r in &prom_rows.inserts {
let expected = schemas.get(&r.table_name).unwrap();
assert_eq!(
expected,
&r.rows
.as_ref()
.unwrap()
.schema
.iter()
.map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) })
.collect()
);
}
}
// Ensures `WriteRequest` and `PromWriteRequest` produce the same gRPC request.
#[test]
fn test_decode_write_request() {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");
let data = Bytes::from(std::fs::read(d).unwrap());
let (expected_rows, expected_samples) =
to_grpc_row_insert_requests(&WriteRequest::decode(data.clone()).unwrap()).unwrap();
let mut prom_write_request = PromWriteRequest::default();
for _ in 0..3 {
check_deserialized(
&mut prom_write_request,
&data,
expected_samples,
&expected_rows,
);
}
}
}

View File

@@ -29,6 +29,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_query::Output;
use opentelemetry_proto::tonic::collector::metrics::v1::{
@@ -95,6 +96,15 @@ pub trait PromStoreProtocolHandler {
ctx: QueryContextRef,
with_metric_engine: bool,
) -> Result<()>;
/// Handling prometheus remote write requests
async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> Result<()>;
/// Handling prometheus remote read requests
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
/// Handling push gateway requests

View File

@@ -0,0 +1,540 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright (c) 2019 Stepan Koltsov
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
// OR OTHER DEALINGS IN THE SOFTWARE.
/// ! The [Clear] trait and [RepeatedField] are taken from [rust-protobuf](https://github.com/stepancheg/rust-protobuf/tree/master/protobuf-examples/vs-prost)
/// to leverage the pooling mechanism to avoid frequent heap allocation/deallocation when decoding deeply nested structs.
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::default::Default;
use std::hash::{Hash, Hasher};
use std::iter::{FromIterator, IntoIterator};
use std::ops::{Deref, DerefMut, Index, IndexMut};
use std::{fmt, slice, vec};
use bytes::Bytes;
/// anything that can be cleared
pub trait Clear {
/// Clear this make, make it equivalent to newly created object.
fn clear(&mut self);
}
impl<T> Clear for Option<T> {
fn clear(&mut self) {
self.take();
}
}
impl Clear for String {
fn clear(&mut self) {
String::clear(self);
}
}
impl<T> Clear for Vec<T> {
fn clear(&mut self) {
Vec::clear(self);
}
}
impl Clear for Bytes {
fn clear(&mut self) {
Bytes::clear(self);
}
}
/// Wrapper around vector to avoid deallocations on clear.
pub struct RepeatedField<T> {
vec: Vec<T>,
len: usize,
}
impl<T> RepeatedField<T> {
/// Return number of elements in this container.
#[inline]
pub fn len(&self) -> usize {
self.len
}
/// Clear.
#[inline]
pub fn clear(&mut self) {
self.len = 0;
}
}
impl<T> Default for RepeatedField<T> {
#[inline]
fn default() -> RepeatedField<T> {
RepeatedField {
vec: Vec::new(),
len: 0,
}
}
}
impl<T> RepeatedField<T> {
/// Create new empty container.
#[inline]
pub fn new() -> RepeatedField<T> {
Default::default()
}
/// Create a contained with data from given vec.
#[inline]
pub fn from_vec(vec: Vec<T>) -> RepeatedField<T> {
let len = vec.len();
RepeatedField { vec, len }
}
/// Convert data into vec.
#[inline]
pub fn into_vec(self) -> Vec<T> {
let mut vec = self.vec;
vec.truncate(self.len);
vec
}
/// Return current capacity.
#[inline]
pub fn capacity(&self) -> usize {
self.vec.capacity()
}
/// View data as slice.
#[inline]
pub fn as_slice<'a>(&'a self) -> &'a [T] {
&self.vec[..self.len]
}
/// View data as mutable slice.
#[inline]
pub fn as_mut_slice<'a>(&'a mut self) -> &'a mut [T] {
&mut self.vec[..self.len]
}
/// Get subslice of this container.
#[inline]
pub fn slice(&self, start: usize, end: usize) -> &[T] {
&self.as_ref()[start..end]
}
/// Get mutable subslice of this container.
#[inline]
pub fn slice_mut(&mut self, start: usize, end: usize) -> &mut [T] {
&mut self.as_mut_slice()[start..end]
}
/// Get slice from given index.
#[inline]
pub fn slice_from(&self, start: usize) -> &[T] {
&self.as_ref()[start..]
}
/// Get mutable slice from given index.
#[inline]
pub fn slice_from_mut(&mut self, start: usize) -> &mut [T] {
&mut self.as_mut_slice()[start..]
}
/// Get slice to given index.
#[inline]
pub fn slice_to(&self, end: usize) -> &[T] {
&self.as_ref()[..end]
}
/// Get mutable slice to given index.
#[inline]
pub fn slice_to_mut(&mut self, end: usize) -> &mut [T] {
&mut self.as_mut_slice()[..end]
}
/// View this container as two slices split at given index.
#[inline]
pub fn split_at<'a>(&'a self, mid: usize) -> (&'a [T], &'a [T]) {
self.as_ref().split_at(mid)
}
/// View this container as two mutable slices split at given index.
#[inline]
pub fn split_at_mut<'a>(&'a mut self, mid: usize) -> (&'a mut [T], &'a mut [T]) {
self.as_mut_slice().split_at_mut(mid)
}
/// View all but first elements of this container.
#[inline]
pub fn tail(&self) -> &[T] {
&self.as_ref()[1..]
}
/// Last element of this container.
#[inline]
pub fn last(&self) -> Option<&T> {
self.as_ref().last()
}
/// Mutable last element of this container.
#[inline]
pub fn last_mut<'a>(&'a mut self) -> Option<&'a mut T> {
self.as_mut_slice().last_mut()
}
/// View all but last elements of this container.
#[inline]
pub fn init<'a>(&'a self) -> &'a [T] {
let s = self.as_ref();
&s[0..s.len() - 1]
}
/// Push an element to the end.
#[inline]
pub fn push(&mut self, value: T) {
if self.len == self.vec.len() {
self.vec.push(value);
} else {
self.vec[self.len] = value;
}
self.len += 1;
}
/// Pop last element.
#[inline]
pub fn pop(&mut self) -> Option<T> {
if self.len == 0 {
None
} else {
self.vec.truncate(self.len);
self.len -= 1;
self.vec.pop()
}
}
/// Insert an element at specified position.
#[inline]
pub fn insert(&mut self, index: usize, value: T) {
assert!(index <= self.len);
self.vec.insert(index, value);
self.len += 1;
}
/// Remove an element from specified position.
#[inline]
pub fn remove(&mut self, index: usize) -> T {
assert!(index < self.len);
self.len -= 1;
self.vec.remove(index)
}
/// Retains only the elements specified by the predicate.
///
/// In other words, remove all elements `e` such that `f(&e)` returns `false`.
/// This method operates in place, visiting each element exactly once in the
/// original order, and preserves the order of the retained elements.
///
/// # Examples
///
/// ```
/// # use protobuf::RepeatedField;
///
/// let mut vec = RepeatedField::from(vec![1, 2, 3, 4]);
/// vec.retain(|&x| x % 2 == 0);
/// assert_eq!(vec, RepeatedField::from(vec![2, 4]));
/// ```
pub fn retain<F>(&mut self, f: F)
where
F: FnMut(&T) -> bool,
{
// suboptimal
self.vec.truncate(self.len);
self.vec.retain(f);
self.len = self.vec.len();
}
/// Truncate at specified length.
#[inline]
pub fn truncate(&mut self, len: usize) {
if self.len > len {
self.len = len;
}
}
/// Reverse in place.
#[inline]
pub fn reverse(&mut self) {
self.as_mut_slice().reverse()
}
/// Into owned iterator.
#[inline]
pub fn into_iter(mut self) -> vec::IntoIter<T> {
self.vec.truncate(self.len);
self.vec.into_iter()
}
/// Immutable data iterator.
#[inline]
pub fn iter<'a>(&'a self) -> slice::Iter<'a, T> {
self.as_ref().iter()
}
/// Mutable data iterator.
#[inline]
pub fn iter_mut<'a>(&'a mut self) -> slice::IterMut<'a, T> {
self.as_mut_slice().iter_mut()
}
/// Sort elements with given comparator.
#[inline]
pub fn sort_by<F>(&mut self, compare: F)
where
F: Fn(&T, &T) -> Ordering,
{
self.as_mut_slice().sort_by(compare)
}
/// Get data as raw pointer.
#[inline]
pub fn as_ptr(&self) -> *const T {
self.vec.as_ptr()
}
/// Get data a mutable raw pointer.
#[inline]
pub fn as_mut_ptr(&mut self) -> *mut T {
self.vec.as_mut_ptr()
}
}
impl<T: Default + Clear> RepeatedField<T> {
/// Push default value.
/// This operation could be faster than `rf.push(Default::default())`,
/// because it may reuse previously allocated and cleared element.
pub fn push_default<'a>(&'a mut self) -> &'a mut T {
if self.len == self.vec.len() {
self.vec.push(Default::default());
} else {
self.vec[self.len].clear();
}
self.len += 1;
self.last_mut().unwrap()
}
}
impl<T> From<Vec<T>> for RepeatedField<T> {
#[inline]
fn from(values: Vec<T>) -> RepeatedField<T> {
RepeatedField::from_vec(values)
}
}
impl<'a, T: Clone> From<&'a [T]> for RepeatedField<T> {
#[inline]
fn from(values: &'a [T]) -> RepeatedField<T> {
RepeatedField::from_slice(values)
}
}
impl<T> Into<Vec<T>> for RepeatedField<T> {
#[inline]
fn into(self) -> Vec<T> {
self.into_vec()
}
}
impl<T: Clone> RepeatedField<T> {
/// Copy slice data to `RepeatedField`
#[inline]
pub fn from_slice(values: &[T]) -> RepeatedField<T> {
RepeatedField::from_vec(values.to_vec())
}
/// Copy slice data to `RepeatedField`
#[inline]
pub fn from_ref<X: AsRef<[T]>>(values: X) -> RepeatedField<T> {
RepeatedField::from_slice(values.as_ref())
}
/// Copy this data into new vec.
#[inline]
pub fn to_vec(&self) -> Vec<T> {
self.as_ref().to_vec()
}
}
impl<T: Clone> Clone for RepeatedField<T> {
#[inline]
fn clone(&self) -> RepeatedField<T> {
RepeatedField {
vec: self.to_vec(),
len: self.len(),
}
}
}
impl<T> FromIterator<T> for RepeatedField<T> {
#[inline]
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> RepeatedField<T> {
RepeatedField::from_vec(FromIterator::from_iter(iter))
}
}
impl<'a, T> IntoIterator for &'a RepeatedField<T> {
type Item = &'a T;
type IntoIter = slice::Iter<'a, T>;
fn into_iter(self) -> slice::Iter<'a, T> {
self.iter()
}
}
impl<'a, T> IntoIterator for &'a mut RepeatedField<T> {
type Item = &'a mut T;
type IntoIter = slice::IterMut<'a, T>;
fn into_iter(self) -> slice::IterMut<'a, T> {
self.iter_mut()
}
}
impl<'a, T> IntoIterator for RepeatedField<T> {
type Item = T;
type IntoIter = vec::IntoIter<T>;
fn into_iter(self) -> vec::IntoIter<T> {
self.into_iter()
}
}
impl<T: PartialEq> PartialEq for RepeatedField<T> {
#[inline]
fn eq(&self, other: &RepeatedField<T>) -> bool {
self.as_ref() == other.as_ref()
}
}
impl<T: Eq> Eq for RepeatedField<T> {}
impl<T: PartialEq> PartialEq<[T]> for RepeatedField<T> {
fn eq(&self, other: &[T]) -> bool {
self.as_slice() == other
}
}
impl<T: PartialEq> PartialEq<RepeatedField<T>> for [T] {
fn eq(&self, other: &RepeatedField<T>) -> bool {
self == other.as_slice()
}
}
impl<T: PartialEq> RepeatedField<T> {
/// True iff this container contains given element.
#[inline]
pub fn contains(&self, value: &T) -> bool {
self.as_ref().contains(value)
}
}
impl<T: Hash> Hash for RepeatedField<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_ref().hash(state);
}
}
impl<T> AsRef<[T]> for RepeatedField<T> {
#[inline]
fn as_ref<'a>(&'a self) -> &'a [T] {
&self.vec[..self.len]
}
}
impl<T> Borrow<[T]> for RepeatedField<T> {
#[inline]
fn borrow(&self) -> &[T] {
&self.vec[..self.len]
}
}
impl<T> Deref for RepeatedField<T> {
type Target = [T];
#[inline]
fn deref(&self) -> &[T] {
&self.vec[..self.len]
}
}
impl<T> DerefMut for RepeatedField<T> {
#[inline]
fn deref_mut(&mut self) -> &mut [T] {
&mut self.vec[..self.len]
}
}
impl<T> Index<usize> for RepeatedField<T> {
type Output = T;
#[inline]
fn index<'a>(&'a self, index: usize) -> &'a T {
&self.as_ref()[index]
}
}
impl<T> IndexMut<usize> for RepeatedField<T> {
#[inline]
fn index_mut<'a>(&'a mut self, index: usize) -> &'a mut T {
&mut self.as_mut_slice()[index]
}
}
impl<T> Extend<T> for RepeatedField<T> {
fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
self.vec.truncate(self.len);
self.vec.extend(iter);
self.len = self.vec.len();
}
}
impl<'a, T: Copy + 'a> Extend<&'a T> for RepeatedField<T> {
fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
self.vec.truncate(self.len);
self.vec.extend(iter);
self.len = self.vec.len();
}
}
impl<T: fmt::Debug> fmt::Debug for RepeatedField<T> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.as_ref().fmt(f)
}
}

View File

@@ -18,6 +18,7 @@ use api::prom_store::remote::{
LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest,
};
use api::v1::greptime_request::Request;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use axum::Router;
use axum_test_helper::TestClient;
@@ -64,6 +65,16 @@ impl PromStoreProtocolHandler for DummyInstance {
Ok(())
}
async fn write_fast(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> Result<()> {
Ok(())
}
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse> {
let _ = self
.tx
@@ -141,6 +152,7 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
#[tokio::test]
async fn test_prometheus_remote_write_read() {
common_telemetry::init_default_ut_logging();
let (tx, mut rx) = mpsc::channel(100);
let app = make_test_app(tx);
@@ -219,28 +231,17 @@ async fn test_prometheus_remote_write_read() {
requests.push(s);
}
assert_eq!(4, requests.len());
assert_eq!(2, requests.len());
assert_eq!("public", requests[0].0);
assert_eq!("prometheus", requests[1].0);
assert_eq!("prometheus", requests[2].0);
assert_eq!("public", requests[3].0);
assert_eq!(
write_request,
WriteRequest::decode(&(requests[0].1)[..]).unwrap()
);
assert_eq!(
write_request,
WriteRequest::decode(&(requests[1].1)[..]).unwrap()
);
assert_eq!("prometheus", requests[0].0);
assert_eq!("public", requests[1].0);
assert_eq!(
read_request,
ReadRequest::decode(&(requests[2].1)[..]).unwrap()
ReadRequest::decode(&(requests[0].1)[..]).unwrap()
);
assert_eq!(
read_request,
ReadRequest::decode(&(requests[3].1)[..]).unwrap()
ReadRequest::decode(&(requests[1].1)[..]).unwrap()
);
}