perf: prom decode (#7737)

* perf/prom-decode:
 **Refactor `PromLabel` to Use Raw Byte Slices**

 - Updated `PromLabel` struct in `proto.rs` to use `RawBytes` for `name` and `value` fields, replacing `Bytes` with static byte slices.
 - Modified test cases in `prom_row_builder.rs` to accommodate changes in `PromLabel` by using byte literals.
 - Simplified `merge_bytes` function in `proto.rs` to directly assign byte slices, removing unnecessary memory operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/prom-decode:
 - **Add UTF-8 Validation**: Introduced `validate_bytes` method in `http.rs` to validate UTF-8 encoding using `simdutf8` for `PromValidationMode::Strict`.
 - **Update Column Indexing**: Modified `prom_row_builder.rs` to use `Vec<u8>` for `col_indexes` keys, ensuring UTF-8 validation for label names.
 - **Dependency Update**: Added `simdutf8` version `0.1.5` to `Cargo.toml` and updated `Cargo.lock` to include this new dependency.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: style issues

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-03-03 14:29:32 +08:00
committed by GitHub
parent be78137165
commit 7ba23a999b
5 changed files with 65 additions and 86 deletions

1
Cargo.lock generated
View File

@@ -12222,6 +12222,7 @@ dependencies = [
"serde_json",
"session",
"simd-json",
"simdutf8",
"snafu 0.8.6",
"snap",
"socket2 0.5.10",

View File

@@ -112,6 +112,7 @@ serde.workspace = true
serde_json.workspace = true
session.workspace = true
simd-json.workspace = true
simdutf8 = "0.1"
snafu.workspace = true
snap = "1"
socket2 = "0.5"

View File

@@ -192,6 +192,16 @@ impl PromValidationMode {
};
Ok(result)
}
pub(crate) fn validate_bytes(&self, bytes: &[u8]) -> std::result::Result<(), DecodeError> {
match self {
PromValidationMode::Strict => {
simdutf8::basic::from_utf8(bytes).map_err(|_| DecodeError::new("invalid utf-8"))?;
Ok(())
}
PromValidationMode::Lossy | PromValidationMode::Unchecked => Ok(()),
}
}
}
impl Default for HttpOptions {

View File

@@ -13,11 +13,9 @@
// limitations under the License.
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::string::ToString;
use api::prom_store::remote::Sample;
use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema};
use api::v1::helper::{field_column_schema, time_index_column_schema};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
use common_query::prelude::{greptime_timestamp, greptime_value};
@@ -102,7 +100,7 @@ pub struct TableBuilder {
/// Rows written.
rows: Vec<Row>,
/// Indices of columns inside `schema`.
col_indexes: HashMap<String, usize>,
col_indexes: HashMap<Vec<u8>, usize>,
}
impl Default for TableBuilder {
@@ -114,8 +112,8 @@ impl Default for TableBuilder {
impl TableBuilder {
pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
col_indexes.insert(greptime_timestamp().to_string(), 0);
col_indexes.insert(greptime_value().to_string(), 1);
col_indexes.insert(greptime_timestamp().as_bytes().to_owned(), 0);
col_indexes.insert(greptime_value().as_bytes().to_owned(), 1);
let mut schema = Vec::with_capacity(cols);
schema.push(time_index_column_schema(
@@ -144,24 +142,29 @@ impl TableBuilder {
let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
for PromLabel { name, value } in labels {
let tag_name = prom_validation_mode.decode_string(name)?;
let tag_value = prom_validation_mode.decode_string(value)?;
let tag_value = Some(ValueData::StringValue(tag_value));
prom_validation_mode.validate_bytes(name)?;
let raw_tag_name = name;
let tag_value = Some(ValueData::StringValue(
prom_validation_mode.decode_string(value)?,
));
let tag_num = self.col_indexes.len();
match self.col_indexes.entry(tag_name) {
Entry::Occupied(e) => {
row[*e.get()].value_data = tag_value;
}
Entry::Vacant(e) => {
self.schema
.push(tag_column_schema(e.key(), ColumnDataType::String));
e.insert(tag_num);
row.push(Value {
value_data: tag_value,
});
}
if let Some(e) = self.col_indexes.get_mut(*raw_tag_name) {
row[*e].value_data = tag_value;
continue;
}
let tag_name = prom_validation_mode.decode_string(raw_tag_name)?;
self.schema.push(ColumnSchema {
column_name: tag_name.clone(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
});
self.col_indexes.insert(tag_name.into_bytes(), tag_num);
row.push(Value {
value_data: tag_value,
});
}
if samples.len() == 1 {
@@ -212,8 +215,6 @@ mod tests {
use api::prom_store::remote::Sample;
use api::v1::Value;
use api::v1::value::ValueData;
use arrow::datatypes::ToByteSlice;
use bytes::Bytes;
use prost::DecodeError;
use crate::http::PromValidationMode;
@@ -225,12 +226,12 @@ mod tests {
let _ = builder.add_labels_and_samples(
&[
PromLabel {
name: Bytes::from("tag0"),
value: Bytes::from("v0"),
name: b"tag0",
value: b"v0",
},
PromLabel {
name: Bytes::from("tag1"),
value: Bytes::from("v1"),
name: b"tag1",
value: b"v1",
},
],
&[Sample {
@@ -243,12 +244,12 @@ mod tests {
let _ = builder.add_labels_and_samples(
&[
PromLabel {
name: Bytes::from("tag0"),
value: Bytes::from("v0"),
name: b"tag0",
value: b"v0",
},
PromLabel {
name: Bytes::from("tag2"),
value: Bytes::from("v2"),
name: b"tag2",
value: b"v2",
},
],
&[Sample {
@@ -304,8 +305,8 @@ mod tests {
let res = builder.add_labels_and_samples(
&[PromLabel {
name: Bytes::from("tag0"),
value: invalid_utf8_bytes.to_byte_slice().into(),
name: b"tag0",
value: invalid_utf8_bytes,
}],
&[Sample {
value: 0.1,

View File

@@ -14,7 +14,6 @@
use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use std::ops::Deref;
use std::slice;
use api::prom_store::remote::Sample;
@@ -42,6 +41,8 @@ use crate::prom_store::{
use crate::query_handler::PipelineHandlerRef;
use crate::repeated_field::{Clear, RepeatedField};
pub type RawBytes = &'static [u8];
impl Clear for Sample {
fn clear(&mut self) {
self.timestamp = 0;
@@ -51,8 +52,8 @@ impl Clear for Sample {
#[derive(Default, Clone, Debug)]
pub struct PromLabel {
pub name: Bytes,
pub value: Bytes,
pub name: RawBytes,
pub value: RawBytes,
}
impl Clear for PromLabel {
@@ -92,56 +93,21 @@ impl PromLabel {
}
}
#[inline(always)]
fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes {
if len == data.remaining() {
std::mem::replace(data, Bytes::new())
} else {
let ret = split_to(data, len);
data.advance(len);
ret
}
}
/// Similar to `Bytes::split_to`, but directly operates on underlying memory region.
/// Reads a variable-length encoded bytes field from `src` and assign it to `dst`.
/// # Safety
/// This function is safe as long as `data` is backed by a consecutive region of memory,
/// for example `Vec<u8>` or `&[u8]`, and caller must ensure that `buf` outlives
/// the `Bytes` returned.
/// Callers must ensure `src` outlives `dst`.
#[inline(always)]
fn split_to(buf: &mut Bytes, end: usize) -> Bytes {
let len = buf.len();
assert!(
end <= len,
"range end out of bounds: {:?} <= {:?}",
end,
len,
);
if end == 0 {
return Bytes::new();
}
let ptr = buf.as_ptr();
let x = unsafe { slice::from_raw_parts(ptr, end) };
// `Bytes::drop` does nothing when it's built via `from_static`.
Bytes::from_static(x)
}
/// Reads a variable-length encoded bytes field from `buf` and assign it to `value`.
/// # Safety
/// Callers must ensure `buf` outlives `value`.
#[inline(always)]
fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> {
let len = decode_varint(buf)?;
if len > buf.remaining() as u64 {
fn merge_bytes(dst: &mut RawBytes, src: &mut Bytes) -> Result<(), DecodeError> {
let len = decode_varint(src)? as usize;
if len > src.remaining() {
return Err(DecodeError::new(format!(
"buffer underflow, len: {}, remaining: {}",
len,
buf.remaining()
src.remaining()
)));
}
*value = copy_to_bytes(buf, len as usize);
*dst = unsafe { slice::from_raw_parts(src.as_ptr(), len) };
src.advance(len);
Ok(())
}
@@ -197,26 +163,26 @@ impl PromTimeSeries {
return Err(DecodeError::new("delimited length exceeded"));
}
match label.name.deref() {
match label.name {
METRIC_NAME_LABEL_BYTES => {
self.table_name = prom_validation_mode.decode_string(&label.value)?;
self.table_name = prom_validation_mode.decode_string(label.value)?;
self.labels.truncate(self.labels.len() - 1); // remove last label
}
#[allow(deprecated)]
crate::prom_store::SCHEMA_LABEL_BYTES => {
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
self.schema = Some(prom_validation_mode.decode_string(label.value)?);
self.labels.truncate(self.labels.len() - 1); // remove last label
}
DATABASE_LABEL_BYTES | DATABASE_LABEL_ALT_BYTES => {
// Only set schema from __database__ if __schema__ hasn't been set yet
if self.schema.is_none() {
self.schema = Some(prom_validation_mode.decode_string(&label.value)?);
self.schema = Some(prom_validation_mode.decode_string(label.value)?);
}
self.labels.truncate(self.labels.len() - 1); // remove last label
}
PHYSICAL_TABLE_LABEL_BYTES | PHYSICAL_TABLE_LABEL_ALT_BYTES => {
self.physical_table =
Some(prom_validation_mode.decode_string(&label.value)?);
Some(prom_validation_mode.decode_string(label.value)?);
self.labels.truncate(self.labels.len() - 1); // remove last label
}
_ => {}
@@ -394,8 +360,8 @@ impl PromSeriesProcessor {
let mut vec_pipeline_map = Vec::new();
let mut pipeline_map = BTreeMap::new();
for l in series.labels.iter() {
let name = prom_validation_mode.decode_string(&l.name)?;
let value = prom_validation_mode.decode_string(&l.value)?;
let name = prom_validation_mode.decode_string(l.name)?;
let value = prom_validation_mode.decode_string(l.value)?;
pipeline_map.insert(KeyString::from(name), VrlValue::Bytes(value.into()));
}