diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index bada913cae..65adc03265 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -38,7 +38,7 @@ use crate::error::{self, InternalSnafu, PipelineSnafu, Result}; use crate::http::extractor::PipelineInfo; use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS}; use crate::http::PromValidationMode; -use crate::prom_store::{snappy_decompress, zstd_decompress}; +use crate::prom_store::{extract_schema_from_read_request, snappy_decompress, zstd_decompress}; use crate::proto::{PromSeriesProcessor, PromWriteRequest}; use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse}; @@ -117,6 +117,7 @@ pub async fn remote_write( let is_zstd = content_encoding.contains(VM_ENCODING); let mut processor = PromSeriesProcessor::default_processor(); + if let Some(pipeline_name) = pipeline_info.pipeline_name { let pipeline_def = PipelineDefinition::from_name( &pipeline_name, @@ -184,13 +185,19 @@ pub async fn remote_read( ) -> Result { let db = params.db.clone().unwrap_or_default(); query_ctx.set_channel(Channel::Prometheus); + + let request = decode_remote_read_request(body).await?; + + // Extract schema from special labels and set it in query context + if let Some(schema) = extract_schema_from_read_request(&request) { + query_ctx.set_current_schema(&schema); + } + let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); - let request = decode_remote_read_request(body).await?; - state.prom_store_handler.read(request, query_ctx).await } diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 4f2634c7d9..b413ba7605 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -19,7 +19,7 @@ use std::collections::BTreeMap; use std::hash::{Hash, Hasher}; use api::prom_store::remote::label_matcher::Type as MatcherType; -use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; +use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest}; use api::v1::RowInsertRequests; use common_grpc::precision::Precision; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; @@ -44,6 +44,9 @@ pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__"; pub const DATABASE_LABEL: &str = "__database__"; pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__"; +pub const SCHEMA_LABEL: &str = "__schema__"; +pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__"; + pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__"; pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__"; @@ -73,6 +76,29 @@ pub fn table_name(q: &Query) -> Result { }) } +/// Extract schema from remote read request. Returns the first schema found from any query's matchers. +/// Prioritizes __schema__ over __database__ labels. +pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option { + for query in &request.queries { + for matcher in &query.matchers { + if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 { + return Some(matcher.value.clone()); + } + } + } + + // If no __schema__ found, look for __database__ + for query in &request.queries { + for matcher in &query.matchers { + if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 { + return Some(matcher.value.clone()); + } + } + } + + None +} + /// Create a DataFrame from a remote Query #[tracing::instrument(skip_all)] pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { @@ -91,7 +117,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { for m in label_matches { let name = &m.name; - if name == METRIC_NAME_LABEL { + if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL { continue; } diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 9a77f27433..3908feee55 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -34,7 +34,7 @@ use crate::http::PromValidationMode; use crate::pipeline::run_pipeline; use crate::prom_row_builder::{PromCtx, TablesBuilder}; use crate::prom_store::{ - DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, + DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, SCHEMA_LABEL_BYTES, }; use crate::query_handler::PipelineHandlerRef; use crate::repeated_field::{Clear, RepeatedField}; @@ -199,10 +199,17 @@ impl PromTimeSeries { self.table_name = decode_string(&label.value, prom_validation_mode)?; self.labels.truncate(self.labels.len() - 1); // remove last label } - DATABASE_LABEL_BYTES => { + SCHEMA_LABEL_BYTES => { self.schema = Some(decode_string(&label.value, prom_validation_mode)?); self.labels.truncate(self.labels.len() - 1); // remove last label } + DATABASE_LABEL_BYTES => { + // Only set schema from __database__ if __schema__ hasn't been set yet + if self.schema.is_none() { + self.schema = Some(decode_string(&label.value, prom_validation_mode)?); + } + self.labels.truncate(self.labels.len() - 1); // remove last label + } PHYSICAL_TABLE_LABEL_BYTES => { self.physical_table = Some(decode_string(&label.value, prom_validation_mode)?); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index aabfafa9c1..1304c50f8e 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -16,7 +16,10 @@ use std::collections::BTreeMap; use std::io::Write; use std::str::FromStr; -use api::prom_store::remote::WriteRequest; +use api::prom_store::remote::label_matcher::Type as MatcherType; +use api::prom_store::remote::{ + Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, TimeSeries, WriteRequest, +}; use auth::user_provider_from_option; use axum::http::{HeaderName, HeaderValue, StatusCode}; use chrono::Utc; @@ -94,6 +97,7 @@ macro_rules! http_tests { test_dashboard_path, test_prometheus_remote_write, test_prometheus_remote_special_labels, + test_prometheus_remote_schema_labels, test_prometheus_remote_write_with_pipeline, test_vm_proto_remote_write, @@ -1465,6 +1469,188 @@ pub async fn test_prometheus_remote_write_with_pipeline(store_type: StorageType) guard.remove_all().await; } +pub async fn test_prometheus_remote_schema_labels(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_prom_app_with_frontend(store_type, "test_prometheus_remote_schema_labels").await; + let client = TestClient::new(app).await; + + // Create test schemas + let res = client + .post("/v1/sql?sql=create database test_schema_1") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let res = client + .post("/v1/sql?sql=create database test_schema_2") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // Write data with __schema__ label + let schema_series = TimeSeries { + labels: vec![ + Label { + name: "__name__".to_string(), + value: "metric_with_schema".to_string(), + }, + Label { + name: "__schema__".to_string(), + value: "test_schema_1".to_string(), + }, + Label { + name: "instance".to_string(), + value: "host1".to_string(), + }, + ], + samples: vec![Sample { + value: 100.0, + timestamp: 1000, + }], + ..Default::default() + }; + + let write_request = WriteRequest { + timeseries: vec![schema_series], + ..Default::default() + }; + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Read data from test_schema_1 using __schema__ matcher + let read_request = ReadRequest { + queries: vec![Query { + start_timestamp_ms: 500, + end_timestamp_ms: 1500, + matchers: vec![ + LabelMatcher { + name: "__name__".to_string(), + value: "metric_with_schema".to_string(), + r#type: MatcherType::Eq as i32, + }, + LabelMatcher { + name: "__schema__".to_string(), + value: "test_schema_1".to_string(), + r#type: MatcherType::Eq as i32, + }, + ], + ..Default::default() + }], + ..Default::default() + }; + + let serialized_read_request = read_request.encode_to_vec(); + let compressed_read_request = + prom_store::snappy_compress(&serialized_read_request).expect("failed to encode snappy"); + + let mut result = client + .post("/v1/prometheus/read") + .body(compressed_read_request) + .send() + .await; + assert_eq!(result.status(), StatusCode::OK); + + let response_body = result.chunk().await.unwrap(); + let decompressed_response = prom_store::snappy_decompress(&response_body).unwrap(); + let read_response = ReadResponse::decode(&decompressed_response[..]).unwrap(); + + assert_eq!(read_response.results.len(), 1); + assert_eq!(read_response.results[0].timeseries.len(), 1); + + let timeseries = &read_response.results[0].timeseries[0]; + assert_eq!(timeseries.samples.len(), 1); + assert_eq!(timeseries.samples[0].value, 100.0); + assert_eq!(timeseries.samples[0].timestamp, 1000); + + // write data to unknown schema + let unknown_schema_series = TimeSeries { + labels: vec![ + Label { + name: "__name__".to_string(), + value: "metric_unknown_schema".to_string(), + }, + Label { + name: "__schema__".to_string(), + value: "unknown_schema".to_string(), + }, + Label { + name: "instance".to_string(), + value: "host2".to_string(), + }, + ], + samples: vec![Sample { + value: 200.0, + timestamp: 2000, + }], + ..Default::default() + }; + + let unknown_write_request = WriteRequest { + timeseries: vec![unknown_schema_series], + ..Default::default() + }; + let serialized_unknown_request = unknown_write_request.encode_to_vec(); + let compressed_unknown_request = + prom_store::snappy_compress(&serialized_unknown_request).expect("failed to encode snappy"); + + // Write data to unknown schema + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .body(compressed_unknown_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + + // Read data from unknown schema + let unknown_read_request = ReadRequest { + queries: vec![Query { + start_timestamp_ms: 1500, + end_timestamp_ms: 2500, + matchers: vec![ + LabelMatcher { + name: "__name__".to_string(), + value: "metric_unknown_schema".to_string(), + r#type: MatcherType::Eq as i32, + }, + LabelMatcher { + name: "__schema__".to_string(), + value: "unknown_schema".to_string(), + r#type: MatcherType::Eq as i32, + }, + ], + ..Default::default() + }], + ..Default::default() + }; + + let serialized_unknown_read_request = unknown_read_request.encode_to_vec(); + let compressed_unknown_read_request = + prom_store::snappy_compress(&serialized_unknown_read_request) + .expect("failed to encode snappy"); + + let unknown_result = client + .post("/v1/prometheus/read") + .body(compressed_unknown_read_request) + .send() + .await; + assert_eq!(unknown_result.status(), StatusCode::BAD_REQUEST); + + guard.remove_all().await; +} + pub async fn test_vm_proto_remote_write(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =