From 9afe327bca0ee236d62c2d807c4a95ebe8c6633c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 12 Mar 2024 20:00:38 +0800 Subject: [PATCH] feat: improve prom write requests decode performance (#3478) * feat: optimize decode performance * fix: some cr comments --- Cargo.lock | 1 + src/servers/Cargo.toml | 1 + src/servers/src/prom_row_builder.rs | 6 +- src/servers/src/proto.rs | 122 ++++++++++++++++++---------- 4 files changed, 86 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ae09096a9..c2b4dc0b5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9193,6 +9193,7 @@ dependencies = [ "derive_builder 0.12.0", "digest", "futures", + "hashbrown 0.14.3", "headers", "hex", "hostname", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index a9363a0efe..04e226e3cd 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -49,6 +49,7 @@ datatypes.workspace = true derive_builder.workspace = true digest = "0.10" futures = "0.3" +hashbrown = "0.14" headers = "0.3" hex = { version = "0.4" } hostname = "0.3.1" diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 20a049f472..0829973092 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; -use std::collections::HashMap; use std::string::ToString; use api::prom_store::remote::Sample; @@ -23,6 +21,8 @@ use api::v1::{ Value, }; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use hashbrown::hash_map::Entry; +use hashbrown::HashMap; use crate::proto::PromLabel; use crate::repeated_field::Clear; @@ -85,7 +85,7 @@ impl Default for TableBuilder { impl TableBuilder { pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self { - let mut col_indexes = HashMap::with_capacity(cols); + let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default()); col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0); col_indexes.insert(GREPTIME_VALUE.to_string(), 1); diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 1a96cd9ed8..adc9cc0ad3 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::ops::Deref; +use std::slice; use api::prom_store::remote::Sample; use api::v1::RowInsertRequests; use bytes::{Buf, Bytes}; use prost::encoding::message::merge; -use prost::encoding::{decode_key, decode_varint, DecodeContext, WireType}; +use prost::encoding::{decode_key, decode_varint, WireType}; use prost::DecodeError; use crate::prom_row_builder::TablesBuilder; @@ -36,29 +37,22 @@ pub struct PromLabel { } impl Clear for PromLabel { - fn clear(&mut self) { - self.name.clear(); - self.value.clear(); - } + fn clear(&mut self) {} } impl PromLabel { - pub fn merge_field( + pub fn merge_field( &mut self, tag: u32, wire_type: WireType, - buf: &mut B, - ctx: DecodeContext, - ) -> Result<(), DecodeError> - where - B: Buf, - { + buf: &mut Bytes, + ) -> Result<(), DecodeError> { const STRUCT_NAME: &str = "PromLabel"; match tag { 1u32 => { // decode label name let value = &mut self.name; - prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| { + merge_bytes(value, buf).map_err(|mut error| { error.push(STRUCT_NAME, "name"); error }) @@ -66,16 +60,69 @@ impl PromLabel { 2u32 => { // decode label value let value = &mut self.value; - prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| { + merge_bytes(value, buf).map_err(|mut error| { error.push(STRUCT_NAME, "value"); error }) } - _ => prost::encoding::skip_field(wire_type, tag, buf, ctx), + _ => prost::encoding::skip_field(wire_type, tag, buf, Default::default()), } } } +#[inline(always)] +fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes { + if len == data.remaining() { + std::mem::replace(data, Bytes::new()) + } else { + let ret = split_to(data, len); + data.advance(len); + ret + } +} + +/// Similar to `Bytes::split_to`, but directly operates on underlying memory region. +/// # Safety +/// This function is safe as long as `data` is backed by a consecutive region of memory, +/// for example `Vec` or `&[u8]`, and caller must ensure that `buf` outlives +/// the `Bytes` returned. +#[inline(always)] +fn split_to(buf: &mut Bytes, end: usize) -> Bytes { + let len = buf.len(); + assert!( + end <= len, + "range end out of bounds: {:?} <= {:?}", + end, + len, + ); + + if end == 0 { + return Bytes::new(); + } + + let ptr = buf.as_ptr(); + let x = unsafe { slice::from_raw_parts(ptr, end) }; + // `Bytes::drop` does nothing when it's built via `from_static`. + Bytes::from_static(x) +} + +/// Reads a variable-length encoded bytes field from `buf` and assign it to `value`. +/// # Safety +/// Callers must ensure `buf` outlives `value`. +#[inline(always)] +fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> { + let len = decode_varint(buf)?; + if len > buf.remaining() as u64 { + return Err(DecodeError::new(format!( + "buffer underflow, len: {}, remaining: {}", + len, + buf.remaining() + ))); + } + *value = copy_to_bytes(buf, len as usize); + Ok(()) +} + #[derive(Default)] pub struct PromTimeSeries { pub table_name: String, @@ -92,16 +139,12 @@ impl Clear for PromTimeSeries { } impl PromTimeSeries { - pub fn merge_field( + pub fn merge_field( &mut self, tag: u32, wire_type: WireType, - buf: &mut B, - ctx: DecodeContext, - ) -> Result<(), DecodeError> - where - B: Buf, - { + buf: &mut Bytes, + ) -> Result<(), DecodeError> { const STRUCT_NAME: &str = "PromTimeSeries"; match tag { 1u32 => { @@ -120,7 +163,7 @@ impl PromTimeSeries { let limit = remaining - len as usize; while buf.remaining() > limit { let (tag, wire_type) = decode_key(buf)?; - label.merge_field(tag, wire_type, buf, ctx.clone())?; + label.merge_field(tag, wire_type, buf)?; } if buf.remaining() != limit { return Err(DecodeError::new("delimited length exceeded")); @@ -135,15 +178,17 @@ impl PromTimeSeries { } 2u32 => { let sample = self.samples.push_default(); - merge(WireType::LengthDelimited, sample, buf, ctx).map_err(|mut error| { - error.push(STRUCT_NAME, "samples"); - error - })?; + merge(WireType::LengthDelimited, sample, buf, Default::default()).map_err( + |mut error| { + error.push(STRUCT_NAME, "samples"); + error + }, + )?; Ok(()) } - // skip exemplars - 3u32 => prost::encoding::skip_field(wire_type, tag, buf, ctx), - _ => prost::encoding::skip_field(wire_type, tag, buf, ctx), + // todo(hl): exemplars are skipped temporarily + 3u32 => prost::encoding::skip_field(wire_type, tag, buf, Default::default()), + _ => prost::encoding::skip_field(wire_type, tag, buf, Default::default()), } } @@ -178,13 +223,9 @@ impl PromWriteRequest { self.table_data.as_insert_requests() } - pub fn merge(&mut self, mut buf: B) -> Result<(), DecodeError> - where - B: Buf, - Self: Sized, - { + // todo(hl): maybe use &[u8] can reduce the overhead introduced with Bytes. + pub fn merge(&mut self, mut buf: Bytes) -> Result<(), DecodeError> { const STRUCT_NAME: &str = "PromWriteRequest"; - let ctx = DecodeContext::default(); while buf.has_remaining() { let (tag, wire_type) = decode_key(&mut buf)?; assert_eq!(WireType::LengthDelimited, wire_type); @@ -203,8 +244,7 @@ impl PromWriteRequest { let limit = remaining - len as usize; while buf.remaining() > limit { let (tag, wire_type) = decode_key(&mut buf)?; - self.series - .merge_field(tag, wire_type, &mut buf, ctx.clone())?; + self.series.merge_field(tag, wire_type, &mut buf)?; } if buf.remaining() != limit { return Err(DecodeError::new("delimited length exceeded")); @@ -212,10 +252,10 @@ impl PromWriteRequest { self.series.add_to_table_data(&mut self.table_data); } 3u32 => { - // we can ignore metadata for now. - prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?; + // todo(hl): metadata are skipped. + prost::encoding::skip_field(wire_type, tag, &mut buf, Default::default())?; } - _ => prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?, + _ => prost::encoding::skip_field(wire_type, tag, &mut buf, Default::default())?, } } Ok(())