diff --git a/Cargo.lock b/Cargo.lock index 11981e1b5d..9d06e8a660 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2912,7 +2912,7 @@ dependencies = [ "cast", "ciborium", "clap 3.2.25", - "criterion-plot", + "criterion-plot 0.5.0", "futures", "itertools 0.10.5", "lazy_static", @@ -2939,7 +2939,7 @@ dependencies = [ "cast", "ciborium", "clap 4.5.40", - "criterion-plot", + "criterion-plot 0.5.0", "is-terminal", "itertools 0.10.5", "num-traits", @@ -2955,6 +2955,29 @@ dependencies = [ "walkdir", ] +[[package]] +name = "criterion" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1c047a62b0cc3e145fa84415a3191f628e980b194c2755aa12300a4e6cbd928" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.5.40", + "criterion-plot 0.6.0", + "itertools 0.13.0", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_json", + "tinytemplate", + "walkdir", +] + [[package]] name = "criterion-plot" version = "0.5.0" @@ -2965,6 +2988,16 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "criterion-plot" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b1bcc0dc7dfae599d84ad0b1a55f80cde8af3725da8313b528da95ef783e338" +dependencies = [ + "cast", + "itertools 0.13.0", +] + [[package]] name = "crossbeam" version = "0.8.4" @@ -7433,6 +7466,7 @@ dependencies = [ "async-stream", "async-trait", "base64 0.22.1", + "bytes", "common-base", "common-error", "common-macro", @@ -7529,6 +7563,7 @@ dependencies = [ "common-recordbatch", "common-telemetry", "common-time", + "criterion 0.7.0", "datafusion-common", "datafusion-expr", "datatypes", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 5c750499e2..3d4e025f7e 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -13,6 +13,7 @@ aquamarine.workspace = true async-stream.workspace = true async-trait.workspace = true base64.workspace = true +bytes.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true diff --git a/src/mito-codec/Cargo.toml b/src/mito-codec/Cargo.toml index 3fddf7c145..a96d24d963 100644 --- a/src/mito-codec/Cargo.toml +++ b/src/mito-codec/Cargo.toml @@ -26,5 +26,11 @@ snafu.workspace = true store-api.workspace = true [dev-dependencies] +criterion = "0.7" datafusion-common.workspace = true datafusion-expr.workspace = true + +[[bench]] +name = "bench_sparse_encoding" +harness = false +required-features = ["testing"] diff --git a/src/mito-codec/benches/bench_sparse_encoding.rs b/src/mito-codec/benches/bench_sparse_encoding.rs new file mode 100644 index 0000000000..b9dc7f3fa2 --- /dev/null +++ b/src/mito-codec/benches/bench_sparse_encoding.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hint::black_box; + +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use datatypes::prelude::ValueRef; +use mito_codec::row_converter::sparse::{RESERVED_COLUMN_ID_TABLE_ID, RESERVED_COLUMN_ID_TSID}; +use mito_codec::row_converter::SparsePrimaryKeyCodec; + +fn encode_sparse(c: &mut Criterion) { + let num_tags = 10; + let codec = SparsePrimaryKeyCodec::from_columns(0..num_tags); + + let dummy_table_id = 1024; + let dummy_ts_id = 42; + let internal_columns = [ + ( + RESERVED_COLUMN_ID_TABLE_ID, + ValueRef::UInt32(dummy_table_id), + ), + (RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(dummy_ts_id)), + ]; + + let tags: Vec<_> = (0..num_tags) + .map(|idx| { + let tag_value = idx.to_string().repeat(10); + (idx, tag_value) + }) + .collect(); + + let mut group = c.benchmark_group("encode"); + group.bench_function("encode_to_vec", |b| { + b.iter(|| { + let mut buffer = Vec::new(); + codec + .encode_to_vec(internal_columns.into_iter(), &mut buffer) + .unwrap(); + codec + .encode_to_vec( + tags.iter() + .map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))), + &mut buffer, + ) + .unwrap(); + black_box(buffer); + }); + }); + + let tags: Vec<_> = tags + .into_iter() + .map(|(col_id, tag_value)| (col_id, Bytes::copy_from_slice(tag_value.as_bytes()))) + .collect(); + + group.bench_function("encode_by_raw", |b| { + b.iter(|| { + let mut buffer_by_raw_encoding = Vec::new(); + codec + .encode_internal(dummy_table_id, dummy_ts_id, &mut buffer_by_raw_encoding) + .unwrap(); + codec + .encode_raw_tag_value( + tags.iter().map(|(c, b)| (*c, b)), + &mut buffer_by_raw_encoding, + ) + .unwrap(); + black_box(buffer_by_raw_encoding); + }); + }); + group.finish(); +} + +criterion_group!(benches, encode_sparse); +criterion_main!(benches); diff --git a/src/mito-codec/src/row_converter/sparse.rs b/src/mito-codec/src/row_converter/sparse.rs index b858058506..114e56b92e 100644 --- a/src/mito-codec/src/row_converter/sparse.rs +++ b/src/mito-codec/src/row_converter/sparse.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use bytes::{BufMut, Bytes}; use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::prelude::ConcreteDataType; use datatypes::value::{Value, ValueRef}; @@ -85,32 +86,31 @@ impl SparseValues { } /// The column id of the tsid. -const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid(); +pub const RESERVED_COLUMN_ID_TSID: ColumnId = ReservedColumnId::tsid(); /// The column id of the table id. -const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id(); +pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id(); /// The size of the column id in the encoded sparse row. pub const COLUMN_ID_ENCODE_SIZE: usize = 4; impl SparsePrimaryKeyCodec { /// Creates a new [`SparsePrimaryKeyCodec`] instance. - /// - /// The `region_metadata` should be the metadata of the logical region. - pub fn new(region_metadata: &RegionMetadataRef) -> Self { + pub fn from_columns(columns_ids: impl Iterator) -> Self { + let columns = columns_ids.collect(); Self { inner: Arc::new(SparsePrimaryKeyCodecInner { table_id_field: SortField::new(ConcreteDataType::uint32_datatype()), tsid_field: SortField::new(ConcreteDataType::uint64_datatype()), label_field: SortField::new(ConcreteDataType::string_datatype()), - columns: Some( - region_metadata - .primary_key_columns() - .map(|c| c.column_id) - .collect(), - ), + columns: Some(columns), }), } } + /// Creates a new [`SparsePrimaryKeyCodec`] instance. + pub fn new(region_metadata: &RegionMetadataRef) -> Self { + Self::from_columns(region_metadata.primary_key_columns().map(|c| c.column_id)) + } + /// Returns a new [`SparsePrimaryKeyCodec`] instance. /// /// It treats all unknown columns as primary key(label field). @@ -177,6 +177,54 @@ impl SparsePrimaryKeyCodec { Ok(()) } + pub fn encode_raw_tag_value<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator, + { + for (tag_column_id, tag_value) in row { + let value_len = tag_value.len(); + buffer.reserve(6 + value_len / 8 * 9); + buffer.put_u32(tag_column_id); + buffer.put_u8(1); + buffer.put_u8(!tag_value.is_empty() as u8); + + // Manual implementation of memcomparable::ser::Serializer::serialize_bytes + // to avoid byte-by-byte put. + let mut len = 0; + let num_chucks = value_len / 8; + let remainder = value_len % 8; + + for idx in 0..num_chucks { + buffer.extend_from_slice(&tag_value[idx * 8..idx * 8 + 8]); + len += 8; + // append an extra byte that signals the number of significant bytes in this chunk + // 1-8: many bytes were significant and this group is the last group + // 9: all 8 bytes were significant and there is more data to come + let extra = if len == value_len { 8 } else { 9 }; + buffer.put_u8(extra); + } + + if remainder != 0 { + buffer.extend_from_slice(&tag_value[len..value_len]); + buffer.put_bytes(0, 8 - remainder); + buffer.put_u8(remainder as u8); + } + } + Ok(()) + } + + /// Encodes the given bytes into a [`SparseValues`]. + pub fn encode_internal(&self, table_id: u32, tsid: u64, buffer: &mut Vec) -> Result<()> { + buffer.reserve_exact(22); + buffer.put_u32(RESERVED_COLUMN_ID_TABLE_ID); + buffer.put_u8(1); + buffer.put_u32(table_id); + buffer.put_u32(RESERVED_COLUMN_ID_TSID); + buffer.put_u8(1); + buffer.put_u64(tsid); + Ok(()) + } + /// Decodes the given bytes into a [`SparseValues`]. fn decode_sparse(&self, bytes: &[u8]) -> Result { let mut deserializer = Deserializer::new(bytes); @@ -487,6 +535,50 @@ mod tests { ] } + #[test] + fn test_encode_by_short_cuts() { + let region_metadata = test_region_metadata(); + let codec = SparsePrimaryKeyCodec::new(®ion_metadata); + let mut buffer = Vec::new(); + let internal_columns = [ + (RESERVED_COLUMN_ID_TABLE_ID, ValueRef::UInt32(1024)), + (RESERVED_COLUMN_ID_TSID, ValueRef::UInt64(42)), + ]; + let tags = [ + (1, "greptime-frontend-6989d9899-22222"), + (2, "greptime-cluster"), + (3, "greptime-frontend-6989d9899-22222"), + (4, "greptime-frontend-6989d9899-22222"), + (5, "10.10.10.10"), + ]; + codec + .encode_to_vec(internal_columns.into_iter(), &mut buffer) + .unwrap(); + codec + .encode_to_vec( + tags.iter() + .map(|(col_id, tag_value)| (*col_id, ValueRef::String(tag_value))), + &mut buffer, + ) + .unwrap(); + + let mut buffer_by_raw_encoding = Vec::new(); + codec + .encode_internal(1024, 42, &mut buffer_by_raw_encoding) + .unwrap(); + let tags: Vec<_> = tags + .into_iter() + .map(|(col_id, tag_value)| (col_id, Bytes::from_static(tag_value.as_bytes()))) + .collect(); + codec + .encode_raw_tag_value( + tags.iter().map(|(c, b)| (*c, b)), + &mut buffer_by_raw_encoding, + ) + .unwrap(); + assert_eq!(buffer, buffer_by_raw_encoding); + } + #[test] fn test_encode_to_vec() { let region_metadata = test_region_metadata(); diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index ad657af07c..5f23125008 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -209,7 +209,7 @@ pub async fn remote_read( state.prom_store_handler.read(request, query_ctx).await } -fn try_decompress(is_zstd: bool, body: &[u8]) -> Result { +pub fn try_decompress(is_zstd: bool, body: &[u8]) -> Result { Ok(Bytes::from(if is_zstd { zstd_decompress(body)? } else {