From 4bb5d00a4b8ae83ec0c51ef7ae66e6eb5df8d508 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 26 Jun 2025 02:56:35 +0800 Subject: [PATCH] fix(http): apply string validation mode to pipeline processor (#6378) * fix/apply-string-validation-to-pipeline: ### Commit Summary - **Refactor `decode_string` Functionality**: - Moved `decode_string` logic into `PromValidationMode` as a method `decode_string`. - Updated all references to use the new method. - Files affected: `http.rs`, `prom_row_builder.rs`, `proto.rs`. - **Logging Enhancements**: - Added `debug` logging for invalid UTF-8 string values. - File affected: `http.rs`. - **Test Updates**: - Modified tests to use the new `decode_string` method in `PromValidationMode`. - File affected: `proto.rs`. Signed-off-by: Lei, HUANG * fix clippy Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/servers/src/http.rs | 21 +++++++- src/servers/src/prom_row_builder.rs | 6 +-- src/servers/src/proto.rs | 78 +++++++++++------------------ 3 files changed, 51 insertions(+), 54 deletions(-) diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index dd7a8804ab..32406a37a5 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -28,7 +28,7 @@ use axum::{middleware, routing, Router}; use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_recordbatch::RecordBatch; -use common_telemetry::{error, info}; +use common_telemetry::{debug, error, info}; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; @@ -37,6 +37,7 @@ use datatypes::value::transform_value_ref_to_json_value; use event::{LogState, LogValidatorRef}; use futures::FutureExt; use http::{HeaderValue, Method}; +use prost::DecodeError; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::{ensure, ResultExt}; @@ -165,6 +166,24 @@ pub enum PromValidationMode { Unchecked, } +impl PromValidationMode { + /// Decodes provided bytes to [String] with optional UTF-8 validation. + pub fn decode_string(&self, bytes: &[u8]) -> std::result::Result { + let result = match self { + PromValidationMode::Strict => match String::from_utf8(bytes.to_vec()) { + Ok(s) => s, + Err(e) => { + debug!("Invalid UTF-8 string value: {:?}, error: {:?}", bytes, e); + return Err(DecodeError::new("invalid utf-8")); + } + }, + PromValidationMode::Lossy => String::from_utf8_lossy(bytes).to_string(), + PromValidationMode::Unchecked => unsafe { String::from_utf8_unchecked(bytes.to_vec()) }, + }; + Ok(result) + } +} + impl Default for HttpOptions { fn default() -> Self { Self { diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 95f9f4ee83..f23eefe642 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -24,7 +24,7 @@ use pipeline::{ContextOpt, ContextReq}; use prost::DecodeError; use crate::http::PromValidationMode; -use crate::proto::{decode_string, PromLabel}; +use crate::proto::PromLabel; use crate::repeated_field::Clear; // Prometheus remote write context @@ -150,8 +150,8 @@ impl TableBuilder { let mut row = vec![Value { value_data: None }; self.col_indexes.len()]; for PromLabel { name, value } in labels { - let tag_name = decode_string(name, prom_validation_mode)?; - let tag_value = decode_string(value, prom_validation_mode)?; + let tag_name = prom_validation_mode.decode_string(name)?; + let tag_value = prom_validation_mode.decode_string(value)?; let tag_value = Some(ValueData::StringValue(tag_value)); let tag_num = self.col_indexes.len(); diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 8b2a73461f..e111cf517f 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -20,7 +20,6 @@ use std::slice; use api::prom_store::remote::Sample; use bytes::{Buf, Bytes}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; -use common_telemetry::debug; use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value}; use prost::encoding::message::merge; use prost::encoding::{decode_key, decode_varint, WireType}; @@ -196,16 +195,16 @@ impl PromTimeSeries { match label.name.deref() { METRIC_NAME_LABEL_BYTES => { - self.table_name = decode_string(&label.value, prom_validation_mode)?; + self.table_name = prom_validation_mode.decode_string(&label.value)?; self.labels.truncate(self.labels.len() - 1); // remove last label } DATABASE_LABEL_BYTES => { - self.schema = Some(decode_string(&label.value, prom_validation_mode)?); + self.schema = Some(prom_validation_mode.decode_string(&label.value)?); self.labels.truncate(self.labels.len() - 1); // remove last label } PHYSICAL_TABLE_LABEL_BYTES => { self.physical_table = - Some(decode_string(&label.value, prom_validation_mode)?); + Some(prom_validation_mode.decode_string(&label.value)?); self.labels.truncate(self.labels.len() - 1); // remove last label } _ => {} @@ -258,29 +257,6 @@ impl PromTimeSeries { } } -/// Decodes bytes into String values according provided validation mode. -pub(crate) fn decode_string( - bytes: &Bytes, - mode: PromValidationMode, -) -> Result { - let result = match mode { - PromValidationMode::Strict => match String::from_utf8(bytes.to_vec()) { - Ok(s) => s, - Err(e) => { - debug!( - "Invalid UTF-8 string value: {:?}, error: {:?}", - &bytes[..], - e - ); - return Err(DecodeError::new("invalid utf-8")); - } - }, - PromValidationMode::Lossy => String::from_utf8_lossy(bytes).to_string(), - PromValidationMode::Unchecked => unsafe { String::from_utf8_unchecked(bytes.to_vec()) }, - }; - Ok(result) -} - #[derive(Default, Debug)] pub struct PromWriteRequest { table_data: TablesBuilder, @@ -332,7 +308,10 @@ impl PromWriteRequest { } if processor.use_pipeline { - processor.consume_series_to_pipeline_map(&mut self.series)?; + processor.consume_series_to_pipeline_map( + &mut self.series, + prom_validation_mode, + )?; } else { self.series .add_to_table_data(&mut self.table_data, prom_validation_mode)?; @@ -398,14 +377,13 @@ impl PromSeriesProcessor { pub(crate) fn consume_series_to_pipeline_map( &mut self, series: &mut PromTimeSeries, + prom_validation_mode: PromValidationMode, ) -> Result<(), DecodeError> { let mut vec_pipeline_map: Vec = Vec::new(); let mut pipeline_map = BTreeMap::new(); for l in series.labels.iter() { - let name = String::from_utf8(l.name.to_vec()) - .map_err(|_| DecodeError::new("invalid utf-8"))?; - let value = String::from_utf8(l.value.to_vec()) - .map_err(|_| DecodeError::new("invalid utf-8"))?; + let name = prom_validation_mode.decode_string(&l.name)?; + let value = prom_validation_mode.decode_string(&l.value)?; pipeline_map.insert(name, Value::String(value)); } @@ -478,7 +456,7 @@ mod tests { use crate::http::PromValidationMode; use crate::prom_store::to_grpc_row_insert_requests; - use crate::proto::{decode_string, PromSeriesProcessor, PromWriteRequest}; + use crate::proto::{PromSeriesProcessor, PromWriteRequest}; use crate::repeated_field::Clear; fn sort_rows(rows: Rows) -> Rows { @@ -559,7 +537,7 @@ mod tests { #[test] fn test_decode_string_strict_mode_valid_utf8() { let valid_utf8 = Bytes::from("hello world"); - let result = decode_string(&valid_utf8, PromValidationMode::Strict); + let result = PromValidationMode::Strict.decode_string(&valid_utf8); assert!(result.is_ok()); assert_eq!(result.unwrap(), "hello world"); } @@ -567,7 +545,7 @@ mod tests { #[test] fn test_decode_string_strict_mode_empty() { let empty = Bytes::new(); - let result = decode_string(&empty, PromValidationMode::Strict); + let result = PromValidationMode::Strict.decode_string(&empty); assert!(result.is_ok()); assert_eq!(result.unwrap(), ""); } @@ -575,7 +553,7 @@ mod tests { #[test] fn test_decode_string_strict_mode_unicode() { let unicode = Bytes::from("Hello ไธ–็•Œ ๐ŸŒ"); - let result = decode_string(&unicode, PromValidationMode::Strict); + let result = PromValidationMode::Strict.decode_string(&unicode); assert!(result.is_ok()); assert_eq!(result.unwrap(), "Hello ไธ–็•Œ ๐ŸŒ"); } @@ -584,7 +562,7 @@ mod tests { fn test_decode_string_strict_mode_invalid_utf8() { // Invalid UTF-8 sequence let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]); - let result = decode_string(&invalid_utf8, PromValidationMode::Strict); + let result = PromValidationMode::Strict.decode_string(&invalid_utf8); assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), @@ -596,7 +574,7 @@ mod tests { fn test_decode_string_strict_mode_incomplete_utf8() { // Incomplete UTF-8 sequence (missing continuation bytes) let incomplete_utf8 = Bytes::from(vec![0xC2]); // Start of 2-byte sequence but missing second byte - let result = decode_string(&incomplete_utf8, PromValidationMode::Strict); + let result = PromValidationMode::Strict.decode_string(&incomplete_utf8); assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), @@ -607,7 +585,7 @@ mod tests { #[test] fn test_decode_string_lossy_mode_valid_utf8() { let valid_utf8 = Bytes::from("hello world"); - let result = decode_string(&valid_utf8, PromValidationMode::Lossy); + let result = PromValidationMode::Lossy.decode_string(&valid_utf8); assert!(result.is_ok()); assert_eq!(result.unwrap(), "hello world"); } @@ -615,7 +593,7 @@ mod tests { #[test] fn test_decode_string_lossy_mode_empty() { let empty = Bytes::new(); - let result = decode_string(&empty, PromValidationMode::Lossy); + let result = PromValidationMode::Lossy.decode_string(&empty); assert!(result.is_ok()); assert_eq!(result.unwrap(), ""); } @@ -623,7 +601,7 @@ mod tests { #[test] fn test_decode_string_lossy_mode_unicode() { let unicode = Bytes::from("Hello ไธ–็•Œ ๐ŸŒ"); - let result = decode_string(&unicode, PromValidationMode::Lossy); + let result = PromValidationMode::Lossy.decode_string(&unicode); assert!(result.is_ok()); assert_eq!(result.unwrap(), "Hello ไธ–็•Œ ๐ŸŒ"); } @@ -632,7 +610,7 @@ mod tests { fn test_decode_string_lossy_mode_invalid_utf8() { // Invalid UTF-8 sequence - should be replaced with replacement character let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]); - let result = decode_string(&invalid_utf8, PromValidationMode::Lossy); + let result = PromValidationMode::Lossy.decode_string(&invalid_utf8); assert!(result.is_ok()); // Each invalid byte should be replaced with the Unicode replacement character assert_eq!(result.unwrap(), "๏ฟฝ๏ฟฝ๏ฟฝ"); @@ -647,7 +625,7 @@ mod tests { mixed.extend_from_slice(b"world"); let mixed_utf8 = Bytes::from(mixed); - let result = decode_string(&mixed_utf8, PromValidationMode::Lossy); + let result = PromValidationMode::Lossy.decode_string(&mixed_utf8); assert!(result.is_ok()); assert_eq!(result.unwrap(), "hello๏ฟฝworld"); } @@ -655,7 +633,7 @@ mod tests { #[test] fn test_decode_string_unchecked_mode_valid_utf8() { let valid_utf8 = Bytes::from("hello world"); - let result = decode_string(&valid_utf8, PromValidationMode::Unchecked); + let result = PromValidationMode::Unchecked.decode_string(&valid_utf8); assert!(result.is_ok()); assert_eq!(result.unwrap(), "hello world"); } @@ -663,7 +641,7 @@ mod tests { #[test] fn test_decode_string_unchecked_mode_empty() { let empty = Bytes::new(); - let result = decode_string(&empty, PromValidationMode::Unchecked); + let result = PromValidationMode::Unchecked.decode_string(&empty); assert!(result.is_ok()); assert_eq!(result.unwrap(), ""); } @@ -671,7 +649,7 @@ mod tests { #[test] fn test_decode_string_unchecked_mode_unicode() { let unicode = Bytes::from("Hello ไธ–็•Œ ๐ŸŒ"); - let result = decode_string(&unicode, PromValidationMode::Unchecked); + let result = PromValidationMode::Unchecked.decode_string(&unicode); assert!(result.is_ok()); assert_eq!(result.unwrap(), "Hello ไธ–็•Œ ๐ŸŒ"); } @@ -680,7 +658,7 @@ mod tests { fn test_decode_string_unchecked_mode_invalid_utf8() { // Invalid UTF-8 sequence - unchecked mode doesn't validate let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]); - let result = decode_string(&invalid_utf8, PromValidationMode::Unchecked); + let result = PromValidationMode::Unchecked.decode_string(&invalid_utf8); // This should succeed but the resulting string may contain invalid UTF-8 assert!(result.is_ok()); // We can't easily test the exact content since it's invalid UTF-8, @@ -693,9 +671,9 @@ mod tests { let ascii = Bytes::from("simple_ascii_123"); // All modes should handle ASCII identically - let strict_result = decode_string(&ascii, PromValidationMode::Strict).unwrap(); - let lossy_result = decode_string(&ascii, PromValidationMode::Lossy).unwrap(); - let unchecked_result = decode_string(&ascii, PromValidationMode::Unchecked).unwrap(); + let strict_result = PromValidationMode::Strict.decode_string(&ascii).unwrap(); + let lossy_result = PromValidationMode::Lossy.decode_string(&ascii).unwrap(); + let unchecked_result = PromValidationMode::Unchecked.decode_string(&ascii).unwrap(); assert_eq!(strict_result, "simple_ascii_123"); assert_eq!(lossy_result, "simple_ascii_123");