Compare commits

..

1 Commits

Author SHA1 Message Date
trinity.pointard
74a510cb56 try to use select-nth instead of full sort in segment level agg top-k selection 2026-06-29 09:13:21 +00:00
2 changed files with 34 additions and 20 deletions

View File

@@ -44,6 +44,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: 'Upload to code-scanning'
uses: github/codeql-action/upload-sarif@8aad20d150bbac5944a9f9d289da16a4b0d87c1e # v4.36.2
uses: github/codeql-action/upload-sarif@87557b9c84dde89fdd9b10e88954ac2f4248e463 # v4.36.1
with:
sarif_file: results.sarif

View File

@@ -981,19 +981,27 @@ where
) -> crate::Result<IntermediateBucketResult> {
let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec();
let segment_size = term_req.req.segment_size as usize;
// select_nth_unstable_by_key(segment_size, ...) places the (k+1)-th element at
// entries[segment_size] and guarantees entries[0..segment_size] are the top-k,
// unordered. We need this to properly compute term_doc_count_before_cutoff.
match &term_req.req.order.target {
OrderTarget::Key => {
// We rely on the fact, that term ordinals match the order of the strings
// TODO: We could have a special collector, that keeps only TOP n results at any
// time.
if term_req.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
} else {
entries.sort_unstable_by_key(|bucket| bucket.0);
if entries.len() > segment_size {
if term_req.req.order.order == Order::Desc {
entries
.select_nth_unstable_by_key(segment_size, |b| std::cmp::Reverse(b.0));
} else {
entries.select_nth_unstable_by_key(segment_size, |b| b.0);
}
}
}
OrderTarget::SubAggregation(sub_agg_path) => {
// Peek segment-level metric values, sort, then fall through to
// Peek segment-level metric values, select top-k, then fall through to
// `cut_off_buckets`. Like Elasticsearch, we always cut off when ordering
// by a sub-agg: top-K results are approximate and may differ from the
// global ordering, especially for non-monotonic metrics like avg/min.
@@ -1003,7 +1011,7 @@ where
))
})?;
let (agg_name, agg_prop) = get_agg_name_and_property(sub_agg_path);
// Fetch values up-front; otherwise sort would re-compute per comparison
// Fetch values up-front; otherwise sort would re-compute per call
let mut keyed: Vec<(f64, (u64, Bucket))> = entries
.into_iter()
.map(|bucket| {
@@ -1013,28 +1021,34 @@ where
(metric_value, bucket)
})
.collect();
if term_req.req.order.order == Order::Desc {
keyed.sort_unstable_by(|a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.sort_unstable_by(|a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
if keyed.len() > segment_size {
if term_req.req.order.order == Order::Desc {
keyed.select_nth_unstable_by(segment_size, |a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.select_nth_unstable_by(segment_size, |a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
}
}
entries = keyed.into_iter().map(|(_, e)| e).collect();
}
OrderTarget::Count => {
if term_req.req.order.order == Order::Desc {
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.1.count));
} else {
entries.sort_unstable_by_key(|bucket| bucket.1.count);
if entries.len() > segment_size {
if term_req.req.order.order == Order::Desc {
entries.select_nth_unstable_by_key(segment_size, |b| {
std::cmp::Reverse(b.1.count)
});
} else {
entries.select_nth_unstable_by_key(segment_size, |b| b.1.count);
}
}
}
}
let (term_doc_count_before_cutoff, sum_other_doc_count) =
cut_off_buckets(&mut entries, term_req.req.segment_size as usize);
cut_off_buckets(&mut entries, segment_size);
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
dict.reserve(entries.len());