Cache term_ord_to_str to avoid having too many, possibly

redundant call to sorted_ords_to_term_cb.

Closes #2892
This commit is contained in:
Paul Masurel
2026-04-20 12:18:13 +02:00
committed by Pascal Seitz
parent e015abab8e
commit af9b7985fd
4 changed files with 413 additions and 35 deletions

View File

@@ -206,6 +206,7 @@ fn terms_7(index: &Index) {
});
execute_agg(index, agg_req);
}
fn terms_all_unique(index: &Index) {
let agg_req = json!({
"my_texts": { "terms": { "field": "text_all_unique_terms" } },

View File

@@ -28,6 +28,7 @@ mod histogram;
mod range;
mod term_agg;
mod term_missing_agg;
mod term_ord_to_str_cache;
use std::collections::HashMap;
use std::fmt;

View File

@@ -7,9 +7,10 @@ use columnar::{
NumericalValue, StrColumn,
};
use common::{BitSet, TinySet};
use rustc_hash::FxHashMap;
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use serde::{Deserialize, Serialize};
use super::term_ord_to_str_cache::{StringArena, StringRef, TermOrdToStrCache};
use super::{CustomOrder, Order, OrderTarget};
use crate::aggregation::agg_data::{
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
@@ -397,6 +398,7 @@ pub(crate) fn build_segment_term_collector(
bucket_id_provider,
max_term_id,
terms_req_data,
term_ord_cache: None,
};
Ok(Box::new(collector))
} else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC {
@@ -408,6 +410,7 @@ pub(crate) fn build_segment_term_collector(
bucket_id_provider,
max_term_id,
terms_req_data,
term_ord_cache: None,
};
Ok(Box::new(collector))
} else if max_term_id < 8_000_000 && is_top_level {
@@ -422,6 +425,7 @@ pub(crate) fn build_segment_term_collector(
bucket_id_provider,
max_term_id,
terms_req_data,
term_ord_cache: None,
};
Ok(Box::new(collector))
} else {
@@ -435,6 +439,7 @@ pub(crate) fn build_segment_term_collector(
bucket_id_provider,
max_term_id,
terms_req_data,
term_ord_cache: None,
};
Ok(Box::new(collector))
}
@@ -470,6 +475,9 @@ trait TermAggregationMap: Clone + Debug + 'static {
/// Returns the term aggregation as a vector of (term_id, bucket) pairs,
/// in any order.
fn into_vec(self) -> Vec<(u64, Bucket)>;
/// Collects all term ordinals present in this map into the given set.
fn collect_term_ords(&self, set: &mut FxHashSet<u64>);
}
#[derive(Clone, Debug)]
@@ -622,6 +630,20 @@ impl TermAggregationMap for PagedTermMap {
Self { pages, mem_usage }
}
fn collect_term_ords(&self, set: &mut FxHashSet<u64>) {
for (page_idx, page_opt) in self.pages.iter().enumerate() {
if let Some(page) = page_opt {
let base_term_id = (page_idx << PAGE_SHIFT) as u64;
for (bucket_pos, &tiny_set) in page.presence.iter().enumerate() {
let base_offset = bucket_pos * 64;
for bit in tiny_set.into_iter() {
set.insert(base_term_id + (base_offset + bit as usize) as u64);
}
}
}
}
}
}
impl TermAggregationMap for HashMapTermBuckets {
@@ -648,6 +670,10 @@ impl TermAggregationMap for HashMapTermBuckets {
fn new(_max_term_id: u64, _bucket_id_provider: &mut BucketIdProvider) -> Self {
Self::default()
}
fn collect_term_ords(&self, set: &mut FxHashSet<u64>) {
set.extend(self.bucket_map.keys().copied());
}
}
/// An optimized term map implementation for a compact set of term ordinals.
@@ -704,6 +730,14 @@ impl TermAggregationMap for VecTermBucketsNoAgg {
.collect(),
}
}
fn collect_term_ords(&self, set: &mut FxHashSet<u64>) {
for (term_id, &count) in self.buckets.iter().enumerate() {
if count > 0 {
set.insert(term_id as u64);
}
}
}
}
/// An optimized term map implementation for a compact set of term ordinals.
@@ -753,6 +787,54 @@ impl TermAggregationMap for VecTermBuckets {
.collect(),
}
}
fn collect_term_ords(&self, set: &mut FxHashSet<u64>) {
for (term_id, bucket) in self.buckets.iter().enumerate() {
if bucket.count > 0 {
set.insert(term_id as u64);
}
}
}
}
fn build_term_ord_cache<TermMap: TermAggregationMap>(
parent_buckets: &[TermMap],
dictionary: &Dictionary,
term_req: &TermsAggReqData,
) -> std::io::Result<TermOrdToStrCache> {
let capacity: usize = parent_buckets.len() * 64;
let mut term_ords_set: FxHashSet<u64> =
FxHashSet::with_capacity_and_hasher(capacity, FxBuildHasher);
for bucket in parent_buckets.iter() {
bucket.collect_term_ords(&mut term_ords_set);
}
if let Some(missing_sentinel) = term_req.missing_value_for_accessor {
term_ords_set.remove(&missing_sentinel);
}
let mut term_ords: Vec<u64> = term_ords_set.into_iter().collect();
term_ords.sort_unstable();
term_ords.pop_if(|highest_term_ord| *highest_term_ord >= dictionary.num_terms() as u64);
let mut string_arena = StringArena::default();
let mut string_refs: Vec<StringRef> = Vec::with_capacity(term_ords.len());
let all_found: bool = dictionary.sorted_ords_to_term_cb(&term_ords, |term_bytes| {
let term_str = std::str::from_utf8(term_bytes).expect("could not convert to str");
string_refs.push(string_arena.register_str(term_str));
})?;
assert!(all_found);
let missing_key: Option<IntermediateKey> =
term_req.req.missing.as_ref().map(|missing_value| match missing_value {
Key::Str(s) => IntermediateKey::Str(s.clone()),
Key::F64(v) => IntermediateKey::F64(*v),
Key::U64(v) => IntermediateKey::U64(*v),
Key::I64(v) => IntermediateKey::I64(*v),
});
Ok(TermOrdToStrCache::new(term_ords, string_refs, string_arena, missing_key))
}
/// The collector puts values from the fast field into the correct buckets and does a conversion to
@@ -765,6 +847,7 @@ struct SegmentTermCollector<TermMap: TermAggregationMap, B: SubAggBuffer> {
bucket_id_provider: BucketIdProvider,
max_term_id: u64,
terms_req_data: TermsAggReqData,
term_ord_cache: Option<TermOrdToStrCache>,
}
pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
@@ -783,6 +866,17 @@ impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
) -> crate::Result<()> {
// TODO: avoid prepare_max_bucket here and handle empty buckets.
self.prepare_max_bucket(bucket, agg_data)?;
if self.terms_req_data.column_type == ColumnType::Str && self.term_ord_cache.is_none() {
if let Some(str_dict_column) = &self.terms_req_data.str_dict_column {
self.term_ord_cache = Some(build_term_ord_cache(
&self.parent_buckets,
str_dict_column.dictionary(),
&self.terms_req_data,
)?);
}
}
let bucket = std::mem::replace(
&mut self.parent_buckets[bucket as usize],
TermMap::new(0, &mut self.bucket_id_provider),
@@ -797,6 +891,7 @@ impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
.map(BufferedSubAggs::get_sub_agg_collector),
bucket,
agg_data,
self.term_ord_cache.as_ref(),
)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
Ok(())
@@ -957,6 +1052,7 @@ where
mut sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
term_buckets: TermMap,
agg_data: &AggregationsSegmentCtx,
term_ord_cache: Option<&TermOrdToStrCache>,
) -> crate::Result<IntermediateBucketResult> {
let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec();
@@ -1005,43 +1101,77 @@ where
.map(|el| el.dictionary())
.unwrap_or_else(|| &fallback_dict);
if let Some((intermediate_key, bucket)) = extract_missing_value(&mut entries, term_req)
{
let intermediate_entry = into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
dict.insert(intermediate_key, intermediate_entry);
}
if let Some(cache) = term_ord_cache {
// Use cached term resolution: missing value is handled via the cache.
if let Some(missing_sentinel) = term_req.missing_value_for_accessor {
if let Some(pos) = entries.iter().position(|(tid, _)| *tid == missing_sentinel)
{
let (_tid, bucket) = entries.swap_remove(pos);
if let Some(missing_key) = cache.missing_key() {
let intermediate_entry = into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
dict.insert(missing_key.clone(), intermediate_entry);
}
}
}
// Sort by term ord
entries.sort_unstable_by_key(|bucket| bucket.0);
let (term_ids, buckets): (Vec<u64>, Vec<Bucket>) = entries.into_iter().unzip();
let intermediate_entries: Vec<IntermediateTermBucketEntry> = buckets
.into_iter()
.map(|bucket| {
into_intermediate_bucket_entry(
for (term_ord, bucket) in entries {
if let Some(term_str) = cache.get(term_ord) {
let intermediate_entry = into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
dict.insert(
IntermediateKey::Str(term_str.to_string()),
intermediate_entry,
);
}
}
} else {
if let Some((intermediate_key, bucket)) =
extract_missing_value(&mut entries, term_req)
{
let intermediate_entry = into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)
})
.collect::<crate::Result<_>>()?;
)?;
dict.insert(intermediate_key, intermediate_entry);
}
let mut intermediate_entry_it = intermediate_entries.into_iter();
// Sort by term ord
entries.sort_unstable_by_key(|bucket| bucket.0);
term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| {
let intermediate_entry = intermediate_entry_it.next().unwrap();
dict.insert(
IntermediateKey::Str(
String::from_utf8(term.to_vec()).expect("could not convert to String"),
),
intermediate_entry,
);
})?;
let (term_ids, buckets): (Vec<u64>, Vec<Bucket>) = entries.into_iter().unzip();
let intermediate_entries: Vec<IntermediateTermBucketEntry> = buckets
.into_iter()
.map(|bucket| {
into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)
})
.collect::<crate::Result<_>>()?;
let mut intermediate_entry_it = intermediate_entries.into_iter();
term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| {
let intermediate_entry = intermediate_entry_it.next().unwrap();
dict.insert(
IntermediateKey::Str(
String::from_utf8(term.to_vec())
.expect("could not convert to String"),
),
intermediate_entry,
);
})?;
}
if term_req.req.min_doc_count == 0 {
// TODO: Handle rev streaming for descending sorting by keys
@@ -1162,13 +1292,13 @@ where
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentTermCollector<TermMap, B> {
#[inline]
fn collect_terms_with_docs(
iter: impl Iterator<Item = (crate::DocId, u64)>,
doc_term_ord_iter: impl Iterator<Item = (crate::DocId, u64)>,
term_buckets: &mut TermMap,
bucket_id_provider: &mut BucketIdProvider,
sub_agg: &mut BufferedSubAggs<B>,
) {
for (doc, term_id) in iter {
let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider);
for (doc, term_ord) in doc_term_ord_iter {
let bucket_id = term_buckets.term_entry(term_ord, bucket_id_provider);
sub_agg.push(bucket_id, doc);
}
}
@@ -2932,4 +3062,48 @@ mod tests {
Ok(())
}
#[test]
fn test_terms_double_nesting() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let outer_field = schema_builder.add_text_field("outer_term", STRING | FAST);
let inner_field = schema_builder.add_text_field("inner_term", STRING | FAST);
let index = Index::create_in_ram(schema_builder.build());
let outer_values = (0..10_000).map(|i| format!("outer_{i}")).collect::<Vec<_>>();
let inner_values = ["INFO", "ERROR", "WARN", "DEBUG"];
{
let mut index_writer: IndexWriter = index.writer_with_num_threads(1, 20_000_000)?;
for doc_id in 0..1_000_000u64 {
let outer_val = &outer_values[doc_id as usize % outer_values.len()];
let inner_val = inner_values[doc_id as usize % inner_values.len()];
index_writer.add_document(doc!(
outer_field => outer_val.as_str(),
inner_field => inner_val,
))?;
}
index_writer.commit()?;
}
let agg_req: Aggregations = serde_json::from_value(json!({
"outer": {
"terms": { "field": "outer_term", "size": 200 },
"aggs": {
"inner": {
"terms": { "field": "inner_term" }
}
}
}
}))
.unwrap();
let reader = index.reader()?;
let searcher = reader.searcher();
let collector =
crate::aggregation::AggregationCollector::from_aggs(agg_req, Default::default());
let _agg_result = searcher.search(&AllQuery, &collector)?;
Ok(())
}
}

View File

@@ -0,0 +1,202 @@
use rustc_hash::FxHashMap;
use crate::aggregation::intermediate_agg_result::IntermediateKey;
#[derive(Clone, Copy, Debug)]
pub(crate) struct StringRef {
start: u32,
len: u32,
}
#[derive(Clone, Debug, Default)]
pub(crate) struct StringArena {
buffer: String,
}
impl StringArena {
pub fn register_str(&mut self, value: &str) -> StringRef {
let start = self.buffer.len() as u32;
self.buffer.push_str(value);
StringRef {
start,
len: value.len() as u32,
}
}
pub fn get_str(&self, string_ref: StringRef) -> &str {
let start = string_ref.start as usize;
let end = start + string_ref.len as usize;
&self.buffer[start..end]
}
pub fn len(&self) -> usize {
self.buffer.len()
}
}
pub(crate) struct TermOrdToStrCache {
string_arena: StringArena,
missing_key: Option<IntermediateKey>,
addr: TermOrdToAddr,
}
enum TermOrdToAddr {
Dense {
offsets: Vec<Option<StringRef>>,
},
Sparse {
terms: FxHashMap<u64, StringRef>,
},
}
impl std::fmt::Debug for TermOrdToStrCache {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match &self.addr {
TermOrdToAddr::Dense { offsets } => f
.debug_struct("TermOrdToStrCache::Dense")
.field("num_slots", &offsets.len())
.field("arena_bytes", &self.string_arena.len())
.finish(),
TermOrdToAddr::Sparse { terms } => f
.debug_struct("TermOrdToStrCache::Sparse")
.field("num_terms", &terms.len())
.field("arena_bytes", &self.string_arena.len())
.finish(),
}
}
}
impl TermOrdToStrCache {
/// `term_ords` must be sorted. Each entry in `string_refs` is a reference
/// into `string_arena` for the corresponding term ord.
pub fn new(
term_ords: Vec<u64>,
string_refs: Vec<StringRef>,
string_arena: StringArena,
missing_key: Option<IntermediateKey>,
) -> TermOrdToStrCache {
let num_terms = term_ords.len();
assert_eq!(num_terms, string_refs.len());
if term_ords.is_empty() {
return TermOrdToStrCache {
string_arena,
missing_key,
addr: TermOrdToAddr::Dense {
offsets: Vec::new(),
},
};
}
let highest_term_ord = term_ords.last().copied().unwrap_or(0u64);
let should_use_dense =
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
let addr = if should_use_dense {
let num_slots = highest_term_ord as usize + 1;
let mut offsets: Vec<Option<StringRef>> = vec![None; num_slots];
for (term_ord, string_ref) in term_ords.into_iter().zip(string_refs) {
offsets[term_ord as usize] = Some(string_ref);
}
TermOrdToAddr::Dense { offsets }
} else {
let terms: FxHashMap<u64, StringRef> =
term_ords.into_iter().zip(string_refs).collect();
TermOrdToAddr::Sparse { terms }
};
TermOrdToStrCache {
string_arena,
missing_key,
addr,
}
}
pub fn get(&self, term_ord: u64) -> Option<&str> {
let string_ref = match &self.addr {
TermOrdToAddr::Dense { offsets } => (*offsets.get(term_ord as usize)?)?,
TermOrdToAddr::Sparse { terms } => *terms.get(&term_ord)?,
};
Some(self.string_arena.get_str(string_ref))
}
pub fn missing_key(&self) -> Option<&IntermediateKey> {
self.missing_key.as_ref()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_string_arena() {
let mut arena = StringArena::default();
let r1 = arena.register_str("hello");
let r2 = arena.register_str("world");
let r3 = arena.register_str("");
let r4 = arena.register_str("!");
assert_eq!(arena.get_str(r1), "hello");
assert_eq!(arena.get_str(r2), "world");
assert_eq!(arena.get_str(r3), "");
assert_eq!(arena.get_str(r4), "!");
}
fn build_arena(terms: &[&str]) -> (StringArena, Vec<StringRef>) {
let mut arena = StringArena::default();
let refs: Vec<StringRef> = terms.iter().map(|t| arena.register_str(t)).collect();
(arena, refs)
}
#[test]
fn test_dense_cache() {
let term_ords = vec![0, 2, 5];
let (arena, refs) = build_arena(&["alpha", "beta", "gamma"]);
let cache = TermOrdToStrCache::new(term_ords, refs, arena, None);
assert_eq!(cache.get(0), Some("alpha"));
assert_eq!(cache.get(1), None);
assert_eq!(cache.get(2), Some("beta"));
assert_eq!(cache.get(3), None);
assert_eq!(cache.get(5), Some("gamma"));
assert_eq!(cache.get(6), None);
assert_eq!(cache.get(100), None);
}
#[test]
fn test_sparse_cache() {
let term_ords = vec![1_000_000, 2_000_000, 5_000_000];
let (arena, refs) = build_arena(&["foo", "bar", "baz"]);
let cache = TermOrdToStrCache::new(term_ords, refs, arena, None);
assert_eq!(cache.get(1_000_000), Some("foo"));
assert_eq!(cache.get(2_000_000), Some("bar"));
assert_eq!(cache.get(5_000_000), Some("baz"));
assert_eq!(cache.get(0), None);
assert_eq!(cache.get(3_000_000), None);
}
#[test]
fn test_empty_cache() {
let cache =
TermOrdToStrCache::new(Vec::new(), Vec::new(), StringArena::default(), None);
assert_eq!(cache.get(0), None);
assert_eq!(cache.get(100), None);
}
#[test]
fn test_missing_key() {
let missing = IntermediateKey::Str("N/A".to_string());
let (arena, refs) = build_arena(&["x"]);
let cache = TermOrdToStrCache::new(vec![0], refs, arena, Some(missing));
assert_eq!(
cache.missing_key(),
Some(&IntermediateKey::Str("N/A".to_string()))
);
let (arena, refs) = build_arena(&["x"]);
let cache_no_missing = TermOrdToStrCache::new(vec![0], refs, arena, None);
assert_eq!(cache_no_missing.missing_key(), None);
}
}