mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
feat: support __schema__ and __database__ in Prom Remote Read (#6610)
* feat: support __schema__ and __database__ in Prom remote R/W Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix integration test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * revert remote write changes Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * check matcher type Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -39,7 +39,7 @@ use crate::http::extractor::PipelineInfo;
|
|||||||
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
||||||
use crate::http::PromValidationMode;
|
use crate::http::PromValidationMode;
|
||||||
use crate::prom_row_builder::TablesBuilder;
|
use crate::prom_row_builder::TablesBuilder;
|
||||||
use crate::prom_store::{snappy_decompress, zstd_decompress};
|
use crate::prom_store::{extract_schema_from_read_request, snappy_decompress, zstd_decompress};
|
||||||
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||||
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
||||||
|
|
||||||
@@ -118,6 +118,7 @@ pub async fn remote_write(
|
|||||||
let is_zstd = content_encoding.contains(VM_ENCODING);
|
let is_zstd = content_encoding.contains(VM_ENCODING);
|
||||||
|
|
||||||
let mut processor = PromSeriesProcessor::default_processor();
|
let mut processor = PromSeriesProcessor::default_processor();
|
||||||
|
|
||||||
if let Some(pipeline_name) = pipeline_info.pipeline_name {
|
if let Some(pipeline_name) = pipeline_info.pipeline_name {
|
||||||
let pipeline_def = PipelineDefinition::from_name(
|
let pipeline_def = PipelineDefinition::from_name(
|
||||||
&pipeline_name,
|
&pipeline_name,
|
||||||
@@ -192,13 +193,19 @@ pub async fn remote_read(
|
|||||||
) -> Result<PromStoreResponse> {
|
) -> Result<PromStoreResponse> {
|
||||||
let db = params.db.clone().unwrap_or_default();
|
let db = params.db.clone().unwrap_or_default();
|
||||||
query_ctx.set_channel(Channel::Prometheus);
|
query_ctx.set_channel(Channel::Prometheus);
|
||||||
|
|
||||||
|
let request = decode_remote_read_request(body).await?;
|
||||||
|
|
||||||
|
// Extract schema from special labels and set it in query context
|
||||||
|
if let Some(schema) = extract_schema_from_read_request(&request) {
|
||||||
|
query_ctx.set_current_schema(&schema);
|
||||||
|
}
|
||||||
|
|
||||||
let query_ctx = Arc::new(query_ctx);
|
let query_ctx = Arc::new(query_ctx);
|
||||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
|
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
|
||||||
.with_label_values(&[db.as_str()])
|
.with_label_values(&[db.as_str()])
|
||||||
.start_timer();
|
.start_timer();
|
||||||
|
|
||||||
let request = decode_remote_read_request(body).await?;
|
|
||||||
|
|
||||||
state.prom_store_handler.read(request, query_ctx).await
|
state.prom_store_handler.read(request, query_ctx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ use std::collections::BTreeMap;
|
|||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
|
|
||||||
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
||||||
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
|
use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
|
||||||
use api::v1::RowInsertRequests;
|
use api::v1::RowInsertRequests;
|
||||||
use common_grpc::precision::Precision;
|
use common_grpc::precision::Precision;
|
||||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||||
@@ -44,6 +44,9 @@ pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
|
|||||||
pub const DATABASE_LABEL: &str = "__database__";
|
pub const DATABASE_LABEL: &str = "__database__";
|
||||||
pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
|
pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
|
||||||
|
|
||||||
|
pub const SCHEMA_LABEL: &str = "__schema__";
|
||||||
|
pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
|
||||||
|
|
||||||
pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
|
pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
|
||||||
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
|
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
|
||||||
|
|
||||||
@@ -73,6 +76,29 @@ pub fn table_name(q: &Query) -> Result<String> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
|
||||||
|
/// Prioritizes __schema__ over __database__ labels.
|
||||||
|
pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
|
||||||
|
for query in &request.queries {
|
||||||
|
for matcher in &query.matchers {
|
||||||
|
if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
|
||||||
|
return Some(matcher.value.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no __schema__ found, look for __database__
|
||||||
|
for query in &request.queries {
|
||||||
|
for matcher in &query.matchers {
|
||||||
|
if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
|
||||||
|
return Some(matcher.value.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a DataFrame from a remote Query
|
/// Create a DataFrame from a remote Query
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
|
pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
|
||||||
@@ -91,7 +117,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
|
|||||||
for m in label_matches {
|
for m in label_matches {
|
||||||
let name = &m.name;
|
let name = &m.name;
|
||||||
|
|
||||||
if name == METRIC_NAME_LABEL {
|
if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ use crate::http::PromValidationMode;
|
|||||||
use crate::pipeline::run_pipeline;
|
use crate::pipeline::run_pipeline;
|
||||||
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
||||||
use crate::prom_store::{
|
use crate::prom_store::{
|
||||||
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES,
|
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, SCHEMA_LABEL_BYTES,
|
||||||
};
|
};
|
||||||
use crate::query_handler::PipelineHandlerRef;
|
use crate::query_handler::PipelineHandlerRef;
|
||||||
use crate::repeated_field::{Clear, RepeatedField};
|
use crate::repeated_field::{Clear, RepeatedField};
|
||||||
@@ -201,10 +201,17 @@ impl PromTimeSeries {
|
|||||||
self.table_name = prom_validation_mode.decode_string(&label.value)?;
|
self.table_name = prom_validation_mode.decode_string(&label.value)?;
|
||||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||||
}
|
}
|
||||||
DATABASE_LABEL_BYTES => {
|
SCHEMA_LABEL_BYTES => {
|
||||||
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
|
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
|
||||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||||
}
|
}
|
||||||
|
DATABASE_LABEL_BYTES => {
|
||||||
|
// Only set schema from __database__ if __schema__ hasn't been set yet
|
||||||
|
if self.schema.is_none() {
|
||||||
|
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
|
||||||
|
}
|
||||||
|
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||||
|
}
|
||||||
PHYSICAL_TABLE_LABEL_BYTES => {
|
PHYSICAL_TABLE_LABEL_BYTES => {
|
||||||
self.physical_table =
|
self.physical_table =
|
||||||
Some(prom_validation_mode.decode_string(&label.value)?);
|
Some(prom_validation_mode.decode_string(&label.value)?);
|
||||||
|
|||||||
@@ -16,7 +16,10 @@ use std::collections::BTreeMap;
|
|||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
use api::prom_store::remote::WriteRequest;
|
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
||||||
|
use api::prom_store::remote::{
|
||||||
|
Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, TimeSeries, WriteRequest,
|
||||||
|
};
|
||||||
use auth::user_provider_from_option;
|
use auth::user_provider_from_option;
|
||||||
use axum::http::{HeaderName, HeaderValue, StatusCode};
|
use axum::http::{HeaderName, HeaderValue, StatusCode};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
@@ -94,6 +97,7 @@ macro_rules! http_tests {
|
|||||||
test_dashboard_path,
|
test_dashboard_path,
|
||||||
test_prometheus_remote_write,
|
test_prometheus_remote_write,
|
||||||
test_prometheus_remote_special_labels,
|
test_prometheus_remote_special_labels,
|
||||||
|
test_prometheus_remote_schema_labels,
|
||||||
test_prometheus_remote_write_with_pipeline,
|
test_prometheus_remote_write_with_pipeline,
|
||||||
test_vm_proto_remote_write,
|
test_vm_proto_remote_write,
|
||||||
|
|
||||||
@@ -1500,6 +1504,188 @@ pub async fn test_prometheus_remote_write_with_pipeline(store_type: StorageType)
|
|||||||
guard.remove_all().await;
|
guard.remove_all().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn test_prometheus_remote_schema_labels(store_type: StorageType) {
|
||||||
|
common_telemetry::init_default_ut_logging();
|
||||||
|
let (app, mut guard) =
|
||||||
|
setup_test_prom_app_with_frontend(store_type, "test_prometheus_remote_schema_labels").await;
|
||||||
|
let client = TestClient::new(app).await;
|
||||||
|
|
||||||
|
// Create test schemas
|
||||||
|
let res = client
|
||||||
|
.post("/v1/sql?sql=create database test_schema_1")
|
||||||
|
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(res.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let res = client
|
||||||
|
.post("/v1/sql?sql=create database test_schema_2")
|
||||||
|
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(res.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
// Write data with __schema__ label
|
||||||
|
let schema_series = TimeSeries {
|
||||||
|
labels: vec![
|
||||||
|
Label {
|
||||||
|
name: "__name__".to_string(),
|
||||||
|
value: "metric_with_schema".to_string(),
|
||||||
|
},
|
||||||
|
Label {
|
||||||
|
name: "__schema__".to_string(),
|
||||||
|
value: "test_schema_1".to_string(),
|
||||||
|
},
|
||||||
|
Label {
|
||||||
|
name: "instance".to_string(),
|
||||||
|
value: "host1".to_string(),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
samples: vec![Sample {
|
||||||
|
value: 100.0,
|
||||||
|
timestamp: 1000,
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let write_request = WriteRequest {
|
||||||
|
timeseries: vec![schema_series],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let serialized_request = write_request.encode_to_vec();
|
||||||
|
let compressed_request =
|
||||||
|
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
|
||||||
|
|
||||||
|
let res = client
|
||||||
|
.post("/v1/prometheus/write")
|
||||||
|
.header("Content-Encoding", "snappy")
|
||||||
|
.body(compressed_request)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||||
|
|
||||||
|
// Read data from test_schema_1 using __schema__ matcher
|
||||||
|
let read_request = ReadRequest {
|
||||||
|
queries: vec![Query {
|
||||||
|
start_timestamp_ms: 500,
|
||||||
|
end_timestamp_ms: 1500,
|
||||||
|
matchers: vec![
|
||||||
|
LabelMatcher {
|
||||||
|
name: "__name__".to_string(),
|
||||||
|
value: "metric_with_schema".to_string(),
|
||||||
|
r#type: MatcherType::Eq as i32,
|
||||||
|
},
|
||||||
|
LabelMatcher {
|
||||||
|
name: "__schema__".to_string(),
|
||||||
|
value: "test_schema_1".to_string(),
|
||||||
|
r#type: MatcherType::Eq as i32,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
..Default::default()
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let serialized_read_request = read_request.encode_to_vec();
|
||||||
|
let compressed_read_request =
|
||||||
|
prom_store::snappy_compress(&serialized_read_request).expect("failed to encode snappy");
|
||||||
|
|
||||||
|
let mut result = client
|
||||||
|
.post("/v1/prometheus/read")
|
||||||
|
.body(compressed_read_request)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(result.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
let response_body = result.chunk().await.unwrap();
|
||||||
|
let decompressed_response = prom_store::snappy_decompress(&response_body).unwrap();
|
||||||
|
let read_response = ReadResponse::decode(&decompressed_response[..]).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(read_response.results.len(), 1);
|
||||||
|
assert_eq!(read_response.results[0].timeseries.len(), 1);
|
||||||
|
|
||||||
|
let timeseries = &read_response.results[0].timeseries[0];
|
||||||
|
assert_eq!(timeseries.samples.len(), 1);
|
||||||
|
assert_eq!(timeseries.samples[0].value, 100.0);
|
||||||
|
assert_eq!(timeseries.samples[0].timestamp, 1000);
|
||||||
|
|
||||||
|
// write data to unknown schema
|
||||||
|
let unknown_schema_series = TimeSeries {
|
||||||
|
labels: vec![
|
||||||
|
Label {
|
||||||
|
name: "__name__".to_string(),
|
||||||
|
value: "metric_unknown_schema".to_string(),
|
||||||
|
},
|
||||||
|
Label {
|
||||||
|
name: "__schema__".to_string(),
|
||||||
|
value: "unknown_schema".to_string(),
|
||||||
|
},
|
||||||
|
Label {
|
||||||
|
name: "instance".to_string(),
|
||||||
|
value: "host2".to_string(),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
samples: vec![Sample {
|
||||||
|
value: 200.0,
|
||||||
|
timestamp: 2000,
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let unknown_write_request = WriteRequest {
|
||||||
|
timeseries: vec![unknown_schema_series],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let serialized_unknown_request = unknown_write_request.encode_to_vec();
|
||||||
|
let compressed_unknown_request =
|
||||||
|
prom_store::snappy_compress(&serialized_unknown_request).expect("failed to encode snappy");
|
||||||
|
|
||||||
|
// Write data to unknown schema
|
||||||
|
let res = client
|
||||||
|
.post("/v1/prometheus/write")
|
||||||
|
.header("Content-Encoding", "snappy")
|
||||||
|
.body(compressed_unknown_request)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
|
||||||
|
|
||||||
|
// Read data from unknown schema
|
||||||
|
let unknown_read_request = ReadRequest {
|
||||||
|
queries: vec![Query {
|
||||||
|
start_timestamp_ms: 1500,
|
||||||
|
end_timestamp_ms: 2500,
|
||||||
|
matchers: vec![
|
||||||
|
LabelMatcher {
|
||||||
|
name: "__name__".to_string(),
|
||||||
|
value: "metric_unknown_schema".to_string(),
|
||||||
|
r#type: MatcherType::Eq as i32,
|
||||||
|
},
|
||||||
|
LabelMatcher {
|
||||||
|
name: "__schema__".to_string(),
|
||||||
|
value: "unknown_schema".to_string(),
|
||||||
|
r#type: MatcherType::Eq as i32,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
..Default::default()
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let serialized_unknown_read_request = unknown_read_request.encode_to_vec();
|
||||||
|
let compressed_unknown_read_request =
|
||||||
|
prom_store::snappy_compress(&serialized_unknown_read_request)
|
||||||
|
.expect("failed to encode snappy");
|
||||||
|
|
||||||
|
let unknown_result = client
|
||||||
|
.post("/v1/prometheus/read")
|
||||||
|
.body(compressed_unknown_read_request)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
assert_eq!(unknown_result.status(), StatusCode::BAD_REQUEST);
|
||||||
|
|
||||||
|
guard.remove_all().await;
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn test_vm_proto_remote_write(store_type: StorageType) {
|
pub async fn test_vm_proto_remote_write(store_type: StorageType) {
|
||||||
common_telemetry::init_default_ut_logging();
|
common_telemetry::init_default_ut_logging();
|
||||||
let (app, mut guard) =
|
let (app, mut guard) =
|
||||||
|
|||||||
Reference in New Issue
Block a user