From e1156728fc0e8157d3b8b03d470b69a20039136d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 8 May 2026 16:35:34 +0800 Subject: [PATCH] perf(mito-codec): optimize SparseValues decode and lookup (#8057) * perf: add benchmarks for SparsePrimaryKeyCodec::has_column Add benchmarks covering table_id, tsid, first_tag, and last_tag lookups across 5, 10, 50, and 100 tag counts to measure the cost of the offset map construction in has_column. Signed-off-by: evenyag * perf: handle table id/tsid specially Signed-off-by: evenyag * perf: lazy decode Signed-off-by: evenyag * chore: use vec for small tags Signed-off-by: evenyag * feat: add bench Signed-off-by: evenyag * perf: use 32 as inline capacity Signed-off-by: evenyag * perf: benchmark sparse value Signed-off-by: evenyag * feat: change sparse values to use vec Signed-off-by: evenyag * chore: reserve capacity Signed-off-by: evenyag * chore: simplify comments Signed-off-by: evenyag * docs: update comment Signed-off-by: evenyag * chore: update benchmark for map sparse values Signed-off-by: evenyag * docs: update comment Signed-off-by: evenyag * chore: remove empty check Signed-off-by: evenyag --------- Signed-off-by: evenyag --- .../benches/bench_primary_key_filter.rs | 7 +- .../benches/bench_sparse_encoding.rs | 429 +++++++++++++++++- src/mito-codec/src/primary_key_filter.rs | 28 +- src/mito-codec/src/row_converter.rs | 6 +- src/mito-codec/src/row_converter/sparse.rs | 245 ++++++++-- 5 files changed, 655 insertions(+), 60 deletions(-) diff --git a/src/mito-codec/benches/bench_primary_key_filter.rs b/src/mito-codec/benches/bench_primary_key_filter.rs index 2cc35abb22..528c374761 100644 --- a/src/mito-codec/benches/bench_primary_key_filter.rs +++ b/src/mito-codec/benches/bench_primary_key_filter.rs @@ -25,7 +25,8 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::{Value, ValueRef}; use mito_codec::row_converter::{ - DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec, + DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparseOffsetsCache, + SparsePrimaryKeyCodec, }; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::storage::{ColumnId, RegionId}; @@ -182,7 +183,7 @@ fn matches_sparse_scalar( codec: &SparsePrimaryKeyCodec, filters: &[SimpleFilterEvaluator], pk: &[u8], - offsets_map: &mut std::collections::HashMap, + offsets_map: &mut SparseOffsetsCache, ) -> bool { offsets_map.clear(); if filters.is_empty() || metadata.primary_key.is_empty() { @@ -252,7 +253,7 @@ fn bench_primary_key_filter(c: &mut Criterion) { let sparse_pk = encode_sparse_pk(&metadata, &row); let sparse_codec = SparsePrimaryKeyCodec::new(&metadata); let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone(), false); - let mut sparse_offsets = std::collections::HashMap::new(); + let mut sparse_offsets = SparseOffsetsCache::new(); let mut group = c.benchmark_group(format!("primary_key_filter/{case_name}")); diff --git a/src/mito-codec/benches/bench_sparse_encoding.rs b/src/mito-codec/benches/bench_sparse_encoding.rs index 5ad8964c0c..a0f978fd9f 100644 --- a/src/mito-codec/benches/bench_sparse_encoding.rs +++ b/src/mito-codec/benches/bench_sparse_encoding.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::hint::black_box; use bytes::Bytes; -use criterion::{Criterion, criterion_group, criterion_main}; +use criterion::measurement::WallTime; +use criterion::{BenchmarkGroup, Criterion, criterion_group, criterion_main}; use datatypes::prelude::ValueRef; -use mito_codec::row_converter::SparsePrimaryKeyCodec; +use datatypes::value::Value; use mito_codec::row_converter::sparse::{RESERVED_COLUMN_ID_TABLE_ID, RESERVED_COLUMN_ID_TSID}; +use mito_codec::row_converter::{SparseOffsetsCache, SparsePrimaryKeyCodec}; +use store_api::storage::ColumnId; fn encode_sparse(c: &mut Criterion) { let num_tags = 10; @@ -82,5 +86,424 @@ fn encode_sparse(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, encode_sparse); +/// Encodes a primary key with the given number of tags. +fn encode_pk(num_tags: u32) -> (SparsePrimaryKeyCodec, Vec) { + // Use schemaless() so all columns (including reserved ones) are recognized during parsing. + let codec = SparsePrimaryKeyCodec::schemaless(); + let dummy_table_id = 1024u32; + let dummy_tsid = 42u64; + + let tags: Vec<_> = (0..num_tags) + .map(|idx| { + let tag_value = idx.to_string().repeat(10); + (idx, Bytes::copy_from_slice(tag_value.as_bytes())) + }) + .collect(); + + let mut buffer = Vec::new(); + codec + .encode_internal(dummy_table_id, dummy_tsid, &mut buffer) + .unwrap(); + codec + .encode_raw_tag_value(tags.iter().map(|(c, b)| (*c, &b[..])), &mut buffer) + .unwrap(); + + (codec, buffer) +} + +fn bench_has_column(c: &mut Criterion) { + for num_tags in [5, 10, 50, 100] { + let (codec, pk) = encode_pk(num_tags); + let mut group = c.benchmark_group(format!("has_column/{num_tags}_tags")); + + group.bench_function("table_id", |b| { + b.iter(|| { + let mut offsets_map = SparseOffsetsCache::new(); + black_box(codec.has_column(&pk, &mut offsets_map, RESERVED_COLUMN_ID_TABLE_ID)); + }); + }); + + group.bench_function("tsid", |b| { + b.iter(|| { + let mut offsets_map = SparseOffsetsCache::new(); + black_box(codec.has_column(&pk, &mut offsets_map, RESERVED_COLUMN_ID_TSID)); + }); + }); + + group.bench_function("first_tag", |b| { + b.iter(|| { + let mut offsets_map = SparseOffsetsCache::new(); + black_box(codec.has_column(&pk, &mut offsets_map, 0)); + }); + }); + + group.bench_function("middle_tag", |b| { + b.iter(|| { + let mut offsets_map = SparseOffsetsCache::new(); + black_box(codec.has_column(&pk, &mut offsets_map, num_tags / 2)); + }); + }); + + group.bench_function("last_tag", |b| { + b.iter(|| { + let mut offsets_map = SparseOffsetsCache::new(); + black_box(codec.has_column(&pk, &mut offsets_map, num_tags - 1)); + }); + }); + + group.finish(); + } +} + +/// Benchmarks Vec linear scan vs HashMap lookup at various collection sizes +/// to find the optimal `SPARSE_OFFSETS_INLINE_CAP` threshold. +fn bench_inline_threshold(c: &mut Criterion) { + for size in [4, 8, 12, 16, 20, 24, 32, 48, 64] { + let vec: Vec<(u32, usize)> = (0..size).map(|i| (i as u32, i * 8)).collect(); + let map: HashMap = vec.iter().copied().collect(); + + let last_id = (size - 1) as u32; + let missing_id = size as u32; + + let mut group = c.benchmark_group(format!("inline_threshold/{size}")); + + // Vec: best case (first element) + group.bench_function("vec_first", |b| { + b.iter(|| { + let target = black_box(0u32); + for entry in &vec { + if entry.0 == target { + return black_box(Some(entry.1)); + } + } + black_box(None) + }); + }); + + // Vec: worst case (last element) + group.bench_function("vec_last", |b| { + b.iter(|| { + let target = black_box(last_id); + for entry in &vec { + if entry.0 == target { + return black_box(Some(entry.1)); + } + } + black_box(None) + }); + }); + + // Vec: miss + group.bench_function("vec_miss", |b| { + b.iter(|| { + let target = black_box(missing_id); + for entry in &vec { + if entry.0 == target { + return black_box(Some(entry.1)); + } + } + black_box(None) + }); + }); + + // HashMap: hit (last element) + group.bench_function("map_hit", |b| { + b.iter(|| black_box(map.get(&black_box(last_id)).copied())); + }); + + // HashMap: miss + group.bench_function("map_miss", |b| { + b.iter(|| black_box(map.get(&black_box(missing_id)).copied())); + }); + + group.finish(); + } +} + +/// Lookup-strategy comparison for `SparseValues`-shaped collections. +/// +/// `SparseValues` is now a `Vec<(ColumnId, Value)>` with linear-scan lookups; +/// these benches keep the `HashMap` and `HybridValues` baselines around +/// to track the crossover and to make it easy to revisit the choice later. +/// The companion `SparseOffsetsCache` uses a small-vec fast path with +/// `SPARSE_OFFSETS_INLINE_CAP = 32` entries against a `usize` payload; +/// `Value` is a much fatter enum, so its crossover may land elsewhere. +/// The benches measure both the crossover (`bench_sparse_values_threshold`) +/// and the workload impact at realistic primary-key widths +/// (`bench_sparse_values_lookup`). +fn sample_value(idx: u32) -> Value { + Value::String(idx.to_string().repeat(10).into()) +} + +/// Common lookup surface for the variants under test. +trait ValueLookup { + fn lookup_get(&self, col: ColumnId) -> Option<&Value>; + fn lookup_or_null(&self, col: ColumnId) -> &Value; +} + +impl ValueLookup for HashMap { + fn lookup_get(&self, col: ColumnId) -> Option<&Value> { + self.get(&col) + } + fn lookup_or_null(&self, col: ColumnId) -> &Value { + self.get(&col).unwrap_or(&Value::Null) + } +} + +impl ValueLookup for Vec<(ColumnId, Value)> { + fn lookup_get(&self, col: ColumnId) -> Option<&Value> { + for entry in self { + if entry.0 == col { + return Some(&entry.1); + } + } + None + } + fn lookup_or_null(&self, col: ColumnId) -> &Value { + for entry in self { + if entry.0 == col { + return &entry.1; + } + } + &Value::Null + } +} + +/// Inline-Vec + HashMap-overflow mirror of `SparseOffsetsCache`'s layout, +/// parameterized so the bench can sweep candidate inline caps. +struct HybridValues { + inline: Vec<(ColumnId, Value)>, + overflow: HashMap, +} + +impl HybridValues { + fn new() -> Self { + Self { + inline: Vec::with_capacity(CAP), + overflow: HashMap::new(), + } + } + + fn insert(&mut self, col: ColumnId, value: Value) { + if self.inline.len() < CAP { + self.inline.push((col, value)); + } else { + self.overflow.insert(col, value); + } + } +} + +impl ValueLookup for HybridValues { + fn lookup_get(&self, col: ColumnId) -> Option<&Value> { + for entry in &self.inline { + if entry.0 == col { + return Some(&entry.1); + } + } + if self.overflow.is_empty() { + return None; + } + self.overflow.get(&col) + } + fn lookup_or_null(&self, col: ColumnId) -> &Value { + self.lookup_get(col).unwrap_or(&Value::Null) + } +} + +fn build_hashmap_values(pairs: &[(ColumnId, Value)]) -> HashMap { + let mut m = HashMap::with_capacity(pairs.len()); + for (c, v) in pairs { + m.insert(*c, v.clone()); + } + m +} + +fn build_vec_values(pairs: &[(ColumnId, Value)]) -> Vec<(ColumnId, Value)> { + pairs.to_vec() +} + +fn build_hybrid_values(pairs: &[(ColumnId, Value)]) -> HybridValues { + let mut h = HybridValues::::new(); + for (c, v) in pairs { + h.insert(*c, v.clone()); + } + h +} + +/// Runs the five workloads (lookup_first/last/miss, iter_all, build_then_iter) +/// for one variant inside a `sparse_values/{size}` group. +fn run_lookup_workloads( + group: &mut BenchmarkGroup<'_, WallTime>, + name: &str, + column_ids: &[ColumnId], + last_id: ColumnId, + missing_id: ColumnId, + build: B, +) where + V: ValueLookup, + B: Fn() -> V, +{ + let built = build(); + + group.bench_function(format!("{name}/lookup_first"), |b| { + b.iter(|| { + let target = black_box(0 as ColumnId); + black_box(built.lookup_or_null(target)) + }); + }); + + group.bench_function(format!("{name}/lookup_last"), |b| { + b.iter(|| { + let target = black_box(last_id); + black_box(built.lookup_or_null(target)) + }); + }); + + group.bench_function(format!("{name}/lookup_miss"), |b| { + b.iter(|| { + let target = black_box(missing_id); + black_box(built.lookup_get(target)) + }); + }); + + group.bench_function(format!("{name}/iter_all"), |b| { + b.iter(|| { + for col in column_ids { + black_box(built.lookup_or_null(*col)); + } + }); + }); + + group.bench_function(format!("{name}/build_then_iter"), |b| { + b.iter(|| { + let c = build(); + for col in column_ids { + black_box(c.lookup_or_null(*col)); + } + c + }); + }); +} + +/// Sweeps collection sizes to find the linear-scan-vs-HashMap crossover for +/// `(ColumnId, Value)` entries — i.e. the natural inline cap for a +/// `SparseValues`-shaped hybrid. +fn bench_sparse_values_threshold(c: &mut Criterion) { + for size in [2usize, 4, 8, 12, 16, 20, 24, 32, 48, 64] { + let vec: Vec<(ColumnId, Value)> = (0..size as u32) + .map(|i| (i as ColumnId, sample_value(i))) + .collect(); + let map: HashMap = vec.iter().cloned().collect(); + let last_id = (size - 1) as ColumnId; + let missing_id = size as ColumnId; + + let mut group = c.benchmark_group(format!("sparse_values_threshold/{size}")); + + group.bench_function("vec_first", |b| { + b.iter(|| { + let target = black_box(0 as ColumnId); + for entry in &vec { + if entry.0 == target { + return black_box(Some(&entry.1)); + } + } + black_box(None) + }); + }); + + group.bench_function("vec_last", |b| { + b.iter(|| { + let target = black_box(last_id); + for entry in &vec { + if entry.0 == target { + return black_box(Some(&entry.1)); + } + } + black_box(None) + }); + }); + + group.bench_function("vec_miss", |b| { + b.iter(|| { + let target = black_box(missing_id); + for entry in &vec { + if entry.0 == target { + return black_box(Some(&entry.1)); + } + } + black_box(None) + }); + }); + + group.bench_function("map_hit_last", |b| { + b.iter(|| black_box(map.get(&black_box(last_id)))); + }); + + group.bench_function("map_miss", |b| { + b.iter(|| black_box(map.get(&black_box(missing_id)))); + }); + + group.finish(); + } +} + +/// Workload comparison at realistic primary-key widths. Compares the current +/// `Vec`-backed `SparseValues` against a `HashMap` and two hybrid candidates. +/// The hybrid caps are picked to bracket the likely crossover (smaller than +/// the offsets-cache's 32, since `Value` is fatter than `usize`); refine +/// after reading `bench_sparse_values_threshold`. +fn bench_sparse_values_lookup(c: &mut Criterion) { + for size in [2usize, 4, 8, 16, 32, 50] { + let pairs: Vec<(ColumnId, Value)> = (0..size as u32) + .map(|i| (i as ColumnId, sample_value(i))) + .collect(); + let column_ids: Vec = pairs.iter().map(|(c, _)| *c).collect(); + let last_id = (size - 1) as ColumnId; + let missing_id = size as ColumnId; + + let mut group = c.benchmark_group(format!("sparse_values/{size}")); + + run_lookup_workloads( + &mut group, + "hashmap", + &column_ids, + last_id, + missing_id, + || build_hashmap_values(&pairs), + ); + + run_lookup_workloads(&mut group, "vec", &column_ids, last_id, missing_id, || { + build_vec_values(&pairs) + }); + + run_lookup_workloads( + &mut group, + "hybrid_8", + &column_ids, + last_id, + missing_id, + || build_hybrid_values::<8>(&pairs), + ); + + run_lookup_workloads( + &mut group, + "hybrid_32", + &column_ids, + last_id, + missing_id, + || build_hybrid_values::<32>(&pairs), + ); + + group.finish(); + } +} + +criterion_group!( + benches, + encode_sparse, + bench_has_column, + bench_inline_threshold, + bench_sparse_values_threshold, + bench_sparse_values_lookup +); criterion_main!(benches); diff --git a/src/mito-codec/src/primary_key_filter.rs b/src/mito-codec/src/primary_key_filter.rs index 70fda7bf54..2450a6e44a 100644 --- a/src/mito-codec/src/primary_key_filter.rs +++ b/src/mito-codec/src/primary_key_filter.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; @@ -27,7 +26,7 @@ use store_api::storage::ColumnId; use crate::error::{EvaluateFilterSnafu, Result}; use crate::row_converter::{ - DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparsePrimaryKeyCodec, + DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparseOffsetsCache, SparsePrimaryKeyCodec, }; /// Returns true if this is a partition column for metrics in the memtable. @@ -303,7 +302,7 @@ impl<'a> PrimaryKeyValueAccessor<'a> for DensePrimaryKeyValueAccessor<'a, '_> { pub struct SparsePrimaryKeyFilter { inner: PrimaryKeyFilterInner, codec: SparsePrimaryKeyCodec, - offsets_map: HashMap, + offsets_cache: SparseOffsetsCache, } impl SparsePrimaryKeyFilter { @@ -316,18 +315,18 @@ impl SparsePrimaryKeyFilter { Self { inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column), codec, - offsets_map: HashMap::new(), + offsets_cache: SparseOffsetsCache::new(), } } } impl PrimaryKeyFilter for SparsePrimaryKeyFilter { fn matches(&mut self, pk: &[u8]) -> Result { - self.offsets_map.clear(); + self.offsets_cache.clear(); let mut accessor = SparsePrimaryKeyValueAccessor { pk, codec: &self.codec, - offsets_map: &mut self.offsets_map, + offsets_cache: &mut self.offsets_cache, }; self.inner.evaluate_filters(&mut accessor) } @@ -336,19 +335,19 @@ impl PrimaryKeyFilter for SparsePrimaryKeyFilter { struct SparsePrimaryKeyValueAccessor<'a, 'b> { pk: &'a [u8], codec: &'b SparsePrimaryKeyCodec, - offsets_map: &'b mut HashMap, + offsets_cache: &'b mut SparseOffsetsCache, } impl<'a> PrimaryKeyValueAccessor<'a> for SparsePrimaryKeyValueAccessor<'a, '_> { fn encoded_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result> { self.codec - .encoded_value_for_column(self.pk, self.offsets_map, filter.column_id) + .encoded_value_for_column(self.pk, self.offsets_cache, filter.column_id) } fn decode_value(&mut self, filter: &CompiledPrimaryKeyFilter) -> Result { if let Some(offset) = self .codec - .has_column(self.pk, self.offsets_map, filter.column_id) + .has_column(self.pk, self.offsets_cache, filter.column_id) { self.codec .decode_value_at(self.pk, offset, filter.column_id) @@ -482,10 +481,13 @@ mod tests { fn encode_sparse_pk( metadata: &RegionMetadataRef, + table_id: u32, + tsid: u64, row: Vec<(ColumnId, ValueRef<'static>)>, ) -> Vec { let codec = SparsePrimaryKeyCodec::new(metadata); let mut pk = Vec::new(); + codec.encode_internal(table_id, tsid, &mut pk).unwrap(); codec.encode_to_vec(row.into_iter(), &mut pk).unwrap(); pk } @@ -509,7 +511,7 @@ mod tests { "pod", "greptime-frontend-6989d9899-22222", )]); - let pk = encode_sparse_pk(&metadata, create_test_row()); + let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row()); let codec = SparsePrimaryKeyCodec::new(&metadata); let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false); assert!(filter.matches(&pk).unwrap()); @@ -522,7 +524,7 @@ mod tests { "pod", "greptime-frontend-6989d9899-22223", )]); - let pk = encode_sparse_pk(&metadata, create_test_row()); + let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row()); let codec = SparsePrimaryKeyCodec::new(&metadata); let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false); assert!(!filter.matches(&pk).unwrap()); @@ -535,7 +537,7 @@ mod tests { "non-exist-label", "greptime-frontend-6989d9899-22222", )]); - let pk = encode_sparse_pk(&metadata, create_test_row()); + let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row()); let codec = SparsePrimaryKeyCodec::new(&metadata); let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false); assert!(filter.matches(&pk).unwrap()); @@ -604,7 +606,7 @@ mod tests { #[test] fn test_sparse_primary_key_filter_order_ops() { let metadata = setup_metadata(); - let pk = encode_sparse_pk(&metadata, create_test_row()); + let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row()); let codec = SparsePrimaryKeyCodec::new(&metadata); let cases = [ diff --git a/src/mito-codec/src/row_converter.rs b/src/mito-codec/src/row_converter.rs index fa57e1d96e..0a3205ce9e 100644 --- a/src/mito-codec/src/row_converter.rs +++ b/src/mito-codec/src/row_converter.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use common_recordbatch::filter::SimpleFilterEvaluator; use datatypes::value::{Value, ValueRef}; pub use dense::{DensePrimaryKeyCodec, SortField}; -pub use sparse::{COLUMN_ID_ENCODE_SIZE, SparsePrimaryKeyCodec, SparseValues}; +pub use sparse::{COLUMN_ID_ENCODE_SIZE, SparseOffsetsCache, SparsePrimaryKeyCodec, SparseValues}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; @@ -65,6 +65,10 @@ pub enum CompositeValues { impl CompositeValues { /// Extends the composite values with the given values. + /// + /// Append-only: `values` must not contain a column id already present in + /// the composite; otherwise the existing entry would shadow the new one on + /// `SparseValues` lookup. pub fn extend(&mut self, values: &[(ColumnId, Value)]) { match self { CompositeValues::Dense(dense_values) => { diff --git a/src/mito-codec/src/row_converter/sparse.rs b/src/mito-codec/src/row_converter/sparse.rs index 1c97a18792..9c0e488576 100644 --- a/src/mito-codec/src/row_converter/sparse.rs +++ b/src/mito-codec/src/row_converter/sparse.rs @@ -71,36 +71,56 @@ struct SparsePrimaryKeyCodecInner { /// Sparse values representation. /// -/// A map of [`ColumnId`] to [`Value`]. -#[derive(Debug, Clone, PartialEq, Eq)] +/// Callers must not insert a column id that is already present; otherwise +/// the existing entry will shadow the newly inserted value on lookup. +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct SparseValues { - values: HashMap, + values: Vec<(ColumnId, Value)>, } impl SparseValues { - /// Creates a new [`SparseValues`] instance. - pub fn new(values: HashMap) -> Self { - Self { values } + /// Creates an empty [`SparseValues`]. + pub fn new() -> Self { + Self { values: Vec::new() } + } + + /// Creates an empty [`SparseValues`] with space reserved for `cap` entries. + pub fn with_capacity(cap: usize) -> Self { + Self { + values: Vec::with_capacity(cap), + } } /// Returns the value of the given column, or [`Value::Null`] if the column is not present. pub fn get_or_null(&self, column_id: ColumnId) -> &Value { - self.values.get(&column_id).unwrap_or(&Value::Null) + for (id, value) in &self.values { + if *id == column_id { + return value; + } + } + &Value::Null } /// Returns the value of the given column, or [`None`] if the column is not present. pub fn get(&self, column_id: &ColumnId) -> Option<&Value> { - self.values.get(column_id) + for (id, value) in &self.values { + if id == column_id { + return Some(value); + } + } + None } - /// Inserts a new value into the [`SparseValues`]. + /// Appends a new `(column_id, value)` pair. + /// + /// Append-only: the caller must ensure `column_id` is not already present. pub fn insert(&mut self, column_id: ColumnId, value: Value) { - self.values.insert(column_id, value); + self.values.push((column_id, value)); } /// Returns an iterator over all stored column id/value pairs. pub fn iter(&self) -> impl Iterator { - self.values.iter() + self.values.iter().map(|(id, value)| (id, value)) } } @@ -111,6 +131,88 @@ pub const RESERVED_COLUMN_ID_TABLE_ID: ColumnId = ReservedColumnId::table_id(); /// The size of the column id in the encoded sparse row. pub const COLUMN_ID_ENCODE_SIZE: usize = 4; +// Fixed byte offsets for reserved columns in the sparse encoding. +// Layout: [table_id_col_id: 4B][marker: 1B][table_id: 4B][tsid_col_id: 4B][marker: 1B][tsid: 8B] +/// Byte offset to the table_id value (after its 4-byte column id). +const TABLE_ID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE; +/// Byte offset to the tsid value (after 9-byte table_id entry + 4-byte tsid column id). +const TSID_VALUE_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE; +/// Byte offset where tag columns start (after 9-byte table_id + 13-byte tsid entries). +const TAGS_START_OFFSET: usize = COLUMN_ID_ENCODE_SIZE + 5 + COLUMN_ID_ENCODE_SIZE + 9; + +/// Inline capacity for the small-vec fast path of [`SparseOffsetsCache`]. +/// +/// Most sparse primary keys carry only a handful of tags, so a linear scan +/// over a short `Vec` beats a `HashMap` lookup. Tags beyond this capacity +/// spill into the overflow `HashMap`. +const SPARSE_OFFSETS_INLINE_CAP: usize = 32; + +/// A lazily populated cache of tag column offsets inside a sparse primary key. +#[derive(Debug, Clone)] +pub struct SparseOffsetsCache { + /// Small-vec fast path. Reserves [`SPARSE_OFFSETS_INLINE_CAP`] slots on + /// the first insert. + inline: Vec<(ColumnId, usize)>, + /// Overflow for columns beyond the inline capacity. Lazily allocated. + overflow: HashMap, + /// Next byte position in the pk to resume parsing from. + cursor: usize, + /// True once the decoder has walked past the last tag column (or stopped + /// on an unknown column id); no further offsets can be discovered. + finished: bool, +} + +impl Default for SparseOffsetsCache { + fn default() -> Self { + Self::new() + } +} + +impl SparseOffsetsCache { + pub fn new() -> Self { + Self { + inline: Vec::new(), + overflow: HashMap::new(), + cursor: TAGS_START_OFFSET, + finished: false, + } + } + + pub fn clear(&mut self) { + self.inline.clear(); + self.overflow.clear(); + self.cursor = TAGS_START_OFFSET; + self.finished = false; + } + + /// Returns the cached offset for `column_id`, if any. + fn get(&self, column_id: ColumnId) -> Option { + for entry in &self.inline { + if entry.0 == column_id { + return Some(entry.1); + } + } + self.overflow.get(&column_id).copied() + } + + /// Records a new `(column_id, offset)` entry. + fn insert(&mut self, column_id: ColumnId, offset: usize) { + if self.inline.len() < SPARSE_OFFSETS_INLINE_CAP { + if self.inline.capacity() == 0 { + self.inline.reserve_exact(SPARSE_OFFSETS_INLINE_CAP); + } + self.inline.push((column_id, offset)); + } else { + self.overflow.insert(column_id, offset); + } + } + + #[cfg(test)] + fn contains(&self, column_id: ColumnId) -> bool { + self.get(column_id).is_some() + } +} + impl SparsePrimaryKeyCodec { /// Creates a new [`SparsePrimaryKeyCodec`] instance. pub fn from_columns(columns_ids: impl Iterator) -> Self { @@ -247,7 +349,7 @@ impl SparsePrimaryKeyCodec { /// Decodes the given bytes into a [`SparseValues`]. fn decode_sparse(&self, bytes: &[u8]) -> Result { let mut deserializer = Deserializer::new(bytes); - let mut values = SparseValues::new(HashMap::new()); + let mut values = SparseValues::with_capacity(16); let column_id = u32::deserialize(&mut deserializer).context(DeserializeFieldSnafu)?; let value = self.inner.table_id_field.deserialize(&mut deserializer)?; @@ -275,31 +377,65 @@ impl SparsePrimaryKeyCodec { } /// Returns the offset of the given column id in the given primary key. + /// + /// The pk must start with the table_id + tsid prefix written by + /// `encode_internal`. + /// + /// # Panics + /// + /// Panics if `pk` is not a well-formed sparse primary key produced by + /// this codec (e.g. truncated or otherwise malformed bytes). pub fn has_column( &self, pk: &[u8], - offsets_map: &mut HashMap, + cache: &mut SparseOffsetsCache, column_id: ColumnId, ) -> Option { - if offsets_map.is_empty() { - let mut deserializer = Deserializer::new(pk); - let mut offset = 0; - while deserializer.has_remaining() { - let column_id = u32::deserialize(&mut deserializer).unwrap(); - offset += 4; - offsets_map.insert(column_id, offset); - let Some(field) = self.get_field(column_id) else { - break; - }; - - let skip = field.skip_deserialize(pk, &mut deserializer).unwrap(); - offset += skip; - } - - offsets_map.get(&column_id).copied() - } else { - offsets_map.get(&column_id).copied() + // Decoding is lazy: on each call we only advance the cache's cursor as + // far as needed to answer the query. A column that has already been + // seen returns immediately; a column we haven't reached yet causes the + // parser to resume from `cache.cursor` and stop as soon as the column + // is located. Once the cursor walks off the end (or hits an unknown + // column id) the cache is marked finished, so subsequent misses are + // O(1). + // table_id and tsid are at fixed offsets. + match column_id { + RESERVED_COLUMN_ID_TABLE_ID => return Some(TABLE_ID_VALUE_OFFSET), + RESERVED_COLUMN_ID_TSID => return Some(TSID_VALUE_OFFSET), + _ => {} } + + if let Some(offset) = cache.get(column_id) { + return Some(offset); + } + if cache.finished { + return None; + } + + let mut deserializer = Deserializer::new(pk); + deserializer.advance(cache.cursor); + let mut offset = cache.cursor; + while deserializer.has_remaining() { + let col = u32::deserialize(&mut deserializer).unwrap(); + offset += COLUMN_ID_ENCODE_SIZE; + let value_offset = offset; + cache.insert(col, value_offset); + let Some(field) = self.get_field(col) else { + cache.finished = true; + cache.cursor = offset; + return None; + }; + + let skip = field.skip_deserialize(pk, &mut deserializer).unwrap(); + offset += skip; + cache.cursor = offset; + if col == column_id { + return Some(value_offset); + } + } + + cache.finished = true; + None } /// Decode value at `offset` in `pk`. @@ -317,10 +453,10 @@ impl SparsePrimaryKeyCodec { pub fn encoded_value_for_column<'a>( &self, pk: &'a [u8], - offsets_map: &mut HashMap, + cache: &mut SparseOffsetsCache, column_id: ColumnId, ) -> Result> { - let Some(offset) = self.has_column(pk, offsets_map, column_id) else { + let Some(offset) = self.has_column(pk, cache, column_id) else { return Ok(None); }; @@ -537,9 +673,8 @@ mod tests { #[test] fn test_sparse_value_new_and_get_or_null() { - let mut values = HashMap::new(); - values.insert(1, Value::Int32(42)); - let sparse_value = SparseValues::new(values); + let mut sparse_value = SparseValues::new(); + sparse_value.insert(1, Value::Int32(42)); assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42)); assert_eq!(sparse_value.get_or_null(2), &Value::Null); @@ -547,7 +682,7 @@ mod tests { #[test] fn test_sparse_value_insert() { - let mut sparse_value = SparseValues::new(HashMap::new()); + let mut sparse_value = SparseValues::new(); sparse_value.insert(1, Value::Int32(42)); assert_eq!(sparse_value.get_or_null(1), &Value::Int32(42)); @@ -681,7 +816,7 @@ mod tests { codec.encode_to_vec(row.into_iter(), &mut buffer).unwrap(); assert!(!buffer.is_empty()); - let mut offsets_map = HashMap::new(); + let mut offsets_map = SparseOffsetsCache::new(); for column_id in [ RESERVED_COLUMN_ID_TABLE_ID, RESERVED_COLUMN_ID_TSID, @@ -699,6 +834,36 @@ mod tests { assert!(offset.is_none()); } + #[test] + fn test_has_column_lazy_resume() { + let region_metadata = test_region_metadata(); + let codec = SparsePrimaryKeyCodec::new(®ion_metadata); + let mut buffer = Vec::new(); + codec + .encode_to_vec(test_row().into_iter(), &mut buffer) + .unwrap(); + + let mut cache = SparseOffsetsCache::new(); + // Look up an early column: only a prefix of tags is decoded. + assert!(codec.has_column(&buffer, &mut cache, 1).is_some()); + assert!(!cache.finished); + assert!(cache.contains(1)); + assert!(!cache.contains(5)); + + // A later column resumes from the cursor. + assert!(codec.has_column(&buffer, &mut cache, 5).is_some()); + assert!(cache.contains(5)); + + // An earlier column that was already cached still resolves. + assert!(codec.has_column(&buffer, &mut cache, 2).is_some()); + + // A non-existent column walks off the end and marks the cache finished. + assert!(codec.has_column(&buffer, &mut cache, 999).is_none()); + assert!(cache.finished); + // Further misses are O(1). + assert!(codec.has_column(&buffer, &mut cache, 998).is_none()); + } + #[test] fn test_decode_value_at() { let region_metadata = test_region_metadata(); @@ -709,7 +874,7 @@ mod tests { assert!(!buffer.is_empty()); let row = test_row(); - let mut offsets_map = HashMap::new(); + let mut offsets_map = SparseOffsetsCache::new(); for column_id in [ RESERVED_COLUMN_ID_TABLE_ID, RESERVED_COLUMN_ID_TSID, @@ -744,7 +909,7 @@ mod tests { .unwrap(); assert!(!buffer.is_empty()); - let mut offsets_map = HashMap::new(); + let mut offsets_map = SparseOffsetsCache::new(); for column_id in [ RESERVED_COLUMN_ID_TABLE_ID, RESERVED_COLUMN_ID_TSID,