From 079daf5db96527c529814941f0369cf581fdb91c Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 16 Jun 2025 20:20:33 -0700 Subject: [PATCH] feat: support special labels parsing in prom remote write (#6302) * feat: support special labels parsing in prom remote write * chore: change __schema__ to __database__ * fix: test Signed-off-by: shuiyisong * fix: remove the missing type alias Signed-off-by: shuiyisong * chore: update cr issue Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --------- Signed-off-by: shuiyisong Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- src/pipeline/src/etl/ctx_req.rs | 24 +++++++- src/pipeline/src/etl/value.rs | 2 - src/servers/src/http/prom_store.rs | 3 +- src/servers/src/pipeline.rs | 2 +- src/servers/src/prom_row_builder.rs | 41 +++++++++++-- src/servers/src/prom_store.rs | 40 +++++++++++- src/servers/src/proto.rs | 48 ++++++++++++--- tests-integration/tests/http.rs | 95 +++++++++++++++++++++++++++++ 8 files changed, 234 insertions(+), 21 deletions(-) diff --git a/src/pipeline/src/etl/ctx_req.rs b/src/pipeline/src/etl/ctx_req.rs index d0e86ff629..ebf8c3b539 100644 --- a/src/pipeline/src/etl/ctx_req.rs +++ b/src/pipeline/src/etl/ctx_req.rs @@ -67,11 +67,24 @@ pub struct ContextOpt { physical_table: Option, skip_wal: Option, + // reset the schema in query context + schema: Option, + // pipeline options, not set in query context // can be removed before the end of the pipeline execution table_suffix: Option, } +impl ContextOpt { + pub fn set_physical_table(&mut self, physical_table: String) { + self.physical_table = Some(physical_table); + } + + pub fn set_schema(&mut self, schema: String) { + self.schema = Some(schema); + } +} + impl ContextOpt { pub fn from_pipeline_map_to_opt(pipeline_map: &mut Value) -> Result { let pipeline_map = pipeline_map.as_map_mut().context(ValueMustBeMapSnafu)?; @@ -177,10 +190,14 @@ impl ContextReq { Self { req: req_map } } - pub fn add_rows(&mut self, opt: ContextOpt, req: RowInsertRequest) { + pub fn add_row(&mut self, opt: ContextOpt, req: RowInsertRequest) { self.req.entry(opt).or_default().push(req); } + pub fn add_rows(&mut self, opt: ContextOpt, reqs: impl IntoIterator) { + self.req.entry(opt).or_default().extend(reqs); + } + pub fn merge(&mut self, other: Self) { for (opt, req) in other.req { self.req.entry(opt).or_default().extend(req); @@ -218,8 +235,11 @@ impl Iterator for ContextReqIter { type Item = (QueryContextRef, RowInsertRequests); fn next(&mut self) -> Option { - let (opt, req_vec) = self.opt_req.next()?; + let (mut opt, req_vec) = self.opt_req.next()?; let mut ctx = self.ctx_template.clone(); + if let Some(schema) = opt.schema.take() { + ctx.set_current_schema(&schema); + } opt.set_query_context(&mut ctx); Some((Arc::new(ctx), RowInsertRequests { inserts: req_vec })) diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 3eb7294720..ccc07e929f 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -36,8 +36,6 @@ use crate::error::{ ValueYamlKeyMustBeStringSnafu, }; -pub type PipelineMap = Value; - /// Value can be used as type /// acts as value: the enclosed value is the actual value /// acts as type: the enclosed value is the default value diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 1c23a6722b..bada913cae 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -232,8 +232,7 @@ async fn decode_remote_write_request( if processor.use_pipeline { processor.exec_pipeline().await } else { - let reqs = request.as_row_insert_requests(); - Ok(ContextReq::default_opt_with_reqs(reqs)) + Ok(request.as_row_insert_requests()) } } diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 1a14441a0f..7e9c62558c 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -167,7 +167,7 @@ async fn run_custom_pipeline( // `RowInsertRequest` and append to results. If the pipeline doesn't // have dispatch, this will be only output of the pipeline. for ((opt, table_name), rows) in transformed_map { - results.add_rows( + results.add_row( opt, RowInsertRequest { rows: Some(Rows { diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 27e4cdf661..95f9f4ee83 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -20,16 +20,25 @@ use api::prom_store::remote::Sample; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use pipeline::{ContextOpt, ContextReq}; use prost::DecodeError; use crate::http::PromValidationMode; use crate::proto::{decode_string, PromLabel}; use crate::repeated_field::Clear; +// Prometheus remote write context +#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct PromCtx { + pub schema: Option, + pub physical_table: Option, +} + /// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests]. #[derive(Default, Debug)] pub(crate) struct TablesBuilder { - tables: HashMap, + // schema -> table -> table_builder + tables: HashMap>, } impl Clear for TablesBuilder { @@ -42,21 +51,45 @@ impl TablesBuilder { /// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist. pub(crate) fn get_or_create_table_builder( &mut self, + prom_ctx: PromCtx, table_name: String, label_num: usize, row_num: usize, ) -> &mut TableBuilder { self.tables + .entry(prom_ctx) + .or_default() .entry(table_name) .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num)) } /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states. - pub(crate) fn as_insert_requests(&mut self) -> Vec { + pub(crate) fn as_insert_requests(&mut self) -> ContextReq { self.tables .drain() - .map(|(name, mut table)| table.as_row_insert_request(name)) - .collect() + .map(|(prom, mut tables)| { + // create context opt + let mut opt = ContextOpt::default(); + if let Some(physical_table) = prom.physical_table { + opt.set_physical_table(physical_table); + } + if let Some(schema) = prom.schema { + opt.set_schema(schema); + } + + // create and set context req + let mut ctx_req = ContextReq::default(); + let reqs = tables + .drain() + .map(|(table_name, mut table)| table.as_row_insert_request(table_name)); + ctx_req.add_rows(opt, reqs); + + ctx_req + }) + .fold(ContextReq::default(), |mut req, reqs| { + req.merge(reqs); + req + }) } } diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index ee1586f94e..4f2634c7d9 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -39,9 +39,14 @@ use crate::error::{self, Result}; use crate::row_writer::{self, MultiTableData}; pub const METRIC_NAME_LABEL: &str = "__name__"; - pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__"; +pub const DATABASE_LABEL: &str = "__database__"; +pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__"; + +pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__"; +pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__"; + /// The same as `FIELD_COLUMN_MATCHER` in `promql` crate pub const FIELD_NAME_LABEL: &str = "__field__"; @@ -507,6 +512,39 @@ pub fn mock_timeseries_new_label() -> Vec { vec![ts_demo_metrics, ts_multi_labels] } +/// Add new labels to the mock timeseries. +pub fn mock_timeseries_special_labels() -> Vec { + let idc3_schema = TimeSeries { + labels: vec![ + new_label(METRIC_NAME_LABEL.to_string(), "idc3_lo_table".to_string()), + new_label("__database__".to_string(), "idc3".to_string()), + new_label("__physical_table__".to_string(), "f1".to_string()), + ], + samples: vec![Sample { + value: 42.0, + timestamp: 3000, + }], + ..Default::default() + }; + let idc4_schema = TimeSeries { + labels: vec![ + new_label( + METRIC_NAME_LABEL.to_string(), + "idc4_local_table".to_string(), + ), + new_label("__database__".to_string(), "idc4".to_string()), + new_label("__physical_table__".to_string(), "f2".to_string()), + ], + samples: vec![Sample { + value: 99.0, + timestamp: 4000, + }], + ..Default::default() + }; + + vec![idc3_schema, idc4_schema] +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index c4bc924f0b..8b2a73461f 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -18,7 +18,6 @@ use std::ops::Deref; use std::slice; use api::prom_store::remote::Sample; -use api::v1::RowInsertRequest; use bytes::{Buf, Bytes}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_telemetry::debug; @@ -33,8 +32,10 @@ use crate::error::InternalSnafu; use crate::http::event::PipelineIngestRequest; use crate::http::PromValidationMode; use crate::pipeline::run_pipeline; -use crate::prom_row_builder::TablesBuilder; -use crate::prom_store::METRIC_NAME_LABEL_BYTES; +use crate::prom_row_builder::{PromCtx, TablesBuilder}; +use crate::prom_store::{ + DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, +}; use crate::query_handler::PipelineHandlerRef; use crate::repeated_field::{Clear, RepeatedField}; @@ -144,6 +145,11 @@ fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> { #[derive(Default, Debug)] pub struct PromTimeSeries { pub table_name: String, + // specified using `__database__` label + pub schema: Option, + // specified using `__physical_table__` label + pub physical_table: Option, + pub labels: RepeatedField, pub samples: RepeatedField, } @@ -187,10 +193,24 @@ impl PromTimeSeries { if buf.remaining() != limit { return Err(DecodeError::new("delimited length exceeded")); } - if label.name.deref() == METRIC_NAME_LABEL_BYTES { - self.table_name = decode_string(&label.value, prom_validation_mode)?; - self.labels.truncate(self.labels.len() - 1); // remove last label + + match label.name.deref() { + METRIC_NAME_LABEL_BYTES => { + self.table_name = decode_string(&label.value, prom_validation_mode)?; + self.labels.truncate(self.labels.len() - 1); // remove last label + } + DATABASE_LABEL_BYTES => { + self.schema = Some(decode_string(&label.value, prom_validation_mode)?); + self.labels.truncate(self.labels.len() - 1); // remove last label + } + PHYSICAL_TABLE_LABEL_BYTES => { + self.physical_table = + Some(decode_string(&label.value, prom_validation_mode)?); + self.labels.truncate(self.labels.len() - 1); // remove last label + } + _ => {} } + Ok(()) } 2u32 => { @@ -216,7 +236,14 @@ impl PromTimeSeries { ) -> Result<(), DecodeError> { let label_num = self.labels.len(); let row_num = self.samples.len(); + + let prom_ctx = PromCtx { + schema: self.schema.take(), + physical_table: self.physical_table.take(), + }; + let table_data = table_builders.get_or_create_table_builder( + prom_ctx, std::mem::take(&mut self.table_name), label_num, row_num, @@ -267,7 +294,7 @@ impl Clear for PromWriteRequest { } impl PromWriteRequest { - pub fn as_row_insert_requests(&mut self) -> Vec { + pub fn as_row_insert_requests(&mut self) -> ContextReq { self.table_data.as_insert_requests() } @@ -482,11 +509,14 @@ mod tests { .unwrap(); let req = prom_write_request.as_row_insert_requests(); + let samples = req - .iter() + .ref_all_req() .filter_map(|r| r.rows.as_ref().map(|r| r.rows.len())) .sum::(); - let prom_rows = RowInsertRequests { inserts: req }; + let prom_rows = RowInsertRequests { + inserts: req.all_req().collect::>(), + }; assert_eq!(expected_samples, samples); assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len()); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a42496d7d2..a67dad223b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -91,6 +91,7 @@ macro_rules! http_tests { test_config_api, test_dashboard_path, test_prometheus_remote_write, + test_prometheus_remote_special_labels, test_prometheus_remote_write_with_pipeline, test_vm_proto_remote_write, @@ -1295,6 +1296,100 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_prom_app_with_frontend(store_type, "test_prometheus_remote_special_labels") + .await; + let client = TestClient::new(app).await; + + // write snappy encoded data + let write_request = WriteRequest { + timeseries: prom_store::mock_timeseries_special_labels(), + ..Default::default() + }; + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy"); + + // create databases + let res = client + .post("/v1/sql?sql=create database idc3") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .post("/v1/sql?sql=create database idc4") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // write data + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // test idc3 + let expected = "[[\"f1\"],[\"idc3_lo_table\"]]"; + validate_data( + "test_prometheus_remote_special_labels_idc3", + &client, + "show tables from idc3;", + expected, + ) + .await; + let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f1'\\n)\"]]"; + validate_data( + "test_prometheus_remote_special_labels_idc3_show_create_table", + &client, + "show create table idc3.idc3_lo_table", + expected, + ) + .await; + let expected = "[[3000,42.0]]"; + validate_data( + "test_prometheus_remote_special_labels_idc3_select", + &client, + "select * from idc3.idc3_lo_table", + expected, + ) + .await; + + // test idc4 + let expected = "[[\"f2\"],[\"idc4_local_table\"]]"; + validate_data( + "test_prometheus_remote_special_labels_idc4", + &client, + "show tables from idc4;", + expected, + ) + .await; + let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f2'\\n)\"]]"; + validate_data( + "test_prometheus_remote_special_labels_idc4_show_create_table", + &client, + "show create table idc4.idc4_local_table", + expected, + ) + .await; + let expected = "[[4000,99.0]]"; + validate_data( + "test_prometheus_remote_special_labels_idc4_select", + &client, + "select * from idc4.idc4_local_table", + expected, + ) + .await; + + guard.remove_all().await; +} + pub async fn test_prometheus_remote_write_with_pipeline(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =