From 7ba23a999bbfdf75eb7de6f6b2c56a0205054cb4 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 3 Mar 2026 14:29:32 +0800 Subject: [PATCH] perf: prom decode (#7737) * perf/prom-decode: **Refactor `PromLabel` to Use Raw Byte Slices** - Updated `PromLabel` struct in `proto.rs` to use `RawBytes` for `name` and `value` fields, replacing `Bytes` with static byte slices. - Modified test cases in `prom_row_builder.rs` to accommodate changes in `PromLabel` by using byte literals. - Simplified `merge_bytes` function in `proto.rs` to directly assign byte slices, removing unnecessary memory operations. Signed-off-by: Lei, HUANG * perf/prom-decode: - **Add UTF-8 Validation**: Introduced `validate_bytes` method in `http.rs` to validate UTF-8 encoding using `simdutf8` for `PromValidationMode::Strict`. - **Update Column Indexing**: Modified `prom_row_builder.rs` to use `Vec` for `col_indexes` keys, ensuring UTF-8 validation for label names. - **Dependency Update**: Added `simdutf8` version `0.1.5` to `Cargo.toml` and updated `Cargo.lock` to include this new dependency. Signed-off-by: Lei, HUANG * fix: style issues Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- Cargo.lock | 1 + src/servers/Cargo.toml | 1 + src/servers/src/http.rs | 10 ++++ src/servers/src/prom_row_builder.rs | 67 ++++++++++++++------------- src/servers/src/proto.rs | 72 ++++++++--------------------- 5 files changed, 65 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d619c5c994..26db6b11f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12222,6 +12222,7 @@ dependencies = [ "serde_json", "session", "simd-json", + "simdutf8", "snafu 0.8.6", "snap", "socket2 0.5.10", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 0bab854dc5..e75192c9ba 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -112,6 +112,7 @@ serde.workspace = true serde_json.workspace = true session.workspace = true simd-json.workspace = true +simdutf8 = "0.1" snafu.workspace = true snap = "1" socket2 = "0.5" diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 5211360985..53402fdd54 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -192,6 +192,16 @@ impl PromValidationMode { }; Ok(result) } + + pub(crate) fn validate_bytes(&self, bytes: &[u8]) -> std::result::Result<(), DecodeError> { + match self { + PromValidationMode::Strict => { + simdutf8::basic::from_utf8(bytes).map_err(|_| DecodeError::new("invalid utf-8"))?; + Ok(()) + } + PromValidationMode::Lossy | PromValidationMode::Unchecked => Ok(()), + } + } } impl Default for HttpOptions { diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index b17048a4dd..df38f9b5fe 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -13,11 +13,9 @@ // limitations under the License. use std::collections::HashMap; -use std::collections::hash_map::Entry; -use std::string::ToString; use api::prom_store::remote::Sample; -use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema}; +use api::v1::helper::{field_column_schema, time_index_column_schema}; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use common_query::prelude::{greptime_timestamp, greptime_value}; @@ -102,7 +100,7 @@ pub struct TableBuilder { /// Rows written. rows: Vec, /// Indices of columns inside `schema`. - col_indexes: HashMap, + col_indexes: HashMap, usize>, } impl Default for TableBuilder { @@ -114,8 +112,8 @@ impl Default for TableBuilder { impl TableBuilder { pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self { let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default()); - col_indexes.insert(greptime_timestamp().to_string(), 0); - col_indexes.insert(greptime_value().to_string(), 1); + col_indexes.insert(greptime_timestamp().as_bytes().to_owned(), 0); + col_indexes.insert(greptime_value().as_bytes().to_owned(), 1); let mut schema = Vec::with_capacity(cols); schema.push(time_index_column_schema( @@ -144,24 +142,29 @@ impl TableBuilder { let mut row = vec![Value { value_data: None }; self.col_indexes.len()]; for PromLabel { name, value } in labels { - let tag_name = prom_validation_mode.decode_string(name)?; - let tag_value = prom_validation_mode.decode_string(value)?; - let tag_value = Some(ValueData::StringValue(tag_value)); + prom_validation_mode.validate_bytes(name)?; + let raw_tag_name = name; + let tag_value = Some(ValueData::StringValue( + prom_validation_mode.decode_string(value)?, + )); let tag_num = self.col_indexes.len(); - match self.col_indexes.entry(tag_name) { - Entry::Occupied(e) => { - row[*e.get()].value_data = tag_value; - } - Entry::Vacant(e) => { - self.schema - .push(tag_column_schema(e.key(), ColumnDataType::String)); - e.insert(tag_num); - row.push(Value { - value_data: tag_value, - }); - } + if let Some(e) = self.col_indexes.get_mut(*raw_tag_name) { + row[*e].value_data = tag_value; + continue; } + let tag_name = prom_validation_mode.decode_string(raw_tag_name)?; + self.schema.push(ColumnSchema { + column_name: tag_name.clone(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }); + self.col_indexes.insert(tag_name.into_bytes(), tag_num); + + row.push(Value { + value_data: tag_value, + }); } if samples.len() == 1 { @@ -212,8 +215,6 @@ mod tests { use api::prom_store::remote::Sample; use api::v1::Value; use api::v1::value::ValueData; - use arrow::datatypes::ToByteSlice; - use bytes::Bytes; use prost::DecodeError; use crate::http::PromValidationMode; @@ -225,12 +226,12 @@ mod tests { let _ = builder.add_labels_and_samples( &[ PromLabel { - name: Bytes::from("tag0"), - value: Bytes::from("v0"), + name: b"tag0", + value: b"v0", }, PromLabel { - name: Bytes::from("tag1"), - value: Bytes::from("v1"), + name: b"tag1", + value: b"v1", }, ], &[Sample { @@ -243,12 +244,12 @@ mod tests { let _ = builder.add_labels_and_samples( &[ PromLabel { - name: Bytes::from("tag0"), - value: Bytes::from("v0"), + name: b"tag0", + value: b"v0", }, PromLabel { - name: Bytes::from("tag2"), - value: Bytes::from("v2"), + name: b"tag2", + value: b"v2", }, ], &[Sample { @@ -304,8 +305,8 @@ mod tests { let res = builder.add_labels_and_samples( &[PromLabel { - name: Bytes::from("tag0"), - value: invalid_utf8_bytes.to_byte_slice().into(), + name: b"tag0", + value: invalid_utf8_bytes, }], &[Sample { value: 0.1, diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 79e5b38e77..d46489124a 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -14,7 +14,6 @@ use std::collections::BTreeMap; use std::collections::btree_map::Entry; -use std::ops::Deref; use std::slice; use api::prom_store::remote::Sample; @@ -42,6 +41,8 @@ use crate::prom_store::{ use crate::query_handler::PipelineHandlerRef; use crate::repeated_field::{Clear, RepeatedField}; +pub type RawBytes = &'static [u8]; + impl Clear for Sample { fn clear(&mut self) { self.timestamp = 0; @@ -51,8 +52,8 @@ impl Clear for Sample { #[derive(Default, Clone, Debug)] pub struct PromLabel { - pub name: Bytes, - pub value: Bytes, + pub name: RawBytes, + pub value: RawBytes, } impl Clear for PromLabel { @@ -92,56 +93,21 @@ impl PromLabel { } } -#[inline(always)] -fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes { - if len == data.remaining() { - std::mem::replace(data, Bytes::new()) - } else { - let ret = split_to(data, len); - data.advance(len); - ret - } -} - -/// Similar to `Bytes::split_to`, but directly operates on underlying memory region. +/// Reads a variable-length encoded bytes field from `src` and assign it to `dst`. /// # Safety -/// This function is safe as long as `data` is backed by a consecutive region of memory, -/// for example `Vec` or `&[u8]`, and caller must ensure that `buf` outlives -/// the `Bytes` returned. +/// Callers must ensure `src` outlives `dst`. #[inline(always)] -fn split_to(buf: &mut Bytes, end: usize) -> Bytes { - let len = buf.len(); - assert!( - end <= len, - "range end out of bounds: {:?} <= {:?}", - end, - len, - ); - - if end == 0 { - return Bytes::new(); - } - - let ptr = buf.as_ptr(); - let x = unsafe { slice::from_raw_parts(ptr, end) }; - // `Bytes::drop` does nothing when it's built via `from_static`. - Bytes::from_static(x) -} - -/// Reads a variable-length encoded bytes field from `buf` and assign it to `value`. -/// # Safety -/// Callers must ensure `buf` outlives `value`. -#[inline(always)] -fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> { - let len = decode_varint(buf)?; - if len > buf.remaining() as u64 { +fn merge_bytes(dst: &mut RawBytes, src: &mut Bytes) -> Result<(), DecodeError> { + let len = decode_varint(src)? as usize; + if len > src.remaining() { return Err(DecodeError::new(format!( "buffer underflow, len: {}, remaining: {}", len, - buf.remaining() + src.remaining() ))); } - *value = copy_to_bytes(buf, len as usize); + *dst = unsafe { slice::from_raw_parts(src.as_ptr(), len) }; + src.advance(len); Ok(()) } @@ -197,26 +163,26 @@ impl PromTimeSeries { return Err(DecodeError::new("delimited length exceeded")); } - match label.name.deref() { + match label.name { METRIC_NAME_LABEL_BYTES => { - self.table_name = prom_validation_mode.decode_string(&label.value)?; + self.table_name = prom_validation_mode.decode_string(label.value)?; self.labels.truncate(self.labels.len() - 1); // remove last label } #[allow(deprecated)] crate::prom_store::SCHEMA_LABEL_BYTES => { - self.schema = Some(prom_validation_mode.decode_string(&label.value)?); + self.schema = Some(prom_validation_mode.decode_string(label.value)?); self.labels.truncate(self.labels.len() - 1); // remove last label } DATABASE_LABEL_BYTES | DATABASE_LABEL_ALT_BYTES => { // Only set schema from __database__ if __schema__ hasn't been set yet if self.schema.is_none() { - self.schema = Some(prom_validation_mode.decode_string(&label.value)?); + self.schema = Some(prom_validation_mode.decode_string(label.value)?); } self.labels.truncate(self.labels.len() - 1); // remove last label } PHYSICAL_TABLE_LABEL_BYTES | PHYSICAL_TABLE_LABEL_ALT_BYTES => { self.physical_table = - Some(prom_validation_mode.decode_string(&label.value)?); + Some(prom_validation_mode.decode_string(label.value)?); self.labels.truncate(self.labels.len() - 1); // remove last label } _ => {} @@ -394,8 +360,8 @@ impl PromSeriesProcessor { let mut vec_pipeline_map = Vec::new(); let mut pipeline_map = BTreeMap::new(); for l in series.labels.iter() { - let name = prom_validation_mode.decode_string(&l.name)?; - let value = prom_validation_mode.decode_string(&l.value)?; + let name = prom_validation_mode.decode_string(l.name)?; + let value = prom_validation_mode.decode_string(l.value)?; pipeline_map.insert(KeyString::from(name), VrlValue::Bytes(value.into())); }