mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: support special labels parsing in prom remote write (#6302)
* feat: support special labels parsing in prom remote write * chore: change __schema__ to __database__ * fix: test Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: remove the missing type alias Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: update cr issue Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --------- Signed-off-by: shuiyisong <xixing.sys@gmail.com> Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
@@ -67,11 +67,24 @@ pub struct ContextOpt {
|
||||
physical_table: Option<String>,
|
||||
skip_wal: Option<String>,
|
||||
|
||||
// reset the schema in query context
|
||||
schema: Option<String>,
|
||||
|
||||
// pipeline options, not set in query context
|
||||
// can be removed before the end of the pipeline execution
|
||||
table_suffix: Option<String>,
|
||||
}
|
||||
|
||||
impl ContextOpt {
|
||||
pub fn set_physical_table(&mut self, physical_table: String) {
|
||||
self.physical_table = Some(physical_table);
|
||||
}
|
||||
|
||||
pub fn set_schema(&mut self, schema: String) {
|
||||
self.schema = Some(schema);
|
||||
}
|
||||
}
|
||||
|
||||
impl ContextOpt {
|
||||
pub fn from_pipeline_map_to_opt(pipeline_map: &mut Value) -> Result<Self> {
|
||||
let pipeline_map = pipeline_map.as_map_mut().context(ValueMustBeMapSnafu)?;
|
||||
@@ -177,10 +190,14 @@ impl ContextReq {
|
||||
Self { req: req_map }
|
||||
}
|
||||
|
||||
pub fn add_rows(&mut self, opt: ContextOpt, req: RowInsertRequest) {
|
||||
pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) {
|
||||
self.req.entry(opt).or_default().push(req);
|
||||
}
|
||||
|
||||
pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator<Item = RowInsertRequest>) {
|
||||
self.req.entry(opt).or_default().extend(reqs);
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, other: Self) {
|
||||
for (opt, req) in other.req {
|
||||
self.req.entry(opt).or_default().extend(req);
|
||||
@@ -218,8 +235,11 @@ impl Iterator for ContextReqIter {
|
||||
type Item = (QueryContextRef, RowInsertRequests);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let (opt, req_vec) = self.opt_req.next()?;
|
||||
let (mut opt, req_vec) = self.opt_req.next()?;
|
||||
let mut ctx = self.ctx_template.clone();
|
||||
if let Some(schema) = opt.schema.take() {
|
||||
ctx.set_current_schema(&schema);
|
||||
}
|
||||
opt.set_query_context(&mut ctx);
|
||||
|
||||
Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec }))
|
||||
|
||||
@@ -36,8 +36,6 @@ use crate::error::{
|
||||
ValueYamlKeyMustBeStringSnafu,
|
||||
};
|
||||
|
||||
pub type PipelineMap = Value;
|
||||
|
||||
/// Value can be used as type
|
||||
/// acts as value: the enclosed value is the actual value
|
||||
/// acts as type: the enclosed value is the default value
|
||||
|
||||
@@ -232,8 +232,7 @@ async fn decode_remote_write_request(
|
||||
if processor.use_pipeline {
|
||||
processor.exec_pipeline().await
|
||||
} else {
|
||||
let reqs = request.as_row_insert_requests();
|
||||
Ok(ContextReq::default_opt_with_reqs(reqs))
|
||||
Ok(request.as_row_insert_requests())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -167,7 +167,7 @@ async fn run_custom_pipeline(
|
||||
// `RowInsertRequest` and append to results. If the pipeline doesn't
|
||||
// have dispatch, this will be only output of the pipeline.
|
||||
for ((opt, table_name), rows) in transformed_map {
|
||||
results.add_rows(
|
||||
results.add_row(
|
||||
opt,
|
||||
RowInsertRequest {
|
||||
rows: Some(Rows {
|
||||
|
||||
@@ -20,16 +20,25 @@ use api::prom_store::remote::Sample;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use pipeline::{ContextOpt, ContextReq};
|
||||
use prost::DecodeError;
|
||||
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::proto::{decode_string, PromLabel};
|
||||
use crate::repeated_field::Clear;
|
||||
|
||||
// Prometheus remote write context
|
||||
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
pub struct PromCtx {
|
||||
pub schema: Option<String>,
|
||||
pub physical_table: Option<String>,
|
||||
}
|
||||
|
||||
/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
|
||||
#[derive(Default, Debug)]
|
||||
pub(crate) struct TablesBuilder {
|
||||
tables: HashMap<String, TableBuilder>,
|
||||
// schema -> table -> table_builder
|
||||
tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
}
|
||||
|
||||
impl Clear for TablesBuilder {
|
||||
@@ -42,21 +51,45 @@ impl TablesBuilder {
|
||||
/// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist.
|
||||
pub(crate) fn get_or_create_table_builder(
|
||||
&mut self,
|
||||
prom_ctx: PromCtx,
|
||||
table_name: String,
|
||||
label_num: usize,
|
||||
row_num: usize,
|
||||
) -> &mut TableBuilder {
|
||||
self.tables
|
||||
.entry(prom_ctx)
|
||||
.or_default()
|
||||
.entry(table_name)
|
||||
.or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
|
||||
}
|
||||
|
||||
/// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states.
|
||||
pub(crate) fn as_insert_requests(&mut self) -> Vec<RowInsertRequest> {
|
||||
pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
|
||||
self.tables
|
||||
.drain()
|
||||
.map(|(name, mut table)| table.as_row_insert_request(name))
|
||||
.collect()
|
||||
.map(|(prom, mut tables)| {
|
||||
// create context opt
|
||||
let mut opt = ContextOpt::default();
|
||||
if let Some(physical_table) = prom.physical_table {
|
||||
opt.set_physical_table(physical_table);
|
||||
}
|
||||
if let Some(schema) = prom.schema {
|
||||
opt.set_schema(schema);
|
||||
}
|
||||
|
||||
// create and set context req
|
||||
let mut ctx_req = ContextReq::default();
|
||||
let reqs = tables
|
||||
.drain()
|
||||
.map(|(table_name, mut table)| table.as_row_insert_request(table_name));
|
||||
ctx_req.add_rows(opt, reqs);
|
||||
|
||||
ctx_req
|
||||
})
|
||||
.fold(ContextReq::default(), |mut req, reqs| {
|
||||
req.merge(reqs);
|
||||
req
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,9 +39,14 @@ use crate::error::{self, Result};
|
||||
use crate::row_writer::{self, MultiTableData};
|
||||
|
||||
pub const METRIC_NAME_LABEL: &str = "__name__";
|
||||
|
||||
pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
|
||||
|
||||
pub const DATABASE_LABEL: &str = "__database__";
|
||||
pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
|
||||
|
||||
pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
|
||||
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
|
||||
|
||||
/// The same as `FIELD_COLUMN_MATCHER` in `promql` crate
|
||||
pub const FIELD_NAME_LABEL: &str = "__field__";
|
||||
|
||||
@@ -507,6 +512,39 @@ pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
|
||||
vec![ts_demo_metrics, ts_multi_labels]
|
||||
}
|
||||
|
||||
/// Add new labels to the mock timeseries.
|
||||
pub fn mock_timeseries_special_labels() -> Vec<TimeSeries> {
|
||||
let idc3_schema = TimeSeries {
|
||||
labels: vec![
|
||||
new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()),
|
||||
new_label("__database__".to_string(), "idc3".to_string()),
|
||||
new_label("__physical_table__".to_string(), "f1".to_string()),
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 42.0,
|
||||
timestamp: 3000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let idc4_schema = TimeSeries {
|
||||
labels: vec![
|
||||
new_label(
|
||||
METRIC_NAME_LABEL.to_string(),
|
||||
"idc4_local_table".to_string(),
|
||||
),
|
||||
new_label("__database__".to_string(), "idc4".to_string()),
|
||||
new_label("__physical_table__".to_string(), "f2".to_string()),
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 99.0,
|
||||
timestamp: 4000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
vec![idc3_schema, idc4_schema]
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -18,7 +18,6 @@ use std::ops::Deref;
|
||||
use std::slice;
|
||||
|
||||
use api::prom_store::remote::Sample;
|
||||
use api::v1::RowInsertRequest;
|
||||
use bytes::{Buf, Bytes};
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_telemetry::debug;
|
||||
@@ -33,8 +32,10 @@ use crate::error::InternalSnafu;
|
||||
use crate::http::event::PipelineIngestRequest;
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::pipeline::run_pipeline;
|
||||
use crate::prom_row_builder::TablesBuilder;
|
||||
use crate::prom_store::METRIC_NAME_LABEL_BYTES;
|
||||
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
||||
use crate::prom_store::{
|
||||
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES,
|
||||
};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
use crate::repeated_field::{Clear, RepeatedField};
|
||||
|
||||
@@ -144,6 +145,11 @@ fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> {
|
||||
#[derive(Default, Debug)]
|
||||
pub struct PromTimeSeries {
|
||||
pub table_name: String,
|
||||
// specified using `__database__` label
|
||||
pub schema: Option<String>,
|
||||
// specified using `__physical_table__` label
|
||||
pub physical_table: Option<String>,
|
||||
|
||||
pub labels: RepeatedField<PromLabel>,
|
||||
pub samples: RepeatedField<Sample>,
|
||||
}
|
||||
@@ -187,10 +193,24 @@ impl PromTimeSeries {
|
||||
if buf.remaining() != limit {
|
||||
return Err(DecodeError::new("delimited length exceeded"));
|
||||
}
|
||||
if label.name.deref() == METRIC_NAME_LABEL_BYTES {
|
||||
self.table_name = decode_string(&label.value, prom_validation_mode)?;
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
|
||||
match label.name.deref() {
|
||||
METRIC_NAME_LABEL_BYTES => {
|
||||
self.table_name = decode_string(&label.value, prom_validation_mode)?;
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
DATABASE_LABEL_BYTES => {
|
||||
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
PHYSICAL_TABLE_LABEL_BYTES => {
|
||||
self.physical_table =
|
||||
Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
2u32 => {
|
||||
@@ -216,7 +236,14 @@ impl PromTimeSeries {
|
||||
) -> Result<(), DecodeError> {
|
||||
let label_num = self.labels.len();
|
||||
let row_num = self.samples.len();
|
||||
|
||||
let prom_ctx = PromCtx {
|
||||
schema: self.schema.take(),
|
||||
physical_table: self.physical_table.take(),
|
||||
};
|
||||
|
||||
let table_data = table_builders.get_or_create_table_builder(
|
||||
prom_ctx,
|
||||
std::mem::take(&mut self.table_name),
|
||||
label_num,
|
||||
row_num,
|
||||
@@ -267,7 +294,7 @@ impl Clear for PromWriteRequest {
|
||||
}
|
||||
|
||||
impl PromWriteRequest {
|
||||
pub fn as_row_insert_requests(&mut self) -> Vec<RowInsertRequest> {
|
||||
pub fn as_row_insert_requests(&mut self) -> ContextReq {
|
||||
self.table_data.as_insert_requests()
|
||||
}
|
||||
|
||||
@@ -482,11 +509,14 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let req = prom_write_request.as_row_insert_requests();
|
||||
|
||||
let samples = req
|
||||
.iter()
|
||||
.ref_all_req()
|
||||
.filter_map(|r| r.rows.as_ref().map(|r| r.rows.len()))
|
||||
.sum::<usize>();
|
||||
let prom_rows = RowInsertRequests { inserts: req };
|
||||
let prom_rows = RowInsertRequests {
|
||||
inserts: req.all_req().collect::<Vec<_>>(),
|
||||
};
|
||||
|
||||
assert_eq!(expected_samples, samples);
|
||||
assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len());
|
||||
|
||||
@@ -91,6 +91,7 @@ macro_rules! http_tests {
|
||||
test_config_api,
|
||||
test_dashboard_path,
|
||||
test_prometheus_remote_write,
|
||||
test_prometheus_remote_special_labels,
|
||||
test_prometheus_remote_write_with_pipeline,
|
||||
test_vm_proto_remote_write,
|
||||
|
||||
@@ -1295,6 +1296,100 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_prom_app_with_frontend(store_type, "test_prometheus_remote_special_labels")
|
||||
.await;
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// write snappy encoded data
|
||||
let write_request = WriteRequest {
|
||||
timeseries: prom_store::mock_timeseries_special_labels(),
|
||||
..Default::default()
|
||||
};
|
||||
let serialized_request = write_request.encode_to_vec();
|
||||
let compressed_request =
|
||||
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
|
||||
|
||||
// create databases
|
||||
let res = client
|
||||
.post("/v1/sql?sql=create database idc3")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let res = client
|
||||
.post("/v1/sql?sql=create database idc4")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// write data
|
||||
let res = client
|
||||
.post("/v1/prometheus/write")
|
||||
.header("Content-Encoding", "snappy")
|
||||
.body(compressed_request)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// test idc3
|
||||
let expected = "[[\"f1\"],[\"idc3_lo_table\"]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc3",
|
||||
&client,
|
||||
"show tables from idc3;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f1'\\n)\"]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc3_show_create_table",
|
||||
&client,
|
||||
"show create table idc3.idc3_lo_table",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
let expected = "[[3000,42.0]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc3_select",
|
||||
&client,
|
||||
"select * from idc3.idc3_lo_table",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
// test idc4
|
||||
let expected = "[[\"f2\"],[\"idc4_local_table\"]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc4",
|
||||
&client,
|
||||
"show tables from idc4;",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f2'\\n)\"]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc4_show_create_table",
|
||||
&client,
|
||||
"show create table idc4.idc4_local_table",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
let expected = "[[4000,99.0]]";
|
||||
validate_data(
|
||||
"test_prometheus_remote_special_labels_idc4_select",
|
||||
&client,
|
||||
"select * from idc4.idc4_local_table",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_prometheus_remote_write_with_pipeline(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
|
||||
Reference in New Issue
Block a user