feat: improve prom write requests decode performance (#3478)

* feat: optimize decode performance

* fix: some cr comments
This commit is contained in:
Lei, HUANG
2024-03-12 20:00:38 +08:00
committed by GitHub
parent 58bd065c6b
commit 9afe327bca
4 changed files with 86 additions and 44 deletions

1
Cargo.lock generated
View File

@@ -9193,6 +9193,7 @@ dependencies = [
"derive_builder 0.12.0",
"digest",
"futures",
"hashbrown 0.14.3",
"headers",
"hex",
"hostname",

View File

@@ -49,6 +49,7 @@ datatypes.workspace = true
derive_builder.workspace = true
digest = "0.10"
futures = "0.3"
hashbrown = "0.14"
headers = "0.3"
hex = { version = "0.4" }
hostname = "0.3.1"

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::string::ToString;
use api::prom_store::remote::Sample;
@@ -23,6 +21,8 @@ use api::v1::{
Value,
};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use hashbrown::hash_map::Entry;
use hashbrown::HashMap;
use crate::proto::PromLabel;
use crate::repeated_field::Clear;
@@ -85,7 +85,7 @@ impl Default for TableBuilder {
impl TableBuilder {
pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
let mut col_indexes = HashMap::with_capacity(cols);
let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
col_indexes.insert(GREPTIME_VALUE.to_string(), 1);

View File

@@ -13,12 +13,13 @@
// limitations under the License.
use std::ops::Deref;
use std::slice;
use api::prom_store::remote::Sample;
use api::v1::RowInsertRequests;
use bytes::{Buf, Bytes};
use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, DecodeContext, WireType};
use prost::encoding::{decode_key, decode_varint, WireType};
use prost::DecodeError;
use crate::prom_row_builder::TablesBuilder;
@@ -36,29 +37,22 @@ pub struct PromLabel {
}
impl Clear for PromLabel {
fn clear(&mut self) {
self.name.clear();
self.value.clear();
}
fn clear(&mut self) {}
}
impl PromLabel {
pub fn merge_field<B>(
pub fn merge_field(
&mut self,
tag: u32,
wire_type: WireType,
buf: &mut B,
ctx: DecodeContext,
) -> Result<(), DecodeError>
where
B: Buf,
{
buf: &mut Bytes,
) -> Result<(), DecodeError> {
const STRUCT_NAME: &str = "PromLabel";
match tag {
1u32 => {
// decode label name
let value = &mut self.name;
prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| {
merge_bytes(value, buf).map_err(|mut error| {
error.push(STRUCT_NAME, "name");
error
})
@@ -66,16 +60,69 @@ impl PromLabel {
2u32 => {
// decode label value
let value = &mut self.value;
prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| {
merge_bytes(value, buf).map_err(|mut error| {
error.push(STRUCT_NAME, "value");
error
})
}
_ => prost::encoding::skip_field(wire_type, tag, buf, ctx),
_ => prost::encoding::skip_field(wire_type, tag, buf, Default::default()),
}
}
}
#[inline(always)]
fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes {
if len == data.remaining() {
std::mem::replace(data, Bytes::new())
} else {
let ret = split_to(data, len);
data.advance(len);
ret
}
}
/// Similar to `Bytes::split_to`, but directly operates on underlying memory region.
/// # Safety
/// This function is safe as long as `data` is backed by a consecutive region of memory,
/// for example `Vec<u8>` or `&[u8]`, and caller must ensure that `buf` outlives
/// the `Bytes` returned.
#[inline(always)]
fn split_to(buf: &mut Bytes, end: usize) -> Bytes {
let len = buf.len();
assert!(
end <= len,
"range end out of bounds: {:?} <= {:?}",
end,
len,
);
if end == 0 {
return Bytes::new();
}
let ptr = buf.as_ptr();
let x = unsafe { slice::from_raw_parts(ptr, end) };
// `Bytes::drop` does nothing when it's built via `from_static`.
Bytes::from_static(x)
}
/// Reads a variable-length encoded bytes field from `buf` and assign it to `value`.
/// # Safety
/// Callers must ensure `buf` outlives `value`.
#[inline(always)]
fn merge_bytes(value: &mut Bytes, buf: &mut Bytes) -> Result<(), DecodeError> {
let len = decode_varint(buf)?;
if len > buf.remaining() as u64 {
return Err(DecodeError::new(format!(
"buffer underflow, len: {}, remaining: {}",
len,
buf.remaining()
)));
}
*value = copy_to_bytes(buf, len as usize);
Ok(())
}
#[derive(Default)]
pub struct PromTimeSeries {
pub table_name: String,
@@ -92,16 +139,12 @@ impl Clear for PromTimeSeries {
}
impl PromTimeSeries {
pub fn merge_field<B>(
pub fn merge_field(
&mut self,
tag: u32,
wire_type: WireType,
buf: &mut B,
ctx: DecodeContext,
) -> Result<(), DecodeError>
where
B: Buf,
{
buf: &mut Bytes,
) -> Result<(), DecodeError> {
const STRUCT_NAME: &str = "PromTimeSeries";
match tag {
1u32 => {
@@ -120,7 +163,7 @@ impl PromTimeSeries {
let limit = remaining - len as usize;
while buf.remaining() > limit {
let (tag, wire_type) = decode_key(buf)?;
label.merge_field(tag, wire_type, buf, ctx.clone())?;
label.merge_field(tag, wire_type, buf)?;
}
if buf.remaining() != limit {
return Err(DecodeError::new("delimited length exceeded"));
@@ -135,15 +178,17 @@ impl PromTimeSeries {
}
2u32 => {
let sample = self.samples.push_default();
merge(WireType::LengthDelimited, sample, buf, ctx).map_err(|mut error| {
error.push(STRUCT_NAME, "samples");
error
})?;
merge(WireType::LengthDelimited, sample, buf, Default::default()).map_err(
|mut error| {
error.push(STRUCT_NAME, "samples");
error
},
)?;
Ok(())
}
// skip exemplars
3u32 => prost::encoding::skip_field(wire_type, tag, buf, ctx),
_ => prost::encoding::skip_field(wire_type, tag, buf, ctx),
// todo(hl): exemplars are skipped temporarily
3u32 => prost::encoding::skip_field(wire_type, tag, buf, Default::default()),
_ => prost::encoding::skip_field(wire_type, tag, buf, Default::default()),
}
}
@@ -178,13 +223,9 @@ impl PromWriteRequest {
self.table_data.as_insert_requests()
}
pub fn merge<B>(&mut self, mut buf: B) -> Result<(), DecodeError>
where
B: Buf,
Self: Sized,
{
// todo(hl): maybe use &[u8] can reduce the overhead introduced with Bytes.
pub fn merge(&mut self, mut buf: Bytes) -> Result<(), DecodeError> {
const STRUCT_NAME: &str = "PromWriteRequest";
let ctx = DecodeContext::default();
while buf.has_remaining() {
let (tag, wire_type) = decode_key(&mut buf)?;
assert_eq!(WireType::LengthDelimited, wire_type);
@@ -203,8 +244,7 @@ impl PromWriteRequest {
let limit = remaining - len as usize;
while buf.remaining() > limit {
let (tag, wire_type) = decode_key(&mut buf)?;
self.series
.merge_field(tag, wire_type, &mut buf, ctx.clone())?;
self.series.merge_field(tag, wire_type, &mut buf)?;
}
if buf.remaining() != limit {
return Err(DecodeError::new("delimited length exceeded"));
@@ -212,10 +252,10 @@ impl PromWriteRequest {
self.series.add_to_table_data(&mut self.table_data);
}
3u32 => {
// we can ignore metadata for now.
prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?;
// todo(hl): metadata are skipped.
prost::encoding::skip_field(wire_type, tag, &mut buf, Default::default())?;
}
_ => prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?,
_ => prost::encoding::skip_field(wire_type, tag, &mut buf, Default::default())?,
}
}
Ok(())