Compare commits

..

1 Commits

Author SHA1 Message Date
trinity.pointard
74a510cb56 try to use select-nth instead of full sort in segment level agg top-k selection 2026-06-29 09:13:21 +00:00
2 changed files with 45 additions and 47 deletions

View File

@@ -981,19 +981,27 @@ where
) -> crate::Result<IntermediateBucketResult> {
let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec();
let segment_size = term_req.req.segment_size as usize;
// select_nth_unstable_by_key(segment_size, ...) places the (k+1)-th element at
// entries[segment_size] and guarantees entries[0..segment_size] are the top-k,
// unordered. We need this to properly compute term_doc_count_before_cutoff.
match &term_req.req.order.target {
OrderTarget::Key => {
// We rely on the fact, that term ordinals match the order of the strings
// TODO: We could have a special collector, that keeps only TOP n results at any
// time.
if term_req.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
} else {
entries.sort_unstable_by_key(|bucket| bucket.0);
if entries.len() > segment_size {
if term_req.req.order.order == Order::Desc {
entries
.select_nth_unstable_by_key(segment_size, |b| std::cmp::Reverse(b.0));
} else {
entries.select_nth_unstable_by_key(segment_size, |b| b.0);
}
}
}
OrderTarget::SubAggregation(sub_agg_path) => {
// Peek segment-level metric values, sort, then fall through to
// Peek segment-level metric values, select top-k, then fall through to
// `cut_off_buckets`. Like Elasticsearch, we always cut off when ordering
// by a sub-agg: top-K results are approximate and may differ from the
// global ordering, especially for non-monotonic metrics like avg/min.
@@ -1003,7 +1011,7 @@ where
))
})?;
let (agg_name, agg_prop) = get_agg_name_and_property(sub_agg_path);
// Fetch values up-front; otherwise sort would re-compute per comparison
// Fetch values up-front; otherwise sort would re-compute per call
let mut keyed: Vec<(f64, (u64, Bucket))> = entries
.into_iter()
.map(|bucket| {
@@ -1013,28 +1021,34 @@ where
(metric_value, bucket)
})
.collect();
if term_req.req.order.order == Order::Desc {
keyed.sort_unstable_by(|a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.sort_unstable_by(|a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
if keyed.len() > segment_size {
if term_req.req.order.order == Order::Desc {
keyed.select_nth_unstable_by(segment_size, |a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.select_nth_unstable_by(segment_size, |a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
}
}
entries = keyed.into_iter().map(|(_, e)| e).collect();
}
OrderTarget::Count => {
if term_req.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.1.count));
} else {
entries.sort_unstable_by_key(|bucket| bucket.1.count);
if entries.len() > segment_size {
if term_req.req.order.order == Order::Desc {
entries.select_nth_unstable_by_key(segment_size, |b| {
std::cmp::Reverse(b.1.count)
});
} else {
entries.select_nth_unstable_by_key(segment_size, |b| b.1.count);
}
}
}
}
let (term_doc_count_before_cutoff, sum_other_doc_count) =
cut_off_buckets(&mut entries, term_req.req.segment_size as usize);
cut_off_buckets(&mut entries, segment_size);
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
dict.reserve(entries.len());

View File

@@ -283,8 +283,7 @@ impl InvertedIndexReader {
#[cfg(feature = "quickwit")]
impl InvertedIndexReader {
/// Resolves a `Term` to its [`TermInfo`] asynchronously, if present in the dictionary.
pub async fn get_term_info_async(&self, term: &Term) -> io::Result<Option<TermInfo>> {
pub(crate) async fn get_term_info_async(&self, term: &Term) -> io::Result<Option<TermInfo>> {
self.termdict.get_async(term.serialized_value_bytes()).await
}
@@ -337,38 +336,23 @@ impl InvertedIndexReader {
pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<bool> {
let term_info_opt: Option<TermInfo> = self.get_term_info_async(term).await?;
if let Some(term_info) = term_info_opt {
self.warm_postings_from_term_info(&term_info, with_positions)
.await?;
let postings = self
.postings_file_slice
.read_bytes_slice_async(term_info.postings_range.clone());
if with_positions {
let positions = self
.positions_file_slice
.read_bytes_slice_async(term_info.positions_range.clone());
futures_util::future::try_join(postings, positions).await?;
} else {
postings.await?;
}
Ok(true)
} else {
Ok(false)
}
}
/// Warmup a block postings given a `TermInfo`.
/// This method is for an advanced usage only.
///
/// Use this when the [`TermInfo`] is already known (e.g. resolved via
/// [`Self::get_term_info_async`]) to avoid a redundant dictionary lookup.
pub async fn warm_postings_from_term_info(
&self,
term_info: &TermInfo,
with_positions: bool,
) -> io::Result<()> {
let postings = self
.postings_file_slice
.read_bytes_slice_async(term_info.postings_range.clone());
if with_positions {
let positions = self
.positions_file_slice
.read_bytes_slice_async(term_info.positions_range.clone());
futures_util::future::try_join(postings, positions).await?;
} else {
postings.await?;
}
Ok(())
}
/// Warmup a block postings given a range of `Term`s.
/// This method is for an advanced usage only.
///