mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat(http): lossy string validation in prom remote write (#6213)
* feat/lossy-string-validation-in-prom-remote-write: ### Commit Message #### Refactor Prometheus Validation Mode - **Replace `is_strict_mode` with `PromValidationMode` Enum:** - Updated `HttpOptions` and related structures to use `PromValidationMode` enum instead of the boolean `is_strict_mode`. - Modified functions and tests to accommodate the new enum, ensuring flexible validation modes (`Strict`, `Lossy`, `Unchecked`). - Affected files: `server.rs`, `prom_decode.rs`, `http.rs`, `prom_store.rs`, `prom_row_builder.rs`, `proto.rs`, `prom_store_test.rs`, `test_util.rs`, `http.rs`. - **Enhance UTF-8 String Decoding:** - Introduced `decode_string` function to handle UTF-8 string decoding based on the selected `PromValidationMode`. - Affected files: `proto.rs`, `prom_row_builder.rs`. This refactor improves the flexibility and clarity of Prometheus request handling by allowing different validation strategies. * feat/lossy-string-validation-in-prom-remote-write: - **Add Prometheus Validation Mode Configuration:** - Updated `config/config.md`, `config/frontend.example.toml`, and `config/standalone.example.toml` to include `http.prom_validation_mode` setting for Prometheus remote write requests. - **Enhance Benchmarking for Prometheus Requests:** - Modified `src/servers/benches/prom_decode.rs` to benchmark different Prometheus validation modes (`Strict`, `Lossy`, `Unchecked`). - **Implement and Test String Decoding:** - Added `decode_string` function and comprehensive tests in `src/servers/src/proto.rs` to handle string decoding with different validation modes. * feat/lossy-string-validation-in-prom-remote-write: ### Add Histogram Buckets to Metrics - **Files Modified**: `src/servers/src/metrics.rs` - **Key Changes**: - Added specific histogram buckets to `METRIC_MYSQL_QUERY_TIMER`, `METRIC_POSTGRES_QUERY_TIMER`, and `METRIC_SERVER_GRPC_PROM_REQUEST_TIMER` to enhance granularity in query elapsed time metrics. * feat/lossy-string-validation-in-prom-remote-write: ### Update Prometheus Validation Mode Default - **Config Documentation**: Updated the default description for `http.prom_validation_mode` to indicate that "strict" is the default option in `config.md`, `frontend.example.toml`, and `standalone.example.toml`. - **HTTP Server Implementation**: Changed the default `prom_validation_mode` to `PromValidationMode::Strict` in `src/servers/src/http.rs`. * feat/lossy-string-validation-in-prom-remote-write: **Commit Message:** Update Prometheus Validation Mode to Strict - Changed `http.prom_validation_mode` from `unchecked` to `strict` in `config.md`, `frontend.example.toml`, and `standalone.example.toml` to enforce strict validation of Prometheus remote write requests.
This commit is contained in:
@@ -27,6 +27,7 @@
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||
@@ -226,6 +227,7 @@
|
||||
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
|
||||
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
|
||||
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
|
||||
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
|
||||
| `grpc` | -- | -- | The gRPC server options. |
|
||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||
|
||||
@@ -37,6 +37,12 @@ enable_cors = true
|
||||
## Customize allowed origins for HTTP CORS.
|
||||
## @toml2docs:none-default
|
||||
cors_allowed_origins = ["https://example.com"]
|
||||
## Whether to enable validation for Prometheus remote write requests.
|
||||
## Available options:
|
||||
## - strict: deny invalid UTF-8 strings (default).
|
||||
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
|
||||
## - unchecked: do not valid strings.
|
||||
prom_validation_mode = "strict"
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
|
||||
@@ -43,6 +43,13 @@ enable_cors = true
|
||||
## @toml2docs:none-default
|
||||
cors_allowed_origins = ["https://example.com"]
|
||||
|
||||
## Whether to enable validation for Prometheus remote write requests.
|
||||
## Available options:
|
||||
## - strict: deny invalid UTF-8 strings (default).
|
||||
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
|
||||
## - unchecked: do not valid strings.
|
||||
prom_validation_mode = "strict"
|
||||
|
||||
## The gRPC server options.
|
||||
[grpc]
|
||||
## The address to bind the gRPC server.
|
||||
|
||||
@@ -104,7 +104,7 @@ where
|
||||
self.instance.clone(),
|
||||
Some(self.instance.clone()),
|
||||
opts.prom_store.with_metric_engine,
|
||||
opts.http.is_strict_mode,
|
||||
opts.http.prom_validation_mode,
|
||||
)
|
||||
.with_prometheus_handler(self.instance.clone());
|
||||
}
|
||||
|
||||
@@ -16,76 +16,55 @@ use std::time::Duration;
|
||||
|
||||
use api::prom_store::remote::WriteRequest;
|
||||
use bytes::Bytes;
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
|
||||
use prost::Message;
|
||||
use servers::http::PromValidationMode;
|
||||
use servers::prom_store::to_grpc_row_insert_requests;
|
||||
use servers::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||
|
||||
fn bench_decode_prom_request_without_strict_mode(c: &mut Criterion) {
|
||||
fn bench_decode_prom_request(c: &mut Criterion) {
|
||||
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
d.push("benches");
|
||||
d.push("write_request.pb.data");
|
||||
|
||||
let data = Bytes::from(std::fs::read(d).unwrap());
|
||||
|
||||
let mut request = WriteRequest::default();
|
||||
let mut prom_request = PromWriteRequest::default();
|
||||
let is_strict_mode = false;
|
||||
let mut p = PromSeriesProcessor::default_processor();
|
||||
let mut group = c.benchmark_group("decode_prom_request");
|
||||
group.measurement_time(Duration::from_secs(3));
|
||||
|
||||
c.benchmark_group("decode")
|
||||
.measurement_time(Duration::from_secs(3))
|
||||
.bench_function("write_request", |b| {
|
||||
b.iter(|| {
|
||||
request.clear();
|
||||
let data = data.clone();
|
||||
request.merge(data).unwrap();
|
||||
to_grpc_row_insert_requests(&request).unwrap();
|
||||
});
|
||||
})
|
||||
.bench_function("prom_write_request", |b| {
|
||||
b.iter(|| {
|
||||
let data = data.clone();
|
||||
prom_request.merge(data, is_strict_mode, &mut p).unwrap();
|
||||
prom_request.as_row_insert_requests();
|
||||
});
|
||||
// Benchmark standard WriteRequest decoding as a baseline
|
||||
let mut request = WriteRequest::default();
|
||||
group.bench_function("standard_write_request", |b| {
|
||||
b.iter(|| {
|
||||
let data = data.clone();
|
||||
request.merge(data).unwrap();
|
||||
to_grpc_row_insert_requests(&request).unwrap();
|
||||
});
|
||||
});
|
||||
|
||||
// Benchmark each validation mode
|
||||
for mode in [
|
||||
PromValidationMode::Strict,
|
||||
PromValidationMode::Lossy,
|
||||
PromValidationMode::Unchecked,
|
||||
] {
|
||||
let mut prom_request = PromWriteRequest::default();
|
||||
let mut p = PromSeriesProcessor::default_processor();
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("validation_mode", format!("{:?}", mode)),
|
||||
&mode,
|
||||
|b, &mode| {
|
||||
b.iter(|| {
|
||||
let data = data.clone();
|
||||
prom_request.merge(data, mode, &mut p).unwrap();
|
||||
prom_request.as_row_insert_requests();
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_decode_prom_request_with_strict_mode(c: &mut Criterion) {
|
||||
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
d.push("benches");
|
||||
d.push("write_request.pb.data");
|
||||
|
||||
let data = Bytes::from(std::fs::read(d).unwrap());
|
||||
|
||||
let mut request = WriteRequest::default();
|
||||
let mut prom_request = PromWriteRequest::default();
|
||||
let is_strict_mode = true;
|
||||
let mut p = PromSeriesProcessor::default_processor();
|
||||
|
||||
c.benchmark_group("decode")
|
||||
.measurement_time(Duration::from_secs(3))
|
||||
.bench_function("write_request", |b| {
|
||||
b.iter(|| {
|
||||
request.clear();
|
||||
let data = data.clone();
|
||||
request.merge(data).unwrap();
|
||||
to_grpc_row_insert_requests(&request).unwrap();
|
||||
});
|
||||
})
|
||||
.bench_function("prom_write_request", |b| {
|
||||
b.iter(|| {
|
||||
let data = data.clone();
|
||||
prom_request.merge(data, is_strict_mode, &mut p).unwrap();
|
||||
prom_request.as_row_insert_requests();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_decode_prom_request_without_strict_mode,
|
||||
bench_decode_prom_request_with_strict_mode
|
||||
);
|
||||
criterion_group!(benches, bench_decode_prom_request);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -146,13 +146,25 @@ pub struct HttpOptions {
|
||||
|
||||
pub body_limit: ReadableSize,
|
||||
|
||||
pub is_strict_mode: bool,
|
||||
/// Validation mode while decoding Prometheus remote write requests.
|
||||
pub prom_validation_mode: PromValidationMode,
|
||||
|
||||
pub cors_allowed_origins: Vec<String>,
|
||||
|
||||
pub enable_cors: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum PromValidationMode {
|
||||
/// Force UTF8 validation
|
||||
Strict,
|
||||
/// Allow lossy UTF8 strings
|
||||
Lossy,
|
||||
/// Do not validate UTF8 strings.
|
||||
Unchecked,
|
||||
}
|
||||
|
||||
impl Default for HttpOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -160,9 +172,9 @@ impl Default for HttpOptions {
|
||||
timeout: Duration::from_secs(0),
|
||||
disable_dashboard: false,
|
||||
body_limit: DEFAULT_BODY_LIMIT,
|
||||
is_strict_mode: false,
|
||||
cors_allowed_origins: Vec::new(),
|
||||
enable_cors: true,
|
||||
prom_validation_mode: PromValidationMode::Strict,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -556,13 +568,13 @@ impl HttpServerBuilder {
|
||||
handler: PromStoreProtocolHandlerRef,
|
||||
pipeline_handler: Option<PipelineHandlerRef>,
|
||||
prom_store_with_metric_engine: bool,
|
||||
is_strict_mode: bool,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
) -> Self {
|
||||
let state = PromStoreState {
|
||||
prom_store_handler: handler,
|
||||
pipeline_handler,
|
||||
prom_store_with_metric_engine,
|
||||
is_strict_mode,
|
||||
prom_validation_mode,
|
||||
};
|
||||
|
||||
Self {
|
||||
|
||||
@@ -38,6 +38,7 @@ use snafu::prelude::*;
|
||||
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
|
||||
use crate::http::extractor::PipelineInfo;
|
||||
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::prom_store::{snappy_decompress, zstd_decompress};
|
||||
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
|
||||
@@ -57,7 +58,7 @@ pub struct PromStoreState {
|
||||
pub prom_store_handler: PromStoreProtocolHandlerRef,
|
||||
pub pipeline_handler: Option<PipelineHandlerRef>,
|
||||
pub prom_store_with_metric_engine: bool,
|
||||
pub is_strict_mode: bool,
|
||||
pub prom_validation_mode: PromValidationMode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -97,7 +98,7 @@ pub async fn remote_write(
|
||||
prom_store_handler,
|
||||
pipeline_handler,
|
||||
prom_store_with_metric_engine,
|
||||
is_strict_mode,
|
||||
prom_validation_mode,
|
||||
} = state;
|
||||
|
||||
if let Some(_vm_handshake) = params.get_vm_proto_version {
|
||||
@@ -133,7 +134,7 @@ pub async fn remote_write(
|
||||
}
|
||||
|
||||
let (request, samples) =
|
||||
decode_remote_write_request(is_zstd, body, is_strict_mode, &mut processor).await?;
|
||||
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;
|
||||
|
||||
let output = prom_store_handler
|
||||
.write(request, query_ctx, prom_store_with_metric_engine)
|
||||
@@ -199,7 +200,7 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
|
||||
async fn decode_remote_write_request(
|
||||
is_zstd: bool,
|
||||
body: Bytes,
|
||||
is_strict_mode: bool,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
processor: &mut PromSeriesProcessor,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
|
||||
@@ -220,7 +221,7 @@ async fn decode_remote_write_request(
|
||||
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
|
||||
|
||||
request
|
||||
.merge(buf, is_strict_mode, processor)
|
||||
.merge(buf, prom_validation_mode, processor)
|
||||
.context(error::DecodePromRemoteRequestSnafu)?;
|
||||
|
||||
if processor.use_pipeline {
|
||||
|
||||
@@ -209,7 +209,8 @@ lazy_static! {
|
||||
pub static ref METRIC_MYSQL_QUERY_TIMER: HistogramVec = register_histogram_vec!(
|
||||
"greptime_servers_mysql_query_elapsed",
|
||||
"servers mysql query elapsed",
|
||||
&[METRIC_MYSQL_SUBPROTOCOL_LABEL, METRIC_DB_LABEL]
|
||||
&[METRIC_MYSQL_SUBPROTOCOL_LABEL, METRIC_DB_LABEL],
|
||||
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_MYSQL_PREPARED_COUNT: IntCounterVec = register_int_counter_vec!(
|
||||
@@ -226,7 +227,8 @@ lazy_static! {
|
||||
pub static ref METRIC_POSTGRES_QUERY_TIMER: HistogramVec = register_histogram_vec!(
|
||||
"greptime_servers_postgres_query_elapsed",
|
||||
"servers postgres query elapsed",
|
||||
&[METRIC_POSTGRES_SUBPROTOCOL_LABEL, METRIC_DB_LABEL]
|
||||
&[METRIC_POSTGRES_SUBPROTOCOL_LABEL, METRIC_DB_LABEL],
|
||||
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_POSTGRES_PREPARED_COUNT: IntCounter = register_int_counter!(
|
||||
@@ -243,7 +245,8 @@ lazy_static! {
|
||||
pub static ref METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: HistogramVec = register_histogram_vec!(
|
||||
"greptime_servers_grpc_prom_request_elapsed",
|
||||
"servers grpc prom request elapsed",
|
||||
&[METRIC_DB_LABEL]
|
||||
&[METRIC_DB_LABEL],
|
||||
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_HTTP_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
|
||||
|
||||
@@ -25,7 +25,8 @@ use api::v1::{
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use prost::DecodeError;
|
||||
|
||||
use crate::proto::PromLabel;
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::proto::{decode_string, PromLabel};
|
||||
use crate::repeated_field::Clear;
|
||||
|
||||
/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
|
||||
@@ -125,27 +126,13 @@ impl TableBuilder {
|
||||
&mut self,
|
||||
labels: &[PromLabel],
|
||||
samples: &[Sample],
|
||||
is_strict_mode: bool,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
) -> Result<(), DecodeError> {
|
||||
let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
|
||||
|
||||
for PromLabel { name, value } in labels {
|
||||
let (tag_name, tag_value) = if is_strict_mode {
|
||||
let tag_name = match String::from_utf8(name.to_vec()) {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Err(DecodeError::new("invalid utf-8")),
|
||||
};
|
||||
let tag_value = match String::from_utf8(value.to_vec()) {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Err(DecodeError::new("invalid utf-8")),
|
||||
};
|
||||
(tag_name, tag_value)
|
||||
} else {
|
||||
let tag_name = unsafe { String::from_utf8_unchecked(name.to_vec()) };
|
||||
let tag_value = unsafe { String::from_utf8_unchecked(value.to_vec()) };
|
||||
(tag_name, tag_value)
|
||||
};
|
||||
|
||||
let tag_name = decode_string(name, prom_validation_mode)?;
|
||||
let tag_value = decode_string(value, prom_validation_mode)?;
|
||||
let tag_value = Some(ValueData::StringValue(tag_value));
|
||||
let tag_num = self.col_indexes.len();
|
||||
|
||||
@@ -215,12 +202,12 @@ mod tests {
|
||||
use bytes::Bytes;
|
||||
use prost::DecodeError;
|
||||
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::prom_row_builder::TableBuilder;
|
||||
use crate::proto::PromLabel;
|
||||
#[test]
|
||||
fn test_table_builder() {
|
||||
let mut builder = TableBuilder::default();
|
||||
let is_strict_mode = true;
|
||||
let _ = builder.add_labels_and_samples(
|
||||
&[
|
||||
PromLabel {
|
||||
@@ -236,7 +223,7 @@ mod tests {
|
||||
value: 0.0,
|
||||
timestamp: 0,
|
||||
}],
|
||||
is_strict_mode,
|
||||
PromValidationMode::Strict,
|
||||
);
|
||||
|
||||
let _ = builder.add_labels_and_samples(
|
||||
@@ -254,7 +241,7 @@ mod tests {
|
||||
value: 0.1,
|
||||
timestamp: 1,
|
||||
}],
|
||||
is_strict_mode,
|
||||
PromValidationMode::Strict,
|
||||
);
|
||||
|
||||
let request = builder.as_row_insert_request("test".to_string());
|
||||
@@ -310,7 +297,7 @@ mod tests {
|
||||
value: 0.1,
|
||||
timestamp: 1,
|
||||
}],
|
||||
is_strict_mode,
|
||||
PromValidationMode::Strict,
|
||||
);
|
||||
assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ use api::prom_store::remote::Sample;
|
||||
use api::v1::RowInsertRequests;
|
||||
use bytes::{Buf, Bytes};
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_telemetry::debug;
|
||||
use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value};
|
||||
use prost::encoding::message::merge;
|
||||
use prost::encoding::{decode_key, decode_varint, WireType};
|
||||
@@ -30,6 +31,7 @@ use snafu::OptionExt;
|
||||
|
||||
use crate::error::InternalSnafu;
|
||||
use crate::http::event::PipelineIngestRequest;
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::pipeline::run_pipeline;
|
||||
use crate::prom_row_builder::TablesBuilder;
|
||||
use crate::prom_store::METRIC_NAME_LABEL_BYTES;
|
||||
@@ -160,7 +162,7 @@ impl PromTimeSeries {
|
||||
tag: u32,
|
||||
wire_type: WireType,
|
||||
buf: &mut Bytes,
|
||||
is_strict_mode: bool,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
) -> Result<(), DecodeError> {
|
||||
const STRUCT_NAME: &str = "PromTimeSeries";
|
||||
match tag {
|
||||
@@ -186,15 +188,7 @@ impl PromTimeSeries {
|
||||
return Err(DecodeError::new("delimited length exceeded"));
|
||||
}
|
||||
if label.name.deref() == METRIC_NAME_LABEL_BYTES {
|
||||
let table_name = if is_strict_mode {
|
||||
match String::from_utf8(label.value.to_vec()) {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Err(DecodeError::new("invalid utf-8")),
|
||||
}
|
||||
} else {
|
||||
unsafe { String::from_utf8_unchecked(label.value.to_vec()) }
|
||||
};
|
||||
self.table_name = table_name;
|
||||
self.table_name = decode_string(&label.value, prom_validation_mode)?;
|
||||
self.labels.truncate(self.labels.len() - 1); // remove last label
|
||||
}
|
||||
Ok(())
|
||||
@@ -218,7 +212,7 @@ impl PromTimeSeries {
|
||||
fn add_to_table_data(
|
||||
&mut self,
|
||||
table_builders: &mut TablesBuilder,
|
||||
is_strict_mode: bool,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
) -> Result<(), DecodeError> {
|
||||
let label_num = self.labels.len();
|
||||
let row_num = self.samples.len();
|
||||
@@ -230,13 +224,36 @@ impl PromTimeSeries {
|
||||
table_data.add_labels_and_samples(
|
||||
self.labels.as_slice(),
|
||||
self.samples.as_slice(),
|
||||
is_strict_mode,
|
||||
prom_validation_mode,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes bytes into String values according provided validation mode.
|
||||
pub(crate) fn decode_string(
|
||||
bytes: &Bytes,
|
||||
mode: PromValidationMode,
|
||||
) -> Result<String, DecodeError> {
|
||||
let result = match mode {
|
||||
PromValidationMode::Strict => match String::from_utf8(bytes.to_vec()) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"Invalid UTF-8 string value: {:?}, error: {:?}",
|
||||
&bytes[..],
|
||||
e
|
||||
);
|
||||
return Err(DecodeError::new("invalid utf-8"));
|
||||
}
|
||||
},
|
||||
PromValidationMode::Lossy => String::from_utf8_lossy(bytes).to_string(),
|
||||
PromValidationMode::Unchecked => unsafe { String::from_utf8_unchecked(bytes.to_vec()) },
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct PromWriteRequest {
|
||||
table_data: TablesBuilder,
|
||||
@@ -258,7 +275,7 @@ impl PromWriteRequest {
|
||||
pub fn merge(
|
||||
&mut self,
|
||||
mut buf: Bytes,
|
||||
is_strict_mode: bool,
|
||||
prom_validation_mode: PromValidationMode,
|
||||
processor: &mut PromSeriesProcessor,
|
||||
) -> Result<(), DecodeError> {
|
||||
const STRUCT_NAME: &str = "PromWriteRequest";
|
||||
@@ -281,7 +298,7 @@ impl PromWriteRequest {
|
||||
while buf.remaining() > limit {
|
||||
let (tag, wire_type) = decode_key(&mut buf)?;
|
||||
self.series
|
||||
.merge_field(tag, wire_type, &mut buf, is_strict_mode)?;
|
||||
.merge_field(tag, wire_type, &mut buf, prom_validation_mode)?;
|
||||
}
|
||||
if buf.remaining() != limit {
|
||||
return Err(DecodeError::new("delimited length exceeded"));
|
||||
@@ -291,7 +308,7 @@ impl PromWriteRequest {
|
||||
processor.consume_series_to_pipeline_map(&mut self.series)?;
|
||||
} else {
|
||||
self.series
|
||||
.add_to_table_data(&mut self.table_data, is_strict_mode)?;
|
||||
.add_to_table_data(&mut self.table_data, prom_validation_mode)?;
|
||||
}
|
||||
|
||||
// clear state
|
||||
@@ -441,8 +458,9 @@ mod tests {
|
||||
use bytes::Bytes;
|
||||
use prost::Message;
|
||||
|
||||
use crate::http::PromValidationMode;
|
||||
use crate::prom_store::to_grpc_row_insert_requests;
|
||||
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
|
||||
use crate::proto::{decode_string, PromSeriesProcessor, PromWriteRequest};
|
||||
use crate::repeated_field::Clear;
|
||||
|
||||
fn sort_rows(rows: Rows) -> Rows {
|
||||
@@ -469,7 +487,7 @@ mod tests {
|
||||
let mut p = PromSeriesProcessor::default_processor();
|
||||
prom_write_request.clear();
|
||||
prom_write_request
|
||||
.merge(data.clone(), true, &mut p)
|
||||
.merge(data.clone(), PromValidationMode::Strict, &mut p)
|
||||
.unwrap();
|
||||
let (prom_rows, samples) = prom_write_request.as_row_insert_requests();
|
||||
|
||||
@@ -510,4 +528,152 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_strict_mode_valid_utf8() {
|
||||
let valid_utf8 = Bytes::from("hello world");
|
||||
let result = decode_string(&valid_utf8, PromValidationMode::Strict);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "hello world");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_strict_mode_empty() {
|
||||
let empty = Bytes::new();
|
||||
let result = decode_string(&empty, PromValidationMode::Strict);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_strict_mode_unicode() {
|
||||
let unicode = Bytes::from("Hello 世界 🌍");
|
||||
let result = decode_string(&unicode, PromValidationMode::Strict);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "Hello 世界 🌍");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_strict_mode_invalid_utf8() {
|
||||
// Invalid UTF-8 sequence
|
||||
let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]);
|
||||
let result = decode_string(&invalid_utf8, PromValidationMode::Strict);
|
||||
assert!(result.is_err());
|
||||
assert_eq!(
|
||||
result.unwrap_err().to_string(),
|
||||
"failed to decode Protobuf message: invalid utf-8"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_strict_mode_incomplete_utf8() {
|
||||
// Incomplete UTF-8 sequence (missing continuation bytes)
|
||||
let incomplete_utf8 = Bytes::from(vec![0xC2]); // Start of 2-byte sequence but missing second byte
|
||||
let result = decode_string(&incomplete_utf8, PromValidationMode::Strict);
|
||||
assert!(result.is_err());
|
||||
assert_eq!(
|
||||
result.unwrap_err().to_string(),
|
||||
"failed to decode Protobuf message: invalid utf-8"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_lossy_mode_valid_utf8() {
|
||||
let valid_utf8 = Bytes::from("hello world");
|
||||
let result = decode_string(&valid_utf8, PromValidationMode::Lossy);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "hello world");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_lossy_mode_empty() {
|
||||
let empty = Bytes::new();
|
||||
let result = decode_string(&empty, PromValidationMode::Lossy);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_lossy_mode_unicode() {
|
||||
let unicode = Bytes::from("Hello 世界 🌍");
|
||||
let result = decode_string(&unicode, PromValidationMode::Lossy);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "Hello 世界 🌍");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_lossy_mode_invalid_utf8() {
|
||||
// Invalid UTF-8 sequence - should be replaced with replacement character
|
||||
let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]);
|
||||
let result = decode_string(&invalid_utf8, PromValidationMode::Lossy);
|
||||
assert!(result.is_ok());
|
||||
// Each invalid byte should be replaced with the Unicode replacement character
|
||||
assert_eq!(result.unwrap(), "<EFBFBD><EFBFBD><EFBFBD>");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_lossy_mode_mixed_valid_invalid() {
|
||||
// Mix of valid and invalid UTF-8
|
||||
let mut mixed = Vec::new();
|
||||
mixed.extend_from_slice(b"hello");
|
||||
mixed.push(0xFF); // Invalid byte
|
||||
mixed.extend_from_slice(b"world");
|
||||
let mixed_utf8 = Bytes::from(mixed);
|
||||
|
||||
let result = decode_string(&mixed_utf8, PromValidationMode::Lossy);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "hello<EFBFBD>world");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_unchecked_mode_valid_utf8() {
|
||||
let valid_utf8 = Bytes::from("hello world");
|
||||
let result = decode_string(&valid_utf8, PromValidationMode::Unchecked);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "hello world");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_unchecked_mode_empty() {
|
||||
let empty = Bytes::new();
|
||||
let result = decode_string(&empty, PromValidationMode::Unchecked);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_unchecked_mode_unicode() {
|
||||
let unicode = Bytes::from("Hello 世界 🌍");
|
||||
let result = decode_string(&unicode, PromValidationMode::Unchecked);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), "Hello 世界 🌍");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_unchecked_mode_invalid_utf8() {
|
||||
// Invalid UTF-8 sequence - unchecked mode doesn't validate
|
||||
let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]);
|
||||
let result = decode_string(&invalid_utf8, PromValidationMode::Unchecked);
|
||||
// This should succeed but the resulting string may contain invalid UTF-8
|
||||
assert!(result.is_ok());
|
||||
// We can't easily test the exact content since it's invalid UTF-8,
|
||||
// but we can verify it doesn't panic and returns something
|
||||
let _string = result.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_string_all_modes_ascii() {
|
||||
let ascii = Bytes::from("simple_ascii_123");
|
||||
|
||||
// All modes should handle ASCII identically
|
||||
let strict_result = decode_string(&ascii, PromValidationMode::Strict).unwrap();
|
||||
let lossy_result = decode_string(&ascii, PromValidationMode::Lossy).unwrap();
|
||||
let unchecked_result = decode_string(&ascii, PromValidationMode::Unchecked).unwrap();
|
||||
|
||||
assert_eq!(strict_result, "simple_ascii_123");
|
||||
assert_eq!(lossy_result, "simple_ascii_123");
|
||||
assert_eq!(unchecked_result, "simple_ascii_123");
|
||||
assert_eq!(strict_result, lossy_result);
|
||||
assert_eq!(lossy_result, unchecked_result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ use query::query_engine::DescribeResult;
|
||||
use servers::error::{Error, Result};
|
||||
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
||||
use servers::http::test_helpers::TestClient;
|
||||
use servers::http::{HttpOptions, HttpServerBuilder};
|
||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||
use servers::prom_store;
|
||||
use servers::prom_store::{snappy_compress, Metrics};
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
@@ -120,11 +120,10 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let is_strict_mode = false;
|
||||
let instance = Arc::new(DummyInstance { tx });
|
||||
let server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(instance.clone())
|
||||
.with_prom_handler(instance, None, true, is_strict_mode)
|
||||
.with_prom_handler(instance, None, true, PromValidationMode::Unchecked)
|
||||
.build();
|
||||
server.build(server.make_app()).unwrap()
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ use object_store::ObjectStore;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||
use servers::http::{HttpOptions, HttpServerBuilder};
|
||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||
use servers::postgres::PostgresServer;
|
||||
@@ -533,7 +533,6 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
..Default::default()
|
||||
};
|
||||
let frontend_ref = instance.fe_instance().clone();
|
||||
let is_strict_mode = true;
|
||||
let http_server = HttpServerBuilder::new(http_opts)
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
|
||||
.with_logs_handler(instance.fe_instance().clone())
|
||||
@@ -541,7 +540,7 @@ pub async fn setup_test_prom_app_with_frontend(
|
||||
frontend_ref.clone(),
|
||||
Some(frontend_ref.clone()),
|
||||
true,
|
||||
is_strict_mode,
|
||||
PromValidationMode::Strict,
|
||||
)
|
||||
.with_prometheus_handler(frontend_ref)
|
||||
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
|
||||
|
||||
@@ -1011,7 +1011,7 @@ init_regions_parallelism = 16
|
||||
addr = "127.0.0.1:4000"
|
||||
timeout = "0s"
|
||||
body_limit = "64MiB"
|
||||
is_strict_mode = false
|
||||
prom_validation_mode = "strict"
|
||||
cors_allowed_origins = []
|
||||
enable_cors = true
|
||||
|
||||
|
||||
Reference in New Issue
Block a user