mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-30 14:20:42 +00:00
Compare commits
1 Commits
pub_term_i
...
trinity.po
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
74a510cb56 |
@@ -981,19 +981,27 @@ where
|
||||
) -> crate::Result<IntermediateBucketResult> {
|
||||
let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec();
|
||||
|
||||
let segment_size = term_req.req.segment_size as usize;
|
||||
|
||||
// select_nth_unstable_by_key(segment_size, ...) places the (k+1)-th element at
|
||||
// entries[segment_size] and guarantees entries[0..segment_size] are the top-k,
|
||||
// unordered. We need this to properly compute term_doc_count_before_cutoff.
|
||||
match &term_req.req.order.target {
|
||||
OrderTarget::Key => {
|
||||
// We rely on the fact, that term ordinals match the order of the strings
|
||||
// TODO: We could have a special collector, that keeps only TOP n results at any
|
||||
// time.
|
||||
if term_req.req.order.order == Order::Desc {
|
||||
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.0));
|
||||
} else {
|
||||
entries.sort_unstable_by_key(|bucket| bucket.0);
|
||||
if entries.len() > segment_size {
|
||||
if term_req.req.order.order == Order::Desc {
|
||||
entries
|
||||
.select_nth_unstable_by_key(segment_size, |b| std::cmp::Reverse(b.0));
|
||||
} else {
|
||||
entries.select_nth_unstable_by_key(segment_size, |b| b.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
OrderTarget::SubAggregation(sub_agg_path) => {
|
||||
// Peek segment-level metric values, sort, then fall through to
|
||||
// Peek segment-level metric values, select top-k, then fall through to
|
||||
// `cut_off_buckets`. Like Elasticsearch, we always cut off when ordering
|
||||
// by a sub-agg: top-K results are approximate and may differ from the
|
||||
// global ordering, especially for non-monotonic metrics like avg/min.
|
||||
@@ -1003,7 +1011,7 @@ where
|
||||
))
|
||||
})?;
|
||||
let (agg_name, agg_prop) = get_agg_name_and_property(sub_agg_path);
|
||||
// Fetch values up-front; otherwise sort would re-compute per comparison
|
||||
// Fetch values up-front; otherwise sort would re-compute per call
|
||||
let mut keyed: Vec<(f64, (u64, Bucket))> = entries
|
||||
.into_iter()
|
||||
.map(|bucket| {
|
||||
@@ -1013,28 +1021,34 @@ where
|
||||
(metric_value, bucket)
|
||||
})
|
||||
.collect();
|
||||
if term_req.req.order.order == Order::Desc {
|
||||
keyed.sort_unstable_by(|a, b| {
|
||||
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
} else {
|
||||
keyed.sort_unstable_by(|a, b| {
|
||||
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
if keyed.len() > segment_size {
|
||||
if term_req.req.order.order == Order::Desc {
|
||||
keyed.select_nth_unstable_by(segment_size, |a, b| {
|
||||
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
} else {
|
||||
keyed.select_nth_unstable_by(segment_size, |a, b| {
|
||||
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
|
||||
});
|
||||
}
|
||||
}
|
||||
entries = keyed.into_iter().map(|(_, e)| e).collect();
|
||||
}
|
||||
OrderTarget::Count => {
|
||||
if term_req.req.order.order == Order::Desc {
|
||||
entries.sort_unstable_by_key(|bucket| std::cmp::Reverse(bucket.1.count));
|
||||
} else {
|
||||
entries.sort_unstable_by_key(|bucket| bucket.1.count);
|
||||
if entries.len() > segment_size {
|
||||
if term_req.req.order.order == Order::Desc {
|
||||
entries.select_nth_unstable_by_key(segment_size, |b| {
|
||||
std::cmp::Reverse(b.1.count)
|
||||
});
|
||||
} else {
|
||||
entries.select_nth_unstable_by_key(segment_size, |b| b.1.count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (term_doc_count_before_cutoff, sum_other_doc_count) =
|
||||
cut_off_buckets(&mut entries, term_req.req.segment_size as usize);
|
||||
cut_off_buckets(&mut entries, segment_size);
|
||||
|
||||
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
|
||||
dict.reserve(entries.len());
|
||||
|
||||
@@ -283,8 +283,7 @@ impl InvertedIndexReader {
|
||||
|
||||
#[cfg(feature = "quickwit")]
|
||||
impl InvertedIndexReader {
|
||||
/// Resolves a `Term` to its [`TermInfo`] asynchronously, if present in the dictionary.
|
||||
pub async fn get_term_info_async(&self, term: &Term) -> io::Result<Option<TermInfo>> {
|
||||
pub(crate) async fn get_term_info_async(&self, term: &Term) -> io::Result<Option<TermInfo>> {
|
||||
self.termdict.get_async(term.serialized_value_bytes()).await
|
||||
}
|
||||
|
||||
@@ -337,38 +336,23 @@ impl InvertedIndexReader {
|
||||
pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<bool> {
|
||||
let term_info_opt: Option<TermInfo> = self.get_term_info_async(term).await?;
|
||||
if let Some(term_info) = term_info_opt {
|
||||
self.warm_postings_from_term_info(&term_info, with_positions)
|
||||
.await?;
|
||||
let postings = self
|
||||
.postings_file_slice
|
||||
.read_bytes_slice_async(term_info.postings_range.clone());
|
||||
if with_positions {
|
||||
let positions = self
|
||||
.positions_file_slice
|
||||
.read_bytes_slice_async(term_info.positions_range.clone());
|
||||
futures_util::future::try_join(postings, positions).await?;
|
||||
} else {
|
||||
postings.await?;
|
||||
}
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Warmup a block postings given a `TermInfo`.
|
||||
/// This method is for an advanced usage only.
|
||||
///
|
||||
/// Use this when the [`TermInfo`] is already known (e.g. resolved via
|
||||
/// [`Self::get_term_info_async`]) to avoid a redundant dictionary lookup.
|
||||
pub async fn warm_postings_from_term_info(
|
||||
&self,
|
||||
term_info: &TermInfo,
|
||||
with_positions: bool,
|
||||
) -> io::Result<()> {
|
||||
let postings = self
|
||||
.postings_file_slice
|
||||
.read_bytes_slice_async(term_info.postings_range.clone());
|
||||
if with_positions {
|
||||
let positions = self
|
||||
.positions_file_slice
|
||||
.read_bytes_slice_async(term_info.positions_range.clone());
|
||||
futures_util::future::try_join(postings, positions).await?;
|
||||
} else {
|
||||
postings.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Warmup a block postings given a range of `Term`s.
|
||||
/// This method is for an advanced usage only.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user