feat: prw_v2 initial commit with sample ingestion (#8361)

* chore: add v2 entrance

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: decode request

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: implement remote write v2 for samples

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove hand-written proto

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: refactor

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: CR issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: CR issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: merge tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add source version field

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix CR issues

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: sqlness

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update proto

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update proto

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-06-29 11:15:41 +08:00
committed by GitHub
parent f6e3aa4a07
commit 59d40c39b1
15 changed files with 1720 additions and 69 deletions

2
Cargo.lock generated
View File

@@ -5937,7 +5937,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5e358318890606a2961e12dcdf9674f6763cec3a#5e358318890606a2961e12dcdf9674f6763cec3a"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b6b665d8b6fce4f1f0da49c8c79fb150636df4ff#b6b665d8b6fce4f1f0da49c8c79fb150636df4ff"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",

View File

@@ -158,7 +158,7 @@ fs2 = "0.4"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5e358318890606a2961e12dcdf9674f6763cec3a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6b665d8b6fce4f1f0da49c8c79fb150636df4ff" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -67,6 +67,7 @@ concern, not query-time semantics).
| --- | --- |
| `greptime.semantic.signal_type` | `trace` / `log` / `metric` / `event` |
| `greptime.semantic.source` | `opentelemetry` / `prometheus` / `influxdb` / `opentsdb` / `elasticsearch` / `loki` / `custom` |
| `greptime.semantic.source_version` | Source protocol version, e.g. Prometheus remote write `1.0` / `2.0` |
| `greptime.semantic.pipeline` | `greptime_trace_v1` (the signal-agnostic successor to `table_data_model`) |
**Trace**: `greptime.semantic.trace.conventions` (the OTel `schema_url` the rows conform to, or `mixed` / `unknown` when not single-valued).
@@ -83,7 +84,7 @@ concern, not query-time semantics).
`metadata_quality = inferred` is the load-bearing field for confidence-aware tooling: an inferred counter should be re-checked before betting on `rate()`-style semantics.
**Deliberately omitted (and why)**: `metric.monotonic` (a function of `type`); `trace.has_events`/`has_links` (constant for the v1 model and derivable from the `span_events`/`span_links` columns); `log.severity_scheme`/`log.body_format` (constant / derivable by sampling, and the latter cost an O(rows) scan); `resource.attributes_preserved`/`attributes_dropped`/`scope.preserved` (the preserved set restates columns, the dropped flag is a contentless boolean, and lineage is a collector-config concern); `source_version` (no cheap non-constant value today — Prom RW is v1-only, OTel SDK identity is deferred). Some are reserved for follow-ups (see Future Work).
**Deliberately omitted (and why)**: `metric.monotonic` (a function of `type`); `trace.has_events`/`has_links` (constant for the v1 model and derivable from the `span_events`/`span_links` columns); `log.severity_scheme`/`log.body_format` (constant / derivable by sampling, and the latter cost an O(rows) scan); `resource.attributes_preserved`/`attributes_dropped`/`scope.preserved` (the preserved set restates columns, the dropped flag is a contentless boolean, and lineage is a collector-config concern). Some are reserved for follow-ups (see Future Work).
## Conflict and update semantics

View File

@@ -19,9 +19,10 @@
//! table's `create_options`.
//!
//! The few signal-agnostic keys are promoted to their own columns
//! (`signal_type` / `source` / `pipeline` / `metadata_quality`); the remaining
//! signal-specific keys are folded into a `semantic_options` JSON string, keyed
//! by the option name with the `greptime.semantic.` prefix stripped.
//! (`signal_type` / `source` / `source_version` / `pipeline` /
//! `metadata_quality`); the remaining signal-specific keys are folded into a
//! `semantic_options` JSON string, keyed by the option name with the
//! `greptime.semantic.` prefix stripped.
use std::collections::BTreeMap;
use std::sync::{Arc, Weak};
@@ -45,7 +46,7 @@ use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableInfo;
use table::requests::{
SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_PIPELINE, SEMANTIC_PREFIX, SEMANTIC_SIGNAL_TYPE,
SEMANTIC_SOURCE, is_semantic_option_key,
SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, is_semantic_option_key,
};
use crate::CatalogManager;
@@ -60,6 +61,7 @@ pub const TABLE_NAME: &str = "table_name";
pub const TABLE_ID: &str = "table_id";
pub const SIGNAL_TYPE: &str = "signal_type";
pub const SOURCE: &str = "source";
pub const SOURCE_VERSION: &str = "source_version";
pub const PIPELINE: &str = "pipeline";
pub const METADATA_QUALITY: &str = "metadata_quality";
pub const SEMANTIC_OPTIONS: &str = "semantic_options";
@@ -75,6 +77,7 @@ fn optional_value(v: Option<&str>) -> Value {
struct SemanticRow<'a> {
signal_type: Option<&'a str>,
source: Option<&'a str>,
source_version: Option<&'a str>,
pipeline: Option<&'a str>,
metadata_quality: Option<&'a str>,
options_json: Option<String>,
@@ -86,6 +89,7 @@ impl<'a> SemanticRow<'a> {
fn extract(table_info: &'a TableInfo) -> Option<Self> {
let mut signal_type = None;
let mut source = None;
let mut source_version = None;
let mut pipeline = None;
let mut metadata_quality = None;
let mut rest = BTreeMap::new();
@@ -97,6 +101,7 @@ impl<'a> SemanticRow<'a> {
match key.as_str() {
SEMANTIC_SIGNAL_TYPE => signal_type = Some(value.as_str()),
SEMANTIC_SOURCE => source = Some(value.as_str()),
SEMANTIC_SOURCE_VERSION => source_version = Some(value.as_str()),
SEMANTIC_PIPELINE => pipeline = Some(value.as_str()),
SEMANTIC_METRIC_METADATA_QUALITY => metadata_quality = Some(value.as_str()),
_ => {
@@ -108,6 +113,7 @@ impl<'a> SemanticRow<'a> {
let has_any = signal_type.is_some()
|| source.is_some()
|| source_version.is_some()
|| pipeline.is_some()
|| metadata_quality.is_some()
|| !rest.is_empty();
@@ -125,6 +131,7 @@ impl<'a> SemanticRow<'a> {
Some(Self {
signal_type,
source,
source_version,
pipeline,
metadata_quality,
options_json,
@@ -156,6 +163,7 @@ impl InformationSchemaTableSemantics {
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(SIGNAL_TYPE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(SOURCE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(SOURCE_VERSION, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(PIPELINE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(METADATA_QUALITY, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(SEMANTIC_OPTIONS, ConcreteDataType::string_datatype(), true),
@@ -216,6 +224,7 @@ struct InformationSchemaSemanticTablesBuilder {
table_ids: UInt32VectorBuilder,
signal_types: StringVectorBuilder,
sources: StringVectorBuilder,
source_versions: StringVectorBuilder,
pipelines: StringVectorBuilder,
metadata_qualities: StringVectorBuilder,
semantic_options: StringVectorBuilder,
@@ -237,6 +246,7 @@ impl InformationSchemaSemanticTablesBuilder {
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
signal_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
sources: StringVectorBuilder::with_capacity(INIT_CAPACITY),
source_versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
pipelines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
metadata_qualities: StringVectorBuilder::with_capacity(INIT_CAPACITY),
semantic_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -279,6 +289,7 @@ impl InformationSchemaSemanticTablesBuilder {
let name_v = Value::from(table_name);
let signal_v = optional_value(row.signal_type);
let source_v = optional_value(row.source);
let source_version_v = optional_value(row.source_version);
let pipeline_v = optional_value(row.pipeline);
let quality_v = optional_value(row.metadata_quality);
let predicate_row = [
@@ -287,6 +298,7 @@ impl InformationSchemaSemanticTablesBuilder {
(TABLE_NAME, &name_v),
(SIGNAL_TYPE, &signal_v),
(SOURCE, &source_v),
(SOURCE_VERSION, &source_version_v),
(PIPELINE, &pipeline_v),
(METADATA_QUALITY, &quality_v),
];
@@ -300,6 +312,7 @@ impl InformationSchemaSemanticTablesBuilder {
self.table_ids.push(Some(table_info.table_id()));
self.signal_types.push(row.signal_type);
self.sources.push(row.source);
self.source_versions.push(row.source_version);
self.pipelines.push(row.pipeline);
self.metadata_qualities.push(row.metadata_quality);
self.semantic_options.push(row.options_json.as_deref());
@@ -313,6 +326,7 @@ impl InformationSchemaSemanticTablesBuilder {
Arc::new(self.table_ids.finish()),
Arc::new(self.signal_types.finish()),
Arc::new(self.sources.finish()),
Arc::new(self.source_versions.finish()),
Arc::new(self.pipelines.finish()),
Arc::new(self.metadata_qualities.finish()),
Arc::new(self.semantic_options.finish()),
@@ -349,7 +363,9 @@ mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use datatypes::schema::SchemaBuilder;
use table::metadata::{TableInfoBuilder, TableMeta, TableType};
use table::requests::{SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, TableOptions};
use table::requests::{
SEMANTIC_METRIC_TYPE, SEMANTIC_METRIC_UNIT, SEMANTIC_SOURCE_VERSION, TableOptions,
};
use super::*;
@@ -400,6 +416,7 @@ mod tests {
let info = table_info(&[
(SEMANTIC_SIGNAL_TYPE, "metric"),
(SEMANTIC_SOURCE, "opentelemetry"),
(SEMANTIC_SOURCE_VERSION, "2.0"),
(SEMANTIC_PIPELINE, "greptime_metric_v1"),
(SEMANTIC_METRIC_METADATA_QUALITY, "declared"),
(SEMANTIC_METRIC_TYPE, "counter"),
@@ -411,6 +428,7 @@ mod tests {
let row = SemanticRow::extract(&info).unwrap();
assert_eq!(row.signal_type, Some("metric"));
assert_eq!(row.source, Some("opentelemetry"));
assert_eq!(row.source_version, Some("2.0"));
assert_eq!(row.pipeline, Some("greptime_metric_v1"));
assert_eq!(row.metadata_quality, Some("declared"));
// Promoted keys stay out of the JSON tail; remaining keys are sorted and

View File

@@ -1462,12 +1462,13 @@ mod tests {
fn test_fill_table_options_copies_semantic_extensions() {
use table::requests::{
SEMANTIC_METRIC_TYPE, SEMANTIC_PER_TABLE_INDEX_KEY, SEMANTIC_SIGNAL_TYPE,
SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY,
SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, SIGNAL_TYPE_METRIC, SOURCE_OPENTELEMETRY,
};
let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
ctx.set_extension(SEMANTIC_SOURCE, SOURCE_OPENTELEMETRY);
ctx.set_extension(SEMANTIC_SOURCE_VERSION, "2.0");
ctx.set_extension(SEMANTIC_METRIC_TYPE, "bogus");
// The internal transport key must NOT be copied into table options.
ctx.set_extension(SEMANTIC_PER_TABLE_INDEX_KEY, "{}");
@@ -1484,6 +1485,12 @@ mod tests {
Some(SOURCE_OPENTELEMETRY),
table_options.get(SEMANTIC_SOURCE).map(String::as_str)
);
assert_eq!(
Some("2.0"),
table_options
.get(SEMANTIC_SOURCE_VERSION)
.map(String::as_str)
);
assert!(!table_options.contains_key(SEMANTIC_METRIC_TYPE));
assert!(!table_options.contains_key(SEMANTIC_PER_TABLE_INDEX_KEY));
}

View File

@@ -18,30 +18,34 @@ use api::prom_store::remote::ReadRequest;
use axum::Extension;
use axum::body::Bytes;
use axum::extract::{Query, State};
use axum::http::{HeaderValue, StatusCode, header};
use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
use axum::response::IntoResponse;
use axum_extra::TypedHeader;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing;
use hyper::HeaderMap;
use pipeline::PipelineDefinition;
use mime_guess::mime;
use pipeline::util::to_pipeline_version;
use pipeline::{ContextReq, PipelineDefinition};
use prometheus::HistogramTimer;
use prost::Message;
use serde::{Deserialize, Serialize};
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use table::requests::{
METADATA_QUALITY_INFERRED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_SIGNAL_TYPE,
SEMANTIC_SOURCE, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS,
SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS,
};
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
use crate::http::extractor::PipelineInfo;
use crate::http::header::{GREPTIME_DB_HEADER_METRICS, write_cost_header_map};
use crate::http::header::{
CONTENT_TYPE_PROTOBUF_STR, GREPTIME_DB_HEADER_METRICS, write_cost_header_map,
};
use crate::pending_rows_batcher::PendingRowsBatcher;
use crate::prom_remote_write::decode::PromSeriesProcessor;
use crate::prom_remote_write::decode_remote_write_request;
use crate::prom_remote_write::v2::{RemoteWriteV2RequestExt, decode_remote_write_v2_request};
use crate::prom_remote_write::validation::PromValidationMode;
use crate::prom_store::{extract_schema_from_read_request, snappy_decompress};
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
@@ -50,6 +54,16 @@ pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
pub const DEFAULT_ENCODING: &str = "snappy";
pub const VM_ENCODING: &str = "zstd";
pub const VM_PROTO_VERSION: &str = "1";
const REMOTE_WRITE_V1_VERSION: &str = "1.0";
const REMOTE_WRITE_V2_VERSION: &str = "2.0";
const REMOTE_WRITE_V1_PROTO: &str = "prometheus.WriteRequest";
const REMOTE_WRITE_V2_PROTO: &str = "io.prometheus.write.v2.Request";
const CONTENT_TYPE_PROTO_PARAM: &str = "proto";
const REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER: &str = "x-prometheus-remote-write-samples-written";
const REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER: &str =
"x-prometheus-remote-write-histograms-written";
const REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER: &str =
"x-prometheus-remote-write-exemplars-written";
#[derive(Clone)]
pub struct PromStoreState {
@@ -88,11 +102,41 @@ impl Default for RemoteWriteQuery {
pub async fn remote_write(
State(state): State<PromStoreState>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContext>,
Extension(query_ctx): Extension<QueryContext>,
content_type: Option<TypedHeader<headers::ContentType>>,
pipeline_info: PipelineInfo,
content_encoding: TypedHeader<headers::ContentEncoding>,
body: Bytes,
) -> Result<impl IntoResponse> {
) -> Result<axum::response::Response> {
let is_zstd = content_encoding.contains(VM_ENCODING);
match remote_write_proto(content_type) {
RemoteWriteProto::V1 => {
remote_write_v1(state, params, query_ctx, pipeline_info, is_zstd, body).await
}
RemoteWriteProto::V2 => {
if let Some(response) = unsupported_remote_write_v2_encoding_response(&content_encoding)
{
return Ok(response);
}
remote_write_v2(state, params, query_ctx, pipeline_info, is_zstd, body).await
}
RemoteWriteProto::Unsupported(content_type) => Ok((
StatusCode::UNSUPPORTED_MEDIA_TYPE,
format!("unsupported prometheus remote write content type: {content_type}"),
)
.into_response()),
}
}
async fn remote_write_v1(
state: PromStoreState,
params: RemoteWriteQuery,
query_ctx: QueryContext,
pipeline_info: PipelineInfo,
is_zstd: bool,
body: Bytes,
) -> Result<axum::response::Response> {
let PromStoreState {
prom_store_handler,
pipeline_handler,
@@ -101,30 +145,12 @@ pub async fn remote_write(
pending_rows_batcher,
} = state;
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
if let Some(response) = vm_proto_version_response(&params) {
return Ok(response);
}
let db = params.db.clone().unwrap_or_default();
query_ctx.set_channel(Channel::Prometheus);
let physical_table = params
.physical_table
.clone()
.unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string());
query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table.clone());
// Stamp the Prometheus metric identity here, before `as_req_iter` splits into the
// batched and direct write paths, so both inherit it (the batched path bypasses
// `PromStoreProtocolHandler::write`). Prom RW v1 metadata is weak, so the type is
// inferred from naming.
query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS);
query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let is_zstd = content_encoding.contains(VM_ENCODING);
let (db, query_ctx, _timer) =
prepare_remote_write_context(&params, query_ctx, REMOTE_WRITE_V1_VERSION);
let mut processor = PromSeriesProcessor::default_processor();
@@ -151,20 +177,189 @@ pub async fn remote_write(
req.as_insert_requests()
};
let outcome = write_prometheus_rows(
prom_store_handler,
pending_rows_batcher,
prom_store_with_metric_engine,
&db,
query_ctx,
req,
)
.await?;
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(outcome.write_cost),
)
.into_response())
}
async fn remote_write_v2(
state: PromStoreState,
params: RemoteWriteQuery,
query_ctx: QueryContext,
pipeline_info: PipelineInfo,
is_zstd: bool,
body: Bytes,
) -> Result<axum::response::Response> {
let PromStoreState {
prom_store_handler,
pipeline_handler: _,
prom_store_with_metric_engine,
prom_validation_mode: _,
pending_rows_batcher,
} = state;
if let Some(response) = vm_proto_version_response(&params) {
return Ok(response);
}
// Pipeline processing is not supported for remote write v2 yet. Ignore the
// optional pipeline parameter and ingest samples directly.
let _ = pipeline_info;
let (db, query_ctx, _timer) =
prepare_remote_write_context(&params, query_ctx, REMOTE_WRITE_V2_VERSION);
let request = match decode_remote_write_v2_request(is_zstd, body) {
Ok(request) => request,
Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)),
};
let req = match request.into_context_req() {
Ok(req) => req,
Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)),
};
let outcome = match write_prometheus_rows_with_progress(
prom_store_handler,
pending_rows_batcher,
prom_store_with_metric_engine,
&db,
query_ctx,
req,
)
.await
{
Ok(outcome) => outcome,
Err(error) => {
return Ok(remote_write_v2_error_response(
error.error,
error.rows_written,
0,
0,
));
}
};
let mut headers = write_cost_header_map(outcome.write_cost);
append_remote_write_v2_written_headers(&mut headers, outcome.rows_written, 0, 0);
Ok((StatusCode::NO_CONTENT, headers).into_response())
}
fn vm_proto_version_response(params: &RemoteWriteQuery) -> Option<axum::response::Response> {
params
.get_vm_proto_version
.as_ref()
.map(|_| VM_PROTO_VERSION.into_response())
}
fn prepare_remote_write_context(
params: &RemoteWriteQuery,
mut query_ctx: QueryContext,
remote_write_version: &str,
) -> (String, Arc<QueryContext>, HistogramTimer) {
let db = params.db.clone().unwrap_or_default();
query_ctx.set_channel(Channel::Prometheus);
let physical_table = params
.physical_table
.clone()
.unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string());
query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
// Stamp the Prometheus metric identity here, before `as_req_iter` splits into the
// batched and direct write paths, so both inherit it (the batched path bypasses
// `PromStoreProtocolHandler::write`). Prometheus remote-write metadata is weak
// here, so the type is inferred from naming.
query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS);
query_ctx.set_extension(SEMANTIC_SOURCE_VERSION, remote_write_version);
query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED);
let query_ctx = Arc::new(query_ctx);
let timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
(db, query_ctx, timer)
}
struct PromWriteOutcome {
write_cost: usize,
rows_written: u64,
}
struct PromWriteError {
error: error::Error,
rows_written: u64,
}
async fn write_prometheus_rows(
prom_store_handler: PromStoreProtocolHandlerRef,
pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
prom_store_with_metric_engine: bool,
db: &str,
query_ctx: Arc<QueryContext>,
req: ContextReq,
) -> Result<PromWriteOutcome> {
write_prometheus_rows_with_progress(
prom_store_handler,
pending_rows_batcher,
prom_store_with_metric_engine,
db,
query_ctx,
req,
)
.await
.map_err(|error| error.error)
}
async fn write_prometheus_rows_with_progress(
prom_store_handler: PromStoreProtocolHandlerRef,
pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
prom_store_with_metric_engine: bool,
db: &str,
query_ctx: Arc<QueryContext>,
req: ContextReq,
) -> std::result::Result<PromWriteOutcome, PromWriteError> {
if prom_store_with_metric_engine && let Some(batcher) = pending_rows_batcher {
let mut rows_written = 0;
for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
prom_store_handler
.pre_write(&reqs, temp_ctx.clone())
.await?;
let rows = batcher.submit(reqs, temp_ctx).await?;
.await
.map_err(|error| PromWriteError {
error,
rows_written,
})?;
let rows = batcher
.submit(reqs, temp_ctx)
.await
.map_err(|error| PromWriteError {
error,
rows_written,
})?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
.with_label_values(&[db.as_str()])
.with_label_values(&[db])
.inc_by(rows);
rows_written += rows;
}
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
return Ok(PromWriteOutcome {
write_cost: 0,
rows_written,
});
}
let mut cost = 0;
let mut write_cost = 0;
let mut rows_written = 0;
for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
let cnt: u64 = reqs
.inserts
@@ -173,14 +368,104 @@ pub async fn remote_write(
.sum();
let output = prom_store_handler
.write(reqs, temp_ctx, prom_store_with_metric_engine)
.await?;
.await
.map_err(|error| PromWriteError {
error,
rows_written,
})?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
.with_label_values(&[db.as_str()])
.with_label_values(&[db])
.inc_by(cnt);
cost += output.meta.cost;
write_cost += output.meta.cost;
rows_written += cnt;
}
Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response())
Ok(PromWriteOutcome {
write_cost,
rows_written,
})
}
fn remote_write_v2_error_response(
error: error::Error,
samples: u64,
histograms: u64,
exemplars: u64,
) -> axum::response::Response {
let mut response = error.into_response();
append_remote_write_v2_written_headers(response.headers_mut(), samples, histograms, exemplars);
response
}
fn append_remote_write_v2_written_headers(
headers: &mut HeaderMap,
samples: u64,
histograms: u64,
exemplars: u64,
) {
headers.insert(
REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER,
HeaderValue::from_str(&samples.to_string()).expect("u64 header value is valid"),
);
headers.insert(
REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER,
HeaderValue::from_str(&histograms.to_string()).expect("u64 header value is valid"),
);
headers.insert(
REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER,
HeaderValue::from_str(&exemplars.to_string()).expect("u64 header value is valid"),
);
}
enum RemoteWriteProto {
V1,
V2,
Unsupported(mime::Mime),
}
// ref: https://github.com/prometheus/client_golang/blob/74560058a7af7a695db8196c8e84a0754032c6af/exp/api/remote/remote_api.go#L544
fn remote_write_proto(content_type: Option<TypedHeader<headers::ContentType>>) -> RemoteWriteProto {
let Some(TypedHeader(content_type)) = content_type else {
return RemoteWriteProto::V1;
};
let mime_type: mime::Mime = content_type.into();
if !mime_type
.essence_str()
.eq_ignore_ascii_case(CONTENT_TYPE_PROTOBUF_STR)
{
return RemoteWriteProto::Unsupported(mime_type);
}
for (name, value) in mime_type.params() {
if !name.as_str().eq_ignore_ascii_case(CONTENT_TYPE_PROTO_PARAM) {
continue;
}
return match value.as_str() {
REMOTE_WRITE_V1_PROTO => RemoteWriteProto::V1,
REMOTE_WRITE_V2_PROTO => RemoteWriteProto::V2,
_ => RemoteWriteProto::Unsupported(mime_type.clone()),
};
}
RemoteWriteProto::V1
}
fn unsupported_remote_write_v2_encoding_response(
content_encoding: &headers::ContentEncoding,
) -> Option<axum::response::Response> {
if content_encoding.contains(DEFAULT_ENCODING) || content_encoding.contains(VM_ENCODING) {
return None;
}
Some((
StatusCode::UNSUPPORTED_MEDIA_TYPE,
format!(
"unsupported prometheus remote write content encoding: only {DEFAULT_ENCODING} and {VM_ENCODING} are supported"
),
)
.into_response())
}
impl IntoResponse for PromStoreResponse {
@@ -236,3 +521,167 @@ async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
}
#[cfg(test)]
mod tests {
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_query::Output;
use pipeline::GreptimePipelineParams;
use session::context::{QueryContext, QueryContextRef};
use super::*;
use crate::prom_remote_write::validation::PromValidationMode;
use crate::prom_store::Metrics;
use crate::query_handler::PromStoreProtocolHandler;
#[test]
fn test_remote_write_proto() {
assert!(matches!(
remote_write_proto(content_type(
"application/x-protobuf;proto=io.prometheus.write.v2.Request"
)),
RemoteWriteProto::V2
));
assert!(matches!(
remote_write_proto(content_type(
"application/x-protobuf; proto=\"io.prometheus.write.v2.Request\""
)),
RemoteWriteProto::V2
));
assert!(matches!(
remote_write_proto(content_type(
"APPLICATION/X-PROTOBUF;proto=io.prometheus.write.v2.Request"
)),
RemoteWriteProto::V2
));
assert!(matches!(
remote_write_proto(content_type("application/x-protobuf")),
RemoteWriteProto::V1
));
assert!(matches!(
remote_write_proto(content_type(
"application/x-protobuf;proto=prometheus.WriteRequest"
)),
RemoteWriteProto::V1
));
assert!(matches!(
remote_write_proto(content_type(
"application/x-protobuf;proto=unknown.WriteRequest"
)),
RemoteWriteProto::Unsupported(_)
));
assert!(matches!(
remote_write_proto(content_type(
"application/json;proto=io.prometheus.write.v2.Request"
)),
RemoteWriteProto::Unsupported(_)
));
assert!(matches!(remote_write_proto(None), RemoteWriteProto::V1));
}
fn content_type(value: &str) -> Option<TypedHeader<headers::ContentType>> {
Some(TypedHeader(std::str::FromStr::from_str(value).unwrap()))
}
#[test]
fn test_prepare_remote_write_context_stamps_semantics() {
let (_, query_ctx, _timer) = prepare_remote_write_context(
&RemoteWriteQuery::default(),
QueryContext::with("greptime", "public"),
REMOTE_WRITE_V2_VERSION,
);
assert_eq!(
query_ctx.extension(SEMANTIC_SIGNAL_TYPE),
Some(SIGNAL_TYPE_METRIC)
);
assert_eq!(
query_ctx.extension(SEMANTIC_SOURCE),
Some(SOURCE_PROMETHEUS)
);
assert_eq!(
query_ctx.extension(SEMANTIC_SOURCE_VERSION),
Some(REMOTE_WRITE_V2_VERSION)
);
assert_eq!(
query_ctx.extension(SEMANTIC_METRIC_METADATA_QUALITY),
Some(METADATA_QUALITY_INFERRED)
);
}
#[tokio::test]
async fn test_remote_write_v2_ignores_pipeline() {
let request = api::greptime_proto::io::prometheus::write::v2::Request {
symbols: vec![String::new()],
timeseries: Vec::new(),
};
let body =
Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap());
let response = remote_write_v2(
test_state(),
RemoteWriteQuery::default(),
QueryContext::with("greptime", "public"),
pipeline_info(Some("pipeline")),
false,
body,
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NO_CONTENT);
assert_eq!(
Some("0"),
response
.headers()
.get(REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER)
.map(|x| x.to_str().unwrap())
);
}
fn test_state() -> PromStoreState {
PromStoreState {
prom_store_handler: Arc::new(NoopPromStoreHandler),
pipeline_handler: None,
prom_store_with_metric_engine: false,
prom_validation_mode: PromValidationMode::Strict,
pending_rows_batcher: None,
}
}
fn pipeline_info(pipeline_name: Option<&str>) -> PipelineInfo {
PipelineInfo {
pipeline_name: pipeline_name.map(ToString::to_string),
pipeline_version: None,
pipeline_params: GreptimePipelineParams::default(),
}
}
struct NoopPromStoreHandler;
#[async_trait]
impl PromStoreProtocolHandler for NoopPromStoreHandler {
async fn write(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> Result<Output> {
unreachable!("empty remote write v2 request should not write")
}
async fn read(
&self,
_request: ReadRequest,
_ctx: QueryContextRef,
) -> Result<PromStoreResponse> {
unimplemented!()
}
async fn ingest_metrics(&self, _metrics: Metrics) -> Result<()> {
unimplemented!()
}
}
}

View File

@@ -20,6 +20,10 @@
pub mod decode;
pub(crate) mod row_builder;
pub(crate) mod types;
#[cfg(any(test, feature = "testing"))]
pub mod v2;
#[cfg(not(any(test, feature = "testing")))]
pub(crate) mod v2;
pub mod validation;
use bytes::Bytes;

View File

@@ -0,0 +1,633 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
#[cfg(test)]
use api::greptime_proto::io::prometheus::write::v2::{Exemplar, Histogram, Metadata, metadata};
use api::greptime_proto::io::prometheus::write::v2::{Request, Sample, TimeSeries};
use api::v1::{RowInsertRequest, Rows, Value};
use bytes::Bytes;
use common_grpc::precision::Precision;
use common_query::prelude::{greptime_timestamp, greptime_value};
use pipeline::{ContextOpt, ContextReq};
use prost::Message;
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::prom_remote_write::row_builder::PromCtx;
use crate::prom_remote_write::try_decompress;
#[allow(deprecated)]
use crate::prom_store::{
DATABASE_LABEL, DATABASE_LABEL_ALT, METRIC_NAME_LABEL, PHYSICAL_TABLE_LABEL,
PHYSICAL_TABLE_LABEL_ALT, SCHEMA_LABEL,
};
use crate::row_writer::{self, TableData};
type PromTags = Vec<(String, String)>;
type ResolvedSeriesLabels = (PromCtx, String, PromTags);
pub(crate) fn decode_remote_write_v2_request(is_zstd: bool, body: Bytes) -> Result<Request> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
// Match the v1 decoder's VictoriaMetrics fallback: some clients may send a
// mismatched content-encoding header, so try the other compression on failure.
let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
buf
} else {
try_decompress(!is_zstd, &body[..])?
};
Request::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
}
pub(crate) trait RemoteWriteV2RequestExt {
fn into_context_req(self) -> Result<ContextReq>;
}
impl RemoteWriteV2RequestExt for Request {
fn into_context_req(self) -> Result<ContextReq> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer();
let Request {
symbols,
timeseries,
} = self;
ensure!(
symbols.first().map(|s| s.as_str()) == Some(""),
error::InvalidPromRemoteRequestSnafu {
msg: "remote write v2 symbols must start with an empty string".to_string(),
}
);
let mut tables = HashMap::<PromCtx, HashMap<String, TableData>>::new();
for series in timeseries {
// Native histograms and exemplars are intentionally ignored for now.
// They will be converted into Greptime rows once their ingestion path is implemented.
let sample_count = series.samples.len();
if sample_count == 0 {
continue;
}
let (prom_ctx, table_name, tags) = resolve_series_labels(&symbols, &series)?;
let table_data = match tables.entry(prom_ctx).or_default().entry(table_name) {
Entry::Occupied(entry) => {
let table_data = entry.into_mut();
table_data.reserve_rows(sample_count);
table_data
}
Entry::Vacant(entry) => entry.insert(TableData::new(tags.len() + 2, sample_count)),
};
write_samples(table_data, series.samples, tags)?;
}
Ok(into_context_req(tables))
}
}
fn write_samples(
table_data: &mut TableData,
mut samples: Vec<Sample>,
tags: PromTags,
) -> Result<()> {
let Some(last_sample) = samples.pop() else {
return Ok(());
};
for sample in &samples {
write_sample(table_data, sample, tags.iter().cloned())?;
}
write_sample(table_data, &last_sample, tags.into_iter())
}
fn write_sample(
table_data: &mut TableData,
sample: &Sample,
tags: impl Iterator<Item = (String, String)>,
) -> Result<()> {
let mut row = table_data.alloc_one_row();
row_writer::write_ts_to_millis(
table_data,
greptime_timestamp(),
Some(sample.timestamp),
Precision::Millisecond,
&mut row,
)?;
row_writer::write_f64(table_data, greptime_value(), sample.value, &mut row)?;
row_writer::write_tags(table_data, tags, &mut row)?;
table_data.add_row(row);
Ok(())
}
fn resolve_series_labels(symbols: &[String], series: &TimeSeries) -> Result<ResolvedSeriesLabels> {
ensure!(
series.labels_refs.len().is_multiple_of(2),
error::InvalidPromRemoteRequestSnafu {
msg: "remote write v2 labels_refs must contain name/value pairs".to_string(),
}
);
let mut prom_ctx = PromCtx::default();
let mut table_name = None;
let mut tags = Vec::with_capacity(series.labels_refs.len() / 2);
let mut label_names = HashSet::with_capacity(series.labels_refs.len() / 2);
for pair in series.labels_refs.chunks_exact(2) {
let name = symbol_ref(symbols, pair[0], "label name")?;
let value = symbol_ref(symbols, pair[1], "label value")?;
validate_label(name, value)?;
ensure!(
label_names.insert(name),
error::InvalidPromRemoteRequestSnafu {
msg: format!("remote write v2 label name `{name}` is repeated"),
}
);
if name == METRIC_NAME_LABEL {
table_name = Some(value.to_string());
continue;
}
if apply_remote_write_special_label(name, value, &mut prom_ctx) {
continue;
}
tags.push((name.to_string(), value.to_string()));
}
let table_name = table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in time-series".to_string(),
})?;
Ok((prom_ctx, table_name, tags))
}
fn validate_label(name: &str, value: &str) -> Result<()> {
ensure!(
!name.is_empty(),
error::InvalidPromRemoteRequestSnafu {
msg: "remote write v2 label names must not be empty".to_string(),
}
);
ensure!(
!value.is_empty(),
error::InvalidPromRemoteRequestSnafu {
msg: format!("remote write v2 label `{name}` value must not be empty"),
}
);
Ok(())
}
fn symbol_ref<'a>(symbols: &'a [String], idx: u32, field: &str) -> Result<&'a str> {
symbols
.get(idx as usize)
.map(String::as_str)
.with_context(|| error::InvalidPromRemoteRequestSnafu {
msg: format!(
"remote write v2 {field} symbol reference {idx} is out of range, symbols len: {}",
symbols.len()
),
})
}
#[allow(deprecated)]
fn apply_remote_write_special_label(name: &str, value: &str, prom_ctx: &mut PromCtx) -> bool {
match name {
SCHEMA_LABEL => {
prom_ctx.schema = Some(value.to_string());
true
}
DATABASE_LABEL | DATABASE_LABEL_ALT => {
if prom_ctx.schema.is_none() {
prom_ctx.schema = Some(value.to_string());
}
true
}
PHYSICAL_TABLE_LABEL | PHYSICAL_TABLE_LABEL_ALT => {
prom_ctx.physical_table = Some(value.to_string());
true
}
_ => false,
}
}
fn into_context_req(tables: HashMap<PromCtx, HashMap<String, TableData>>) -> ContextReq {
let mut ctx_req = ContextReq::default();
for (prom_ctx, tables) in tables {
let mut opt = ContextOpt::default();
if let Some(schema) = prom_ctx.schema {
opt.set_schema(schema);
}
if let Some(physical_table) = prom_ctx.physical_table {
opt.set_physical_table(physical_table);
}
ctx_req.add_rows(
opt,
tables.into_iter().map(|(table_name, table_data)| {
table_data_to_row_insert_request(table_name, table_data)
}),
);
}
ctx_req
}
fn table_data_to_row_insert_request(table_name: String, table_data: TableData) -> RowInsertRequest {
let num_columns = table_data.num_columns();
let (schema, mut rows) = table_data.into_schema_and_rows();
for row in &mut rows {
if num_columns > row.values.len() {
row.values.resize(num_columns, Value { value_data: None });
}
}
RowInsertRequest {
table_name,
rows: Some(Rows { schema, rows }),
}
}
#[cfg(any(test, feature = "testing"))]
pub mod test_util {
use api::greptime_proto::io::prometheus::write::v2::{Histogram, Request, Sample, TimeSeries};
pub fn request_with_labels_and_samples(
labels: Vec<(&str, &str)>,
samples: Vec<Sample>,
) -> Request {
request_with_labels(labels, samples, Vec::new())
}
pub fn request_with_labels_and_histograms(
labels: Vec<(&str, &str)>,
histograms: Vec<Histogram>,
) -> Request {
request_with_labels(labels, Vec::new(), histograms)
}
pub fn histogram(timestamp: i64) -> Histogram {
Histogram {
timestamp,
..Default::default()
}
}
fn request_with_labels(
labels: Vec<(&str, &str)>,
samples: Vec<Sample>,
histograms: Vec<Histogram>,
) -> Request {
let mut symbols = vec!["".to_string()];
let mut labels_refs = Vec::with_capacity(labels.len() * 2);
for (name, value) in labels {
labels_refs.push(push_symbol(&mut symbols, name));
labels_refs.push(push_symbol(&mut symbols, value));
}
Request {
symbols,
timeseries: vec![TimeSeries {
labels_refs,
samples,
histograms,
exemplars: Vec::new(),
metadata: None,
}],
}
}
fn push_symbol(symbols: &mut Vec<String>, symbol: &str) -> u32 {
if let Some(idx) = symbols.iter().position(|s| s == symbol) {
return idx as u32;
}
let idx = symbols.len();
symbols.push(symbol.to_string());
idx as u32
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::value::ValueData;
use common_query::prelude::{greptime_timestamp, greptime_value};
use session::context::QueryContext;
use super::*;
use crate::error;
use crate::http::prom_store::PHYSICAL_TABLE_PARAM;
use crate::prom_store::{DATABASE_LABEL, PHYSICAL_TABLE_LABEL};
#[test]
fn test_decode_remote_write_v2_request() {
let request = Request {
symbols: vec![
"".to_string(),
"__name__".to_string(),
"http_requests_total".to_string(),
],
timeseries: vec![TimeSeries {
labels_refs: vec![1, 2],
samples: vec![Sample {
value: 42.0,
timestamp: 1000,
start_timestamp: 0,
}],
histograms: Vec::new(),
exemplars: Vec::new(),
metadata: Some(Metadata {
r#type: metadata::MetricType::Counter as i32,
help_ref: 0,
unit_ref: 0,
}),
}],
};
let body =
Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap());
let decoded = decode_remote_write_v2_request(false, body).unwrap();
assert_eq!(decoded.symbols, request.symbols);
assert_eq!(decoded.timeseries.len(), 1);
assert_eq!(decoded.timeseries[0].labels_refs, vec![1, 2]);
assert_eq!(decoded.timeseries[0].samples.len(), 1);
assert_eq!(decoded.timeseries[0].samples[0].value, 42.0);
assert_eq!(decoded.timeseries[0].metadata.as_ref().unwrap().r#type, 1);
}
#[test]
fn test_into_context_req_samples() {
let ctx_req = test_util::request_with_labels_and_samples(
vec![
(METRIC_NAME_LABEL, "http_requests_total"),
("job", "api"),
("instance", "localhost:9090"),
],
vec![
Sample {
value: 42.0,
timestamp: 1000,
start_timestamp: 0,
},
Sample {
value: 43.0,
timestamp: 2000,
start_timestamp: 0,
},
],
)
.into_context_req()
.unwrap();
let mut inserts = ctx_req.all_req().collect::<Vec<_>>();
assert_eq!(inserts.len(), 1);
let request = inserts.pop().unwrap();
assert_eq!(request.table_name, "http_requests_total");
let rows = request.rows.unwrap();
assert_eq!(rows.rows.len(), 2);
assert_eq!(
rows.schema
.iter()
.map(|col| col.column_name.as_str())
.collect::<Vec<_>>(),
vec![greptime_timestamp(), greptime_value(), "job", "instance"]
);
assert_eq!(
rows.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1000))
);
assert_eq!(
rows.rows[0].values[1].value_data,
Some(ValueData::F64Value(42.0))
);
assert_eq!(
rows.rows[0].values[2].value_data,
Some(ValueData::StringValue("api".to_string()))
);
assert_eq!(
rows.rows[0].values[3].value_data,
Some(ValueData::StringValue("localhost:9090".to_string()))
);
assert_eq!(
rows.rows[1].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(2000))
);
assert_eq!(
rows.rows[1].values[1].value_data,
Some(ValueData::F64Value(43.0))
);
}
#[test]
fn test_into_context_req_accepts_utf8_label_names() {
let ctx_req = test_util::request_with_labels_and_samples(
vec![
(METRIC_NAME_LABEL, "http_requests_total"),
("service.name", "api"),
("区域", "华东"),
],
vec![Sample {
value: 42.0,
timestamp: 1000,
start_timestamp: 0,
}],
)
.into_context_req()
.unwrap();
let mut inserts = ctx_req.all_req().collect::<Vec<_>>();
assert_eq!(inserts.len(), 1);
let rows = inserts.pop().unwrap().rows.unwrap();
assert_eq!(
rows.schema
.iter()
.map(|col| col.column_name.as_str())
.collect::<Vec<_>>(),
vec![
greptime_timestamp(),
greptime_value(),
"service.name",
"区域"
]
);
assert_eq!(
rows.rows[0].values[2].value_data,
Some(ValueData::StringValue("api".to_string()))
);
assert_eq!(
rows.rows[0].values[3].value_data,
Some(ValueData::StringValue("华东".to_string()))
);
}
#[test]
fn test_into_context_req_special_labels() {
let ctx_req = test_util::request_with_labels_and_samples(
vec![
(METRIC_NAME_LABEL, "cpu_usage"),
(DATABASE_LABEL, "tenant_a"),
(PHYSICAL_TABLE_LABEL, "metrics_physical"),
("job", "api"),
],
vec![Sample {
value: 1.0,
timestamp: 1000,
start_timestamp: 0,
}],
)
.into_context_req()
.unwrap();
let mut iter = ctx_req.as_req_iter(Arc::new(QueryContext::with("greptime", "public")));
let (ctx, reqs) = iter.next().unwrap();
assert!(iter.next().is_none());
assert_eq!(ctx.current_schema(), "tenant_a");
assert_eq!(
ctx.extension(PHYSICAL_TABLE_PARAM),
Some("metrics_physical")
);
assert_eq!(reqs.inserts.len(), 1);
let rows = reqs.inserts[0].rows.as_ref().unwrap();
assert_eq!(
rows.schema
.iter()
.map(|col| col.column_name.as_str())
.collect::<Vec<_>>(),
vec![greptime_timestamp(), greptime_value(), "job"]
);
}
#[test]
fn test_into_context_req_rejects_invalid_requests() {
let mut cases = Vec::new();
cases.push((
"missing metric name",
request_with_sample(vec![("job", "api")]),
"missing '__name__'",
));
let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
request.timeseries[0].labels_refs.push(1);
cases.push((
"odd label refs",
request,
"labels_refs must contain name/value pairs",
));
let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
request.timeseries[0].labels_refs[1] = 99;
cases.push((
"out of range symbol ref",
request,
"symbol reference 99 is out of range",
));
let mut request = request_with_sample(vec![(METRIC_NAME_LABEL, "metric")]);
request.symbols[0] = "not-empty".to_string();
cases.push((
"non-empty first symbol",
request,
"symbols must start with an empty string",
));
cases.push((
"repeated label name",
request_with_sample(vec![
(METRIC_NAME_LABEL, "metric"),
("job", "api"),
("job", "worker"),
]),
"label name `job` is repeated",
));
cases.push((
"empty label name",
request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("", "api")]),
"label names must not be empty",
));
cases.push((
"empty label value",
request_with_sample(vec![(METRIC_NAME_LABEL, "metric"), ("job", "")]),
"label `job` value must not be empty",
));
for (name, request, expected) in cases {
assert_invalid(name, request, expected);
}
}
#[test]
fn test_into_context_req_ignores_histograms_and_exemplars() {
let mut request = test_util::request_with_labels_and_samples(
vec![(METRIC_NAME_LABEL, "metric")],
vec![Sample {
value: 1.0,
timestamp: 1000,
start_timestamp: 0,
}],
);
request.timeseries[0].histograms.push(Histogram::default());
request.timeseries[0].exemplars.push(Exemplar::default());
let ctx_req = request.into_context_req().unwrap();
assert_eq!(ctx_req.all_req().count(), 1);
}
#[test]
fn test_into_context_req_skips_histogram_only_series() {
let mut request =
test_util::request_with_labels_and_samples(vec![(METRIC_NAME_LABEL, "metric")], vec![]);
request.timeseries[0].histograms.push(Histogram::default());
let ctx_req = request.into_context_req().unwrap();
assert_eq!(ctx_req.all_req().count(), 0);
}
fn request_with_sample(labels: Vec<(&str, &str)>) -> Request {
test_util::request_with_labels_and_samples(
labels,
vec![Sample {
value: 1.0,
timestamp: 1000,
start_timestamp: 0,
}],
)
}
fn assert_invalid(name: &str, request: Request, expected: &str) {
let err = request.into_context_req().unwrap_err();
assert!(
matches!(err, error::Error::InvalidPromRemoteRequest { .. }),
"{name}: expected invalid request error, got {err}"
);
assert!(
err.to_string().contains(expected),
"{name}: expected error containing {expected:?}, got {err}"
);
}
}

View File

@@ -69,6 +69,11 @@ impl TableData {
self.rows.push(Row { values })
}
#[inline]
pub fn reserve_rows(&mut self, additional: usize) {
self.rows.reserve(additional);
}
#[allow(dead_code)]
pub fn columns(&self) -> &Vec<ColumnSchema> {
&self.schema

View File

@@ -13,23 +13,33 @@
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use api::greptime_proto::io::prometheus::write::v2::{
Request as RemoteWriteV2Request, Sample as RemoteWriteV2Sample,
TimeSeries as RemoteWriteV2TimeSeries,
};
use api::prom_store::remote::{
LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest,
};
use api::v1::RowInsertRequests;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests, SemanticType};
use async_trait::async_trait;
use axum::Router;
use axum::http::HeaderMap;
use common_query::Output;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_test_util::ports;
use datafusion_expr::LogicalPlan;
use prost::Message;
use query::parser::PromQuery;
use query::query_engine::DescribeResult;
use servers::error::Result;
use servers::error::{self, Result};
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::test_helpers::TestClient;
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::prom_remote_write::v2::test_util as remote_write_v2;
use servers::prom_remote_write::validation::PromValidationMode;
use servers::prom_store;
use servers::prom_store::{Metrics, snappy_compress};
@@ -39,24 +49,53 @@ use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
const REMOTE_WRITE_V2_CONTENT_TYPE: &str =
"application/x-protobuf;proto=io.prometheus.write.v2.Request";
struct DummyInstance {
tx: mpsc::Sender<(String, Vec<u8>)>,
read_tx: mpsc::Sender<(String, Vec<u8>)>,
write_tx: mpsc::Sender<RemoteWriteCapture>,
write_calls: Arc<AtomicUsize>,
fail_write_call: Option<usize>,
}
struct RemoteWriteCapture {
schema: String,
physical_table: Option<String>,
request: RowInsertRequests,
}
#[async_trait]
impl PromStoreProtocolHandler for DummyInstance {
async fn write(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
request: RowInsertRequests,
ctx: QueryContextRef,
_with_metric_engine: bool,
) -> Result<Output> {
let write_call = self.write_calls.fetch_add(1, Ordering::SeqCst) + 1;
if self.fail_write_call == Some(write_call) {
return error::InvalidPromRemoteRequestSnafu {
msg: "injected prometheus remote write failure".to_string(),
}
.fail();
}
let _ = self
.write_tx
.send(RemoteWriteCapture {
schema: ctx.current_schema(),
physical_table: ctx.extension(PHYSICAL_TABLE_PARAM).map(ToString::to_string),
request,
})
.await;
Ok(Output::new_with_affected_rows(0))
}
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse> {
let _ = self
.tx
.read_tx
.send((ctx.current_schema(), request.encode_to_vec()))
.await;
@@ -112,12 +151,33 @@ impl SqlQueryHandler for DummyInstance {
}
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let (write_tx, _write_rx) = mpsc::channel(100);
make_test_app_with_write_capture(tx, write_tx)
}
fn make_test_app_with_write_capture(
read_tx: mpsc::Sender<(String, Vec<u8>)>,
write_tx: mpsc::Sender<RemoteWriteCapture>,
) -> Router {
make_test_app_with_write_failure(read_tx, write_tx, None)
}
fn make_test_app_with_write_failure(
read_tx: mpsc::Sender<(String, Vec<u8>)>,
write_tx: mpsc::Sender<RemoteWriteCapture>,
fail_write_call: Option<usize>,
) -> Router {
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let instance = Arc::new(DummyInstance { tx });
let instance = Arc::new(DummyInstance {
read_tx,
write_tx,
write_calls: Arc::new(AtomicUsize::new(0)),
fail_write_call,
});
let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(instance.clone())
.with_prom_handler(instance, None, true, PromValidationMode::Unchecked, None)
@@ -125,6 +185,144 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
server.build(server.make_app()).unwrap()
}
async fn post_remote_write_v2(client: &TestClient, request: &RemoteWriteV2Request) -> TestResponse {
post_remote_write_v2_body(client, snappy_compress(&request.encode_to_vec()).unwrap()).await
}
async fn post_remote_write_v2_body(client: &TestClient, body: Vec<u8>) -> TestResponse {
post_remote_write_with_content_type(client, REMOTE_WRITE_V2_CONTENT_TYPE, body).await
}
async fn post_remote_write_with_content_type(
client: &TestClient,
content_type: &str,
body: Vec<u8>,
) -> TestResponse {
post_remote_write_with_content_type_and_encoding(client, content_type, "snappy", body).await
}
async fn post_remote_write_with_content_type_and_encoding(
client: &TestClient,
content_type: &str,
content_encoding: &str,
body: Vec<u8>,
) -> TestResponse {
client
.post("/v1/prometheus/write")
.header("content-type", content_type)
.header("content-encoding", content_encoding)
.body(body)
.send()
.await
}
#[tokio::test]
async fn test_prometheus_remote_write_v2_decode_error_has_written_headers() {
common_telemetry::init_default_ut_logging();
let (read_tx, _read_rx) = mpsc::channel(100);
let (write_tx, mut write_rx) = mpsc::channel(100);
let app = make_test_app_with_write_capture(read_tx, write_tx);
let client = TestClient::new(app).await;
let result = post_remote_write_v2_body(&client, vec![0xff, 0xff]).await;
assert_eq!(result.status(), 400);
assert_remote_write_v2_written_headers(&result.headers(), "0");
assert!(result.text().await.contains("error"));
assert!(write_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_prometheus_remote_write_v2_convert_error_has_written_headers() {
common_telemetry::init_default_ut_logging();
let (read_tx, _read_rx) = mpsc::channel(100);
let (write_tx, mut write_rx) = mpsc::channel(100);
let app = make_test_app_with_write_capture(read_tx, write_tx);
let client = TestClient::new(app).await;
let write_request = remote_write_v2::request_with_labels_and_samples(
vec![("job", "api")],
vec![RemoteWriteV2Sample {
value: 42.0,
timestamp: 1000,
start_timestamp: 0,
}],
);
let result = post_remote_write_v2(&client, &write_request).await;
assert_eq!(result.status(), 400);
assert_remote_write_v2_written_headers(&result.headers(), "0");
assert!(result.text().await.contains("missing '__name__'"));
assert!(write_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_prometheus_remote_write_v2_write_error_has_partial_written_headers() {
common_telemetry::init_default_ut_logging();
let (read_tx, _read_rx) = mpsc::channel(100);
let (write_tx, mut write_rx) = mpsc::channel(100);
let app = make_test_app_with_write_failure(read_tx, write_tx, Some(2));
let client = TestClient::new(app).await;
let write_request = RemoteWriteV2Request {
symbols: vec![
String::new(),
prom_store::METRIC_NAME_LABEL.to_string(),
"http_requests_total".to_string(),
prom_store::DATABASE_LABEL.to_string(),
"tenant_a".to_string(),
"tenant_b".to_string(),
],
timeseries: vec![
RemoteWriteV2TimeSeries {
labels_refs: vec![1, 2, 3, 4],
samples: vec![RemoteWriteV2Sample {
value: 42.0,
timestamp: 1000,
start_timestamp: 0,
}],
..Default::default()
},
RemoteWriteV2TimeSeries {
labels_refs: vec![1, 2, 3, 5],
samples: vec![RemoteWriteV2Sample {
value: 43.0,
timestamp: 2000,
start_timestamp: 0,
}],
..Default::default()
},
],
};
let result = post_remote_write_v2(&client, &write_request).await;
assert_eq!(result.status(), 400);
assert_remote_write_v2_written_headers(&result.headers(), "1");
assert!(
result
.text()
.await
.contains("injected prometheus remote write failure")
);
let captured = write_rx.recv().await.unwrap();
assert_eq!(
1,
captured.request.inserts[0]
.rows
.as_ref()
.unwrap()
.rows
.len()
);
assert!(write_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_prometheus_remote_write_read() {
common_telemetry::init_default_ut_logging();
@@ -220,3 +418,206 @@ async fn test_prometheus_remote_write_read() {
ReadRequest::decode(&(requests[1].1)[..]).unwrap()
);
}
#[tokio::test]
async fn test_prometheus_remote_write_v2_samples() {
common_telemetry::init_default_ut_logging();
let (read_tx, _read_rx) = mpsc::channel(100);
let (write_tx, mut write_rx) = mpsc::channel(100);
let app = make_test_app_with_write_capture(read_tx, write_tx);
let client = TestClient::new(app).await;
let write_request = remote_write_v2::request_with_labels_and_samples(
vec![
(prom_store::METRIC_NAME_LABEL, "http_requests_total"),
(prom_store::DATABASE_LABEL, "tenant_a"),
(prom_store::PHYSICAL_TABLE_LABEL, "metrics_physical"),
("job", "api"),
],
vec![
RemoteWriteV2Sample {
value: 42.0,
timestamp: 1000,
start_timestamp: 0,
},
RemoteWriteV2Sample {
value: 43.0,
timestamp: 2000,
start_timestamp: 0,
},
],
);
let result = post_remote_write_v2(&client, &write_request).await;
assert_eq!(result.status(), 204);
assert_remote_write_v2_written_headers(&result.headers(), "2");
assert!(result.text().await.is_empty());
let captured = write_rx.recv().await.unwrap();
assert_eq!("tenant_a", captured.schema);
assert_eq!(
Some("metrics_physical".to_string()),
captured.physical_table
);
assert_eq!(1, captured.request.inserts.len());
let insert = &captured.request.inserts[0];
assert_eq!("http_requests_total", insert.table_name);
let rows = insert.rows.as_ref().unwrap();
assert_eq!(
vec![greptime_timestamp(), greptime_value(), "job"],
rows.schema
.iter()
.map(|column| column.column_name.as_str())
.collect::<Vec<_>>()
);
assert_eq!(
vec![
ColumnDataType::TimestampMillisecond as i32,
ColumnDataType::Float64 as i32,
ColumnDataType::String as i32,
],
rows.schema
.iter()
.map(|column| column.datatype)
.collect::<Vec<_>>()
);
assert_eq!(
vec![
SemanticType::Timestamp as i32,
SemanticType::Field as i32,
SemanticType::Tag as i32,
],
rows.schema
.iter()
.map(|column| column.semantic_type)
.collect::<Vec<_>>()
);
assert_eq!(2, rows.rows.len());
assert_eq!(
Some(ValueData::TimestampMillisecondValue(1000)),
rows.rows[0].values[0].value_data
);
assert_eq!(
Some(ValueData::F64Value(42.0)),
rows.rows[0].values[1].value_data
);
assert_eq!(
Some(ValueData::StringValue("api".to_string())),
rows.rows[0].values[2].value_data
);
assert_eq!(
Some(ValueData::TimestampMillisecondValue(2000)),
rows.rows[1].values[0].value_data
);
assert_eq!(
Some(ValueData::F64Value(43.0)),
rows.rows[1].values[1].value_data
);
assert_eq!(
Some(ValueData::StringValue("api".to_string())),
rows.rows[1].values[2].value_data
);
assert!(write_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_prometheus_remote_write_v2_ignores_histogram_only_series() {
common_telemetry::init_default_ut_logging();
let (read_tx, _read_rx) = mpsc::channel(100);
let (write_tx, mut write_rx) = mpsc::channel(100);
let app = make_test_app_with_write_capture(read_tx, write_tx);
let client = TestClient::new(app).await;
let write_request = remote_write_v2::request_with_labels_and_histograms(
vec![(
prom_store::METRIC_NAME_LABEL,
"http_request_duration_seconds",
)],
vec![remote_write_v2::histogram(1000)],
);
let result = post_remote_write_v2(&client, &write_request).await;
assert_eq!(result.status(), 204);
assert_remote_write_v2_written_headers(&result.headers(), "0");
assert!(result.text().await.is_empty());
assert!(write_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_prometheus_remote_write_rejects_unsupported_proto() {
common_telemetry::init_default_ut_logging();
let (read_tx, _read_rx) = mpsc::channel(100);
let (write_tx, mut write_rx) = mpsc::channel(100);
let app = make_test_app_with_write_capture(read_tx, write_tx);
let client = TestClient::new(app).await;
let result = post_remote_write_with_content_type(
&client,
"application/x-protobuf;proto=io.prometheus.write.v3.Request",
Vec::new(),
)
.await;
assert_eq!(result.status(), 415);
assert!(
result
.text()
.await
.contains("unsupported prometheus remote write content type")
);
assert!(write_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_prometheus_remote_write_v2_rejects_unsupported_content_encoding() {
common_telemetry::init_default_ut_logging();
let (read_tx, _read_rx) = mpsc::channel(100);
let (write_tx, mut write_rx) = mpsc::channel(100);
let app = make_test_app_with_write_capture(read_tx, write_tx);
let client = TestClient::new(app).await;
let result = post_remote_write_with_content_type_and_encoding(
&client,
REMOTE_WRITE_V2_CONTENT_TYPE,
"gzip",
Vec::new(),
)
.await;
assert_eq!(result.status(), 415);
assert!(
result
.text()
.await
.contains("unsupported prometheus remote write content encoding")
);
assert!(write_rx.try_recv().is_err());
}
fn assert_remote_write_v2_written_headers(headers: &HeaderMap, samples: &str) {
assert_eq!(
Some(samples),
headers
.get("X-Prometheus-Remote-Write-Samples-Written")
.map(|x| x.to_str().unwrap())
);
assert_eq!(
Some("0"),
headers
.get("X-Prometheus-Remote-Write-Histograms-Written")
.map(|x| x.to_str().unwrap())
);
assert_eq!(
Some("0"),
headers
.get("X-Prometheus-Remote-Write-Exemplars-Written")
.map(|x| x.to_str().unwrap())
);
}

View File

@@ -46,6 +46,8 @@ pub const SEMANTIC_PER_TABLE_INDEX_KEY: &str = "greptime.internal.semantic.per_t
pub const SEMANTIC_SIGNAL_TYPE: &str = "greptime.semantic.signal_type";
/// Ingestion ecosystem, e.g. [`SOURCE_OPENTELEMETRY`] / [`SOURCE_PROMETHEUS`].
pub const SEMANTIC_SOURCE: &str = "greptime.semantic.source";
/// Source protocol version, e.g. Prometheus remote write `1.0` / `2.0`.
pub const SEMANTIC_SOURCE_VERSION: &str = "greptime.semantic.source_version";
/// Internal ingestion pipeline / data model, e.g. `greptime_trace_v1`. The
/// signal-agnostic successor to the engine-specific `table_data_model` option.
pub const SEMANTIC_PIPELINE: &str = "greptime.semantic.pipeline";
@@ -103,6 +105,7 @@ pub const SEMANTIC_VALUE_MIXED: &str = "mixed";
pub const SEMANTIC_OPTION_KEYS: &[&str] = &[
SEMANTIC_SIGNAL_TYPE,
SEMANTIC_SOURCE,
SEMANTIC_SOURCE_VERSION,
SEMANTIC_PIPELINE,
SEMANTIC_TRACE_CONVENTIONS,
SEMANTIC_METRIC_TYPE,
@@ -131,6 +134,7 @@ pub fn is_semantic_option_key(key: &str) -> bool {
pub fn validate_semantic_option(key: &str, value: &str) -> bool {
match key {
SEMANTIC_PIPELINE
| SEMANTIC_SOURCE_VERSION
| SEMANTIC_METRIC_UNIT
| SEMANTIC_METRIC_ORIGINAL_NAME
| SEMANTIC_TRACE_CONVENTIONS => !value.is_empty(),
@@ -248,6 +252,7 @@ mod tests {
SOURCE_OPENTELEMETRY
));
assert!(validate_semantic_option(SEMANTIC_SOURCE, SOURCE_PROMETHEUS));
assert!(validate_semantic_option(SEMANTIC_SOURCE_VERSION, "2.0"));
assert!(validate_semantic_option(
SEMANTIC_METRIC_METADATA_QUALITY,
METADATA_QUALITY_INFERRED

View File

@@ -17,12 +17,13 @@ use std::io::Write;
use std::str::FromStr;
use std::time::Duration;
use api::greptime_proto::io::prometheus::write::v2::Sample as RemoteWriteV2Sample;
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{
Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, TimeSeries, WriteRequest,
};
use auth::{UserProviderRef, user_provider_from_option};
use axum::http::{HeaderName, HeaderValue, StatusCode};
use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use base64::prelude::{BASE64_STANDARD, Engine as _};
use chrono::Utc;
use cmd::options::GreptimeOptions;
@@ -64,6 +65,7 @@ use servers::http::result::error_result::ErrorResponse;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::prom_remote_write::v2::test_util as remote_write_v2;
use servers::prom_store::{self, mock_timeseries_new_label};
use servers::request_memory_limiter::ServerMemoryLimiter;
use standalone::options::StandaloneOptions;
@@ -119,6 +121,7 @@ macro_rules! http_tests {
test_dashboard_path,
test_dashboard_api,
test_prometheus_remote_write,
test_prometheus_remote_write_v2,
test_prometheus_remote_write_batched,
test_prometheus_remote_special_labels,
test_prometheus_remote_schema_labels,
@@ -274,6 +277,27 @@ fn basic_auth(username: &str, password: &str) -> String {
)
}
fn assert_remote_write_v2_written_headers(headers: &HeaderMap, samples: &str) {
assert_eq!(
Some(samples),
headers
.get("X-Prometheus-Remote-Write-Samples-Written")
.map(|x| x.to_str().unwrap())
);
assert_eq!(
Some("0"),
headers
.get("X-Prometheus-Remote-Write-Histograms-Written")
.map(|x| x.to_str().unwrap())
);
assert_eq!(
Some("0"),
headers
.get("X-Prometheus-Remote-Write-Exemplars-Written")
.map(|x| x.to_str().unwrap())
);
}
pub async fn test_http_auth_from_standalone_user_provider_config() {
common_telemetry::init_default_ut_logging();
@@ -2268,6 +2292,106 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_prometheus_remote_write_v2(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_prom_app_with_frontend(store_type, "prometheus_remote_write_v2").await;
let client = TestClient::new(app).await;
let write_request = remote_write_v2::request_with_labels_and_samples(
vec![
(prom_store::METRIC_NAME_LABEL, "remote_write_v2_total"),
("job", "api"),
("instance", "localhost:9090"),
],
vec![
RemoteWriteV2Sample {
value: 42.0,
timestamp: 1000,
start_timestamp: 0,
},
RemoteWriteV2Sample {
value: 43.0,
timestamp: 2000,
start_timestamp: 0,
},
],
);
let serialized_request = write_request.encode_to_vec();
let compressed_request =
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.header(
"Content-Type",
"application/x-protobuf;proto=io.prometheus.write.v2.Request",
)
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);
let headers = res.headers();
assert_remote_write_v2_written_headers(&headers, "2");
assert!(res.text().await.is_empty());
validate_data(
"prometheus_remote_write_v2_rows",
&client,
"select greptime_timestamp, greptime_value, job, instance from remote_write_v2_total order by greptime_timestamp;",
"[[1000,42.0,\"api\",\"localhost:9090\"],[2000,43.0,\"api\",\"localhost:9090\"]]",
)
.await;
validate_data(
"prometheus_remote_write_v2_semantic_identity",
&client,
"select count(*) from information_schema.tables where table_name = 'remote_write_v2_total' \
and create_options like '%greptime.semantic.signal_type=metric%' \
and create_options like '%greptime.semantic.source=prometheus%' \
and create_options like '%greptime.semantic.metric.metadata_quality=inferred%';",
"[[1]]",
)
.await;
let write_request = remote_write_v2::request_with_labels_and_histograms(
vec![(
prom_store::METRIC_NAME_LABEL,
"remote_write_v2_histogram_only",
)],
vec![remote_write_v2::histogram(3000)],
);
let serialized_request = write_request.encode_to_vec();
let compressed_request =
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.header(
"Content-Type",
"application/x-protobuf;proto=io.prometheus.write.v2.Request",
)
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);
let headers = res.headers();
assert_remote_write_v2_written_headers(&headers, "0");
assert!(res.text().await.is_empty());
validate_data(
"prometheus_remote_write_v2_histogram_only",
&client,
"select count(*) from information_schema.tables where table_name = 'remote_write_v2_histogram_only';",
"[[0]]",
)
.await;
guard.remove_all().await;
}
/// Covers the batched (pending-rows-batcher) Prometheus remote write path, which
/// bypasses `PromStoreProtocolHandler::write`. Verifies the metric table is created
/// asynchronously and still carries the Prometheus semantic identity stamped on the
@@ -2358,7 +2482,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
expected,
)
.await;
let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f1'\\n)\"]]";
let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n 'greptime.semantic.source_version' = '1.0',\\n on_physical_table = 'f1'\\n)\"]]";
validate_data(
"test_prometheus_remote_special_labels_idc3_show_create_table",
&client,
@@ -2384,7 +2508,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
expected,
)
.await;
let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n on_physical_table = 'f2'\\n)\"]]";
let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n 'greptime.semantic.metric.metadata_quality' = 'inferred',\\n 'greptime.semantic.signal_type' = 'metric',\\n 'greptime.semantic.source' = 'prometheus',\\n 'greptime.semantic.source_version' = '1.0',\\n on_physical_table = 'f2'\\n)\"]]";
validate_data(
"test_prometheus_remote_special_labels_idc4_show_create_table",
&client,

View File

@@ -9,6 +9,7 @@ DESC TABLE information_schema.table_semantics;
| table_id | UInt32 | | NO | | FIELD |
| signal_type | String | | YES | | FIELD |
| source | String | | YES | | FIELD |
| source_version | String | | YES | | FIELD |
| pipeline | String | | YES | | FIELD |
| metadata_quality | String | | YES | | FIELD |
| semantic_options | String | | YES | | FIELD |
@@ -21,6 +22,7 @@ CREATE TABLE metrics_tagged (
WITH (
'greptime.semantic.signal_type' = 'metric',
'greptime.semantic.source' = 'opentelemetry',
'greptime.semantic.source_version' = '2.0',
'greptime.semantic.pipeline' = 'greptime_metric_v1',
'greptime.semantic.metric.metadata_quality' = 'declared',
'greptime.semantic.metric.type' = 'counter',
@@ -49,16 +51,16 @@ CREATE TABLE plain_table (
Affected Rows: 0
SELECT table_schema, table_name, signal_type, source, pipeline, metadata_quality, semantic_options
SELECT table_schema, table_name, signal_type, source, source_version, pipeline, metadata_quality, semantic_options
FROM information_schema.table_semantics
ORDER BY table_name;
+--------------+----------------+-------------+---------------+--------------------+------------------+-----------------------------------------------------------------+
| table_schema | table_name | signal_type | source | pipeline | metadata_quality | semantic_options |
+--------------+----------------+-------------+---------------+--------------------+------------------+-----------------------------------------------------------------+
| public | metrics_tagged | metric | opentelemetry | greptime_metric_v1 | declared | {"metric.type":"counter","metric.unit":"By"} |
| public | traces_tagged | trace | opentelemetry | | | {"trace.conventions":"https://opentelemetry.io/schemas/1.27.0"} |
+--------------+----------------+-------------+---------------+--------------------+------------------+-----------------------------------------------------------------+
+--------------+----------------+-------------+---------------+----------------+--------------------+------------------+-----------------------------------------------------------------+
| table_schema | table_name | signal_type | source | source_version | pipeline | metadata_quality | semantic_options |
+--------------+----------------+-------------+---------------+----------------+--------------------+------------------+-----------------------------------------------------------------+
| public | metrics_tagged | metric | opentelemetry | 2.0 | greptime_metric_v1 | declared | {"metric.type":"counter","metric.unit":"By"} |
| public | traces_tagged | trace | opentelemetry | | | | {"trace.conventions":"https://opentelemetry.io/schemas/1.27.0"} |
+--------------+----------------+-------------+---------------+----------------+--------------------+------------------+-----------------------------------------------------------------+
-- Predicate pushdown on a promoted column.
SELECT table_name, signal_type

View File

@@ -7,6 +7,7 @@ CREATE TABLE metrics_tagged (
WITH (
'greptime.semantic.signal_type' = 'metric',
'greptime.semantic.source' = 'opentelemetry',
'greptime.semantic.source_version' = '2.0',
'greptime.semantic.pipeline' = 'greptime_metric_v1',
'greptime.semantic.metric.metadata_quality' = 'declared',
'greptime.semantic.metric.type' = 'counter',
@@ -29,7 +30,7 @@ CREATE TABLE plain_table (
val DOUBLE,
);
SELECT table_schema, table_name, signal_type, source, pipeline, metadata_quality, semantic_options
SELECT table_schema, table_name, signal_type, source, source_version, pipeline, metadata_quality, semantic_options
FROM information_schema.table_semantics
ORDER BY table_name;

View File

@@ -476,11 +476,12 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | table_privileges | table_catalog | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | |
| greptime | information_schema | table_privileges | table_name | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | |
| greptime | information_schema | table_privileges | table_schema | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | |
| greptime | information_schema | table_semantics | metadata_quality | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | pipeline | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | semantic_options | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | metadata_quality | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | pipeline | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | semantic_options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | signal_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | source | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | source_version | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | YES | string | | |
| greptime | information_schema | table_semantics | table_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | |
| greptime | information_schema | table_semantics | table_id | 4 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | NO | int unsigned | | |
| greptime | information_schema | table_semantics | table_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | NO | string | | |