feat: Prometheus remote write with pipeline (#5981)

* chore: update nightly version

* chore: sort lint lines

* chore: minor fix

* chore: update nix

* chore: update toolchain to 2024-04-14

* chore: update toolchain to 2024-04-15

* chore: remove unnecessory test

* chore: do not assert oid in sqlness test

* chore: fix margin issue

* chore: fix cr issues

* chore: fix cr issues

* chore: add pipelie handler to prom state

* chore: add prom series processor to merge function

* chore: add run pipeline in decode

* chore: add channel to pipeline ctx

* chore: add pipeline info to remote wirte hander

* chore: minor update

* chore: minor update

* chore: add test

* chore: add comment

* refactor: simplify identity pipeline params

* fix: test

* refactor: remove is_prometheus

---------

Co-authored-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
shuiyisong
2025-05-19 16:00:59 +08:00
committed by GitHub
parent 3a5534722c
commit a0d89c9ed1
13 changed files with 357 additions and 60 deletions

View File

@@ -102,6 +102,7 @@ where
builder = builder
.with_prom_handler(
self.instance.clone(),
Some(self.instance.clone()),
opts.prom_store.with_metric_engine,
opts.http.is_strict_mode,
)

View File

@@ -23,10 +23,12 @@ use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use coerce::{coerce_columns, coerce_value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use serde_json::Number;
use session::context::Channel;
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
@@ -38,7 +40,7 @@ use crate::etl::transform::index::Index;
use crate::etl::transform::{Transform, Transforms};
use crate::etl::value::{Timestamp, Value};
use crate::etl::PipelineMap;
use crate::{IdentityTimeIndex, PipelineContext};
use crate::PipelineContext;
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
@@ -332,61 +334,87 @@ fn resolve_number_schema(
)
}
fn calc_ts(p_ctx: &PipelineContext, values: &PipelineMap) -> Result<Option<ValueData>> {
match p_ctx.channel {
Channel::Prometheus => Ok(Some(ValueData::TimestampMillisecondValue(
values
.get(GREPTIME_TIMESTAMP)
.and_then(|v| v.as_i64())
.unwrap_or_default(),
))),
_ => {
let custom_ts = p_ctx.pipeline_definition.get_custom_ts();
match custom_ts {
Some(ts) => {
let ts_field = values.get(ts.get_column_name());
Some(ts.get_timestamp(ts_field)).transpose()
}
None => Ok(Some(ValueData::TimestampNanosecondValue(
chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
))),
}
}
}
}
fn values_to_row(
schema_info: &mut SchemaInfo,
values: PipelineMap,
custom_ts: Option<&IdentityTimeIndex>,
pipeline_ctx: &PipelineContext<'_>,
) -> Result<Row> {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
// set time index value
let value_data = match custom_ts {
Some(ts) => {
let ts_field = values.get(ts.get_column_name());
Some(ts.get_timestamp(ts_field)?)
}
None => Some(ValueData::TimestampNanosecondValue(
chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(),
)),
};
// calculate timestamp value based on the channel
let ts = calc_ts(pipeline_ctx, &values)?;
row.push(GreptimeValue { value_data });
row.push(GreptimeValue { value_data: ts });
for _ in 1..schema_info.schema.len() {
row.push(GreptimeValue { value_data: None });
}
// skip ts column
let ts_column_name = custom_ts
.as_ref()
.map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
for (column_name, value) in values {
// skip ts column
let ts_column = custom_ts
.as_ref()
.map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name());
if column_name == ts_column {
if column_name == ts_column_name {
continue;
}
let index = schema_info.index.get(&column_name).copied();
resolve_value(index, value, column_name, &mut row, schema_info)?;
resolve_value(value, column_name, &mut row, schema_info, pipeline_ctx)?;
}
Ok(Row { values: row })
}
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
if p_ctx.channel == Channel::Prometheus && column_name != GREPTIME_VALUE {
SemanticType::Tag as i32
} else {
SemanticType::Field as i32
}
}
fn resolve_value(
index: Option<usize>,
value: Value,
column_name: String,
row: &mut Vec<GreptimeValue>,
schema_info: &mut SchemaInfo,
p_ctx: &PipelineContext,
) -> Result<()> {
let index = schema_info.index.get(&column_name).copied();
let mut resolve_simple_type =
|value_data: ValueData, column_name: String, data_type: ColumnDataType| {
let semantic_type = decide_semantic(p_ctx, &column_name);
resolve_schema(
index,
value_data,
ColumnSchema {
column_name,
datatype: data_type as i32,
semantic_type: SemanticType::Field as i32,
semantic_type,
datatype_extension: None,
options: None,
},
@@ -499,16 +527,20 @@ fn identity_pipeline_inner(
column_name: custom_ts
.map(|ts| ts.get_column_name().clone())
.unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()),
datatype: custom_ts
.map(|c| c.get_datatype())
.unwrap_or(ColumnDataType::TimestampNanosecond) as i32,
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ColumnDataType::TimestampMillisecond
} else {
ColumnDataType::TimestampNanosecond
}
}) as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
});
for values in pipeline_maps {
let row = values_to_row(&mut schema_info, values, custom_ts)?;
let row = values_to_row(&mut schema_info, values, pipeline_ctx)?;
rows.push(row);
}
@@ -622,8 +654,11 @@ mod tests {
#[test]
fn test_identify_pipeline() {
let params = GreptimePipelineParams::default();
let pipeline_ctx =
PipelineContext::new(&PipelineDefinition::GreptimeIdentityPipeline(None), &params);
let pipeline_ctx = PipelineContext::new(
&PipelineDefinition::GreptimeIdentityPipeline(None),
&params,
Channel::Unknown,
);
{
let array = vec![
serde_json::json!({

View File

@@ -20,6 +20,7 @@ use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::timestamp::TimestampNanosecond;
use itertools::Itertools;
use session::context::Channel;
use snafu::ensure;
use util::to_pipeline_version;
@@ -119,16 +120,19 @@ impl PipelineDefinition {
pub struct PipelineContext<'a> {
pub pipeline_definition: &'a PipelineDefinition,
pub pipeline_param: &'a GreptimePipelineParams,
pub channel: Channel,
}
impl<'a> PipelineContext<'a> {
pub fn new(
pipeline_definition: &'a PipelineDefinition,
pipeline_param: &'a GreptimePipelineParams,
channel: Channel,
) -> Self {
Self {
pipeline_definition,
pipeline_param,
channel,
}
}
}

View File

@@ -19,7 +19,7 @@ use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use prost::Message;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::proto::PromWriteRequest;
use servers::proto::{PromSeriesProcessor, PromWriteRequest};
fn bench_decode_prom_request_without_strict_mode(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@@ -31,6 +31,8 @@ fn bench_decode_prom_request_without_strict_mode(c: &mut Criterion) {
let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
let is_strict_mode = false;
let mut p = PromSeriesProcessor::default_processor();
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
@@ -44,7 +46,7 @@ fn bench_decode_prom_request_without_strict_mode(c: &mut Criterion) {
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data, is_strict_mode).unwrap();
prom_request.merge(data, is_strict_mode, &mut p).unwrap();
prom_request.as_row_insert_requests();
});
});
@@ -60,6 +62,8 @@ fn bench_decode_prom_request_with_strict_mode(c: &mut Criterion) {
let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
let is_strict_mode = true;
let mut p = PromSeriesProcessor::default_processor();
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
@@ -73,7 +77,7 @@ fn bench_decode_prom_request_with_strict_mode(c: &mut Criterion) {
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data, is_strict_mode).unwrap();
prom_request.merge(data, is_strict_mode, &mut p).unwrap();
prom_request.as_row_insert_requests();
});
});

View File

@@ -554,11 +554,13 @@ impl HttpServerBuilder {
pub fn with_prom_handler(
self,
handler: PromStoreProtocolHandlerRef,
pipeline_handler: Option<PipelineHandlerRef>,
prom_store_with_metric_engine: bool,
is_strict_mode: bool,
) -> Self {
let state = PromStoreState {
prom_store_handler: handler,
pipeline_handler,
prom_store_with_metric_engine,
is_strict_mode,
};

View File

@@ -326,7 +326,7 @@ async fn dryrun_pipeline_inner(
let params = GreptimePipelineParams::default();
let pipeline_def = PipelineDefinition::Resolved(pipeline);
let pipeline_ctx = PipelineContext::new(&pipeline_def, &params);
let pipeline_ctx = PipelineContext::new(&pipeline_def, &params, query_ctx.channel());
let results = run_pipeline(
&pipeline_handler,
&pipeline_ctx,
@@ -693,7 +693,7 @@ pub(crate) async fn ingest_logs_inner(
.and_then(|v| v.to_str().ok()),
);
let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params);
let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
for pipeline_req in log_ingest_requests {
let requests =
run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;

View File

@@ -28,16 +28,19 @@ use common_telemetry::tracing;
use hyper::HeaderMap;
use lazy_static::lazy_static;
use object_pool::Pool;
use pipeline::util::to_pipeline_version;
use pipeline::PipelineDefinition;
use prost::Message;
use serde::{Deserialize, Serialize};
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
use crate::http::extractor::PipelineInfo;
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
lazy_static! {
@@ -52,6 +55,7 @@ pub const VM_PROTO_VERSION: &str = "1";
#[derive(Clone)]
pub struct PromStoreState {
pub prom_store_handler: PromStoreProtocolHandlerRef,
pub pipeline_handler: Option<PipelineHandlerRef>,
pub prom_store_with_metric_engine: bool,
pub is_strict_mode: bool,
}
@@ -85,11 +89,13 @@ pub async fn remote_write(
State(state): State<PromStoreState>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContext>,
pipeline_info: PipelineInfo,
content_encoding: TypedHeader<headers::ContentEncoding>,
body: Bytes,
) -> Result<impl IntoResponse> {
let PromStoreState {
prom_store_handler,
pipeline_handler,
prom_store_with_metric_engine,
is_strict_mode,
} = state;
@@ -100,17 +106,34 @@ pub async fn remote_write(
let db = params.db.clone().unwrap_or_default();
query_ctx.set_channel(Channel::Prometheus);
if let Some(physical_table) = params.physical_table {
query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
}
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body, is_strict_mode).await?;
if let Some(physical_table) = params.physical_table {
query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
let mut processor = PromSeriesProcessor::default_processor();
if let Some(pipeline_name) = pipeline_info.pipeline_name {
let pipeline_def = PipelineDefinition::from_name(
&pipeline_name,
to_pipeline_version(pipeline_info.pipeline_version.as_deref())
.context(PipelineSnafu)?,
None,
)
.context(PipelineSnafu)?;
let pipeline_handler = pipeline_handler.context(InternalSnafu {
err_msg: "pipeline handler is not set".to_string(),
})?;
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
}
let query_ctx = Arc::new(query_ctx);
let (request, samples) =
decode_remote_write_request(is_zstd, body, is_strict_mode, &mut processor).await?;
let output = prom_store_handler
.write(request, query_ctx, prom_store_with_metric_engine)
@@ -177,6 +200,7 @@ async fn decode_remote_write_request(
is_zstd: bool,
body: Bytes,
is_strict_mode: bool,
processor: &mut PromSeriesProcessor,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
@@ -194,10 +218,16 @@ async fn decode_remote_write_request(
};
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf, is_strict_mode)
.merge(buf, is_strict_mode, processor)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
if processor.use_pipeline {
processor.exec_pipeline().await
} else {
Ok(request.as_row_insert_requests())
}
}
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {

View File

@@ -75,7 +75,8 @@ pub async fn to_grpc_insert_requests(
let data = parse_export_logs_service_request(request);
let array = pipeline::json_array_to_map(data).context(PipelineSnafu)?;
let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_params);
let pipeline_ctx =
PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
let inserts = run_pipeline(
&pipeline_handler,
&pipeline_ctx,

View File

@@ -23,7 +23,7 @@ use pipeline::{
DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext, PipelineDefinition,
PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use session::context::QueryContextRef;
use session::context::{Channel, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
@@ -84,10 +84,14 @@ async fn run_identity_pipeline(
table: table_name,
values: data_array,
} = pipeline_req;
let table = handler
.get_table(&table_name, query_ctx)
.await
.context(CatalogSnafu)?;
let table = if pipeline_ctx.channel == Channel::Prometheus {
None
} else {
handler
.get_table(&table_name, query_ctx)
.await
.context(CatalogSnafu)?
};
pipeline::identity_pipeline(data_array, table, pipeline_ctx)
.map(|rows| {
vec![RowInsertRequest {
@@ -187,7 +191,8 @@ async fn run_custom_pipeline(
let ident_ts_index = IdentityTimeIndex::Epoch(ts_key.to_string(), unit, false);
let new_def = PipelineDefinition::GreptimeIdentityPipeline(Some(ident_ts_index));
let next_pipeline_ctx = PipelineContext::new(&new_def, pipeline_ctx.pipeline_param);
let next_pipeline_ctx =
PipelineContext::new(&new_def, pipeline_ctx.pipeline_param, pipeline_ctx.channel);
let reqs = run_identity_pipeline(
handler,
@@ -218,8 +223,11 @@ async fn run_custom_pipeline(
// run pipeline recursively.
let next_pipeline_def =
PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
let next_pipeline_ctx =
PipelineContext::new(&next_pipeline_def, pipeline_ctx.pipeline_param);
let next_pipeline_ctx = PipelineContext::new(
&next_pipeline_def,
pipeline_ctx.pipeline_param,
pipeline_ctx.channel,
);
let requests = Box::pin(run_pipeline(
handler,
&next_pipeline_ctx,

View File

@@ -12,18 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::ops::Deref;
use std::slice;
use api::prom_store::remote::Sample;
use api::v1::RowInsertRequests;
use bytes::{Buf, Bytes};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value};
use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, WireType};
use prost::DecodeError;
use session::context::QueryContextRef;
use snafu::OptionExt;
use crate::error::InternalSnafu;
use crate::http::event::PipelineIngestRequest;
use crate::pipeline::run_pipeline;
use crate::prom_row_builder::TablesBuilder;
use crate::prom_store::METRIC_NAME_LABEL_BYTES;
use crate::query_handler::PipelineHandlerRef;
use crate::repeated_field::{Clear, RepeatedField};
impl Clear for Sample {
@@ -222,8 +232,6 @@ impl PromTimeSeries {
self.samples.as_slice(),
is_strict_mode,
)?;
self.labels.clear();
self.samples.clear();
Ok(())
}
@@ -247,7 +255,12 @@ impl PromWriteRequest {
}
// todo(hl): maybe use &[u8] can reduce the overhead introduced with Bytes.
pub fn merge(&mut self, mut buf: Bytes, is_strict_mode: bool) -> Result<(), DecodeError> {
pub fn merge(
&mut self,
mut buf: Bytes,
is_strict_mode: bool,
processor: &mut PromSeriesProcessor,
) -> Result<(), DecodeError> {
const STRUCT_NAME: &str = "PromWriteRequest";
while buf.has_remaining() {
let (tag, wire_type) = decode_key(&mut buf)?;
@@ -273,8 +286,17 @@ impl PromWriteRequest {
if buf.remaining() != limit {
return Err(DecodeError::new("delimited length exceeded"));
}
self.series
.add_to_table_data(&mut self.table_data, is_strict_mode)?;
if processor.use_pipeline {
processor.consume_series_to_pipeline_map(&mut self.series)?;
} else {
self.series
.add_to_table_data(&mut self.table_data, is_strict_mode)?;
}
// clear state
self.series.labels.clear();
self.series.samples.clear();
}
3u32 => {
// todo(hl): metadata are skipped.
@@ -283,10 +305,133 @@ impl PromWriteRequest {
_ => prost::encoding::skip_field(wire_type, tag, &mut buf, Default::default())?,
}
}
Ok(())
}
}
/// A hook to be injected into the PromWriteRequest decoding process.
/// It was originally designed with two usage:
/// 1. consume one series to desired type, in this case, the pipeline map
/// 2. convert itself to RowInsertRequests
///
/// Since the origin conversion is coupled with PromWriteRequest,
/// let's keep it that way for now.
pub struct PromSeriesProcessor {
pub(crate) use_pipeline: bool,
pub(crate) table_values: BTreeMap<String, Vec<PipelineMap>>,
// optional fields for pipeline
pub(crate) pipeline_handler: Option<PipelineHandlerRef>,
pub(crate) query_ctx: Option<QueryContextRef>,
pub(crate) pipeline_def: Option<PipelineDefinition>,
}
impl PromSeriesProcessor {
pub fn default_processor() -> Self {
Self {
use_pipeline: false,
table_values: BTreeMap::new(),
pipeline_handler: None,
query_ctx: None,
pipeline_def: None,
}
}
pub fn set_pipeline(
&mut self,
handler: PipelineHandlerRef,
query_ctx: QueryContextRef,
pipeline_def: PipelineDefinition,
) {
self.use_pipeline = true;
self.pipeline_handler = Some(handler);
self.query_ctx = Some(query_ctx);
self.pipeline_def = Some(pipeline_def);
}
// convert one series to pipeline map
pub(crate) fn consume_series_to_pipeline_map(
&mut self,
series: &mut PromTimeSeries,
) -> Result<(), DecodeError> {
let mut vec_pipeline_map: Vec<PipelineMap> = Vec::new();
let mut pipeline_map = PipelineMap::new();
for l in series.labels.iter() {
let name = String::from_utf8(l.name.to_vec())
.map_err(|_| DecodeError::new("invalid utf-8"))?;
let value = String::from_utf8(l.value.to_vec())
.map_err(|_| DecodeError::new("invalid utf-8"))?;
pipeline_map.insert(name, Value::String(value));
}
let one_sample = series.samples.len() == 1;
for s in series.samples.iter() {
let timestamp = s.timestamp;
pipeline_map.insert(GREPTIME_TIMESTAMP.to_string(), Value::Int64(timestamp));
pipeline_map.insert(GREPTIME_VALUE.to_string(), Value::Float64(s.value));
if one_sample {
vec_pipeline_map.push(pipeline_map);
break;
} else {
vec_pipeline_map.push(pipeline_map.clone());
}
}
let table_name = std::mem::take(&mut series.table_name);
match self.table_values.entry(table_name) {
Entry::Occupied(mut occupied_entry) => {
occupied_entry.get_mut().append(&mut vec_pipeline_map);
}
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(vec_pipeline_map);
}
}
Ok(())
}
pub(crate) async fn exec_pipeline(
&mut self,
) -> crate::error::Result<(RowInsertRequests, usize)> {
// prepare params
let handler = self.pipeline_handler.as_ref().context(InternalSnafu {
err_msg: "pipeline handler is not set",
})?;
let pipeline_def = self.pipeline_def.as_ref().context(InternalSnafu {
err_msg: "pipeline definition is not set",
})?;
let pipeline_param = GreptimePipelineParams::default();
let query_ctx = self.query_ctx.as_ref().context(InternalSnafu {
err_msg: "query context is not set",
})?;
let pipeline_ctx = PipelineContext::new(pipeline_def, &pipeline_param, query_ctx.channel());
let mut size = 0;
// run pipeline
let mut inserts = Vec::with_capacity(self.table_values.len());
for (table_name, pipeline_maps) in self.table_values.iter_mut() {
let pipeline_req = PipelineIngestRequest {
table: table_name.clone(),
values: pipeline_maps.clone(),
};
let row_req =
run_pipeline(handler, &pipeline_ctx, pipeline_req, query_ctx, true).await?;
size += row_req
.iter()
.map(|rq| rq.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
.sum::<usize>();
inserts.extend(row_req);
}
let row_insert_requests = RowInsertRequests { inserts };
Ok((row_insert_requests, size))
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -297,7 +442,7 @@ mod tests {
use prost::Message;
use crate::prom_store::to_grpc_row_insert_requests;
use crate::proto::PromWriteRequest;
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
use crate::repeated_field::Clear;
fn sort_rows(rows: Rows) -> Rows {
@@ -321,8 +466,11 @@ mod tests {
expected_samples: usize,
expected_rows: &RowInsertRequests,
) {
let mut p = PromSeriesProcessor::default_processor();
prom_write_request.clear();
prom_write_request.merge(data.clone(), true).unwrap();
prom_write_request
.merge(data.clone(), true, &mut p)
.unwrap();
let (prom_rows, samples) = prom_write_request.as_row_insert_requests();
assert_eq!(expected_samples, samples);

View File

@@ -124,7 +124,7 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(instance.clone())
.with_prom_handler(instance, true, is_strict_mode)
.with_prom_handler(instance, None, true, is_strict_mode)
.build();
server.build(server.make_app()).unwrap()
}

View File

@@ -537,7 +537,12 @@ pub async fn setup_test_prom_app_with_frontend(
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
.with_logs_handler(instance.fe_instance().clone())
.with_prom_handler(frontend_ref.clone(), true, is_strict_mode)
.with_prom_handler(
frontend_ref.clone(),
Some(frontend_ref.clone()),
true,
is_strict_mode,
)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();

View File

@@ -91,6 +91,7 @@ macro_rules! http_tests {
test_config_api,
test_dashboard_path,
test_prometheus_remote_write,
test_prometheus_remote_write_with_pipeline,
test_vm_proto_remote_write,
test_pipeline_api,
@@ -1256,6 +1257,64 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) {
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);
let expected = "[[\"demo\"],[\"demo_metrics\"],[\"demo_metrics_with_nanos\"],[\"greptime_physical_table\"],[\"metric1\"],[\"metric2\"],[\"metric3\"],[\"mito\"],[\"multi_labels\"],[\"numbers\"],[\"phy\"],[\"phy2\"],[\"phy_ns\"]]";
validate_data("prometheus_remote_write", &client, "show tables;", expected).await;
let table_val = "[[1000,3.0,\"z001\",\"test_host1\"],[2000,4.0,\"z001\",\"test_host1\"]]";
validate_data(
"prometheus_remote_write",
&client,
"select * from metric2",
table_val,
)
.await;
guard.remove_all().await;
}
pub async fn test_prometheus_remote_write_with_pipeline(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_prom_app_with_frontend(store_type, "prometheus_remote_write_with_pipeline")
.await;
let client = TestClient::new(app).await;
// write snappy encoded data
let write_request = WriteRequest {
timeseries: prom_store::mock_timeseries(),
..Default::default()
};
let serialized_request = write_request.encode_to_vec();
let compressed_request =
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.header("x-greptime-log-pipeline-name", "greptime_identity")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);
let expected = "[[\"demo\"],[\"demo_metrics\"],[\"demo_metrics_with_nanos\"],[\"greptime_physical_table\"],[\"metric1\"],[\"metric2\"],[\"metric3\"],[\"mito\"],[\"multi_labels\"],[\"numbers\"],[\"phy\"],[\"phy2\"],[\"phy_ns\"]]";
validate_data(
"prometheus_remote_write_pipeline",
&client,
"show tables;",
expected,
)
.await;
let table_val = "[[1000,3.0,\"z001\",\"test_host1\"],[2000,4.0,\"z001\",\"test_host1\"]]";
validate_data(
"prometheus_remote_write_pipeline",
&client,
"select * from metric2",
table_val,
)
.await;
guard.remove_all().await;
}