mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 14:30:43 +00:00
perf(mito-codec): optimize SparseValues decode and lookup (#8057)
* perf: add benchmarks for SparsePrimaryKeyCodec::has_column Add benchmarks covering table_id, tsid, first_tag, and last_tag lookups across 5, 10, 50, and 100 tag counts to measure the cost of the offset map construction in has_column. Signed-off-by: evenyag <realevenyag@gmail.com> * perf: handle table id/tsid specially Signed-off-by: evenyag <realevenyag@gmail.com> * perf: lazy decode Signed-off-by: evenyag <realevenyag@gmail.com> * chore: use vec for small tags Signed-off-by: evenyag <realevenyag@gmail.com> * feat: add bench Signed-off-by: evenyag <realevenyag@gmail.com> * perf: use 32 as inline capacity Signed-off-by: evenyag <realevenyag@gmail.com> * perf: benchmark sparse value Signed-off-by: evenyag <realevenyag@gmail.com> * feat: change sparse values to use vec Signed-off-by: evenyag <realevenyag@gmail.com> * chore: reserve capacity Signed-off-by: evenyag <realevenyag@gmail.com> * chore: simplify comments Signed-off-by: evenyag <realevenyag@gmail.com> * docs: update comment Signed-off-by: evenyag <realevenyag@gmail.com> * chore: update benchmark for map sparse values Signed-off-by: evenyag <realevenyag@gmail.com> * docs: update comment Signed-off-by: evenyag <realevenyag@gmail.com> * chore: remove empty check Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -25,7 +25,8 @@ use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use mito_codec::row_converter::{
|
||||
DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
|
||||
DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparseOffsetsCache,
|
||||
SparsePrimaryKeyCodec,
|
||||
};
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
@@ -182,7 +183,7 @@ fn matches_sparse_scalar(
|
||||
codec: &SparsePrimaryKeyCodec,
|
||||
filters: &[SimpleFilterEvaluator],
|
||||
pk: &[u8],
|
||||
offsets_map: &mut std::collections::HashMap<ColumnId, usize>,
|
||||
offsets_map: &mut SparseOffsetsCache,
|
||||
) -> bool {
|
||||
offsets_map.clear();
|
||||
if filters.is_empty() || metadata.primary_key.is_empty() {
|
||||
@@ -252,7 +253,7 @@ fn bench_primary_key_filter(c: &mut Criterion) {
|
||||
let sparse_pk = encode_sparse_pk(&metadata, &row);
|
||||
let sparse_codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone(), false);
|
||||
let mut sparse_offsets = std::collections::HashMap::new();
|
||||
let mut sparse_offsets = SparseOffsetsCache::new();
|
||||
|
||||
let mut group = c.benchmark_group(format!("primary_key_filter/{case_name}"));
|
||||
|
||||
|
||||
@@ -12,13 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::hint::black_box;
|
||||
|
||||
use bytes::Bytes;
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use criterion::measurement::WallTime;
|
||||
use criterion::{BenchmarkGroup, Criterion, criterion_group, criterion_main};
|
||||
use datatypes::prelude::ValueRef;
|
||||
use mito_codec::row_converter::SparsePrimaryKeyCodec;
|
||||
use datatypes::value::Value;
|
||||
use mito_codec::row_converter::sparse::{RESERVED_COLUMN_ID_TABLE_ID, RESERVED_COLUMN_ID_TSID};
|
||||
use mito_codec::row_converter::{SparseOffsetsCache, SparsePrimaryKeyCodec};
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
fn encode_sparse(c: &mut Criterion) {
|
||||
let num_tags = 10;
|
||||
@@ -82,5 +86,424 @@ fn encode_sparse(c: &mut Criterion) {
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, encode_sparse);
|
||||
/// Encodes a primary key with the given number of tags.
|
||||
fn encode_pk(num_tags: u32) -> (SparsePrimaryKeyCodec, Vec<u8>) {
|
||||
// Use schemaless() so all columns (including reserved ones) are recognized during parsing.
|
||||
let codec = SparsePrimaryKeyCodec::schemaless();
|
||||
let dummy_table_id = 1024u32;
|
||||
let dummy_tsid = 42u64;
|
||||
|
||||
let tags: Vec<_> = (0..num_tags)
|
||||
.map(|idx| {
|
||||
let tag_value = idx.to_string().repeat(10);
|
||||
(idx, Bytes::copy_from_slice(tag_value.as_bytes()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
codec
|
||||
.encode_internal(dummy_table_id, dummy_tsid, &mut buffer)
|
||||
.unwrap();
|
||||
codec
|
||||
.encode_raw_tag_value(tags.iter().map(|(c, b)| (*c, &b[..])), &mut buffer)
|
||||
.unwrap();
|
||||
|
||||
(codec, buffer)
|
||||
}
|
||||
|
||||
fn bench_has_column(c: &mut Criterion) {
|
||||
for num_tags in [5, 10, 50, 100] {
|
||||
let (codec, pk) = encode_pk(num_tags);
|
||||
let mut group = c.benchmark_group(format!("has_column/{num_tags}_tags"));
|
||||
|
||||
group.bench_function("table_id", |b| {
|
||||
b.iter(|| {
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
black_box(codec.has_column(&pk, &mut offsets_map, RESERVED_COLUMN_ID_TABLE_ID));
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("tsid", |b| {
|
||||
b.iter(|| {
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
black_box(codec.has_column(&pk, &mut offsets_map, RESERVED_COLUMN_ID_TSID));
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("first_tag", |b| {
|
||||
b.iter(|| {
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
black_box(codec.has_column(&pk, &mut offsets_map, 0));
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("middle_tag", |b| {
|
||||
b.iter(|| {
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
black_box(codec.has_column(&pk, &mut offsets_map, num_tags / 2));
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("last_tag", |b| {
|
||||
b.iter(|| {
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
black_box(codec.has_column(&pk, &mut offsets_map, num_tags - 1));
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmarks Vec linear scan vs HashMap lookup at various collection sizes
|
||||
/// to find the optimal `SPARSE_OFFSETS_INLINE_CAP` threshold.
|
||||
fn bench_inline_threshold(c: &mut Criterion) {
|
||||
for size in [4, 8, 12, 16, 20, 24, 32, 48, 64] {
|
||||
let vec: Vec<(u32, usize)> = (0..size).map(|i| (i as u32, i * 8)).collect();
|
||||
let map: HashMap<u32, usize> = vec.iter().copied().collect();
|
||||
|
||||
let last_id = (size - 1) as u32;
|
||||
let missing_id = size as u32;
|
||||
|
||||
let mut group = c.benchmark_group(format!("inline_threshold/{size}"));
|
||||
|
||||
// Vec: best case (first element)
|
||||
group.bench_function("vec_first", |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(0u32);
|
||||
for entry in &vec {
|
||||
if entry.0 == target {
|
||||
return black_box(Some(entry.1));
|
||||
}
|
||||
}
|
||||
black_box(None)
|
||||
});
|
||||
});
|
||||
|
||||
// Vec: worst case (last element)
|
||||
group.bench_function("vec_last", |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(last_id);
|
||||
for entry in &vec {
|
||||
if entry.0 == target {
|
||||
return black_box(Some(entry.1));
|
||||
}
|
||||
}
|
||||
black_box(None)
|
||||
});
|
||||
});
|
||||
|
||||
// Vec: miss
|
||||
group.bench_function("vec_miss", |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(missing_id);
|
||||
for entry in &vec {
|
||||
if entry.0 == target {
|
||||
return black_box(Some(entry.1));
|
||||
}
|
||||
}
|
||||
black_box(None)
|
||||
});
|
||||
});
|
||||
|
||||
// HashMap: hit (last element)
|
||||
group.bench_function("map_hit", |b| {
|
||||
b.iter(|| black_box(map.get(&black_box(last_id)).copied()));
|
||||
});
|
||||
|
||||
// HashMap: miss
|
||||
group.bench_function("map_miss", |b| {
|
||||
b.iter(|| black_box(map.get(&black_box(missing_id)).copied()));
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
}
|
||||
|
||||
/// Lookup-strategy comparison for `SparseValues`-shaped collections.
|
||||
///
|
||||
/// `SparseValues` is now a `Vec<(ColumnId, Value)>` with linear-scan lookups;
|
||||
/// these benches keep the `HashMap` and `HybridValues<CAP>` baselines around
|
||||
/// to track the crossover and to make it easy to revisit the choice later.
|
||||
/// The companion `SparseOffsetsCache` uses a small-vec fast path with
|
||||
/// `SPARSE_OFFSETS_INLINE_CAP = 32` entries against a `usize` payload;
|
||||
/// `Value` is a much fatter enum, so its crossover may land elsewhere.
|
||||
/// The benches measure both the crossover (`bench_sparse_values_threshold`)
|
||||
/// and the workload impact at realistic primary-key widths
|
||||
/// (`bench_sparse_values_lookup`).
|
||||
fn sample_value(idx: u32) -> Value {
|
||||
Value::String(idx.to_string().repeat(10).into())
|
||||
}
|
||||
|
||||
/// Common lookup surface for the variants under test.
|
||||
trait ValueLookup {
|
||||
fn lookup_get(&self, col: ColumnId) -> Option<&Value>;
|
||||
fn lookup_or_null(&self, col: ColumnId) -> &Value;
|
||||
}
|
||||
|
||||
impl ValueLookup for HashMap<ColumnId, Value> {
|
||||
fn lookup_get(&self, col: ColumnId) -> Option<&Value> {
|
||||
self.get(&col)
|
||||
}
|
||||
fn lookup_or_null(&self, col: ColumnId) -> &Value {
|
||||
self.get(&col).unwrap_or(&Value::Null)
|
||||
}
|
||||
}
|
||||
|
||||
impl ValueLookup for Vec<(ColumnId, Value)> {
|
||||
fn lookup_get(&self, col: ColumnId) -> Option<&Value> {
|
||||
for entry in self {
|
||||
if entry.0 == col {
|
||||
return Some(&entry.1);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
fn lookup_or_null(&self, col: ColumnId) -> &Value {
|
||||
for entry in self {
|
||||
if entry.0 == col {
|
||||
return &entry.1;
|
||||
}
|
||||
}
|
||||
&Value::Null
|
||||
}
|
||||
}
|
||||
|
||||
/// Inline-Vec + HashMap-overflow mirror of `SparseOffsetsCache`'s layout,
|
||||
/// parameterized so the bench can sweep candidate inline caps.
|
||||
struct HybridValues<const CAP: usize> {
|
||||
inline: Vec<(ColumnId, Value)>,
|
||||
overflow: HashMap<ColumnId, Value>,
|
||||
}
|
||||
|
||||
impl<const CAP: usize> HybridValues<CAP> {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
inline: Vec::with_capacity(CAP),
|
||||
overflow: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(&mut self, col: ColumnId, value: Value) {
|
||||
if self.inline.len() < CAP {
|
||||
self.inline.push((col, value));
|
||||
} else {
|
||||
self.overflow.insert(col, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<const CAP: usize> ValueLookup for HybridValues<CAP> {
|
||||
fn lookup_get(&self, col: ColumnId) -> Option<&Value> {
|
||||
for entry in &self.inline {
|
||||
if entry.0 == col {
|
||||
return Some(&entry.1);
|
||||
}
|
||||
}
|
||||
if self.overflow.is_empty() {
|
||||
return None;
|
||||
}
|
||||
self.overflow.get(&col)
|
||||
}
|
||||
fn lookup_or_null(&self, col: ColumnId) -> &Value {
|
||||
self.lookup_get(col).unwrap_or(&Value::Null)
|
||||
}
|
||||
}
|
||||
|
||||
fn build_hashmap_values(pairs: &[(ColumnId, Value)]) -> HashMap<ColumnId, Value> {
|
||||
let mut m = HashMap::with_capacity(pairs.len());
|
||||
for (c, v) in pairs {
|
||||
m.insert(*c, v.clone());
|
||||
}
|
||||
m
|
||||
}
|
||||
|
||||
fn build_vec_values(pairs: &[(ColumnId, Value)]) -> Vec<(ColumnId, Value)> {
|
||||
pairs.to_vec()
|
||||
}
|
||||
|
||||
fn build_hybrid_values<const CAP: usize>(pairs: &[(ColumnId, Value)]) -> HybridValues<CAP> {
|
||||
let mut h = HybridValues::<CAP>::new();
|
||||
for (c, v) in pairs {
|
||||
h.insert(*c, v.clone());
|
||||
}
|
||||
h
|
||||
}
|
||||
|
||||
/// Runs the five workloads (lookup_first/last/miss, iter_all, build_then_iter)
|
||||
/// for one variant inside a `sparse_values/{size}` group.
|
||||
fn run_lookup_workloads<V, B>(
|
||||
group: &mut BenchmarkGroup<'_, WallTime>,
|
||||
name: &str,
|
||||
column_ids: &[ColumnId],
|
||||
last_id: ColumnId,
|
||||
missing_id: ColumnId,
|
||||
build: B,
|
||||
) where
|
||||
V: ValueLookup,
|
||||
B: Fn() -> V,
|
||||
{
|
||||
let built = build();
|
||||
|
||||
group.bench_function(format!("{name}/lookup_first"), |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(0 as ColumnId);
|
||||
black_box(built.lookup_or_null(target))
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function(format!("{name}/lookup_last"), |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(last_id);
|
||||
black_box(built.lookup_or_null(target))
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function(format!("{name}/lookup_miss"), |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(missing_id);
|
||||
black_box(built.lookup_get(target))
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function(format!("{name}/iter_all"), |b| {
|
||||
b.iter(|| {
|
||||
for col in column_ids {
|
||||
black_box(built.lookup_or_null(*col));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function(format!("{name}/build_then_iter"), |b| {
|
||||
b.iter(|| {
|
||||
let c = build();
|
||||
for col in column_ids {
|
||||
black_box(c.lookup_or_null(*col));
|
||||
}
|
||||
c
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/// Sweeps collection sizes to find the linear-scan-vs-HashMap crossover for
|
||||
/// `(ColumnId, Value)` entries — i.e. the natural inline cap for a
|
||||
/// `SparseValues`-shaped hybrid.
|
||||
fn bench_sparse_values_threshold(c: &mut Criterion) {
|
||||
for size in [2usize, 4, 8, 12, 16, 20, 24, 32, 48, 64] {
|
||||
let vec: Vec<(ColumnId, Value)> = (0..size as u32)
|
||||
.map(|i| (i as ColumnId, sample_value(i)))
|
||||
.collect();
|
||||
let map: HashMap<ColumnId, Value> = vec.iter().cloned().collect();
|
||||
let last_id = (size - 1) as ColumnId;
|
||||
let missing_id = size as ColumnId;
|
||||
|
||||
let mut group = c.benchmark_group(format!("sparse_values_threshold/{size}"));
|
||||
|
||||
group.bench_function("vec_first", |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(0 as ColumnId);
|
||||
for entry in &vec {
|
||||
if entry.0 == target {
|
||||
return black_box(Some(&entry.1));
|
||||
}
|
||||
}
|
||||
black_box(None)
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("vec_last", |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(last_id);
|
||||
for entry in &vec {
|
||||
if entry.0 == target {
|
||||
return black_box(Some(&entry.1));
|
||||
}
|
||||
}
|
||||
black_box(None)
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("vec_miss", |b| {
|
||||
b.iter(|| {
|
||||
let target = black_box(missing_id);
|
||||
for entry in &vec {
|
||||
if entry.0 == target {
|
||||
return black_box(Some(&entry.1));
|
||||
}
|
||||
}
|
||||
black_box(None)
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("map_hit_last", |b| {
|
||||
b.iter(|| black_box(map.get(&black_box(last_id))));
|
||||
});
|
||||
|
||||
group.bench_function("map_miss", |b| {
|
||||
b.iter(|| black_box(map.get(&black_box(missing_id))));
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
}
|
||||
|
||||
/// Workload comparison at realistic primary-key widths. Compares the current
|
||||
/// `Vec`-backed `SparseValues` against a `HashMap` and two hybrid candidates.
|
||||
/// The hybrid caps are picked to bracket the likely crossover (smaller than
|
||||
/// the offsets-cache's 32, since `Value` is fatter than `usize`); refine
|
||||
/// after reading `bench_sparse_values_threshold`.
|
||||
fn bench_sparse_values_lookup(c: &mut Criterion) {
|
||||
for size in [2usize, 4, 8, 16, 32, 50] {
|
||||
let pairs: Vec<(ColumnId, Value)> = (0..size as u32)
|
||||
.map(|i| (i as ColumnId, sample_value(i)))
|
||||
.collect();
|
||||
let column_ids: Vec<ColumnId> = pairs.iter().map(|(c, _)| *c).collect();
|
||||
let last_id = (size - 1) as ColumnId;
|
||||
let missing_id = size as ColumnId;
|
||||
|
||||
let mut group = c.benchmark_group(format!("sparse_values/{size}"));
|
||||
|
||||
run_lookup_workloads(
|
||||
&mut group,
|
||||
"hashmap",
|
||||
&column_ids,
|
||||
last_id,
|
||||
missing_id,
|
||||
|| build_hashmap_values(&pairs),
|
||||
);
|
||||
|
||||
run_lookup_workloads(&mut group, "vec", &column_ids, last_id, missing_id, || {
|
||||
build_vec_values(&pairs)
|
||||
});
|
||||
|
||||
run_lookup_workloads(
|
||||
&mut group,
|
||||
"hybrid_8",
|
||||
&column_ids,
|
||||
last_id,
|
||||
missing_id,
|
||||
|| build_hybrid_values::<8>(&pairs),
|
||||
);
|
||||
|
||||
run_lookup_workloads(
|
||||
&mut group,
|
||||
"hybrid_32",
|
||||
&column_ids,
|
||||
last_id,
|
||||
missing_id,
|
||||
|| build_hybrid_values::<32>(&pairs),
|
||||
);
|
||||
|
||||
group.finish();
|
||||
}
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
encode_sparse,
|
||||
bench_has_column,
|
||||
bench_inline_threshold,
|
||||
bench_sparse_values_threshold,
|
||||
bench_sparse_values_lookup
|
||||
);
|
||||
criterion_main!(benches);
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
@@ -27,7 +26,7 @@ use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::{EvaluateFilterSnafu, Result};
|
||||
use crate::row_converter::{
|
||||
DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparsePrimaryKeyCodec,
|
||||
DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparseOffsetsCache, SparsePrimaryKeyCodec,
|
||||
};
|
||||
|
||||
/// Returns true if this is a partition column for metrics in the memtable.
|
||||
@@ -303,7 +302,7 @@ impl<'a> PrimaryKeyValueAccessor<'a> for DensePrimaryKeyValueAccessor<'a, '_> {
|
||||
pub struct SparsePrimaryKeyFilter {
|
||||
inner: PrimaryKeyFilterInner,
|
||||
codec: SparsePrimaryKeyCodec,
|
||||
offsets_map: HashMap<ColumnId, usize>,
|
||||
offsets_cache: SparseOffsetsCache,
|
||||
}
|
||||
|
||||
impl SparsePrimaryKeyFilter {
|
||||
@@ -316,18 +315,18 @@ impl SparsePrimaryKeyFilter {
|
||||
Self {
|
||||
inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column),
|
||||
codec,
|
||||
offsets_map: HashMap::new(),
|
||||
offsets_cache: SparseOffsetsCache::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
|
||||
fn matches(&mut self, pk: &[u8]) -> Result<bool> {
|
||||
self.offsets_map.clear();
|
||||
self.offsets_cache.clear();
|
||||
let mut accessor = SparsePrimaryKeyValueAccessor {
|
||||
pk,
|
||||
codec: &self.codec,
|
||||
offsets_map: &mut self.offsets_map,
|
||||
offsets_cache: &mut self.offsets_cache,
|
||||
};
|
||||
self.inner.evaluate_filters(&mut accessor)
|
||||
}
|
||||
@@ -336,19 +335,19 @@ impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
|
||||
struct SparsePrimaryKeyValueAccessor<'a, 'b> {
|
||||
pk: &'a [u8],
|
||||
codec: &'b SparsePrimaryKeyCodec,
|
||||
offsets_map: &'b mut HashMap<ColumnId, usize>,
|
||||
offsets_cache: &'b mut SparseOffsetsCache,
|
||||
}
|
||||
|
||||
impl<'a> PrimaryKeyValueAccessor<'a> for SparsePrimaryKeyValueAccessor<'a, '_> {
|
||||
fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Option<&'a [u8]>> {
|
||||
self.codec
|
||||
.encoded_value_for_column(self.pk, self.offsets_map, filter.column_id)
|
||||
.encoded_value_for_column(self.pk, self.offsets_cache, filter.column_id)
|
||||
}
|
||||
|
||||
fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result<Value> {
|
||||
if let Some(offset) = self
|
||||
.codec
|
||||
.has_column(self.pk, self.offsets_map, filter.column_id)
|
||||
.has_column(self.pk, self.offsets_cache, filter.column_id)
|
||||
{
|
||||
self.codec
|
||||
.decode_value_at(self.pk, offset, filter.column_id)
|
||||
@@ -482,10 +481,13 @@ mod tests {
|
||||
|
||||
fn encode_sparse_pk(
|
||||
metadata: &RegionMetadataRef,
|
||||
table_id: u32,
|
||||
tsid: u64,
|
||||
row: Vec<(ColumnId, ValueRef<'static>)>,
|
||||
) -> Vec<u8> {
|
||||
let codec = SparsePrimaryKeyCodec::new(metadata);
|
||||
let mut pk = Vec::new();
|
||||
codec.encode_internal(table_id, tsid, &mut pk).unwrap();
|
||||
codec.encode_to_vec(row.into_iter(), &mut pk).unwrap();
|
||||
pk
|
||||
}
|
||||
@@ -509,7 +511,7 @@ mod tests {
|
||||
"pod",
|
||||
"greptime-frontend-6989d9899-22222",
|
||||
)]);
|
||||
let pk = encode_sparse_pk(&metadata, create_test_row());
|
||||
let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
@@ -522,7 +524,7 @@ mod tests {
|
||||
"pod",
|
||||
"greptime-frontend-6989d9899-22223",
|
||||
)]);
|
||||
let pk = encode_sparse_pk(&metadata, create_test_row());
|
||||
let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(!filter.matches(&pk).unwrap());
|
||||
@@ -535,7 +537,7 @@ mod tests {
|
||||
"non-exist-label",
|
||||
"greptime-frontend-6989d9899-22222",
|
||||
)]);
|
||||
let pk = encode_sparse_pk(&metadata, create_test_row());
|
||||
let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
@@ -604,7 +606,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_sparse_primary_key_filter_order_ops() {
|
||||
let metadata = setup_metadata();
|
||||
let pk = encode_sparse_pk(&metadata, create_test_row());
|
||||
let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
|
||||
let cases = [
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::sync::Arc;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
pub use dense::{DensePrimaryKeyCodec, SortField};
|
||||
pub use sparse::{COLUMN_ID_ENCODE_SIZE, SparsePrimaryKeyCodec, SparseValues};
|
||||
pub use sparse::{COLUMN_ID_ENCODE_SIZE, SparseOffsetsCache, SparsePrimaryKeyCodec, SparseValues};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
@@ -65,6 +65,10 @@ pub enum CompositeValues {
|
||||
|
||||
impl CompositeValues {
|
||||
/// Extends the composite values with the given values.
|
||||
///
|
||||
/// Append-only: `values` must not contain a column id already present in
|
||||
/// the composite; otherwise the existing entry would shadow the new one on
|
||||
/// `SparseValues` lookup.
|
||||
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
|
||||
match self {
|
||||
CompositeValues::Dense(dense_values) => {
|
||||
|
||||
@@ -71,36 +71,56 @@ struct SparsePrimaryKeyCodecInner {
|
||||
|
||||
/// Sparse values representation.
|
||||
///
|
||||
/// A map of [`ColumnId`] to [`Value`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
/// Callers must not insert a column id that is already present; otherwise
|
||||
/// the existing entry will shadow the newly inserted value on lookup.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default)]
|
||||
pub struct SparseValues {
|
||||
values: HashMap<ColumnId, Value>,
|
||||
values: Vec<(ColumnId, Value)>,
|
||||
}
|
||||
|
||||
impl SparseValues {
|
||||
/// Creates a new [`SparseValues`] instance.
|
||||
pub fn new(values: HashMap<ColumnId, Value>) -> Self {
|
||||
Self { values }
|
||||
/// Creates an empty [`SparseValues`].
|
||||
pub fn new() -> Self {
|
||||
Self { values: Vec::new() }
|
||||
}
|
||||
|
||||
/// Creates an empty [`SparseValues`] with space reserved for `cap` entries.
|
||||
pub fn with_capacity(cap: usize) -> Self {
|
||||
Self {
|
||||
values: Vec::with_capacity(cap),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the value of the given column, or [`Value::Null`] if the column is not present.
|
||||
pub fn get_or_null(&self, column_id: ColumnId) -> &Value {
|
||||
self.values.get(&column_id).unwrap_or(&Value::Null)
|
||||
for (id, value) in &self.values {
|
||||
if *id == column_id {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
&Value::Null
|
||||
}
|
||||
|
||||
/// Returns the value of the given column, or [`None`] if the column is not present.
|
||||
pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
|
||||
self.values.get(column_id)
|
||||
for (id, value) in &self.values {
|
||||
if id == column_id {
|
||||
return Some(value);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Inserts a new value into the [`SparseValues`].
|
||||
/// Appends a new `(column_id, value)` pair.
|
||||
///
|
||||
/// Append-only: the caller must ensure `column_id` is not already present.
|
||||
pub fn insert(&mut self, column_id: ColumnId, value: Value) {
|
||||
self.values.insert(column_id, value);
|
||||
self.values.push((column_id, value));
|
||||
}
|
||||
|
||||
/// Returns an iterator over all stored column id/value pairs.
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&ColumnId, &Value)> {
|
||||
self.values.iter()
|
||||
self.values.iter().map(|(id, value)| (id, value))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,6 +131,88 @@ pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id();
|
||||
/// The size of the column id in the encoded sparse row.
|
||||
pub const COLUMN_ID_ENCODE_SIZE: usize = 4;
|
||||
|
||||
// Fixed byte offsets for reserved columns in the sparse encoding.
|
||||
// Layout: [table_id_col_id: 4B][marker: 1B][table_id: 4B][tsid_col_id: 4B][marker: 1B][tsid: 8B]
|
||||
/// Byte offset to the table_id value (after its 4-byte column id).
|
||||
const TABLE_ID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE;
|
||||
/// Byte offset to the tsid value (after 9-byte table_id entry + 4-byte tsid column id).
|
||||
const TSID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE;
|
||||
/// Byte offset where tag columns start (after 9-byte table_id + 13-byte tsid entries).
|
||||
const TAGS_START_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE + 9;
|
||||
|
||||
/// Inline capacity for the small-vec fast path of [`SparseOffsetsCache`].
|
||||
///
|
||||
/// Most sparse primary keys carry only a handful of tags, so a linear scan
|
||||
/// over a short `Vec` beats a `HashMap` lookup. Tags beyond this capacity
|
||||
/// spill into the overflow `HashMap`.
|
||||
const SPARSE_OFFSETS_INLINE_CAP: usize = 32;
|
||||
|
||||
/// A lazily populated cache of tag column offsets inside a sparse primary key.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SparseOffsetsCache {
|
||||
/// Small-vec fast path. Reserves [`SPARSE_OFFSETS_INLINE_CAP`] slots on
|
||||
/// the first insert.
|
||||
inline: Vec<(ColumnId, usize)>,
|
||||
/// Overflow for columns beyond the inline capacity. Lazily allocated.
|
||||
overflow: HashMap<ColumnId, usize>,
|
||||
/// Next byte position in the pk to resume parsing from.
|
||||
cursor: usize,
|
||||
/// True once the decoder has walked past the last tag column (or stopped
|
||||
/// on an unknown column id); no further offsets can be discovered.
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
impl Default for SparseOffsetsCache {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl SparseOffsetsCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inline: Vec::new(),
|
||||
overflow: HashMap::new(),
|
||||
cursor: TAGS_START_OFFSET,
|
||||
finished: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.inline.clear();
|
||||
self.overflow.clear();
|
||||
self.cursor = TAGS_START_OFFSET;
|
||||
self.finished = false;
|
||||
}
|
||||
|
||||
/// Returns the cached offset for `column_id`, if any.
|
||||
fn get(&self, column_id: ColumnId) -> Option<usize> {
|
||||
for entry in &self.inline {
|
||||
if entry.0 == column_id {
|
||||
return Some(entry.1);
|
||||
}
|
||||
}
|
||||
self.overflow.get(&column_id).copied()
|
||||
}
|
||||
|
||||
/// Records a new `(column_id, offset)` entry.
|
||||
fn insert(&mut self, column_id: ColumnId, offset: usize) {
|
||||
if self.inline.len() < SPARSE_OFFSETS_INLINE_CAP {
|
||||
if self.inline.capacity() == 0 {
|
||||
self.inline.reserve_exact(SPARSE_OFFSETS_INLINE_CAP);
|
||||
}
|
||||
self.inline.push((column_id, offset));
|
||||
} else {
|
||||
self.overflow.insert(column_id, offset);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn contains(&self, column_id: ColumnId) -> bool {
|
||||
self.get(column_id).is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl SparsePrimaryKeyCodec {
|
||||
/// Creates a new [`SparsePrimaryKeyCodec`] instance.
|
||||
pub fn from_columns(columns_ids: impl Iterator<Item = ColumnId>) -> Self {
|
||||
@@ -247,7 +349,7 @@ impl SparsePrimaryKeyCodec {
|
||||
/// Decodes the given bytes into a [`SparseValues`].
|
||||
fn decode_sparse(&self, bytes: &[u8]) -> Result<SparseValues> {
|
||||
let mut deserializer = Deserializer::new(bytes);
|
||||
let mut values = SparseValues::new(HashMap::new());
|
||||
let mut values = SparseValues::with_capacity(16);
|
||||
|
||||
let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?;
|
||||
let value = self.inner.table_id_field.deserialize(&mut deserializer)?;
|
||||
@@ -275,31 +377,65 @@ impl SparsePrimaryKeyCodec {
|
||||
}
|
||||
|
||||
/// Returns the offset of the given column id in the given primary key.
|
||||
///
|
||||
/// The pk must start with the table_id + tsid prefix written by
|
||||
/// `encode_internal`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `pk` is not a well-formed sparse primary key produced by
|
||||
/// this codec (e.g. truncated or otherwise malformed bytes).
|
||||
pub fn has_column(
|
||||
&self,
|
||||
pk: &[u8],
|
||||
offsets_map: &mut HashMap<u32, usize>,
|
||||
cache: &mut SparseOffsetsCache,
|
||||
column_id: ColumnId,
|
||||
) -> Option<usize> {
|
||||
if offsets_map.is_empty() {
|
||||
let mut deserializer = Deserializer::new(pk);
|
||||
let mut offset = 0;
|
||||
while deserializer.has_remaining() {
|
||||
let column_id = u32::deserialize(&mut deserializer).unwrap();
|
||||
offset += 4;
|
||||
offsets_map.insert(column_id, offset);
|
||||
let Some(field) = self.get_field(column_id) else {
|
||||
break;
|
||||
};
|
||||
|
||||
let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
|
||||
offset += skip;
|
||||
}
|
||||
|
||||
offsets_map.get(&column_id).copied()
|
||||
} else {
|
||||
offsets_map.get(&column_id).copied()
|
||||
// Decoding is lazy: on each call we only advance the cache's cursor as
|
||||
// far as needed to answer the query. A column that has already been
|
||||
// seen returns immediately; a column we haven't reached yet causes the
|
||||
// parser to resume from `cache.cursor` and stop as soon as the column
|
||||
// is located. Once the cursor walks off the end (or hits an unknown
|
||||
// column id) the cache is marked finished, so subsequent misses are
|
||||
// O(1).
|
||||
// table_id and tsid are at fixed offsets.
|
||||
match column_id {
|
||||
RESERVED_COLUMN_ID_TABLE_ID => return Some(TABLE_ID_VALUE_OFFSET),
|
||||
RESERVED_COLUMN_ID_TSID => return Some(TSID_VALUE_OFFSET),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if let Some(offset) = cache.get(column_id) {
|
||||
return Some(offset);
|
||||
}
|
||||
if cache.finished {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut deserializer = Deserializer::new(pk);
|
||||
deserializer.advance(cache.cursor);
|
||||
let mut offset = cache.cursor;
|
||||
while deserializer.has_remaining() {
|
||||
let col = u32::deserialize(&mut deserializer).unwrap();
|
||||
offset += COLUMN_ID_ENCODE_SIZE;
|
||||
let value_offset = offset;
|
||||
cache.insert(col, value_offset);
|
||||
let Some(field) = self.get_field(col) else {
|
||||
cache.finished = true;
|
||||
cache.cursor = offset;
|
||||
return None;
|
||||
};
|
||||
|
||||
let skip = field.skip_deserialize(pk, &mut deserializer).unwrap();
|
||||
offset += skip;
|
||||
cache.cursor = offset;
|
||||
if col == column_id {
|
||||
return Some(value_offset);
|
||||
}
|
||||
}
|
||||
|
||||
cache.finished = true;
|
||||
None
|
||||
}
|
||||
|
||||
/// Decode value at `offset` in `pk`.
|
||||
@@ -317,10 +453,10 @@ impl SparsePrimaryKeyCodec {
|
||||
pub fn encoded_value_for_column<'a>(
|
||||
&self,
|
||||
pk: &'a [u8],
|
||||
offsets_map: &mut HashMap<u32, usize>,
|
||||
cache: &mut SparseOffsetsCache,
|
||||
column_id: ColumnId,
|
||||
) -> Result<Option<&'a [u8]>> {
|
||||
let Some(offset) = self.has_column(pk, offsets_map, column_id) else {
|
||||
let Some(offset) = self.has_column(pk, cache, column_id) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -537,9 +673,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_sparse_value_new_and_get_or_null() {
|
||||
let mut values = HashMap::new();
|
||||
values.insert(1, Value::Int32(42));
|
||||
let sparse_value = SparseValues::new(values);
|
||||
let mut sparse_value = SparseValues::new();
|
||||
sparse_value.insert(1, Value::Int32(42));
|
||||
|
||||
assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
|
||||
assert_eq!(sparse_value.get_or_null(2), &Value::Null);
|
||||
@@ -547,7 +682,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_sparse_value_insert() {
|
||||
let mut sparse_value = SparseValues::new(HashMap::new());
|
||||
let mut sparse_value = SparseValues::new();
|
||||
sparse_value.insert(1, Value::Int32(42));
|
||||
|
||||
assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42));
|
||||
@@ -681,7 +816,7 @@ mod tests {
|
||||
codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap();
|
||||
assert!(!buffer.is_empty());
|
||||
|
||||
let mut offsets_map = HashMap::new();
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
for column_id in [
|
||||
RESERVED_COLUMN_ID_TABLE_ID,
|
||||
RESERVED_COLUMN_ID_TSID,
|
||||
@@ -699,6 +834,36 @@ mod tests {
|
||||
assert!(offset.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_has_column_lazy_resume() {
|
||||
let region_metadata = test_region_metadata();
|
||||
let codec = SparsePrimaryKeyCodec::new(®ion_metadata);
|
||||
let mut buffer = Vec::new();
|
||||
codec
|
||||
.encode_to_vec(test_row().into_iter(), &mut buffer)
|
||||
.unwrap();
|
||||
|
||||
let mut cache = SparseOffsetsCache::new();
|
||||
// Look up an early column: only a prefix of tags is decoded.
|
||||
assert!(codec.has_column(&buffer, &mut cache, 1).is_some());
|
||||
assert!(!cache.finished);
|
||||
assert!(cache.contains(1));
|
||||
assert!(!cache.contains(5));
|
||||
|
||||
// A later column resumes from the cursor.
|
||||
assert!(codec.has_column(&buffer, &mut cache, 5).is_some());
|
||||
assert!(cache.contains(5));
|
||||
|
||||
// An earlier column that was already cached still resolves.
|
||||
assert!(codec.has_column(&buffer, &mut cache, 2).is_some());
|
||||
|
||||
// A non-existent column walks off the end and marks the cache finished.
|
||||
assert!(codec.has_column(&buffer, &mut cache, 999).is_none());
|
||||
assert!(cache.finished);
|
||||
// Further misses are O(1).
|
||||
assert!(codec.has_column(&buffer, &mut cache, 998).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_decode_value_at() {
|
||||
let region_metadata = test_region_metadata();
|
||||
@@ -709,7 +874,7 @@ mod tests {
|
||||
assert!(!buffer.is_empty());
|
||||
|
||||
let row = test_row();
|
||||
let mut offsets_map = HashMap::new();
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
for column_id in [
|
||||
RESERVED_COLUMN_ID_TABLE_ID,
|
||||
RESERVED_COLUMN_ID_TSID,
|
||||
@@ -744,7 +909,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert!(!buffer.is_empty());
|
||||
|
||||
let mut offsets_map = HashMap::new();
|
||||
let mut offsets_map = SparseOffsetsCache::new();
|
||||
for column_id in [
|
||||
RESERVED_COLUMN_ID_TABLE_ID,
|
||||
RESERVED_COLUMN_ID_TSID,
|
||||
|
||||
Reference in New Issue
Block a user