fix(http): apply string validation mode to pipeline processor (#6378)

* fix/apply-string-validation-to-pipeline:
 ### Commit Summary

 - **Refactor `decode_string` Functionality**:
   - Moved `decode_string` logic into `PromValidationMode` as a method `decode_string`.
   - Updated all references to use the new method.
   - Files affected: `http.rs`, `prom_row_builder.rs`, `proto.rs`.

 - **Logging Enhancements**:
   - Added `debug` logging for invalid UTF-8 string values.
   - File affected: `http.rs`.

 - **Test Updates**:
   - Modified tests to use the new `decode_string` method in `PromValidationMode`.
   - File affected: `proto.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-26 02:56:35 +08:00
committed by GitHub
parent 1d07864b29
commit 4bb5d00a4b
3 changed files with 51 additions and 54 deletions

View File

@@ -28,7 +28,7 @@ use axum::{middleware, routing, Router};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_recordbatch::RecordBatch;
use common_telemetry::{error, info};
use common_telemetry::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datatypes::data_type::DataType;
@@ -37,6 +37,7 @@ use datatypes::value::transform_value_ref_to_json_value;
use event::{LogState, LogValidatorRef};
use futures::FutureExt;
use http::{HeaderValue, Method};
use prost::DecodeError;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::{ensure, ResultExt};
@@ -165,6 +166,24 @@ pub enum PromValidationMode {
Unchecked,
}
impl PromValidationMode {
/// Decodes provided bytes to [String] with optional UTF-8 validation.
pub fn decode_string(&self, bytes: &[u8]) -> std::result::Result<String, DecodeError> {
let result = match self {
PromValidationMode::Strict => match String::from_utf8(bytes.to_vec()) {
Ok(s) => s,
Err(e) => {
debug!("Invalid UTF-8 string value: {:?}, error: {:?}", bytes, e);
return Err(DecodeError::new("invalid utf-8"));
}
},
PromValidationMode::Lossy => String::from_utf8_lossy(bytes).to_string(),
PromValidationMode::Unchecked => unsafe { String::from_utf8_unchecked(bytes.to_vec()) },
};
Ok(result)
}
}
impl Default for HttpOptions {
fn default() -> Self {
Self {

View File

@@ -24,7 +24,7 @@ use pipeline::{ContextOpt, ContextReq};
use prost::DecodeError;
use crate::http::PromValidationMode;
use crate::proto::{decode_string, PromLabel};
use crate::proto::PromLabel;
use crate::repeated_field::Clear;
// Prometheus remote write context
@@ -150,8 +150,8 @@ impl TableBuilder {
let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
for PromLabel { name, value } in labels {
let tag_name = decode_string(name, prom_validation_mode)?;
let tag_value = decode_string(value, prom_validation_mode)?;
let tag_name = prom_validation_mode.decode_string(name)?;
let tag_value = prom_validation_mode.decode_string(value)?;
let tag_value = Some(ValueData::StringValue(tag_value));
let tag_num = self.col_indexes.len();

View File

@@ -20,7 +20,6 @@ use std::slice;
use api::prom_store::remote::Sample;
use bytes::{Buf, Bytes};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::debug;
use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value};
use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, WireType};
@@ -196,16 +195,16 @@ impl PromTimeSeries {
match label.name.deref() {
METRIC_NAME_LABEL_BYTES => {
self.table_name = decode_string(&label.value, prom_validation_mode)?;
self.table_name = prom_validation_mode.decode_string(&label.value)?;
self.labels.truncate(self.labels.len() - 1); // remove last label
}
DATABASE_LABEL_BYTES => {
self.schema = Some(decode_string(&label.value, prom_validation_mode)?);
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
self.labels.truncate(self.labels.len() - 1); // remove last label
}
PHYSICAL_TABLE_LABEL_BYTES => {
self.physical_table =
Some(decode_string(&label.value, prom_validation_mode)?);
Some(prom_validation_mode.decode_string(&label.value)?);
self.labels.truncate(self.labels.len() - 1); // remove last label
}
_ => {}
@@ -258,29 +257,6 @@ impl PromTimeSeries {
}
}
/// Decodes bytes into String values according provided validation mode.
pub(crate) fn decode_string(
bytes: &Bytes,
mode: PromValidationMode,
) -> Result<String, DecodeError> {
let result = match mode {
PromValidationMode::Strict => match String::from_utf8(bytes.to_vec()) {
Ok(s) => s,
Err(e) => {
debug!(
"Invalid UTF-8 string value: {:?}, error: {:?}",
&bytes[..],
e
);
return Err(DecodeError::new("invalid utf-8"));
}
},
PromValidationMode::Lossy => String::from_utf8_lossy(bytes).to_string(),
PromValidationMode::Unchecked => unsafe { String::from_utf8_unchecked(bytes.to_vec()) },
};
Ok(result)
}
#[derive(Default, Debug)]
pub struct PromWriteRequest {
table_data: TablesBuilder,
@@ -332,7 +308,10 @@ impl PromWriteRequest {
}
if processor.use_pipeline {
processor.consume_series_to_pipeline_map(&mut self.series)?;
processor.consume_series_to_pipeline_map(
&mut self.series,
prom_validation_mode,
)?;
} else {
self.series
.add_to_table_data(&mut self.table_data, prom_validation_mode)?;
@@ -398,14 +377,13 @@ impl PromSeriesProcessor {
pub(crate) fn consume_series_to_pipeline_map(
&mut self,
series: &mut PromTimeSeries,
prom_validation_mode: PromValidationMode,
) -> Result<(), DecodeError> {
let mut vec_pipeline_map: Vec<Value> = Vec::new();
let mut pipeline_map = BTreeMap::new();
for l in series.labels.iter() {
let name = String::from_utf8(l.name.to_vec())
.map_err(|_| DecodeError::new("invalid utf-8"))?;
let value = String::from_utf8(l.value.to_vec())
.map_err(|_| DecodeError::new("invalid utf-8"))?;
let name = prom_validation_mode.decode_string(&l.name)?;
let value = prom_validation_mode.decode_string(&l.value)?;
pipeline_map.insert(name, Value::String(value));
}
@@ -478,7 +456,7 @@ mod tests {
use crate::http::PromValidationMode;
use crate::prom_store::to_grpc_row_insert_requests;
use crate::proto::{decode_string, PromSeriesProcessor, PromWriteRequest};
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
use crate::repeated_field::Clear;
fn sort_rows(rows: Rows) -> Rows {
@@ -559,7 +537,7 @@ mod tests {
#[test]
fn test_decode_string_strict_mode_valid_utf8() {
let valid_utf8 = Bytes::from("hello world");
let result = decode_string(&valid_utf8, PromValidationMode::Strict);
let result = PromValidationMode::Strict.decode_string(&valid_utf8);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "hello world");
}
@@ -567,7 +545,7 @@ mod tests {
#[test]
fn test_decode_string_strict_mode_empty() {
let empty = Bytes::new();
let result = decode_string(&empty, PromValidationMode::Strict);
let result = PromValidationMode::Strict.decode_string(&empty);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "");
}
@@ -575,7 +553,7 @@ mod tests {
#[test]
fn test_decode_string_strict_mode_unicode() {
let unicode = Bytes::from("Hello 世界 🌍");
let result = decode_string(&unicode, PromValidationMode::Strict);
let result = PromValidationMode::Strict.decode_string(&unicode);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "Hello 世界 🌍");
}
@@ -584,7 +562,7 @@ mod tests {
fn test_decode_string_strict_mode_invalid_utf8() {
// Invalid UTF-8 sequence
let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]);
let result = decode_string(&invalid_utf8, PromValidationMode::Strict);
let result = PromValidationMode::Strict.decode_string(&invalid_utf8);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
@@ -596,7 +574,7 @@ mod tests {
fn test_decode_string_strict_mode_incomplete_utf8() {
// Incomplete UTF-8 sequence (missing continuation bytes)
let incomplete_utf8 = Bytes::from(vec![0xC2]); // Start of 2-byte sequence but missing second byte
let result = decode_string(&incomplete_utf8, PromValidationMode::Strict);
let result = PromValidationMode::Strict.decode_string(&incomplete_utf8);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
@@ -607,7 +585,7 @@ mod tests {
#[test]
fn test_decode_string_lossy_mode_valid_utf8() {
let valid_utf8 = Bytes::from("hello world");
let result = decode_string(&valid_utf8, PromValidationMode::Lossy);
let result = PromValidationMode::Lossy.decode_string(&valid_utf8);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "hello world");
}
@@ -615,7 +593,7 @@ mod tests {
#[test]
fn test_decode_string_lossy_mode_empty() {
let empty = Bytes::new();
let result = decode_string(&empty, PromValidationMode::Lossy);
let result = PromValidationMode::Lossy.decode_string(&empty);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "");
}
@@ -623,7 +601,7 @@ mod tests {
#[test]
fn test_decode_string_lossy_mode_unicode() {
let unicode = Bytes::from("Hello 世界 🌍");
let result = decode_string(&unicode, PromValidationMode::Lossy);
let result = PromValidationMode::Lossy.decode_string(&unicode);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "Hello 世界 🌍");
}
@@ -632,7 +610,7 @@ mod tests {
fn test_decode_string_lossy_mode_invalid_utf8() {
// Invalid UTF-8 sequence - should be replaced with replacement character
let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]);
let result = decode_string(&invalid_utf8, PromValidationMode::Lossy);
let result = PromValidationMode::Lossy.decode_string(&invalid_utf8);
assert!(result.is_ok());
// Each invalid byte should be replaced with the Unicode replacement character
assert_eq!(result.unwrap(), "<EFBFBD><EFBFBD><EFBFBD>");
@@ -647,7 +625,7 @@ mod tests {
mixed.extend_from_slice(b"world");
let mixed_utf8 = Bytes::from(mixed);
let result = decode_string(&mixed_utf8, PromValidationMode::Lossy);
let result = PromValidationMode::Lossy.decode_string(&mixed_utf8);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "hello<EFBFBD>world");
}
@@ -655,7 +633,7 @@ mod tests {
#[test]
fn test_decode_string_unchecked_mode_valid_utf8() {
let valid_utf8 = Bytes::from("hello world");
let result = decode_string(&valid_utf8, PromValidationMode::Unchecked);
let result = PromValidationMode::Unchecked.decode_string(&valid_utf8);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "hello world");
}
@@ -663,7 +641,7 @@ mod tests {
#[test]
fn test_decode_string_unchecked_mode_empty() {
let empty = Bytes::new();
let result = decode_string(&empty, PromValidationMode::Unchecked);
let result = PromValidationMode::Unchecked.decode_string(&empty);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "");
}
@@ -671,7 +649,7 @@ mod tests {
#[test]
fn test_decode_string_unchecked_mode_unicode() {
let unicode = Bytes::from("Hello 世界 🌍");
let result = decode_string(&unicode, PromValidationMode::Unchecked);
let result = PromValidationMode::Unchecked.decode_string(&unicode);
assert!(result.is_ok());
assert_eq!(result.unwrap(), "Hello 世界 🌍");
}
@@ -680,7 +658,7 @@ mod tests {
fn test_decode_string_unchecked_mode_invalid_utf8() {
// Invalid UTF-8 sequence - unchecked mode doesn't validate
let invalid_utf8 = Bytes::from(vec![0xFF, 0xFE, 0xFD]);
let result = decode_string(&invalid_utf8, PromValidationMode::Unchecked);
let result = PromValidationMode::Unchecked.decode_string(&invalid_utf8);
// This should succeed but the resulting string may contain invalid UTF-8
assert!(result.is_ok());
// We can't easily test the exact content since it's invalid UTF-8,
@@ -693,9 +671,9 @@ mod tests {
let ascii = Bytes::from("simple_ascii_123");
// All modes should handle ASCII identically
let strict_result = decode_string(&ascii, PromValidationMode::Strict).unwrap();
let lossy_result = decode_string(&ascii, PromValidationMode::Lossy).unwrap();
let unchecked_result = decode_string(&ascii, PromValidationMode::Unchecked).unwrap();
let strict_result = PromValidationMode::Strict.decode_string(&ascii).unwrap();
let lossy_result = PromValidationMode::Lossy.decode_string(&ascii).unwrap();
let unchecked_result = PromValidationMode::Unchecked.decode_string(&ascii).unwrap();
assert_eq!(strict_result, "simple_ascii_123");
assert_eq!(lossy_result, "simple_ascii_123");