From 59d40c39b19b64cc6c8291952efccbab962d557d Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 29 Jun 2026 11:15:41 +0800 Subject: [PATCH] feat: `prw_v2` initial commit with sample ingestion (#8361) * chore: add v2 entrance Signed-off-by: shuiyisong * chore: decode request Signed-off-by: shuiyisong * chore: implement remote write v2 for samples Signed-off-by: shuiyisong * chore: add tests Signed-off-by: shuiyisong * chore: remove hand-written proto Signed-off-by: shuiyisong * chore: refactor Signed-off-by: shuiyisong * fix: CR issue Signed-off-by: shuiyisong * fix: CR issue Signed-off-by: shuiyisong * fix: merge tests Signed-off-by: shuiyisong * chore: add source version field Signed-off-by: shuiyisong * chore: fix CR issues Signed-off-by: shuiyisong * fix: test Signed-off-by: shuiyisong * fix: sqlness Signed-off-by: shuiyisong * chore: update proto Signed-off-by: shuiyisong * chore: update proto Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong --- Cargo.lock | 2 +- Cargo.toml | 2 +- docs/rfcs/2026-05-28-table-semantic-layer.md | 3 +- .../information_schema/table_semantics.rs | 28 +- src/operator/src/insert.rs | 9 +- src/servers/src/http/prom_store.rs | 525 +++++++++++++-- src/servers/src/prom_remote_write/mod.rs | 4 + src/servers/src/prom_remote_write/v2.rs | 633 ++++++++++++++++++ src/servers/src/row_writer.rs | 5 + src/servers/tests/http/prom_store_test.rs | 417 +++++++++++- src/table/src/requests/semantic.rs | 5 + tests-integration/tests/http.rs | 130 +++- .../information_schema/table_semantics.result | 16 +- .../information_schema/table_semantics.sql | 3 +- .../common/system/information_schema.result | 7 +- 15 files changed, 1720 insertions(+), 69 deletions(-) create mode 100644 src/servers/src/prom_remote_write/v2.rs diff --git a/Cargo.lock b/Cargo.lock index 38927a191a..5b06130cca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5937,7 +5937,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5e358318890606a2961e12dcdf9674f6763cec3a#5e358318890606a2961e12dcdf9674f6763cec3a" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b6b665d8b6fce4f1f0da49c8c79fb150636df4ff#b6b665d8b6fce4f1f0da49c8c79fb150636df4ff" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", diff --git a/Cargo.toml b/Cargo.toml index bf29241c51..1197547bdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,7 +158,7 @@ fs2 = "0.4" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5e358318890606a2961e12dcdf9674f6763cec3a" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6b665d8b6fce4f1f0da49c8c79fb150636df4ff" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/docs/rfcs/2026-05-28-table-semantic-layer.md b/docs/rfcs/2026-05-28-table-semantic-layer.md index 931bd6ce2d..4c646e5e51 100644 --- a/docs/rfcs/2026-05-28-table-semantic-layer.md +++ b/docs/rfcs/2026-05-28-table-semantic-layer.md @@ -67,6 +67,7 @@ concern, not query-time semantics). | --- | --- | | `greptime.semantic.signal_type` | `trace` / `log` / `metric` / `event` | | `greptime.semantic.source` | `opentelemetry` / `prometheus` / `influxdb` / `opentsdb` / `elasticsearch` / `loki` / `custom` | +| `greptime.semantic.source_version` | Source protocol version, e.g. Prometheus remote write `1.0` / `2.0` | | `greptime.semantic.pipeline` | `greptime_trace_v1` (the signal-agnostic successor to `table_data_model`) | **Trace**: `greptime.semantic.trace.conventions` (the OTel `schema_url` the rows conform to, or `mixed` / `unknown` when not single-valued). @@ -83,7 +84,7 @@ concern, not query-time semantics). `metadata_quality = inferred` is the load-bearing field for confidence-aware tooling: an inferred counter should be re-checked before betting on `rate()`-style semantics. -**Deliberately omitted (and why)**: `metric.monotonic` (a function of `type`); `trace.has_events`/`has_links` (constant for the v1 model and derivable from the `span_events`/`span_links` columns); `log.severity_scheme`/`log.body_format` (constant / derivable by sampling, and the latter cost an O(rows) scan); `resource.attributes_preserved`/`attributes_dropped`/`scope.preserved` (the preserved set restates columns, the dropped flag is a contentless boolean, and lineage is a collector-config concern); `source_version` (no cheap non-constant value today — Prom RW is v1-only, OTel SDK identity is deferred). Some are reserved for follow-ups (see Future Work). +**Deliberately omitted (and why)**: `metric.monotonic` (a function of `type`); `trace.has_events`/`has_links` (constant for the v1 model and derivable from the `span_events`/`span_links` columns); `log.severity_scheme`/`log.body_format` (constant / derivable by sampling, and the latter cost an O(rows) scan); `resource.attributes_preserved`/`attributes_dropped`/`scope.preserved` (the preserved set restates columns, the dropped flag is a contentless boolean, and lineage is a collector-config concern). Some are reserved for follow-ups (see Future Work). ## Conflict and update semantics diff --git a/src/catalog/src/system_schema/information_schema/table_semantics.rs b/src/catalog/src/system_schema/information_schema/table_semantics.rs index a203d18c08..b74885968e 100644 --- a/src/catalog/src/system_schema/information_schema/table_semantics.rs +++ b/src/catalog/src/system_schema/information_schema/table_semantics.rs @@ -19,9 +19,10 @@ //! table's `create_options`. //! //! The few signal-agnostic keys are promoted to their own columns -//! (`signal_type` / `source` / `pipeline` / `metadata_quality`); the remaining -//! signal-specific keys are folded into a `semantic_options` JSON string, keyed -//! by the option name with the `greptime.semantic.` prefix stripped. +//! (`signal_type` / `source` / `source_version` / `pipeline` / +//! `metadata_quality`); the remaining signal-specific keys are folded into a +//! `semantic_options` JSON string, keyed by the option name with the +//! `greptime.semantic.` prefix stripped. use std::collections::BTreeMap; use std::sync::{Arc, Weak}; @@ -45,7 +46,7 @@ use store_api::storage::{ScanRequest, TableId}; use table::metadata::TableInfo; use table::requests::{ SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_PIPELINE, SEMANTIC_PREFIX, SEMANTIC_SIGNAL_TYPE, - SEMANTIC_SOURCE, is_semantic_option_key, + SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, is_semantic_option_key, }; use crate::CatalogManager; @@ -60,6 +61,7 @@ pub const TABLE_NAME: &str = "table_name"; pub const TABLE_ID: &str = "table_id"; pub const SIGNAL_TYPE: &str = "signal_type"; pub const SOURCE: &str = "source"; +pub const SOURCE_VERSION: &str = "source_version"; pub const PIPELINE: &str = "pipeline"; pub const METADATA_QUALITY: &str = "metadata_quality"; pub const SEMANTIC_OPTIONS: &str = "semantic_options"; @@ -75,6 +77,7 @@ fn optional_value(v: Option<&str>) -> Value { struct SemanticRow<'a> { signal_type: Option<&'a str>, source: Option<&'a str>, + source_version: Option<&'a str>, pipeline: Option<&'a str>, metadata_quality: Option<&'a str>, options_json: Option, @@ -86,6 +89,7 @@ impl<'a> SemanticRow<'a> { fn extract(table_info: &'a TableInfo) -> Option { let mut signal_type = None; let mut source = None; + let mut source_version = None; let mut pipeline = None; let mut metadata_quality = None; let mut rest = BTreeMap::new(); @@ -97,6 +101,7 @@ impl<'a> SemanticRow<'a> { match key.as_str() { SEMANTIC_SIGNAL_TYPE => signal_type = Some(value.as_str()), SEMANTIC_SOURCE => source = Some(value.as_str()), + SEMANTIC_SOURCE_VERSION => source_version = Some(value.as_str()), SEMANTIC_PIPELINE => pipeline = Some(value.as_str()), SEMANTIC_METRIC_METADATA_QUALITY => metadata_quality = Some(value.as_str()), _ => { @@ -108,6 +113,7 @@ impl<'a> SemanticRow<'a> { let has_any = signal_type.is_some() || source.is_some() + || source_version.is_some() || pipeline.is_some() || metadata_quality.is_some() || !rest.is_empty(); @@ -125,6 +131,7 @@ impl<'a> SemanticRow<'a> { Some(Self { signal_type, source, + source_version, pipeline, metadata_quality, options_json, @@ -156,6 +163,7 @@ impl InformationSchemaTableSemantics { ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false), ColumnSchema::new(SIGNAL_TYPE, ConcreteDataType::string_datatype(), true), ColumnSchema::new(SOURCE, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(SOURCE_VERSION, ConcreteDataType::string_datatype(), true), ColumnSchema::new(PIPELINE, ConcreteDataType::string_datatype(), true), ColumnSchema::new(METADATA_QUALITY, ConcreteDataType::string_datatype(), true), ColumnSchema::new(SEMANTIC_OPTIONS, ConcreteDataType::string_datatype(), true), @@ -216,6 +224,7 @@ struct InformationSchemaSemanticTablesBuilder { table_ids: UInt32VectorBuilder, signal_types: StringVectorBuilder, sources: StringVectorBuilder, + source_versions: StringVectorBuilder, pipelines: StringVectorBuilder, metadata_qualities: StringVectorBuilder, semantic_options: StringVectorBuilder, @@ -237,6 +246,7 @@ impl InformationSchemaSemanticTablesBuilder { table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), signal_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), sources: StringVectorBuilder::with_capacity(INIT_CAPACITY), + source_versions: StringVectorBuilder::with_capacity(INIT_CAPACITY), pipelines: StringVectorBuilder::with_capacity(INIT_CAPACITY), metadata_qualities: StringVectorBuilder::with_capacity(INIT_CAPACITY), semantic_options: StringVectorBuilder::with_capacity(INIT_CAPACITY), @@ -279,6 +289,7 @@ impl InformationSchemaSemanticTablesBuilder { let name_v = Value::from(table_name); let signal_v = optional_value(row.signal_type); let source_v = optional_value(row.source); + let source_version_v = optional_value(row.source_version); let pipeline_v = optional_value(row.pipeline); let quality_v = optional_value(row.metadata_quality); let predicate_row = [ @@ -287,6 +298,7 @@ impl InformationSchemaSemanticTablesBuilder { (TABLE_NAME, &name_v), (SIGNAL_TYPE, &signal_v), (SOURCE, &source_v), + (SOURCE_VERSION, &source_version_v), (PIPELINE, &pipeline_v), (METADATA_QUALITY, &quality_v), ]; @@ -300,6 +312,7 @@ impl InformationSchemaSemanticTablesBuilder { self.table_ids.push(Some(table_info.table_id())); self.signal_types.push(row.signal_type); self.sources.push(row.source); + self.source_versions.push(row.source_version); self.pipelines.push(row.pipeline); self.metadata_qualities.push(row.metadata_quality); self.semantic_options.push(row.options_json.as_deref()); @@ -313,6 +326,7 @@ impl InformationSchemaSemanticTablesBuilder { Arc::new(self.table_ids.finish()), Arc::new(self.signal_types.finish()), Arc::new(self.sources.finish()), + Arc::new(self.source_versions.finish()), Arc::new(self.pipelines.finish()), Arc::new(self.metadata_qualities.finish()), Arc::new(self.semantic_options.finish()), @@ -349,7 +363,9 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use datatypes::schema::SchemaBuilder; use table::metadata::{TableInfoBuilder, TableMeta, TableType}; - use table::requests::{SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, TableOptions}; + use table::requests::{ + SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, SEMANTIC_SOURCE_VERSION, TableOptions, + }; use super::*; @@ -400,6 +416,7 @@ mod tests { let info = table_info(&[ (SEMANTIC_SIGNAL_TYPE, "metric"), (SEMANTIC_SOURCE, "opentelemetry"), + (SEMANTIC_SOURCE_VERSION, "2.0"), (SEMANTIC_PIPELINE, "greptime_metric_v1"), (SEMANTIC_METRIC_METADATA_QUALITY, "declared"), (SEMANTIC_METRIC_TYPE, "counter"), @@ -411,6 +428,7 @@ mod tests { let row = SemanticRow::extract(&info).unwrap(); assert_eq!(row.signal_type, Some("metric")); assert_eq!(row.source, Some("opentelemetry")); + assert_eq!(row.source_version, Some("2.0")); assert_eq!(row.pipeline, Some("greptime_metric_v1")); assert_eq!(row.metadata_quality, Some("declared")); // Promoted keys stay out of the JSON tail; remaining keys are sorted and diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 8a038b8013..080c270f92 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -1462,12 +1462,13 @@ mod tests { fn test_fill_table_options_copies_semantic_extensions() { use table::requests::{ SEMANTIC_METRIC_TYPE, SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE, - SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY, + SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY, }; let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); ctx.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY); + ctx.set_extension(SEMANTIC_SOURCE_VERSION, "2.0"); ctx.set_extension(SEMANTIC_METRIC_TYPE, "bogus"); // The internal transport key must NOT be copied into table options. ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, "{}"); @@ -1484,6 +1485,12 @@ mod tests { Some(SOURCE_OPENTELEMETRY), table_options.get(SEMANTIC_SOURCE).map(String::as_str) ); + assert_eq!( + Some("2.0"), + table_options + .get(SEMANTIC_SOURCE_VERSION) + .map(String::as_str) + ); assert!(!table_options.contains_key(SEMANTIC_METRIC_TYPE)); assert!(!table_options.contains_key(SEMANTIC_PER_TABLE_INDEX_KEY)); } diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 280c0655d7..07cd1e19a9 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -18,30 +18,34 @@ use api::prom_store::remote::ReadRequest; use axum::Extension; use axum::body::Bytes; use axum::extract::{Query, State}; -use axum::http::{HeaderValue, StatusCode, header}; +use axum::http::{HeaderMap, HeaderValue, StatusCode, header}; use axum::response::IntoResponse; use axum_extra::TypedHeader; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_telemetry::tracing; -use hyper::HeaderMap; -use pipeline::PipelineDefinition; +use mime_guess::mime; use pipeline::util::to_pipeline_version; +use pipeline::{ContextReq, PipelineDefinition}; +use prometheus::HistogramTimer; use prost::Message; use serde::{Deserialize, Serialize}; use session::context::{Channel, QueryContext}; use snafu::prelude::*; use table::requests::{ METADATA_QUALITY_INFERRED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_SIGNAL_TYPE, - SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS, + SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS, }; use crate::error::{self, InternalSnafu, PipelineSnafu, Result}; use crate::http::extractor::PipelineInfo; -use crate::http::header::{GREPTIME_DB_HEADER_METRICS, write_cost_header_map}; +use crate::http::header::{ + CONTENT_TYPE_PROTOBUF_STR, GREPTIME_DB_HEADER_METRICS, write_cost_header_map, +}; use crate::pending_rows_batcher::PendingRowsBatcher; use crate::prom_remote_write::decode::PromSeriesProcessor; use crate::prom_remote_write::decode_remote_write_request; +use crate::prom_remote_write::v2::{RemoteWriteV2RequestExt, decode_remote_write_v2_request}; use crate::prom_remote_write::validation::PromValidationMode; use crate::prom_store::{extract_schema_from_read_request, snappy_decompress}; use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse}; @@ -50,6 +54,16 @@ pub const PHYSICAL_TABLE_PARAM: &str = "physical_table"; pub const DEFAULT_ENCODING: &str = "snappy"; pub const VM_ENCODING: &str = "zstd"; pub const VM_PROTO_VERSION: &str = "1"; +const REMOTE_WRITE_V1_VERSION: &str = "1.0"; +const REMOTE_WRITE_V2_VERSION: &str = "2.0"; +const REMOTE_WRITE_V1_PROTO: &str = "prometheus.WriteRequest"; +const REMOTE_WRITE_V2_PROTO: &str = "io.prometheus.write.v2.Request"; +const CONTENT_TYPE_PROTO_PARAM: &str = "proto"; +const REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER: &str = "x-prometheus-remote-write-samples-written"; +const REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER: &str = + "x-prometheus-remote-write-histograms-written"; +const REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER: &str = + "x-prometheus-remote-write-exemplars-written"; #[derive(Clone)] pub struct PromStoreState { @@ -88,11 +102,41 @@ impl Default for RemoteWriteQuery { pub async fn remote_write( State(state): State, Query(params): Query, - Extension(mut query_ctx): Extension, + Extension(query_ctx): Extension, + content_type: Option>, pipeline_info: PipelineInfo, content_encoding: TypedHeader, body: Bytes, -) -> Result { +) -> Result { + let is_zstd = content_encoding.contains(VM_ENCODING); + + match remote_write_proto(content_type) { + RemoteWriteProto::V1 => { + remote_write_v1(state, params, query_ctx, pipeline_info, is_zstd, body).await + } + RemoteWriteProto::V2 => { + if let Some(response) = unsupported_remote_write_v2_encoding_response(&content_encoding) + { + return Ok(response); + } + remote_write_v2(state, params, query_ctx, pipeline_info, is_zstd, body).await + } + RemoteWriteProto::Unsupported(content_type) => Ok(( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!("unsupported prometheus remote write content type: {content_type}"), + ) + .into_response()), + } +} + +async fn remote_write_v1( + state: PromStoreState, + params: RemoteWriteQuery, + query_ctx: QueryContext, + pipeline_info: PipelineInfo, + is_zstd: bool, + body: Bytes, +) -> Result { let PromStoreState { prom_store_handler, pipeline_handler, @@ -101,30 +145,12 @@ pub async fn remote_write( pending_rows_batcher, } = state; - if let Some(_vm_handshake) = params.get_vm_proto_version { - return Ok(VM_PROTO_VERSION.into_response()); + if let Some(response) = vm_proto_version_response(¶ms) { + return Ok(response); } - let db = params.db.clone().unwrap_or_default(); - query_ctx.set_channel(Channel::Prometheus); - let physical_table = params - .physical_table - .clone() - .unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string()); - query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table.clone()); - // Stamp the Prometheus metric identity here, before `as_req_iter` splits into the - // batched and direct write paths, so both inherit it (the batched path bypasses - // `PromStoreProtocolHandler::write`). Prom RW v1 metadata is weak, so the type is - // inferred from naming. - query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); - query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS); - query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED); - let query_ctx = Arc::new(query_ctx); - let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED - .with_label_values(&[db.as_str()]) - .start_timer(); - - let is_zstd = content_encoding.contains(VM_ENCODING); + let (db, query_ctx, _timer) = + prepare_remote_write_context(¶ms, query_ctx, REMOTE_WRITE_V1_VERSION); let mut processor = PromSeriesProcessor::default_processor(); @@ -151,20 +177,189 @@ pub async fn remote_write( req.as_insert_requests() }; + let outcome = write_prometheus_rows( + prom_store_handler, + pending_rows_batcher, + prom_store_with_metric_engine, + &db, + query_ctx, + req, + ) + .await?; + + Ok(( + StatusCode::NO_CONTENT, + write_cost_header_map(outcome.write_cost), + ) + .into_response()) +} + +async fn remote_write_v2( + state: PromStoreState, + params: RemoteWriteQuery, + query_ctx: QueryContext, + pipeline_info: PipelineInfo, + is_zstd: bool, + body: Bytes, +) -> Result { + let PromStoreState { + prom_store_handler, + pipeline_handler: _, + prom_store_with_metric_engine, + prom_validation_mode: _, + pending_rows_batcher, + } = state; + + if let Some(response) = vm_proto_version_response(¶ms) { + return Ok(response); + } + + // Pipeline processing is not supported for remote write v2 yet. Ignore the + // optional pipeline parameter and ingest samples directly. + let _ = pipeline_info; + + let (db, query_ctx, _timer) = + prepare_remote_write_context(¶ms, query_ctx, REMOTE_WRITE_V2_VERSION); + + let request = match decode_remote_write_v2_request(is_zstd, body) { + Ok(request) => request, + Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)), + }; + let req = match request.into_context_req() { + Ok(req) => req, + Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)), + }; + + let outcome = match write_prometheus_rows_with_progress( + prom_store_handler, + pending_rows_batcher, + prom_store_with_metric_engine, + &db, + query_ctx, + req, + ) + .await + { + Ok(outcome) => outcome, + Err(error) => { + return Ok(remote_write_v2_error_response( + error.error, + error.rows_written, + 0, + 0, + )); + } + }; + + let mut headers = write_cost_header_map(outcome.write_cost); + append_remote_write_v2_written_headers(&mut headers, outcome.rows_written, 0, 0); + + Ok((StatusCode::NO_CONTENT, headers).into_response()) +} + +fn vm_proto_version_response(params: &RemoteWriteQuery) -> Option { + params + .get_vm_proto_version + .as_ref() + .map(|_| VM_PROTO_VERSION.into_response()) +} + +fn prepare_remote_write_context( + params: &RemoteWriteQuery, + mut query_ctx: QueryContext, + remote_write_version: &str, +) -> (String, Arc, HistogramTimer) { + let db = params.db.clone().unwrap_or_default(); + query_ctx.set_channel(Channel::Prometheus); + let physical_table = params + .physical_table + .clone() + .unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string()); + query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table); + // Stamp the Prometheus metric identity here, before `as_req_iter` splits into the + // batched and direct write paths, so both inherit it (the batched path bypasses + // `PromStoreProtocolHandler::write`). Prometheus remote-write metadata is weak + // here, so the type is inferred from naming. + query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC); + query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS); + query_ctx.set_extension(SEMANTIC_SOURCE_VERSION, remote_write_version); + query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED); + let query_ctx = Arc::new(query_ctx); + let timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED + .with_label_values(&[db.as_str()]) + .start_timer(); + + (db, query_ctx, timer) +} + +struct PromWriteOutcome { + write_cost: usize, + rows_written: u64, +} + +struct PromWriteError { + error: error::Error, + rows_written: u64, +} + +async fn write_prometheus_rows( + prom_store_handler: PromStoreProtocolHandlerRef, + pending_rows_batcher: Option>, + prom_store_with_metric_engine: bool, + db: &str, + query_ctx: Arc, + req: ContextReq, +) -> Result { + write_prometheus_rows_with_progress( + prom_store_handler, + pending_rows_batcher, + prom_store_with_metric_engine, + db, + query_ctx, + req, + ) + .await + .map_err(|error| error.error) +} + +async fn write_prometheus_rows_with_progress( + prom_store_handler: PromStoreProtocolHandlerRef, + pending_rows_batcher: Option>, + prom_store_with_metric_engine: bool, + db: &str, + query_ctx: Arc, + req: ContextReq, +) -> std::result::Result { if prom_store_with_metric_engine && let Some(batcher) = pending_rows_batcher { + let mut rows_written = 0; for (temp_ctx, reqs) in req.as_req_iter(query_ctx) { prom_store_handler .pre_write(&reqs, temp_ctx.clone()) - .await?; - let rows = batcher.submit(reqs, temp_ctx).await?; + .await + .map_err(|error| PromWriteError { + error, + rows_written, + })?; + let rows = batcher + .submit(reqs, temp_ctx) + .await + .map_err(|error| PromWriteError { + error, + rows_written, + })?; crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES - .with_label_values(&[db.as_str()]) + .with_label_values(&[db]) .inc_by(rows); + rows_written += rows; } - return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response()); + return Ok(PromWriteOutcome { + write_cost: 0, + rows_written, + }); } - let mut cost = 0; + let mut write_cost = 0; + let mut rows_written = 0; for (temp_ctx, reqs) in req.as_req_iter(query_ctx) { let cnt: u64 = reqs .inserts @@ -173,14 +368,104 @@ pub async fn remote_write( .sum(); let output = prom_store_handler .write(reqs, temp_ctx, prom_store_with_metric_engine) - .await?; + .await + .map_err(|error| PromWriteError { + error, + rows_written, + })?; crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES - .with_label_values(&[db.as_str()]) + .with_label_values(&[db]) .inc_by(cnt); - cost += output.meta.cost; + write_cost += output.meta.cost; + rows_written += cnt; } - Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response()) + Ok(PromWriteOutcome { + write_cost, + rows_written, + }) +} + +fn remote_write_v2_error_response( + error: error::Error, + samples: u64, + histograms: u64, + exemplars: u64, +) -> axum::response::Response { + let mut response = error.into_response(); + append_remote_write_v2_written_headers(response.headers_mut(), samples, histograms, exemplars); + response +} + +fn append_remote_write_v2_written_headers( + headers: &mut HeaderMap, + samples: u64, + histograms: u64, + exemplars: u64, +) { + headers.insert( + REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER, + HeaderValue::from_str(&samples.to_string()).expect("u64 header value is valid"), + ); + headers.insert( + REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER, + HeaderValue::from_str(&histograms.to_string()).expect("u64 header value is valid"), + ); + headers.insert( + REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER, + HeaderValue::from_str(&exemplars.to_string()).expect("u64 header value is valid"), + ); +} + +enum RemoteWriteProto { + V1, + V2, + Unsupported(mime::Mime), +} + +// ref: https://github.com/prometheus/client_golang/blob/74560058a7af7a695db8196c8e84a0754032c6af/exp/api/remote/remote_api.go#L544 +fn remote_write_proto(content_type: Option>) -> RemoteWriteProto { + let Some(TypedHeader(content_type)) = content_type else { + return RemoteWriteProto::V1; + }; + + let mime_type: mime::Mime = content_type.into(); + if !mime_type + .essence_str() + .eq_ignore_ascii_case(CONTENT_TYPE_PROTOBUF_STR) + { + return RemoteWriteProto::Unsupported(mime_type); + } + + for (name, value) in mime_type.params() { + if !name.as_str().eq_ignore_ascii_case(CONTENT_TYPE_PROTO_PARAM) { + continue; + } + + return match value.as_str() { + REMOTE_WRITE_V1_PROTO => RemoteWriteProto::V1, + REMOTE_WRITE_V2_PROTO => RemoteWriteProto::V2, + _ => RemoteWriteProto::Unsupported(mime_type.clone()), + }; + } + + RemoteWriteProto::V1 +} + +fn unsupported_remote_write_v2_encoding_response( + content_encoding: &headers::ContentEncoding, +) -> Option { + if content_encoding.contains(DEFAULT_ENCODING) || content_encoding.contains(VM_ENCODING) { + return None; + } + + Some(( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + format!( + "unsupported prometheus remote write content encoding: only {DEFAULT_ENCODING} and {VM_ENCODING} are supported" + ), + ) + .into_response()) } impl IntoResponse for PromStoreResponse { @@ -236,3 +521,167 @@ async fn decode_remote_read_request(body: Bytes) -> Result { ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu) } + +#[cfg(test)] +mod tests { + use api::prom_store::remote::ReadRequest; + use api::v1::RowInsertRequests; + use async_trait::async_trait; + use common_query::Output; + use pipeline::GreptimePipelineParams; + use session::context::{QueryContext, QueryContextRef}; + + use super::*; + use crate::prom_remote_write::validation::PromValidationMode; + use crate::prom_store::Metrics; + use crate::query_handler::PromStoreProtocolHandler; + + #[test] + fn test_remote_write_proto() { + assert!(matches!( + remote_write_proto(content_type( + "application/x-protobuf;proto=io.prometheus.write.v2.Request" + )), + RemoteWriteProto::V2 + )); + assert!(matches!( + remote_write_proto(content_type( + "application/x-protobuf; proto=\"io.prometheus.write.v2.Request\"" + )), + RemoteWriteProto::V2 + )); + assert!(matches!( + remote_write_proto(content_type( + "APPLICATION/X-PROTOBUF;proto=io.prometheus.write.v2.Request" + )), + RemoteWriteProto::V2 + )); + assert!(matches!( + remote_write_proto(content_type("application/x-protobuf")), + RemoteWriteProto::V1 + )); + assert!(matches!( + remote_write_proto(content_type( + "application/x-protobuf;proto=prometheus.WriteRequest" + )), + RemoteWriteProto::V1 + )); + assert!(matches!( + remote_write_proto(content_type( + "application/x-protobuf;proto=unknown.WriteRequest" + )), + RemoteWriteProto::Unsupported(_) + )); + assert!(matches!( + remote_write_proto(content_type( + "application/json;proto=io.prometheus.write.v2.Request" + )), + RemoteWriteProto::Unsupported(_) + )); + assert!(matches!(remote_write_proto(None), RemoteWriteProto::V1)); + } + + fn content_type(value: &str) -> Option> { + Some(TypedHeader(std::str::FromStr::from_str(value).unwrap())) + } + + #[test] + fn test_prepare_remote_write_context_stamps_semantics() { + let (_, query_ctx, _timer) = prepare_remote_write_context( + &RemoteWriteQuery::default(), + QueryContext::with("greptime", "public"), + REMOTE_WRITE_V2_VERSION, + ); + + assert_eq!( + query_ctx.extension(SEMANTIC_SIGNAL_TYPE), + Some(SIGNAL_TYPE_METRIC) + ); + assert_eq!( + query_ctx.extension(SEMANTIC_SOURCE), + Some(SOURCE_PROMETHEUS) + ); + assert_eq!( + query_ctx.extension(SEMANTIC_SOURCE_VERSION), + Some(REMOTE_WRITE_V2_VERSION) + ); + assert_eq!( + query_ctx.extension(SEMANTIC_METRIC_METADATA_QUALITY), + Some(METADATA_QUALITY_INFERRED) + ); + } + + #[tokio::test] + async fn test_remote_write_v2_ignores_pipeline() { + let request = api::greptime_proto::io::prometheus::write::v2::Request { + symbols: vec![String::new()], + timeseries: Vec::new(), + }; + let body = + Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap()); + + let response = remote_write_v2( + test_state(), + RemoteWriteQuery::default(), + QueryContext::with("greptime", "public"), + pipeline_info(Some("pipeline")), + false, + body, + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::NO_CONTENT); + assert_eq!( + Some("0"), + response + .headers() + .get(REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER) + .map(|x| x.to_str().unwrap()) + ); + } + + fn test_state() -> PromStoreState { + PromStoreState { + prom_store_handler: Arc::new(NoopPromStoreHandler), + pipeline_handler: None, + prom_store_with_metric_engine: false, + prom_validation_mode: PromValidationMode::Strict, + pending_rows_batcher: None, + } + } + + fn pipeline_info(pipeline_name: Option<&str>) -> PipelineInfo { + PipelineInfo { + pipeline_name: pipeline_name.map(ToString::to_string), + pipeline_version: None, + pipeline_params: GreptimePipelineParams::default(), + } + } + + struct NoopPromStoreHandler; + + #[async_trait] + impl PromStoreProtocolHandler for NoopPromStoreHandler { + async fn write( + &self, + _request: RowInsertRequests, + _ctx: QueryContextRef, + _with_metric_engine: bool, + ) -> Result { + unreachable!("empty remote write v2 request should not write") + } + + async fn read( + &self, + _request: ReadRequest, + _ctx: QueryContextRef, + ) -> Result { + unimplemented!() + } + + async fn ingest_metrics(&self, _metrics: Metrics) -> Result<()> { + unimplemented!() + } + } +} diff --git a/src/servers/src/prom_remote_write/mod.rs b/src/servers/src/prom_remote_write/mod.rs index de943ae85a..1c6b972e3d 100644 --- a/src/servers/src/prom_remote_write/mod.rs +++ b/src/servers/src/prom_remote_write/mod.rs @@ -20,6 +20,10 @@ pub mod decode; pub(crate) mod row_builder; pub(crate) mod types; +#[cfg(any(test, feature = "testing"))] +pub mod v2; +#[cfg(not(any(test, feature = "testing")))] +pub(crate) mod v2; pub mod validation; use bytes::Bytes; diff --git a/src/servers/src/prom_remote_write/v2.rs b/src/servers/src/prom_remote_write/v2.rs new file mode 100644 index 0000000000..8ee8455338 --- /dev/null +++ b/src/servers/src/prom_remote_write/v2.rs @@ -0,0 +1,633 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; + +#[cfg(test)] +use api::greptime_proto::io::prometheus::write::v2::{Exemplar, Histogram, Metadata, metadata}; +use api::greptime_proto::io::prometheus::write::v2::{Request, Sample, TimeSeries}; +use api::v1::{RowInsertRequest, Rows, Value}; +use bytes::Bytes; +use common_grpc::precision::Precision; +use common_query::prelude::{greptime_timestamp, greptime_value}; +use pipeline::{ContextOpt, ContextReq}; +use prost::Message; +use snafu::{OptionExt, ResultExt, ensure}; + +use crate::error::{self, Result}; +use crate::prom_remote_write::row_builder::PromCtx; +use crate::prom_remote_write::try_decompress; +#[allow(deprecated)] +use crate::prom_store::{ + DATABASE_LABEL, DATABASE_LABEL_ALT, METRIC_NAME_LABEL, PHYSICAL_TABLE_LABEL, + PHYSICAL_TABLE_LABEL_ALT, SCHEMA_LABEL, +}; +use crate::row_writer::{self, TableData}; + +type PromTags = Vec<(String, String)>; +type ResolvedSeriesLabels = (PromCtx, String, PromTags); + +pub(crate) fn decode_remote_write_v2_request(is_zstd: bool, body: Bytes) -> Result { + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); + + // Match the v1 decoder's VictoriaMetrics fallback: some clients may send a + // mismatched content-encoding header, so try the other compression on failure. + let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) { + buf + } else { + try_decompress(!is_zstd, &body[..])? + }; + + Request::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu) +} + +pub(crate) trait RemoteWriteV2RequestExt { + fn into_context_req(self) -> Result; +} + +impl RemoteWriteV2RequestExt for Request { + fn into_context_req(self) -> Result { + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer(); + let Request { + symbols, + timeseries, + } = self; + + ensure!( + symbols.first().map(|s| s.as_str()) == Some(""), + error::InvalidPromRemoteRequestSnafu { + msg: "remote write v2 symbols must start with an empty string".to_string(), + } + ); + + let mut tables = HashMap::>::new(); + + for series in timeseries { + // Native histograms and exemplars are intentionally ignored for now. + // They will be converted into Greptime rows once their ingestion path is implemented. + let sample_count = series.samples.len(); + if sample_count == 0 { + continue; + } + + let (prom_ctx, table_name, tags) = resolve_series_labels(&symbols, &series)?; + let table_data = match tables.entry(prom_ctx).or_default().entry(table_name) { + Entry::Occupied(entry) => { + let table_data = entry.into_mut(); + table_data.reserve_rows(sample_count); + table_data + } + Entry::Vacant(entry) => entry.insert(TableData::new(tags.len() + 2, sample_count)), + }; + + write_samples(table_data, series.samples, tags)?; + } + + Ok(into_context_req(tables)) + } +} + +fn write_samples( + table_data: &mut TableData, + mut samples: Vec, + tags: PromTags, +) -> Result<()> { + let Some(last_sample) = samples.pop() else { + return Ok(()); + }; + + for sample in &samples { + write_sample(table_data, sample, tags.iter().cloned())?; + } + + write_sample(table_data, &last_sample, tags.into_iter()) +} + +fn write_sample( + table_data: &mut TableData, + sample: &Sample, + tags: impl Iterator, +) -> Result<()> { + let mut row = table_data.alloc_one_row(); + row_writer::write_ts_to_millis( + table_data, + greptime_timestamp(), + Some(sample.timestamp), + Precision::Millisecond, + &mut row, + )?; + row_writer::write_f64(table_data, greptime_value(), sample.value, &mut row)?; + row_writer::write_tags(table_data, tags, &mut row)?; + table_data.add_row(row); + + Ok(()) +} + +fn resolve_series_labels(symbols: &[String], series: &TimeSeries) -> Result { + ensure!( + series.labels_refs.len().is_multiple_of(2), + error::InvalidPromRemoteRequestSnafu { + msg: "remote write v2 labels_refs must contain name/value pairs".to_string(), + } + ); + + let mut prom_ctx = PromCtx::default(); + let mut table_name = None; + let mut tags = Vec::with_capacity(series.labels_refs.len() / 2); + let mut label_names = HashSet::with_capacity(series.labels_refs.len() / 2); + + for pair in series.labels_refs.chunks_exact(2) { + let name = symbol_ref(symbols, pair[0], "label name")?; + let value = symbol_ref(symbols, pair[1], "label value")?; + validate_label(name, value)?; + ensure!( + label_names.insert(name), + error::InvalidPromRemoteRequestSnafu { + msg: format!("remote write v2 label name `{name}` is repeated"), + } + ); + + if name == METRIC_NAME_LABEL { + table_name = Some(value.to_string()); + continue; + } + if apply_remote_write_special_label(name, value, &mut prom_ctx) { + continue; + } + + tags.push((name.to_string(), value.to_string())); + } + + let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu { + msg: "missing '__name__' label in time-series".to_string(), + })?; + + Ok((prom_ctx, table_name, tags)) +} + +fn validate_label(name: &str, value: &str) -> Result<()> { + ensure!( + !name.is_empty(), + error::InvalidPromRemoteRequestSnafu { + msg: "remote write v2 label names must not be empty".to_string(), + } + ); + ensure!( + !value.is_empty(), + error::InvalidPromRemoteRequestSnafu { + msg: format!("remote write v2 label `{name}` value must not be empty"), + } + ); + + Ok(()) +} + +fn symbol_ref<'a>(symbols: &'a [String], idx: u32, field: &str) -> Result<&'a str> { + symbols + .get(idx as usize) + .map(String::as_str) + .with_context(|| error::InvalidPromRemoteRequestSnafu { + msg: format!( + "remote write v2 {field} symbol reference {idx} is out of range, symbols len: {}", + symbols.len() + ), + }) +} + +#[allow(deprecated)] +fn apply_remote_write_special_label(name: &str, value: &str, prom_ctx: &mut PromCtx) -> bool { + match name { + SCHEMA_LABEL => { + prom_ctx.schema = Some(value.to_string()); + true + } + DATABASE_LABEL | DATABASE_LABEL_ALT => { + if prom_ctx.schema.is_none() { + prom_ctx.schema = Some(value.to_string()); + } + true + } + PHYSICAL_TABLE_LABEL | PHYSICAL_TABLE_LABEL_ALT => { + prom_ctx.physical_table = Some(value.to_string()); + true + } + _ => false, + } +} + +fn into_context_req(tables: HashMap>) -> ContextReq { + let mut ctx_req = ContextReq::default(); + for (prom_ctx, tables) in tables { + let mut opt = ContextOpt::default(); + if let Some(schema) = prom_ctx.schema { + opt.set_schema(schema); + } + if let Some(physical_table) = prom_ctx.physical_table { + opt.set_physical_table(physical_table); + } + + ctx_req.add_rows( + opt, + tables.into_iter().map(|(table_name, table_data)| { + table_data_to_row_insert_request(table_name, table_data) + }), + ); + } + ctx_req +} + +fn table_data_to_row_insert_request(table_name: String, table_data: TableData) -> RowInsertRequest { + let num_columns = table_data.num_columns(); + let (schema, mut rows) = table_data.into_schema_and_rows(); + for row in &mut rows { + if num_columns > row.values.len() { + row.values.resize(num_columns, Value { value_data: None }); + } + } + + RowInsertRequest { + table_name, + rows: Some(Rows { schema, rows }), + } +} + +#[cfg(any(test, feature = "testing"))] +pub mod test_util { + use api::greptime_proto::io::prometheus::write::v2::{Histogram, Request, Sample, TimeSeries}; + + pub fn request_with_labels_and_samples( + labels: Vec<(&str, &str)>, + samples: Vec, + ) -> Request { + request_with_labels(labels, samples, Vec::new()) + } + + pub fn request_with_labels_and_histograms( + labels: Vec<(&str, &str)>, + histograms: Vec, + ) -> Request { + request_with_labels(labels, Vec::new(), histograms) + } + + pub fn histogram(timestamp: i64) -> Histogram { + Histogram { + timestamp, + ..Default::default() + } + } + + fn request_with_labels( + labels: Vec<(&str, &str)>, + samples: Vec, + histograms: Vec, + ) -> Request { + let mut symbols = vec!["".to_string()]; + let mut labels_refs = Vec::with_capacity(labels.len() * 2); + for (name, value) in labels { + labels_refs.push(push_symbol(&mut symbols, name)); + labels_refs.push(push_symbol(&mut symbols, value)); + } + + Request { + symbols, + timeseries: vec![TimeSeries { + labels_refs, + samples, + histograms, + exemplars: Vec::new(), + metadata: None, + }], + } + } + + fn push_symbol(symbols: &mut Vec, symbol: &str) -> u32 { + if let Some(idx) = symbols.iter().position(|s| s == symbol) { + return idx as u32; + } + + let idx = symbols.len(); + symbols.push(symbol.to_string()); + idx as u32 + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::value::ValueData; + use common_query::prelude::{greptime_timestamp, greptime_value}; + use session::context::QueryContext; + + use super::*; + use crate::error; + use crate::http::prom_store::PHYSICAL_TABLE_PARAM; + use crate::prom_store::{DATABASE_LABEL, PHYSICAL_TABLE_LABEL}; + + #[test] + fn test_decode_remote_write_v2_request() { + let request = Request { + symbols: vec![ + "".to_string(), + "__name__".to_string(), + "http_requests_total".to_string(), + ], + timeseries: vec![TimeSeries { + labels_refs: vec![1, 2], + samples: vec![Sample { + value: 42.0, + timestamp: 1000, + start_timestamp: 0, + }], + histograms: Vec::new(), + exemplars: Vec::new(), + metadata: Some(Metadata { + r#type: metadata::MetricType::Counter as i32, + help_ref: 0, + unit_ref: 0, + }), + }], + }; + let body = + Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap()); + + let decoded = decode_remote_write_v2_request(false, body).unwrap(); + + assert_eq!(decoded.symbols, request.symbols); + assert_eq!(decoded.timeseries.len(), 1); + assert_eq!(decoded.timeseries[0].labels_refs, vec![1, 2]); + assert_eq!(decoded.timeseries[0].samples.len(), 1); + assert_eq!(decoded.timeseries[0].samples[0].value, 42.0); + assert_eq!(decoded.timeseries[0].metadata.as_ref().unwrap().r#type, 1); + } + + #[test] + fn test_into_context_req_samples() { + let ctx_req = test_util::request_with_labels_and_samples( + vec![ + (METRIC_NAME_LABEL, "http_requests_total"), + ("job", "api"), + ("instance", "localhost:9090"), + ], + vec![ + Sample { + value: 42.0, + timestamp: 1000, + start_timestamp: 0, + }, + Sample { + value: 43.0, + timestamp: 2000, + start_timestamp: 0, + }, + ], + ) + .into_context_req() + .unwrap(); + + let mut inserts = ctx_req.all_req().collect::>(); + assert_eq!(inserts.len(), 1); + + let request = inserts.pop().unwrap(); + assert_eq!(request.table_name, "http_requests_total"); + let rows = request.rows.unwrap(); + assert_eq!(rows.rows.len(), 2); + assert_eq!( + rows.schema + .iter() + .map(|col| col.column_name.as_str()) + .collect::>(), + vec![greptime_timestamp(), greptime_value(), "job", "instance"] + ); + assert_eq!( + rows.rows[0].values[0].value_data, + Some(ValueData::TimestampMillisecondValue(1000)) + ); + assert_eq!( + rows.rows[0].values[1].value_data, + Some(ValueData::F64Value(42.0)) + ); + assert_eq!( + rows.rows[0].values[2].value_data, + Some(ValueData::StringValue("api".to_string())) + ); + assert_eq!( + rows.rows[0].values[3].value_data, + Some(ValueData::StringValue("localhost:9090".to_string())) + ); + assert_eq!( + rows.rows[1].values[0].value_data, + Some(ValueData::TimestampMillisecondValue(2000)) + ); + assert_eq!( + rows.rows[1].values[1].value_data, + Some(ValueData::F64Value(43.0)) + ); + } + + #[test] + fn test_into_context_req_accepts_utf8_label_names() { + let ctx_req = test_util::request_with_labels_and_samples( + vec![ + (METRIC_NAME_LABEL, "http_requests_total"), + ("service.name", "api"), + ("区域", "华东"), + ], + vec![Sample { + value: 42.0, + timestamp: 1000, + start_timestamp: 0, + }], + ) + .into_context_req() + .unwrap(); + + let mut inserts = ctx_req.all_req().collect::>(); + assert_eq!(inserts.len(), 1); + let rows = inserts.pop().unwrap().rows.unwrap(); + assert_eq!( + rows.schema + .iter() + .map(|col| col.column_name.as_str()) + .collect::>(), + vec![ + greptime_timestamp(), + greptime_value(), + "service.name", + "区域" + ] + ); + assert_eq!( + rows.rows[0].values[2].value_data, + Some(ValueData::StringValue("api".to_string())) + ); + assert_eq!( + rows.rows[0].values[3].value_data, + Some(ValueData::StringValue("华东".to_string())) + ); + } + + #[test] + fn test_into_context_req_special_labels() { + let ctx_req = test_util::request_with_labels_and_samples( + vec![ + (METRIC_NAME_LABEL, "cpu_usage"), + (DATABASE_LABEL, "tenant_a"), + (PHYSICAL_TABLE_LABEL, "metrics_physical"), + ("job", "api"), + ], + vec![Sample { + value: 1.0, + timestamp: 1000, + start_timestamp: 0, + }], + ) + .into_context_req() + .unwrap(); + + let mut iter = ctx_req.as_req_iter(Arc::new(QueryContext::with("greptime", "public"))); + let (ctx, reqs) = iter.next().unwrap(); + assert!(iter.next().is_none()); + + assert_eq!(ctx.current_schema(), "tenant_a"); + assert_eq!( + ctx.extension(PHYSICAL_TABLE_PARAM), + Some("metrics_physical") + ); + assert_eq!(reqs.inserts.len(), 1); + + let rows = reqs.inserts[0].rows.as_ref().unwrap(); + assert_eq!( + rows.schema + .iter() + .map(|col| col.column_name.as_str()) + .collect::>(), + vec![greptime_timestamp(), greptime_value(), "job"] + ); + } + + #[test] + fn test_into_context_req_rejects_invalid_requests() { + let mut cases = Vec::new(); + + cases.push(( + "missing metric name", + request_with_sample(vec![("job", "api")]), + "missing '__name__'", + )); + + let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]); + request.timeseries[0].labels_refs.push(1); + cases.push(( + "odd label refs", + request, + "labels_refs must contain name/value pairs", + )); + + let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]); + request.timeseries[0].labels_refs[1] = 99; + cases.push(( + "out of range symbol ref", + request, + "symbol reference 99 is out of range", + )); + + let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]); + request.symbols[0] = "not-empty".to_string(); + cases.push(( + "non-empty first symbol", + request, + "symbols must start with an empty string", + )); + + cases.push(( + "repeated label name", + request_with_sample(vec![ + (METRIC_NAME_LABEL, "metric"), + ("job", "api"), + ("job", "worker"), + ]), + "label name `job` is repeated", + )); + + cases.push(( + "empty label name", + request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("", "api")]), + "label names must not be empty", + )); + + cases.push(( + "empty label value", + request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("job", "")]), + "label `job` value must not be empty", + )); + + for (name, request, expected) in cases { + assert_invalid(name, request, expected); + } + } + + #[test] + fn test_into_context_req_ignores_histograms_and_exemplars() { + let mut request = test_util::request_with_labels_and_samples( + vec![(METRIC_NAME_LABEL, "metric")], + vec![Sample { + value: 1.0, + timestamp: 1000, + start_timestamp: 0, + }], + ); + request.timeseries[0].histograms.push(Histogram::default()); + request.timeseries[0].exemplars.push(Exemplar::default()); + + let ctx_req = request.into_context_req().unwrap(); + + assert_eq!(ctx_req.all_req().count(), 1); + } + + #[test] + fn test_into_context_req_skips_histogram_only_series() { + let mut request = + test_util::request_with_labels_and_samples(vec![(METRIC_NAME_LABEL, "metric")], vec![]); + request.timeseries[0].histograms.push(Histogram::default()); + + let ctx_req = request.into_context_req().unwrap(); + + assert_eq!(ctx_req.all_req().count(), 0); + } + + fn request_with_sample(labels: Vec<(&str, &str)>) -> Request { + test_util::request_with_labels_and_samples( + labels, + vec![Sample { + value: 1.0, + timestamp: 1000, + start_timestamp: 0, + }], + ) + } + + fn assert_invalid(name: &str, request: Request, expected: &str) { + let err = request.into_context_req().unwrap_err(); + assert!( + matches!(err, error::Error::InvalidPromRemoteRequest { .. }), + "{name}: expected invalid request error, got {err}" + ); + assert!( + err.to_string().contains(expected), + "{name}: expected error containing {expected:?}, got {err}" + ); + } +} diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index fca2c21b41..d92ebc8367 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -69,6 +69,11 @@ impl TableData { self.rows.push(Row { values }) } + #[inline] + pub fn reserve_rows(&mut self, additional: usize) { + self.rows.reserve(additional); + } + #[allow(dead_code)] pub fn columns(&self) -> &Vec { &self.schema diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index 0da9f70cb7..cd0d3e7822 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -13,23 +13,33 @@ // limitations under the License. use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use api::greptime_proto::io::prometheus::write::v2::{ + Request as RemoteWriteV2Request, Sample as RemoteWriteV2Sample, + TimeSeries as RemoteWriteV2TimeSeries, +}; use api::prom_store::remote::{ LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest, }; -use api::v1::RowInsertRequests; +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, RowInsertRequests, SemanticType}; use async_trait::async_trait; use axum::Router; +use axum::http::HeaderMap; use common_query::Output; +use common_query::prelude::{greptime_timestamp, greptime_value}; use common_test_util::ports; use datafusion_expr::LogicalPlan; use prost::Message; use query::parser::PromQuery; use query::query_engine::DescribeResult; -use servers::error::Result; +use servers::error::{self, Result}; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; -use servers::http::test_helpers::TestClient; +use servers::http::prom_store::PHYSICAL_TABLE_PARAM; +use servers::http::test_helpers::{TestClient, TestResponse}; use servers::http::{HttpOptions, HttpServerBuilder}; +use servers::prom_remote_write::v2::test_util as remote_write_v2; use servers::prom_remote_write::validation::PromValidationMode; use servers::prom_store; use servers::prom_store::{Metrics, snappy_compress}; @@ -39,24 +49,53 @@ use session::context::QueryContextRef; use sql::statements::statement::Statement; use tokio::sync::mpsc; +const REMOTE_WRITE_V2_CONTENT_TYPE: &str = + "application/x-protobuf;proto=io.prometheus.write.v2.Request"; + struct DummyInstance { - tx: mpsc::Sender<(String, Vec)>, + read_tx: mpsc::Sender<(String, Vec)>, + write_tx: mpsc::Sender, + write_calls: Arc, + fail_write_call: Option, +} + +struct RemoteWriteCapture { + schema: String, + physical_table: Option, + request: RowInsertRequests, } #[async_trait] impl PromStoreProtocolHandler for DummyInstance { async fn write( &self, - _request: RowInsertRequests, - _ctx: QueryContextRef, + request: RowInsertRequests, + ctx: QueryContextRef, _with_metric_engine: bool, ) -> Result { + let write_call = self.write_calls.fetch_add(1, Ordering::SeqCst) + 1; + if self.fail_write_call == Some(write_call) { + return error::InvalidPromRemoteRequestSnafu { + msg: "injected prometheus remote write failure".to_string(), + } + .fail(); + } + + let _ = self + .write_tx + .send(RemoteWriteCapture { + schema: ctx.current_schema(), + physical_table: ctx.extension(PHYSICAL_TABLE_PARAM).map(ToString::to_string), + request, + }) + .await; + Ok(Output::new_with_affected_rows(0)) } async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result { let _ = self - .tx + .read_tx .send((ctx.current_schema(), request.encode_to_vec())) .await; @@ -112,12 +151,33 @@ impl SqlQueryHandler for DummyInstance { } fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { + let (write_tx, _write_rx) = mpsc::channel(100); + make_test_app_with_write_capture(tx, write_tx) +} + +fn make_test_app_with_write_capture( + read_tx: mpsc::Sender<(String, Vec)>, + write_tx: mpsc::Sender, +) -> Router { + make_test_app_with_write_failure(read_tx, write_tx, None) +} + +fn make_test_app_with_write_failure( + read_tx: mpsc::Sender<(String, Vec)>, + write_tx: mpsc::Sender, + fail_write_call: Option, +) -> Router { let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), ..Default::default() }; - let instance = Arc::new(DummyInstance { tx }); + let instance = Arc::new(DummyInstance { + read_tx, + write_tx, + write_calls: Arc::new(AtomicUsize::new(0)), + fail_write_call, + }); let server = HttpServerBuilder::new(http_opts) .with_sql_handler(instance.clone()) .with_prom_handler(instance, None, true, PromValidationMode::Unchecked, None) @@ -125,6 +185,144 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { server.build(server.make_app()).unwrap() } +async fn post_remote_write_v2(client: &TestClient, request: &RemoteWriteV2Request) -> TestResponse { + post_remote_write_v2_body(client, snappy_compress(&request.encode_to_vec()).unwrap()).await +} + +async fn post_remote_write_v2_body(client: &TestClient, body: Vec) -> TestResponse { + post_remote_write_with_content_type(client, REMOTE_WRITE_V2_CONTENT_TYPE, body).await +} + +async fn post_remote_write_with_content_type( + client: &TestClient, + content_type: &str, + body: Vec, +) -> TestResponse { + post_remote_write_with_content_type_and_encoding(client, content_type, "snappy", body).await +} + +async fn post_remote_write_with_content_type_and_encoding( + client: &TestClient, + content_type: &str, + content_encoding: &str, + body: Vec, +) -> TestResponse { + client + .post("/v1/prometheus/write") + .header("content-type", content_type) + .header("content-encoding", content_encoding) + .body(body) + .send() + .await +} + +#[tokio::test] +async fn test_prometheus_remote_write_v2_decode_error_has_written_headers() { + common_telemetry::init_default_ut_logging(); + let (read_tx, _read_rx) = mpsc::channel(100); + let (write_tx, mut write_rx) = mpsc::channel(100); + + let app = make_test_app_with_write_capture(read_tx, write_tx); + let client = TestClient::new(app).await; + + let result = post_remote_write_v2_body(&client, vec![0xff, 0xff]).await; + + assert_eq!(result.status(), 400); + assert_remote_write_v2_written_headers(&result.headers(), "0"); + assert!(result.text().await.contains("error")); + assert!(write_rx.try_recv().is_err()); +} + +#[tokio::test] +async fn test_prometheus_remote_write_v2_convert_error_has_written_headers() { + common_telemetry::init_default_ut_logging(); + let (read_tx, _read_rx) = mpsc::channel(100); + let (write_tx, mut write_rx) = mpsc::channel(100); + + let app = make_test_app_with_write_capture(read_tx, write_tx); + let client = TestClient::new(app).await; + + let write_request = remote_write_v2::request_with_labels_and_samples( + vec![("job", "api")], + vec![RemoteWriteV2Sample { + value: 42.0, + timestamp: 1000, + start_timestamp: 0, + }], + ); + + let result = post_remote_write_v2(&client, &write_request).await; + + assert_eq!(result.status(), 400); + assert_remote_write_v2_written_headers(&result.headers(), "0"); + assert!(result.text().await.contains("missing '__name__'")); + assert!(write_rx.try_recv().is_err()); +} + +#[tokio::test] +async fn test_prometheus_remote_write_v2_write_error_has_partial_written_headers() { + common_telemetry::init_default_ut_logging(); + let (read_tx, _read_rx) = mpsc::channel(100); + let (write_tx, mut write_rx) = mpsc::channel(100); + + let app = make_test_app_with_write_failure(read_tx, write_tx, Some(2)); + let client = TestClient::new(app).await; + + let write_request = RemoteWriteV2Request { + symbols: vec![ + String::new(), + prom_store::METRIC_NAME_LABEL.to_string(), + "http_requests_total".to_string(), + prom_store::DATABASE_LABEL.to_string(), + "tenant_a".to_string(), + "tenant_b".to_string(), + ], + timeseries: vec![ + RemoteWriteV2TimeSeries { + labels_refs: vec![1, 2, 3, 4], + samples: vec![RemoteWriteV2Sample { + value: 42.0, + timestamp: 1000, + start_timestamp: 0, + }], + ..Default::default() + }, + RemoteWriteV2TimeSeries { + labels_refs: vec![1, 2, 3, 5], + samples: vec![RemoteWriteV2Sample { + value: 43.0, + timestamp: 2000, + start_timestamp: 0, + }], + ..Default::default() + }, + ], + }; + + let result = post_remote_write_v2(&client, &write_request).await; + + assert_eq!(result.status(), 400); + assert_remote_write_v2_written_headers(&result.headers(), "1"); + assert!( + result + .text() + .await + .contains("injected prometheus remote write failure") + ); + + let captured = write_rx.recv().await.unwrap(); + assert_eq!( + 1, + captured.request.inserts[0] + .rows + .as_ref() + .unwrap() + .rows + .len() + ); + assert!(write_rx.try_recv().is_err()); +} + #[tokio::test] async fn test_prometheus_remote_write_read() { common_telemetry::init_default_ut_logging(); @@ -220,3 +418,206 @@ async fn test_prometheus_remote_write_read() { ReadRequest::decode(&(requests[1].1)[..]).unwrap() ); } + +#[tokio::test] +async fn test_prometheus_remote_write_v2_samples() { + common_telemetry::init_default_ut_logging(); + let (read_tx, _read_rx) = mpsc::channel(100); + let (write_tx, mut write_rx) = mpsc::channel(100); + + let app = make_test_app_with_write_capture(read_tx, write_tx); + let client = TestClient::new(app).await; + + let write_request = remote_write_v2::request_with_labels_and_samples( + vec![ + (prom_store::METRIC_NAME_LABEL, "http_requests_total"), + (prom_store::DATABASE_LABEL, "tenant_a"), + (prom_store::PHYSICAL_TABLE_LABEL, "metrics_physical"), + ("job", "api"), + ], + vec![ + RemoteWriteV2Sample { + value: 42.0, + timestamp: 1000, + start_timestamp: 0, + }, + RemoteWriteV2Sample { + value: 43.0, + timestamp: 2000, + start_timestamp: 0, + }, + ], + ); + + let result = post_remote_write_v2(&client, &write_request).await; + + assert_eq!(result.status(), 204); + assert_remote_write_v2_written_headers(&result.headers(), "2"); + assert!(result.text().await.is_empty()); + + let captured = write_rx.recv().await.unwrap(); + assert_eq!("tenant_a", captured.schema); + assert_eq!( + Some("metrics_physical".to_string()), + captured.physical_table + ); + assert_eq!(1, captured.request.inserts.len()); + + let insert = &captured.request.inserts[0]; + assert_eq!("http_requests_total", insert.table_name); + let rows = insert.rows.as_ref().unwrap(); + assert_eq!( + vec![greptime_timestamp(), greptime_value(), "job"], + rows.schema + .iter() + .map(|column| column.column_name.as_str()) + .collect::>() + ); + assert_eq!( + vec![ + ColumnDataType::TimestampMillisecond as i32, + ColumnDataType::Float64 as i32, + ColumnDataType::String as i32, + ], + rows.schema + .iter() + .map(|column| column.datatype) + .collect::>() + ); + assert_eq!( + vec![ + SemanticType::Timestamp as i32, + SemanticType::Field as i32, + SemanticType::Tag as i32, + ], + rows.schema + .iter() + .map(|column| column.semantic_type) + .collect::>() + ); + assert_eq!(2, rows.rows.len()); + assert_eq!( + Some(ValueData::TimestampMillisecondValue(1000)), + rows.rows[0].values[0].value_data + ); + assert_eq!( + Some(ValueData::F64Value(42.0)), + rows.rows[0].values[1].value_data + ); + assert_eq!( + Some(ValueData::StringValue("api".to_string())), + rows.rows[0].values[2].value_data + ); + assert_eq!( + Some(ValueData::TimestampMillisecondValue(2000)), + rows.rows[1].values[0].value_data + ); + assert_eq!( + Some(ValueData::F64Value(43.0)), + rows.rows[1].values[1].value_data + ); + assert_eq!( + Some(ValueData::StringValue("api".to_string())), + rows.rows[1].values[2].value_data + ); + assert!(write_rx.try_recv().is_err()); +} + +#[tokio::test] +async fn test_prometheus_remote_write_v2_ignores_histogram_only_series() { + common_telemetry::init_default_ut_logging(); + let (read_tx, _read_rx) = mpsc::channel(100); + let (write_tx, mut write_rx) = mpsc::channel(100); + + let app = make_test_app_with_write_capture(read_tx, write_tx); + let client = TestClient::new(app).await; + + let write_request = remote_write_v2::request_with_labels_and_histograms( + vec![( + prom_store::METRIC_NAME_LABEL, + "http_request_duration_seconds", + )], + vec![remote_write_v2::histogram(1000)], + ); + + let result = post_remote_write_v2(&client, &write_request).await; + + assert_eq!(result.status(), 204); + assert_remote_write_v2_written_headers(&result.headers(), "0"); + assert!(result.text().await.is_empty()); + assert!(write_rx.try_recv().is_err()); +} + +#[tokio::test] +async fn test_prometheus_remote_write_rejects_unsupported_proto() { + common_telemetry::init_default_ut_logging(); + let (read_tx, _read_rx) = mpsc::channel(100); + let (write_tx, mut write_rx) = mpsc::channel(100); + + let app = make_test_app_with_write_capture(read_tx, write_tx); + let client = TestClient::new(app).await; + + let result = post_remote_write_with_content_type( + &client, + "application/x-protobuf;proto=io.prometheus.write.v3.Request", + Vec::new(), + ) + .await; + + assert_eq!(result.status(), 415); + assert!( + result + .text() + .await + .contains("unsupported prometheus remote write content type") + ); + assert!(write_rx.try_recv().is_err()); +} + +#[tokio::test] +async fn test_prometheus_remote_write_v2_rejects_unsupported_content_encoding() { + common_telemetry::init_default_ut_logging(); + let (read_tx, _read_rx) = mpsc::channel(100); + let (write_tx, mut write_rx) = mpsc::channel(100); + + let app = make_test_app_with_write_capture(read_tx, write_tx); + let client = TestClient::new(app).await; + + let result = post_remote_write_with_content_type_and_encoding( + &client, + REMOTE_WRITE_V2_CONTENT_TYPE, + "gzip", + Vec::new(), + ) + .await; + + assert_eq!(result.status(), 415); + assert!( + result + .text() + .await + .contains("unsupported prometheus remote write content encoding") + ); + assert!(write_rx.try_recv().is_err()); +} + +fn assert_remote_write_v2_written_headers(headers: &HeaderMap, samples: &str) { + assert_eq!( + Some(samples), + headers + .get("X-Prometheus-Remote-Write-Samples-Written") + .map(|x| x.to_str().unwrap()) + ); + assert_eq!( + Some("0"), + headers + .get("X-Prometheus-Remote-Write-Histograms-Written") + .map(|x| x.to_str().unwrap()) + ); + assert_eq!( + Some("0"), + headers + .get("X-Prometheus-Remote-Write-Exemplars-Written") + .map(|x| x.to_str().unwrap()) + ); +} diff --git a/src/table/src/requests/semantic.rs b/src/table/src/requests/semantic.rs index 54ea5d0448..33c0d53576 100644 --- a/src/table/src/requests/semantic.rs +++ b/src/table/src/requests/semantic.rs @@ -46,6 +46,8 @@ pub const SEMANTIC_PER_TABLE_INDEX_KEY: &str = "greptime.internal.semantic.per_t pub const SEMANTIC_SIGNAL_TYPE: &str = "greptime.semantic.signal_type"; /// Ingestion ecosystem, e.g. [`SOURCE_OPENTELEMETRY`] / [`SOURCE_PROMETHEUS`]. pub const SEMANTIC_SOURCE: &str = "greptime.semantic.source"; +/// Source protocol version, e.g. Prometheus remote write `1.0` / `2.0`. +pub const SEMANTIC_SOURCE_VERSION: &str = "greptime.semantic.source_version"; /// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`. The /// signal-agnostic successor to the engine-specific `table_data_model` option. pub const SEMANTIC_PIPELINE: &str = "greptime.semantic.pipeline"; @@ -103,6 +105,7 @@ pub const SEMANTIC_VALUE_MIXED: &str = "mixed"; pub const SEMANTIC_OPTION_KEYS: &[&str] = &[ SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, + SEMANTIC_SOURCE_VERSION, SEMANTIC_PIPELINE, SEMANTIC_TRACE_CONVENTIONS, SEMANTIC_METRIC_TYPE, @@ -131,6 +134,7 @@ pub fn is_semantic_option_key(key: &str) -> bool { pub fn validate_semantic_option(key: &str, value: &str) -> bool { match key { SEMANTIC_PIPELINE + | SEMANTIC_SOURCE_VERSION | SEMANTIC_METRIC_UNIT | SEMANTIC_METRIC_ORIGINAL_NAME | SEMANTIC_TRACE_CONVENTIONS => !value.is_empty(), @@ -248,6 +252,7 @@ mod tests { SOURCE_OPENTELEMETRY )); assert!(validate_semantic_option(SEMANTIC_SOURCE, SOURCE_PROMETHEUS)); + assert!(validate_semantic_option(SEMANTIC_SOURCE_VERSION, "2.0")); assert!(validate_semantic_option( SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fa46c3a0d2..5a7308acbe 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -17,12 +17,13 @@ use std::io::Write; use std::str::FromStr; use std::time::Duration; +use api::greptime_proto::io::prometheus::write::v2::Sample as RemoteWriteV2Sample; use api::prom_store::remote::label_matcher::Type as MatcherType; use api::prom_store::remote::{ Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, TimeSeries, WriteRequest, }; use auth::{UserProviderRef, user_provider_from_option}; -use axum::http::{HeaderName, HeaderValue, StatusCode}; +use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; use base64::prelude::{BASE64_STANDARD, Engine as _}; use chrono::Utc; use cmd::options::GreptimeOptions; @@ -64,6 +65,7 @@ use servers::http::result::error_result::ErrorResponse; use servers::http::result::greptime_result_v1::GreptimedbV1Response; use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::test_helpers::{TestClient, TestResponse}; +use servers::prom_remote_write::v2::test_util as remote_write_v2; use servers::prom_store::{self, mock_timeseries_new_label}; use servers::request_memory_limiter::ServerMemoryLimiter; use standalone::options::StandaloneOptions; @@ -119,6 +121,7 @@ macro_rules! http_tests { test_dashboard_path, test_dashboard_api, test_prometheus_remote_write, + test_prometheus_remote_write_v2, test_prometheus_remote_write_batched, test_prometheus_remote_special_labels, test_prometheus_remote_schema_labels, @@ -274,6 +277,27 @@ fn basic_auth(username: &str, password: &str) -> String { ) } +fn assert_remote_write_v2_written_headers(headers: &HeaderMap, samples: &str) { + assert_eq!( + Some(samples), + headers + .get("X-Prometheus-Remote-Write-Samples-Written") + .map(|x| x.to_str().unwrap()) + ); + assert_eq!( + Some("0"), + headers + .get("X-Prometheus-Remote-Write-Histograms-Written") + .map(|x| x.to_str().unwrap()) + ); + assert_eq!( + Some("0"), + headers + .get("X-Prometheus-Remote-Write-Exemplars-Written") + .map(|x| x.to_str().unwrap()) + ); +} + pub async fn test_http_auth_from_standalone_user_provider_config() { common_telemetry::init_default_ut_logging(); @@ -2268,6 +2292,106 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_prometheus_remote_write_v2(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_prom_app_with_frontend(store_type, "prometheus_remote_write_v2").await; + let client = TestClient::new(app).await; + + let write_request = remote_write_v2::request_with_labels_and_samples( + vec![ + (prom_store::METRIC_NAME_LABEL, "remote_write_v2_total"), + ("job", "api"), + ("instance", "localhost:9090"), + ], + vec![ + RemoteWriteV2Sample { + value: 42.0, + timestamp: 1000, + start_timestamp: 0, + }, + RemoteWriteV2Sample { + value: 43.0, + timestamp: 2000, + start_timestamp: 0, + }, + ], + ); + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .header( + "Content-Type", + "application/x-protobuf;proto=io.prometheus.write.v2.Request", + ) + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + let headers = res.headers(); + assert_remote_write_v2_written_headers(&headers, "2"); + assert!(res.text().await.is_empty()); + + validate_data( + "prometheus_remote_write_v2_rows", + &client, + "select greptime_timestamp, greptime_value, job, instance from remote_write_v2_total order by greptime_timestamp;", + "[[1000,42.0,\"api\",\"localhost:9090\"],[2000,43.0,\"api\",\"localhost:9090\"]]", + ) + .await; + + validate_data( + "prometheus_remote_write_v2_semantic_identity", + &client, + "select count(*) from information_schema.tables where table_name = 'remote_write_v2_total' \ + and create_options like '%greptime.semantic.signal_type=metric%' \ + and create_options like '%greptime.semantic.source=prometheus%' \ + and create_options like '%greptime.semantic.metric.metadata_quality=inferred%';", + "[[1]]", + ) + .await; + + let write_request = remote_write_v2::request_with_labels_and_histograms( + vec![( + prom_store::METRIC_NAME_LABEL, + "remote_write_v2_histogram_only", + )], + vec![remote_write_v2::histogram(3000)], + ); + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .header( + "Content-Type", + "application/x-protobuf;proto=io.prometheus.write.v2.Request", + ) + .body(compressed_request) + .send() + .await; + assert_eq!(res.status(), StatusCode::NO_CONTENT); + let headers = res.headers(); + assert_remote_write_v2_written_headers(&headers, "0"); + assert!(res.text().await.is_empty()); + + validate_data( + "prometheus_remote_write_v2_histogram_only", + &client, + "select count(*) from information_schema.tables where table_name = 'remote_write_v2_histogram_only';", + "[[0]]", + ) + .await; + + guard.remove_all().await; +} + /// Covers the batched (pending-rows-batcher) Prometheus remote write path, which /// bypasses `PromStoreProtocolHandler::write`. Verifies the metric table is created /// asynchronously and still carries the Prometheus semantic identity stamped on the @@ -2358,7 +2482,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { expected, ) .await; - let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f1'\\n)\"]]"; + let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n 'greptime.semantic.source_version' = '1.0',\\n on_physical_table = 'f1'\\n)\"]]"; validate_data( "test_prometheus_remote_special_labels_idc3_show_create_table", &client, @@ -2384,7 +2508,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) { expected, ) .await; - let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f2'\\n)\"]]"; + let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n 'greptime.semantic.source_version' = '1.0',\\n on_physical_table = 'f2'\\n)\"]]"; validate_data( "test_prometheus_remote_special_labels_idc4_show_create_table", &client, diff --git a/tests/cases/standalone/common/information_schema/table_semantics.result b/tests/cases/standalone/common/information_schema/table_semantics.result index 152b5f0068..5aa39a91fb 100644 --- a/tests/cases/standalone/common/information_schema/table_semantics.result +++ b/tests/cases/standalone/common/information_schema/table_semantics.result @@ -9,6 +9,7 @@ DESC TABLE information_schema.table_semantics; | table_id | UInt32 | | NO | | FIELD | | signal_type | String | | YES | | FIELD | | source | String | | YES | | FIELD | +| source_version | String | | YES | | FIELD | | pipeline | String | | YES | | FIELD | | metadata_quality | String | | YES | | FIELD | | semantic_options | String | | YES | | FIELD | @@ -21,6 +22,7 @@ CREATE TABLE metrics_tagged ( WITH ( 'greptime.semantic.signal_type' = 'metric', 'greptime.semantic.source' = 'opentelemetry', + 'greptime.semantic.source_version' = '2.0', 'greptime.semantic.pipeline' = 'greptime_metric_v1', 'greptime.semantic.metric.metadata_quality' = 'declared', 'greptime.semantic.metric.type' = 'counter', @@ -49,16 +51,16 @@ CREATE TABLE plain_table ( Affected Rows: 0 -SELECT table_schema, table_name, signal_type, source, pipeline, metadata_quality, semantic_options +SELECT table_schema, table_name, signal_type, source, source_version, pipeline, metadata_quality, semantic_options FROM information_schema.table_semantics ORDER BY table_name; -+--------------+----------------+-------------+---------------+--------------------+------------------+-----------------------------------------------------------------+ -| table_schema | table_name | signal_type | source | pipeline | metadata_quality | semantic_options | -+--------------+----------------+-------------+---------------+--------------------+------------------+-----------------------------------------------------------------+ -| public | metrics_tagged | metric | opentelemetry | greptime_metric_v1 | declared | {"metric.type":"counter","metric.unit":"By"} | -| public | traces_tagged | trace | opentelemetry | | | {"trace.conventions":"https://opentelemetry.io/schemas/1.27.0"} | -+--------------+----------------+-------------+---------------+--------------------+------------------+-----------------------------------------------------------------+ ++--------------+----------------+-------------+---------------+----------------+--------------------+------------------+-----------------------------------------------------------------+ +| table_schema | table_name | signal_type | source | source_version | pipeline | metadata_quality | semantic_options | ++--------------+----------------+-------------+---------------+----------------+--------------------+------------------+-----------------------------------------------------------------+ +| public | metrics_tagged | metric | opentelemetry | 2.0 | greptime_metric_v1 | declared | {"metric.type":"counter","metric.unit":"By"} | +| public | traces_tagged | trace | opentelemetry | | | | {"trace.conventions":"https://opentelemetry.io/schemas/1.27.0"} | ++--------------+----------------+-------------+---------------+----------------+--------------------+------------------+-----------------------------------------------------------------+ -- Predicate pushdown on a promoted column. SELECT table_name, signal_type diff --git a/tests/cases/standalone/common/information_schema/table_semantics.sql b/tests/cases/standalone/common/information_schema/table_semantics.sql index 3e42a5aa65..8fb7c78aef 100644 --- a/tests/cases/standalone/common/information_schema/table_semantics.sql +++ b/tests/cases/standalone/common/information_schema/table_semantics.sql @@ -7,6 +7,7 @@ CREATE TABLE metrics_tagged ( WITH ( 'greptime.semantic.signal_type' = 'metric', 'greptime.semantic.source' = 'opentelemetry', + 'greptime.semantic.source_version' = '2.0', 'greptime.semantic.pipeline' = 'greptime_metric_v1', 'greptime.semantic.metric.metadata_quality' = 'declared', 'greptime.semantic.metric.type' = 'counter', @@ -29,7 +30,7 @@ CREATE TABLE plain_table ( val DOUBLE, ); -SELECT table_schema, table_name, signal_type, source, pipeline, metadata_quality, semantic_options +SELECT table_schema, table_name, signal_type, source, source_version, pipeline, metadata_quality, semantic_options FROM information_schema.table_semantics ORDER BY table_name; diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 6be6a7ffc8..cf141afe81 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -476,11 +476,12 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | table_privileges | table_catalog | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | | | greptime | information_schema | table_privileges | table_name | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | | | greptime | information_schema | table_privileges | table_schema | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | | -| greptime | information_schema | table_semantics | metadata_quality | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | -| greptime | information_schema | table_semantics | pipeline | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | -| greptime | information_schema | table_semantics | semantic_options | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | +| greptime | information_schema | table_semantics | metadata_quality | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | +| greptime | information_schema | table_semantics | pipeline | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | +| greptime | information_schema | table_semantics | semantic_options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | | greptime | information_schema | table_semantics | signal_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | | greptime | information_schema | table_semantics | source | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | +| greptime | information_schema | table_semantics | source_version | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | | | greptime | information_schema | table_semantics | table_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | | | greptime | information_schema | table_semantics | table_id | 4 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | NO | int unsigned | | | | greptime | information_schema | table_semantics | table_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | |