perf: sparse encoder (#6809)

* perf/sparse-encoder:
 - **Update Dependencies**: Updated `criterion-plot` to version `0.5.0` and added `criterion` version `0.7.0` in `Cargo.lock`. Added `bytes` to `Cargo.toml` in `src/metric-engine`.
 - **Benchmarking**: Added a new benchmark for sparse encoding in `bench_sparse_encoding.rs` and updated `Cargo.toml` in `src/mito-codec` to include `criterion` as a dev-dependency.
 - **Sparse Encoding Enhancements**: Modified `SparsePrimaryKeyCodec` in `sparse.rs` to include new methods `encode_raw_tag_value` and `encode_internal`. Added public constants `RESERVED_COLUMN_ID_TSID` and `RESERVED_COLUMN_ID_TABLE_ID`.
 - **HTTP Server**: Made `try_decompress` function public in `prom_store.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/sparse-encoder:
 Improve buffer handling in `sparse.rs`

 - Refactored buffer reservation logic to use `value_len` for clarity.
 - Optimized chunk processing by calculating `num_chunks` and `remainder` for efficient data handling.
 - Enhanced manual serialization of bytes to avoid byte-by-byte operations, improving performance.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* Update src/mito-codec/src/row_converter/sparse.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Lei, HUANG
2025-08-26 12:10:11 +08:00
committed by GitHub
parent eb5e627ddd
commit d99734b97b
6 changed files with 234 additions and 14 deletions

39
Cargo.lock generated
View File

@@ -2912,7 +2912,7 @@ dependencies = [
"cast",
"ciborium",
"clap 3.2.25",
"criterion-plot",
"criterion-plot 0.5.0",
"futures",
"itertools 0.10.5",
"lazy_static",
@@ -2939,7 +2939,7 @@ dependencies = [
"cast",
"ciborium",
"clap 4.5.40",
"criterion-plot",
"criterion-plot 0.5.0",
"is-terminal",
"itertools 0.10.5",
"num-traits",
@@ -2955,6 +2955,29 @@ dependencies = [
"walkdir",
]
[[package]]
name = "criterion"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1c047a62b0cc3e145fa84415a3191f628e980b194c2755aa12300a4e6cbd928"
dependencies = [
"anes",
"cast",
"ciborium",
"clap 4.5.40",
"criterion-plot 0.6.0",
"itertools 0.13.0",
"num-traits",
"oorandom",
"plotters",
"rayon",
"regex",
"serde",
"serde_json",
"tinytemplate",
"walkdir",
]
[[package]]
name = "criterion-plot"
version = "0.5.0"
@@ -2965,6 +2988,16 @@ dependencies = [
"itertools 0.10.5",
]
[[package]]
name = "criterion-plot"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b1bcc0dc7dfae599d84ad0b1a55f80cde8af3725da8313b528da95ef783e338"
dependencies = [
"cast",
"itertools 0.13.0",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
@@ -7433,6 +7466,7 @@ dependencies = [
"async-stream",
"async-trait",
"base64 0.22.1",
"bytes",
"common-base",
"common-error",
"common-macro",
@@ -7529,6 +7563,7 @@ dependencies = [
"common-recordbatch",
"common-telemetry",
"common-time",
"criterion 0.7.0",
"datafusion-common",
"datafusion-expr",
"datatypes",

View File

@@ -13,6 +13,7 @@ aquamarine.workspace = true
async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -26,5 +26,11 @@ snafu.workspace = true
store-api.workspace = true
[dev-dependencies]
criterion = "0.7"
datafusion-common.workspace = true
datafusion-expr.workspace = true
[[bench]]
name = "bench_sparse_encoding"
harness = false
required-features = ["testing"]

View File

@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hint::black_box;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use datatypes::prelude::ValueRef;
use mito_codec::row_converter::sparse::{RESERVED_COLUMN_ID_TABLE_ID, RESERVED_COLUMN_ID_TSID};
use mito_codec::row_converter::SparsePrimaryKeyCodec;
fn encode_sparse(c: &mut Criterion) {
let num_tags = 10;
let codec = SparsePrimaryKeyCodec::from_columns(0..num_tags);
let dummy_table_id = 1024;
let dummy_ts_id = 42;
let internal_columns = [
(
RESERVED_COLUMN_ID_TABLE_ID,
ValueRef::UInt32(dummy_table_id),
),
(RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(dummy_ts_id)),
];
let tags: Vec<_> = (0..num_tags)
.map(|idx| {
let tag_value = idx.to_string().repeat(10);
(idx, tag_value)
})
.collect();
let mut group = c.benchmark_group("encode");
group.bench_function("encode_to_vec", |b| {
b.iter(|| {
let mut buffer = Vec::new();
codec
.encode_to_vec(internal_columns.into_iter(), &mut buffer)
.unwrap();
codec
.encode_to_vec(
tags.iter()
.map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))),
&mut buffer,
)
.unwrap();
black_box(buffer);
});
});
let tags: Vec<_> = tags
.into_iter()
.map(|(col_id, tag_value)| (col_id, Bytes::copy_from_slice(tag_value.as_bytes())))
.collect();
group.bench_function("encode_by_raw", |b| {
b.iter(|| {
let mut buffer_by_raw_encoding = Vec::new();
codec
.encode_internal(dummy_table_id, dummy_ts_id, &mut buffer_by_raw_encoding)
.unwrap();
codec
.encode_raw_tag_value(
tags.iter().map(|(c, b)| (*c, b)),
&mut buffer_by_raw_encoding,
)
.unwrap();
black_box(buffer_by_raw_encoding);
});
});
group.finish();
}
criterion_group!(benches, encode_sparse);
criterion_main!(benches);

View File

@@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use bytes::{BufMut, Bytes};
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
@@ -85,32 +86,31 @@ impl SparseValues {
}
/// The column id of the tsid.
const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
pub const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid();
/// The column id of the table id.
const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
/// The size of the column id in the encoded sparse row.
pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
impl SparsePrimaryKeyCodec {
/// Creates a new [`SparsePrimaryKeyCodec`] instance.
///
/// The `region_metadata` should be the metadata of the logical region.
pub fn new(region_metadata: &RegionMetadataRef) -> Self {
pub fn from_columns(columns_ids: impl Iterator<Item = ColumnId>) -> Self {
let columns = columns_ids.collect();
Self {
inner: Arc::new(SparsePrimaryKeyCodecInner {
table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
label_field: SortField::new(ConcreteDataType::string_datatype()),
columns: Some(
region_metadata
.primary_key_columns()
.map(|c| c.column_id)
.collect(),
),
columns: Some(columns),
}),
}
}
/// Creates a new [`SparsePrimaryKeyCodec`] instance.
pub fn new(region_metadata: &RegionMetadataRef) -> Self {
Self::from_columns(region_metadata.primary_key_columns().map(|c| c.column_id))
}
/// Returns a new [`SparsePrimaryKeyCodec`] instance.
///
/// It treats all unknown columns as primary key(label field).
@@ -177,6 +177,54 @@ impl SparsePrimaryKeyCodec {
Ok(())
}
pub fn encode_raw_tag_value<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = (ColumnId, &'a Bytes)>,
{
for (tag_column_id, tag_value) in row {
let value_len = tag_value.len();
buffer.reserve(6 + value_len / 8 * 9);
buffer.put_u32(tag_column_id);
buffer.put_u8(1);
buffer.put_u8(!tag_value.is_empty() as u8);
// Manual implementation of memcomparable::ser::Serializer::serialize_bytes
// to avoid byte-by-byte put.
let mut len = 0;
let num_chucks = value_len / 8;
let remainder = value_len % 8;
for idx in 0..num_chucks {
buffer.extend_from_slice(&tag_value[idx * 8..idx * 8 + 8]);
len += 8;
// append an extra byte that signals the number of significant bytes in this chunk
// 1-8: many bytes were significant and this group is the last group
// 9: all 8 bytes were significant and there is more data to come
let extra = if len == value_len { 8 } else { 9 };
buffer.put_u8(extra);
}
if remainder != 0 {
buffer.extend_from_slice(&tag_value[len..value_len]);
buffer.put_bytes(0, 8 - remainder);
buffer.put_u8(remainder as u8);
}
}
Ok(())
}
/// Encodes the given bytes into a [`SparseValues`].
pub fn encode_internal(&self, table_id: u32, tsid: u64, buffer: &mut Vec<u8>) -> Result<()> {
buffer.reserve_exact(22);
buffer.put_u32(RESERVED_COLUMN_ID_TABLE_ID);
buffer.put_u8(1);
buffer.put_u32(table_id);
buffer.put_u32(RESERVED_COLUMN_ID_TSID);
buffer.put_u8(1);
buffer.put_u64(tsid);
Ok(())
}
/// Decodes the given bytes into a [`SparseValues`].
fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
let mut deserializer = Deserializer::new(bytes);
@@ -487,6 +535,50 @@ mod tests {
]
}
#[test]
fn test_encode_by_short_cuts() {
let region_metadata = test_region_metadata();
let codec = SparsePrimaryKeyCodec::new(&region_metadata);
let mut buffer = Vec::new();
let internal_columns = [
(RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(1024)),
(RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(42)),
];
let tags = [
(1, "greptime-frontend-6989d9899-22222"),
(2, "greptime-cluster"),
(3, "greptime-frontend-6989d9899-22222"),
(4, "greptime-frontend-6989d9899-22222"),
(5, "10.10.10.10"),
];
codec
.encode_to_vec(internal_columns.into_iter(), &mut buffer)
.unwrap();
codec
.encode_to_vec(
tags.iter()
.map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))),
&mut buffer,
)
.unwrap();
let mut buffer_by_raw_encoding = Vec::new();
codec
.encode_internal(1024, 42, &mut buffer_by_raw_encoding)
.unwrap();
let tags: Vec<_> = tags
.into_iter()
.map(|(col_id, tag_value)| (col_id, Bytes::from_static(tag_value.as_bytes())))
.collect();
codec
.encode_raw_tag_value(
tags.iter().map(|(c, b)| (*c, b)),
&mut buffer_by_raw_encoding,
)
.unwrap();
assert_eq!(buffer, buffer_by_raw_encoding);
}
#[test]
fn test_encode_to_vec() {
let region_metadata = test_region_metadata();

View File

@@ -209,7 +209,7 @@ pub async fn remote_read(
state.prom_store_handler.read(request, query_ctx).await
}
fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
pub fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
Ok(Bytes::from(if is_zstd {
zstd_decompress(body)?
} else {