mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 16:30:39 +00:00
feat: support __schema__ and __database__ in Prom Remote Read (#6610)
* feat: support __schema__ and __database__ in Prom remote R/W Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix integration test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * revert remote write changes Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * check matcher type Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -38,7 +38,7 @@ use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
|
||||
use crate::http::extractor::PipelineInfo;
|
||||
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::prom_store::{snappy_decompress, zstd_decompress};
|
||||
use crate::prom_store::{extract_schema_from_read_request, snappy_decompress, zstd_decompress};
|
||||
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
||||
|
||||
@@ -117,6 +117,7 @@ pub async fn remote_write(
|
||||
let is_zstd = content_encoding.contains(VM_ENCODING);
|
||||
|
||||
let mut processor = PromSeriesProcessor::default_processor();
|
||||
|
||||
if let Some(pipeline_name) = pipeline_info.pipeline_name {
|
||||
let pipeline_def = PipelineDefinition::from_name(
|
||||
&pipeline_name,
|
||||
@@ -184,13 +185,19 @@ pub async fn remote_read(
|
||||
) -> Result<PromStoreResponse> {
|
||||
let db = params.db.clone().unwrap_or_default();
|
||||
query_ctx.set_channel(Channel::Prometheus);
|
||||
|
||||
let request = decode_remote_read_request(body).await?;
|
||||
|
||||
// Extract schema from special labels and set it in query context
|
||||
if let Some(schema) = extract_schema_from_read_request(&request) {
|
||||
query_ctx.set_current_schema(&schema);
|
||||
}
|
||||
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
|
||||
.with_label_values(&[db.as_str()])
|
||||
.start_timer();
|
||||
|
||||
let request = decode_remote_read_request(body).await?;
|
||||
|
||||
state.prom_store_handler.read(request, query_ctx).await
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::collections::BTreeMap;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
||||
use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest};
|
||||
use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
|
||||
use api::v1::RowInsertRequests;
|
||||
use common_grpc::precision::Precision;
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
@@ -44,6 +44,9 @@ pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__";
|
||||
pub const DATABASE_LABEL: &str = "__database__";
|
||||
pub const DATABASE_LABEL_BYTES: &[u8] = b"__database__";
|
||||
|
||||
pub const SCHEMA_LABEL: &str = "__schema__";
|
||||
pub const SCHEMA_LABEL_BYTES: &[u8] = b"__schema__";
|
||||
|
||||
pub const PHYSICAL_TABLE_LABEL: &str = "__physical_table__";
|
||||
pub const PHYSICAL_TABLE_LABEL_BYTES: &[u8] = b"__physical_table__";
|
||||
|
||||
@@ -73,6 +76,29 @@ pub fn table_name(q: &Query) -> Result<String> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Extract schema from remote read request. Returns the first schema found from any query's matchers.
|
||||
/// Prioritizes __schema__ over __database__ labels.
|
||||
pub fn extract_schema_from_read_request(request: &ReadRequest) -> Option<String> {
|
||||
for query in &request.queries {
|
||||
for matcher in &query.matchers {
|
||||
if matcher.name == SCHEMA_LABEL && matcher.r#type == MatcherType::Eq as i32 {
|
||||
return Some(matcher.value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no __schema__ found, look for __database__
|
||||
for query in &request.queries {
|
||||
for matcher in &query.matchers {
|
||||
if matcher.name == DATABASE_LABEL && matcher.r#type == MatcherType::Eq as i32 {
|
||||
return Some(matcher.value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Create a DataFrame from a remote Query
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
|
||||
@@ -91,7 +117,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {
|
||||
for m in label_matches {
|
||||
let name = &m.name;
|
||||
|
||||
if name == METRIC_NAME_LABEL {
|
||||
if name == METRIC_NAME_LABEL || name == SCHEMA_LABEL || name == DATABASE_LABEL {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::http::PromValidationMode;
|
||||
use crate::pipeline::run_pipeline;
|
||||
use crate::prom_row_builder::{PromCtx, TablesBuilder};
|
||||
use crate::prom_store::{
|
||||
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES,
|
||||
DATABASE_LABEL_BYTES, METRIC_NAME_LABEL_BYTES, PHYSICAL_TABLE_LABEL_BYTES, SCHEMA_LABEL_BYTES,
|
||||
};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
use crate::repeated_field::{Clear, RepeatedField};
|
||||
@@ -199,10 +199,17 @@ impl PromTimeSeries {
|
||||
self.table_name = decode_string(&label.value, prom_validation_mode)?;
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
DATABASE_LABEL_BYTES => {
|
||||
SCHEMA_LABEL_BYTES => {
|
||||
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
DATABASE_LABEL_BYTES => {
|
||||
// Only set schema from __database__ if __schema__ hasn't been set yet
|
||||
if self.schema.is_none() {
|
||||
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
}
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
PHYSICAL_TABLE_LABEL_BYTES => {
|
||||
self.physical_table =
|
||||
Some(decode_string(&label.value, prom_validation_mode)?);
|
||||
|
||||
Reference in New Issue
Block a user