Compare commits

..

1 Commits

Author SHA1 Message Date
trinity.pointard
74a510cb56 try to use select-nth instead of full sort in segment level agg top-k selection 2026-06-29 09:13:21 +00:00
3 changed files with 45 additions and 50 deletions

View File

@@ -981,19 +981,27 @@ where
) -> crate::Result<IntermediateBucketResult> {
let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec();
let segment_size = term_req.req.segment_size as usize;
// select_nth_unstable_by_key(segment_size, ...) places the (k+1)-th element at
// entries[segment_size] and guarantees entries[0..segment_size] are the top-k,
// unordered. We need this to properly compute term_doc_count_before_cutoff.
match &term_req.req.order.target {
OrderTarget::Key => {
// We rely on the fact, that term ordinals match the order of the strings
// TODO: We could have a special collector, that keeps only TOP n results at any
// time.
if term_req.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
} else {
entries.sort_unstable_by_key(|bucket| bucket.0);
if entries.len() > segment_size {
if term_req.req.order.order == Order::Desc {
entries
.select_nth_unstable_by_key(segment_size, |b| std::cmp::Reverse(b.0));
} else {
entries.select_nth_unstable_by_key(segment_size, |b| b.0);
}
}
}
OrderTarget::SubAggregation(sub_agg_path) => {
// Peek segment-level metric values, sort, then fall through to
// Peek segment-level metric values, select top-k, then fall through to
// `cut_off_buckets`. Like Elasticsearch, we always cut off when ordering
// by a sub-agg: top-K results are approximate and may differ from the
// global ordering, especially for non-monotonic metrics like avg/min.
@@ -1003,7 +1011,7 @@ where
))
})?;
let (agg_name, agg_prop) = get_agg_name_and_property(sub_agg_path);
// Fetch values up-front; otherwise sort would re-compute per comparison
// Fetch values up-front; otherwise sort would re-compute per call
let mut keyed: Vec<(f64, (u64, Bucket))> = entries
.into_iter()
.map(|bucket| {
@@ -1013,28 +1021,34 @@ where
(metric_value, bucket)
})
.collect();
if term_req.req.order.order == Order::Desc {
keyed.sort_unstable_by(|a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.sort_unstable_by(|a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
if keyed.len() > segment_size {
if term_req.req.order.order == Order::Desc {
keyed.select_nth_unstable_by(segment_size, |a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.select_nth_unstable_by(segment_size, |a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
}
}
entries = keyed.into_iter().map(|(_, e)| e).collect();
}
OrderTarget::Count => {
if term_req.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.1.count));
} else {
entries.sort_unstable_by_key(|bucket| bucket.1.count);
if entries.len() > segment_size {
if term_req.req.order.order == Order::Desc {
entries.select_nth_unstable_by_key(segment_size, |b| {
std::cmp::Reverse(b.1.count)
});
} else {
entries.select_nth_unstable_by_key(segment_size, |b| b.1.count);
}
}
}
}
let (term_doc_count_before_cutoff, sum_other_doc_count) =
cut_off_buckets(&mut entries, term_req.req.segment_size as usize);
cut_off_buckets(&mut entries, segment_size);
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
dict.reserve(entries.len());

View File

@@ -91,14 +91,10 @@ fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
num_docs: u32,
) -> Box<dyn Scorer> {
match scorer {
SpecializedScorer::TermUnion(mut term_scorers) => {
if term_scorers.len() == 1 {
Box::new(term_scorers.pop().unwrap())
} else {
let union_scorer =
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
Box::new(union_scorer)
}
SpecializedScorer::TermUnion(term_scorers) => {
let union_scorer =
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
Box::new(union_scorer)
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
@@ -508,15 +504,10 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
let scorer = self.complex_scorer(reader, 1.0, &self.score_combiner_fn)?;
let num_docs = reader.num_docs();
match scorer {
SpecializedScorer::TermUnion(mut term_scorers) => {
if term_scorers.len() == 1 {
let mut term_scorer = term_scorers.pop().unwrap();
for_each_scorer(&mut term_scorer, callback);
} else {
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
for_each_scorer(&mut union_scorer, callback);
}
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
for_each_scorer(&mut union_scorer, callback);
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
@@ -543,15 +534,10 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
match scorer {
SpecializedScorer::TermUnion(mut term_scorers) => {
if term_scorers.len() == 1 {
let mut term_scorer = term_scorers.pop().unwrap();
for_each_docset_buffered(&mut term_scorer, &mut buffer, callback);
} else {
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
}
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers

View File

@@ -55,11 +55,6 @@ pub struct BufferedUnionScorer<TScorer, TScoreCombiner = DoNothingCombiner> {
num_docs: u32,
}
// Keep this helper out-of-line. When LLVM inlines it into
// `BufferedUnionScorer::advance`, the full traversal path used by combined
// collectors such as `(TopDocs, Count)` becomes sensitive to unrelated codegen
// changes and regresses on large unions.
#[inline(never)]
fn refill<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
scorers: &mut Vec<TScorer>,
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],