diff --git a/config/config.md b/config/config.md index 423e05a668..384b9992c8 100644 --- a/config/config.md +++ b/config/config.md @@ -27,6 +27,7 @@ | `http.body_limit` | String | `64MB` | HTTP request body limit.
The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
Set to 0 to disable limit. | | `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default
This allows browser to access http APIs without CORS restrictions | | `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. | +| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.
Available options:
- strict: deny invalid UTF-8 strings (default).
- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
- unchecked: do not valid strings. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | @@ -226,6 +227,7 @@ | `http.body_limit` | String | `64MB` | HTTP request body limit.
The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
Set to 0 to disable limit. | | `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default
This allows browser to access http APIs without CORS restrictions | | `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. | +| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.
Available options:
- strict: deny invalid UTF-8 strings (default).
- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
- unchecked: do not valid strings. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. | | `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.
If left empty or unset, the server will automatically use the IP address of the first network interface
on the host, with the same port number as the one specified in `grpc.bind_addr`. | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 6d9f9e08ea..753ef6ae30 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -37,6 +37,12 @@ enable_cors = true ## Customize allowed origins for HTTP CORS. ## @toml2docs:none-default cors_allowed_origins = ["https://example.com"] +## Whether to enable validation for Prometheus remote write requests. +## Available options: +## - strict: deny invalid UTF-8 strings (default). +## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD). +## - unchecked: do not valid strings. +prom_validation_mode = "strict" ## The gRPC server options. [grpc] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 4093ab4423..2470ac4775 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -43,6 +43,13 @@ enable_cors = true ## @toml2docs:none-default cors_allowed_origins = ["https://example.com"] +## Whether to enable validation for Prometheus remote write requests. +## Available options: +## - strict: deny invalid UTF-8 strings (default). +## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD). +## - unchecked: do not valid strings. +prom_validation_mode = "strict" + ## The gRPC server options. [grpc] ## The address to bind the gRPC server. diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 396d001a4f..abd80c940b 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -104,7 +104,7 @@ where self.instance.clone(), Some(self.instance.clone()), opts.prom_store.with_metric_engine, - opts.http.is_strict_mode, + opts.http.prom_validation_mode, ) .with_prometheus_handler(self.instance.clone()); } diff --git a/src/servers/benches/prom_decode.rs b/src/servers/benches/prom_decode.rs index 040a31f0d9..52759db3d0 100644 --- a/src/servers/benches/prom_decode.rs +++ b/src/servers/benches/prom_decode.rs @@ -16,76 +16,55 @@ use std::time::Duration; use api::prom_store::remote::WriteRequest; use bytes::Bytes; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use prost::Message; +use servers::http::PromValidationMode; use servers::prom_store::to_grpc_row_insert_requests; use servers::proto::{PromSeriesProcessor, PromWriteRequest}; -fn bench_decode_prom_request_without_strict_mode(c: &mut Criterion) { +fn bench_decode_prom_request(c: &mut Criterion) { let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); d.push("benches"); d.push("write_request.pb.data"); let data = Bytes::from(std::fs::read(d).unwrap()); - let mut request = WriteRequest::default(); - let mut prom_request = PromWriteRequest::default(); - let is_strict_mode = false; - let mut p = PromSeriesProcessor::default_processor(); + let mut group = c.benchmark_group("decode_prom_request"); + group.measurement_time(Duration::from_secs(3)); - c.benchmark_group("decode") - .measurement_time(Duration::from_secs(3)) - .bench_function("write_request", |b| { - b.iter(|| { - request.clear(); - let data = data.clone(); - request.merge(data).unwrap(); - to_grpc_row_insert_requests(&request).unwrap(); - }); - }) - .bench_function("prom_write_request", |b| { - b.iter(|| { - let data = data.clone(); - prom_request.merge(data, is_strict_mode, &mut p).unwrap(); - prom_request.as_row_insert_requests(); - }); + // Benchmark standard WriteRequest decoding as a baseline + let mut request = WriteRequest::default(); + group.bench_function("standard_write_request", |b| { + b.iter(|| { + let data = data.clone(); + request.merge(data).unwrap(); + to_grpc_row_insert_requests(&request).unwrap(); }); + }); + + // Benchmark each validation mode + for mode in [ + PromValidationMode::Strict, + PromValidationMode::Lossy, + PromValidationMode::Unchecked, + ] { + let mut prom_request = PromWriteRequest::default(); + let mut p = PromSeriesProcessor::default_processor(); + group.bench_with_input( + BenchmarkId::new("validation_mode", format!("{:?}", mode)), + &mode, + |b, &mode| { + b.iter(|| { + let data = data.clone(); + prom_request.merge(data, mode, &mut p).unwrap(); + prom_request.as_row_insert_requests(); + }); + }, + ); + } + + group.finish(); } -fn bench_decode_prom_request_with_strict_mode(c: &mut Criterion) { - let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); - d.push("benches"); - d.push("write_request.pb.data"); - - let data = Bytes::from(std::fs::read(d).unwrap()); - - let mut request = WriteRequest::default(); - let mut prom_request = PromWriteRequest::default(); - let is_strict_mode = true; - let mut p = PromSeriesProcessor::default_processor(); - - c.benchmark_group("decode") - .measurement_time(Duration::from_secs(3)) - .bench_function("write_request", |b| { - b.iter(|| { - request.clear(); - let data = data.clone(); - request.merge(data).unwrap(); - to_grpc_row_insert_requests(&request).unwrap(); - }); - }) - .bench_function("prom_write_request", |b| { - b.iter(|| { - let data = data.clone(); - prom_request.merge(data, is_strict_mode, &mut p).unwrap(); - prom_request.as_row_insert_requests(); - }); - }); -} - -criterion_group!( - benches, - bench_decode_prom_request_without_strict_mode, - bench_decode_prom_request_with_strict_mode -); +criterion_group!(benches, bench_decode_prom_request); criterion_main!(benches); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d464babcc7..38575de1ac 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -146,13 +146,25 @@ pub struct HttpOptions { pub body_limit: ReadableSize, - pub is_strict_mode: bool, + /// Validation mode while decoding Prometheus remote write requests. + pub prom_validation_mode: PromValidationMode, pub cors_allowed_origins: Vec, pub enable_cors: bool, } +#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PromValidationMode { + /// Force UTF8 validation + Strict, + /// Allow lossy UTF8 strings + Lossy, + /// Do not validate UTF8 strings. + Unchecked, +} + impl Default for HttpOptions { fn default() -> Self { Self { @@ -160,9 +172,9 @@ impl Default for HttpOptions { timeout: Duration::from_secs(0), disable_dashboard: false, body_limit: DEFAULT_BODY_LIMIT, - is_strict_mode: false, cors_allowed_origins: Vec::new(), enable_cors: true, + prom_validation_mode: PromValidationMode::Strict, } } } @@ -556,13 +568,13 @@ impl HttpServerBuilder { handler: PromStoreProtocolHandlerRef, pipeline_handler: Option, prom_store_with_metric_engine: bool, - is_strict_mode: bool, + prom_validation_mode: PromValidationMode, ) -> Self { let state = PromStoreState { prom_store_handler: handler, pipeline_handler, prom_store_with_metric_engine, - is_strict_mode, + prom_validation_mode, }; Self { diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 13b7f0bed0..a609edb988 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -38,6 +38,7 @@ use snafu::prelude::*; use crate::error::{self, InternalSnafu, PipelineSnafu, Result}; use crate::http::extractor::PipelineInfo; use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS}; +use crate::http::PromValidationMode; use crate::prom_store::{snappy_decompress, zstd_decompress}; use crate::proto::{PromSeriesProcessor, PromWriteRequest}; use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse}; @@ -57,7 +58,7 @@ pub struct PromStoreState { pub prom_store_handler: PromStoreProtocolHandlerRef, pub pipeline_handler: Option, pub prom_store_with_metric_engine: bool, - pub is_strict_mode: bool, + pub prom_validation_mode: PromValidationMode, } #[derive(Debug, Serialize, Deserialize)] @@ -97,7 +98,7 @@ pub async fn remote_write( prom_store_handler, pipeline_handler, prom_store_with_metric_engine, - is_strict_mode, + prom_validation_mode, } = state; if let Some(_vm_handshake) = params.get_vm_proto_version { @@ -133,7 +134,7 @@ pub async fn remote_write( } let (request, samples) = - decode_remote_write_request(is_zstd, body, is_strict_mode, &mut processor).await?; + decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?; let output = prom_store_handler .write(request, query_ctx, prom_store_with_metric_engine) @@ -199,7 +200,7 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result { async fn decode_remote_write_request( is_zstd: bool, body: Bytes, - is_strict_mode: bool, + prom_validation_mode: PromValidationMode, processor: &mut PromSeriesProcessor, ) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); @@ -220,7 +221,7 @@ async fn decode_remote_write_request( let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); request - .merge(buf, is_strict_mode, processor) + .merge(buf, prom_validation_mode, processor) .context(error::DecodePromRemoteRequestSnafu)?; if processor.use_pipeline { diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index cc10b38708..88e8ccf395 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -209,7 +209,8 @@ lazy_static! { pub static ref METRIC_MYSQL_QUERY_TIMER: HistogramVec = register_histogram_vec!( "greptime_servers_mysql_query_elapsed", "servers mysql query elapsed", - &[METRIC_MYSQL_SUBPROTOCOL_LABEL, METRIC_DB_LABEL] + &[METRIC_MYSQL_SUBPROTOCOL_LABEL, METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); pub static ref METRIC_MYSQL_PREPARED_COUNT: IntCounterVec = register_int_counter_vec!( @@ -226,7 +227,8 @@ lazy_static! { pub static ref METRIC_POSTGRES_QUERY_TIMER: HistogramVec = register_histogram_vec!( "greptime_servers_postgres_query_elapsed", "servers postgres query elapsed", - &[METRIC_POSTGRES_SUBPROTOCOL_LABEL, METRIC_DB_LABEL] + &[METRIC_POSTGRES_SUBPROTOCOL_LABEL, METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); pub static ref METRIC_POSTGRES_PREPARED_COUNT: IntCounter = register_int_counter!( @@ -243,7 +245,8 @@ lazy_static! { pub static ref METRIC_SERVER_GRPC_PROM_REQUEST_TIMER: HistogramVec = register_histogram_vec!( "greptime_servers_grpc_prom_request_elapsed", "servers grpc prom request elapsed", - &[METRIC_DB_LABEL] + &[METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); pub static ref METRIC_HTTP_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!( diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index e58c683d01..008440b224 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -25,7 +25,8 @@ use api::v1::{ use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use prost::DecodeError; -use crate::proto::PromLabel; +use crate::http::PromValidationMode; +use crate::proto::{decode_string, PromLabel}; use crate::repeated_field::Clear; /// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests]. @@ -125,27 +126,13 @@ impl TableBuilder { &mut self, labels: &[PromLabel], samples: &[Sample], - is_strict_mode: bool, + prom_validation_mode: PromValidationMode, ) -> Result<(), DecodeError> { let mut row = vec![Value { value_data: None }; self.col_indexes.len()]; for PromLabel { name, value } in labels { - let (tag_name, tag_value) = if is_strict_mode { - let tag_name = match String::from_utf8(name.to_vec()) { - Ok(s) => s, - Err(_) => return Err(DecodeError::new("invalid utf-8")), - }; - let tag_value = match String::from_utf8(value.to_vec()) { - Ok(s) => s, - Err(_) => return Err(DecodeError::new("invalid utf-8")), - }; - (tag_name, tag_value) - } else { - let tag_name = unsafe { String::from_utf8_unchecked(name.to_vec()) }; - let tag_value = unsafe { String::from_utf8_unchecked(value.to_vec()) }; - (tag_name, tag_value) - }; - + let tag_name = decode_string(name, prom_validation_mode)?; + let tag_value = decode_string(value, prom_validation_mode)?; let tag_value = Some(ValueData::StringValue(tag_value)); let tag_num = self.col_indexes.len(); @@ -215,12 +202,12 @@ mod tests { use bytes::Bytes; use prost::DecodeError; + use crate::http::PromValidationMode; use crate::prom_row_builder::TableBuilder; use crate::proto::PromLabel; #[test] fn test_table_builder() { let mut builder = TableBuilder::default(); - let is_strict_mode = true; let _ = builder.add_labels_and_samples( &[ PromLabel { @@ -236,7 +223,7 @@ mod tests { value: 0.0, timestamp: 0, }], - is_strict_mode, + PromValidationMode::Strict, ); let _ = builder.add_labels_and_samples( @@ -254,7 +241,7 @@ mod tests { value: 0.1, timestamp: 1, }], - is_strict_mode, + PromValidationMode::Strict, ); let request = builder.as_row_insert_request("test".to_string()); @@ -310,7 +297,7 @@ mod tests { value: 0.1, timestamp: 1, }], - is_strict_mode, + PromValidationMode::Strict, ); assert_eq!(res, Err(DecodeError::new("invalid utf-8"))); } diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 07b444f18c..210f45e849 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -21,6 +21,7 @@ use api::prom_store::remote::Sample; use api::v1::RowInsertRequests; use bytes::{Buf, Bytes}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_telemetry::debug; use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap, Value}; use prost::encoding::message::merge; use prost::encoding::{decode_key, decode_varint, WireType}; @@ -30,6 +31,7 @@ use snafu::OptionExt; use crate::error::InternalSnafu; use crate::http::event::PipelineIngestRequest; +use crate::http::PromValidationMode; use crate::pipeline::run_pipeline; use crate::prom_row_builder::TablesBuilder; use crate::prom_store::METRIC_NAME_LABEL_BYTES; @@ -160,7 +162,7 @@ impl PromTimeSeries { tag: u32, wire_type: WireType, buf: &mut Bytes, - is_strict_mode: bool, + prom_validation_mode: PromValidationMode, ) -> Result<(), DecodeError> { const STRUCT_NAME: &str = "PromTimeSeries"; match tag { @@ -186,15 +188,7 @@ impl PromTimeSeries { return Err(DecodeError::new("delimited length exceeded")); } if label.name.deref() == METRIC_NAME_LABEL_BYTES { - let table_name = if is_strict_mode { - match String::from_utf8(label.value.to_vec()) { - Ok(s) => s, - Err(_) => return Err(DecodeError::new("invalid utf-8")), - } - } else { - unsafe { String::from_utf8_unchecked(label.value.to_vec()) } - }; - self.table_name = table_name; + self.table_name = decode_string(&label.value, prom_validation_mode)?; self.labels.truncate(self.labels.len() - 1); // remove last label } Ok(()) @@ -218,7 +212,7 @@ impl PromTimeSeries { fn add_to_table_data( &mut self, table_builders: &mut TablesBuilder, - is_strict_mode: bool, + prom_validation_mode: PromValidationMode, ) -> Result<(), DecodeError> { let label_num = self.labels.len(); let row_num = self.samples.len(); @@ -230,13 +224,36 @@ impl PromTimeSeries { table_data.add_labels_and_samples( self.labels.as_slice(), self.samples.as_slice(), - is_strict_mode, + prom_validation_mode, )?; Ok(()) } } +/// Decodes bytes into String values according provided validation mode. +pub(crate) fn decode_string( + bytes: &Bytes, + mode: PromValidationMode, +) -> Result { + let result = match mode { + PromValidationMode::Strict => match String::from_utf8(bytes.to_vec()) { + Ok(s) => s, + Err(e) => { + debug!( + "Invalid UTF-8 string value: {:?}, error: {:?}", + &bytes[..], + e + ); + return Err(DecodeError::new("invalid utf-8")); + } + }, + PromValidationMode::Lossy => String::from_utf8_lossy(bytes).to_string(), + PromValidationMode::Unchecked => unsafe { String::from_utf8_unchecked(bytes.to_vec()) }, + }; + Ok(result) +} + #[derive(Default, Debug)] pub struct PromWriteRequest { table_data: TablesBuilder, @@ -258,7 +275,7 @@ impl PromWriteRequest { pub fn merge( &mut self, mut buf: Bytes, - is_strict_mode: bool, + prom_validation_mode: PromValidationMode, processor: &mut PromSeriesProcessor, ) -> Result<(), DecodeError> { const STRUCT_NAME: &str = "PromWriteRequest"; @@ -281,7 +298,7 @@ impl PromWriteRequest { while buf.remaining() > limit { let (tag, wire_type) = decode_key(&mut buf)?; self.series - .merge_field(tag, wire_type, &mut buf, is_strict_mode)?; + .merge_field(tag, wire_type, &mut buf, prom_validation_mode)?; } if buf.remaining() != limit { return Err(DecodeError::new("delimited length exceeded")); @@ -291,7 +308,7 @@ impl PromWriteRequest { processor.consume_series_to_pipeline_map(&mut self.series)?; } else { self.series - .add_to_table_data(&mut self.table_data, is_strict_mode)?; + .add_to_table_data(&mut self.table_data, prom_validation_mode)?; } // clear state @@ -441,8 +458,9 @@ mod tests { use bytes::Bytes; use prost::Message; + use crate::http::PromValidationMode; use crate::prom_store::to_grpc_row_insert_requests; - use crate::proto::{PromSeriesProcessor, PromWriteRequest}; + use crate::proto::{decode_string, PromSeriesProcessor, PromWriteRequest}; use crate::repeated_field::Clear; fn sort_rows(rows: Rows) -> Rows { @@ -469,7 +487,7 @@ mod tests { let mut p = PromSeriesProcessor::default_processor(); prom_write_request.clear(); prom_write_request - .merge(data.clone(), true, &mut p) + .merge(data.clone(), PromValidationMode::Strict, &mut p) .unwrap(); let (prom_rows, samples) = prom_write_request.as_row_insert_requests(); @@ -510,4 +528,152 @@ mod tests { ); } } + + #[test] + fn test_decode_string_strict_mode_valid_utf8() { + let valid_utf8 = Bytes::from("hello world"); + let result = decode_string(&valid_utf8, PromValidationMode::Strict); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "hello world"); + } + + #[test] + fn test_decode_string_strict_mode_empty() { + let empty = Bytes::new(); + let result = decode_string(&empty, PromValidationMode::Strict); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ""); + } + + #[test] + fn test_decode_string_strict_mode_unicode() { + let unicode = Bytes::from("Hello ไธ–็•Œ ๐ŸŒ"); + let result = decode_string(&unicode, PromValidationMode::Strict); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "Hello ไธ–็•Œ ๐ŸŒ"); + } + + #[test] + fn test_decode_string_strict_mode_invalid_utf8() { + // Invalid UTF-8 sequence + let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]); + let result = decode_string(&invalid_utf8, PromValidationMode::Strict); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "failed to decode Protobuf message: invalid utf-8" + ); + } + + #[test] + fn test_decode_string_strict_mode_incomplete_utf8() { + // Incomplete UTF-8 sequence (missing continuation bytes) + let incomplete_utf8 = Bytes::from(vec![0xC2]); // Start of 2-byte sequence but missing second byte + let result = decode_string(&incomplete_utf8, PromValidationMode::Strict); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "failed to decode Protobuf message: invalid utf-8" + ); + } + + #[test] + fn test_decode_string_lossy_mode_valid_utf8() { + let valid_utf8 = Bytes::from("hello world"); + let result = decode_string(&valid_utf8, PromValidationMode::Lossy); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "hello world"); + } + + #[test] + fn test_decode_string_lossy_mode_empty() { + let empty = Bytes::new(); + let result = decode_string(&empty, PromValidationMode::Lossy); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ""); + } + + #[test] + fn test_decode_string_lossy_mode_unicode() { + let unicode = Bytes::from("Hello ไธ–็•Œ ๐ŸŒ"); + let result = decode_string(&unicode, PromValidationMode::Lossy); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "Hello ไธ–็•Œ ๐ŸŒ"); + } + + #[test] + fn test_decode_string_lossy_mode_invalid_utf8() { + // Invalid UTF-8 sequence - should be replaced with replacement character + let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]); + let result = decode_string(&invalid_utf8, PromValidationMode::Lossy); + assert!(result.is_ok()); + // Each invalid byte should be replaced with the Unicode replacement character + assert_eq!(result.unwrap(), "๏ฟฝ๏ฟฝ๏ฟฝ"); + } + + #[test] + fn test_decode_string_lossy_mode_mixed_valid_invalid() { + // Mix of valid and invalid UTF-8 + let mut mixed = Vec::new(); + mixed.extend_from_slice(b"hello"); + mixed.push(0xFF); // Invalid byte + mixed.extend_from_slice(b"world"); + let mixed_utf8 = Bytes::from(mixed); + + let result = decode_string(&mixed_utf8, PromValidationMode::Lossy); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "hello๏ฟฝworld"); + } + + #[test] + fn test_decode_string_unchecked_mode_valid_utf8() { + let valid_utf8 = Bytes::from("hello world"); + let result = decode_string(&valid_utf8, PromValidationMode::Unchecked); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "hello world"); + } + + #[test] + fn test_decode_string_unchecked_mode_empty() { + let empty = Bytes::new(); + let result = decode_string(&empty, PromValidationMode::Unchecked); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ""); + } + + #[test] + fn test_decode_string_unchecked_mode_unicode() { + let unicode = Bytes::from("Hello ไธ–็•Œ ๐ŸŒ"); + let result = decode_string(&unicode, PromValidationMode::Unchecked); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "Hello ไธ–็•Œ ๐ŸŒ"); + } + + #[test] + fn test_decode_string_unchecked_mode_invalid_utf8() { + // Invalid UTF-8 sequence - unchecked mode doesn't validate + let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]); + let result = decode_string(&invalid_utf8, PromValidationMode::Unchecked); + // This should succeed but the resulting string may contain invalid UTF-8 + assert!(result.is_ok()); + // We can't easily test the exact content since it's invalid UTF-8, + // but we can verify it doesn't panic and returns something + let _string = result.unwrap(); + } + + #[test] + fn test_decode_string_all_modes_ascii() { + let ascii = Bytes::from("simple_ascii_123"); + + // All modes should handle ASCII identically + let strict_result = decode_string(&ascii, PromValidationMode::Strict).unwrap(); + let lossy_result = decode_string(&ascii, PromValidationMode::Lossy).unwrap(); + let unchecked_result = decode_string(&ascii, PromValidationMode::Unchecked).unwrap(); + + assert_eq!(strict_result, "simple_ascii_123"); + assert_eq!(lossy_result, "simple_ascii_123"); + assert_eq!(unchecked_result, "simple_ascii_123"); + assert_eq!(strict_result, lossy_result); + assert_eq!(lossy_result, unchecked_result); + } } diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index fef100b056..f87697cf4d 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -29,7 +29,7 @@ use query::query_engine::DescribeResult; use servers::error::{Error, Result}; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::test_helpers::TestClient; -use servers::http::{HttpOptions, HttpServerBuilder}; +use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::prom_store; use servers::prom_store::{snappy_compress, Metrics}; use servers::query_handler::sql::SqlQueryHandler; @@ -120,11 +120,10 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { ..Default::default() }; - let is_strict_mode = false; let instance = Arc::new(DummyInstance { tx }); let server = HttpServerBuilder::new(http_opts) .with_sql_handler(instance.clone()) - .with_prom_handler(instance, None, true, is_strict_mode) + .with_prom_handler(instance, None, true, PromValidationMode::Unchecked) .build(); server.build(server.make_app()).unwrap() } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 018347e64f..616dc245e7 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -43,7 +43,7 @@ use object_store::ObjectStore; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig}; -use servers::http::{HttpOptions, HttpServerBuilder}; +use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::postgres::PostgresServer; @@ -533,7 +533,6 @@ pub async fn setup_test_prom_app_with_frontend( ..Default::default() }; let frontend_ref = instance.fe_instance().clone(); - let is_strict_mode = true; let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone())) .with_logs_handler(instance.fe_instance().clone()) @@ -541,7 +540,7 @@ pub async fn setup_test_prom_app_with_frontend( frontend_ref.clone(), Some(frontend_ref.clone()), true, - is_strict_mode, + PromValidationMode::Strict, ) .with_prometheus_handler(frontend_ref) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 0e6543e435..6cf765ae12 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1011,7 +1011,7 @@ init_regions_parallelism = 16 addr = "127.0.0.1:4000" timeout = "0s" body_limit = "64MiB" -is_strict_mode = false +prom_validation_mode = "strict" cors_allowed_origins = [] enable_cors = true