mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-07 09:32:54 +00:00
Split Collector into an overall Collector and a per-segment SegmentCollector. Precursor to cross-segment parallelism, and as a side benefit cleans up any per-segment fields from being Option<T> to just T.
This commit is contained in:
@@ -4,22 +4,29 @@ use Result;
|
||||
use Score;
|
||||
use SegmentLocalId;
|
||||
use SegmentReader;
|
||||
use collector::SegmentCollector;
|
||||
|
||||
/// Collector that does nothing.
|
||||
/// This is used in the chain Collector and will hopefully
|
||||
/// be optimized away by the compiler.
|
||||
pub struct DoNothingCollector;
|
||||
impl Collector for DoNothingCollector {
|
||||
type Child = DoNothingCollector;
|
||||
#[inline]
|
||||
fn set_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<()> {
|
||||
Ok(())
|
||||
fn for_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<DoNothingCollector> {
|
||||
Ok(DoNothingCollector)
|
||||
}
|
||||
#[inline]
|
||||
fn collect(&mut self, _doc: DocId, _score: Score) {}
|
||||
#[inline]
|
||||
fn requires_scoring(&self) -> bool {
|
||||
false
|
||||
}
|
||||
#[inline]
|
||||
fn merge_children(&mut self, _children: Vec<DoNothingCollector>) {}
|
||||
}
|
||||
|
||||
impl SegmentCollector for DoNothingCollector {
|
||||
#[inline]
|
||||
fn collect(&mut self, _doc: DocId, _score: Score) {}
|
||||
}
|
||||
|
||||
/// Zero-cost abstraction used to collect on multiple collectors.
|
||||
@@ -30,6 +37,11 @@ pub struct ChainedCollector<Left: Collector, Right: Collector> {
|
||||
right: Right,
|
||||
}
|
||||
|
||||
pub struct ChainedSegmentCollector<Left: SegmentCollector, Right: SegmentCollector> {
|
||||
left: Left,
|
||||
right: Right,
|
||||
}
|
||||
|
||||
impl<Left: Collector, Right: Collector> ChainedCollector<Left, Right> {
|
||||
/// Adds a collector
|
||||
pub fn push<C: Collector>(self, new_collector: &mut C) -> ChainedCollector<Self, &mut C> {
|
||||
@@ -41,24 +53,49 @@ impl<Left: Collector, Right: Collector> ChainedCollector<Left, Right> {
|
||||
}
|
||||
|
||||
impl<Left: Collector, Right: Collector> Collector for ChainedCollector<Left, Right> {
|
||||
fn set_segment(
|
||||
type Child = ChainedSegmentCollector<Left::Child, Right::Child>;
|
||||
fn for_segment(
|
||||
&mut self,
|
||||
segment_local_id: SegmentLocalId,
|
||||
segment: &SegmentReader,
|
||||
) -> Result<()> {
|
||||
self.left.set_segment(segment_local_id, segment)?;
|
||||
self.right.set_segment(segment_local_id, segment)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
self.left.collect(doc, score);
|
||||
self.right.collect(doc, score);
|
||||
) -> Result<Self::Child> {
|
||||
Ok(ChainedSegmentCollector {
|
||||
left: self.left.for_segment(segment_local_id, segment)?,
|
||||
right: self.right.for_segment(segment_local_id, segment)?,
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
self.left.requires_scoring() || self.right.requires_scoring()
|
||||
}
|
||||
|
||||
fn merge_children(&mut self, children: Vec<Self::Child>) {
|
||||
let mut lefts = Vec::new();
|
||||
let mut rights = Vec::new();
|
||||
|
||||
for child in children.into_iter() {
|
||||
lefts.push(child.left);
|
||||
rights.push(child.right);
|
||||
}
|
||||
|
||||
self.left.merge_children(lefts);
|
||||
self.right.merge_children(rights);
|
||||
}
|
||||
}
|
||||
|
||||
impl<Left: SegmentCollector, Right: SegmentCollector> SegmentCollector for ChainedSegmentCollector<Left, Right> {
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
self.left.collect(doc, score);
|
||||
self.right.collect(doc, score);
|
||||
}
|
||||
}
|
||||
|
||||
// For unit tests to keep working. I don't know if this should stick around.
|
||||
impl<Left: Collector+SegmentCollector, Right: Collector+SegmentCollector> SegmentCollector for ChainedCollector<Left, Right> {
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
self.left.collect(doc, score);
|
||||
self.right.collect(doc, score);
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a `ChainedCollector`
|
||||
@@ -73,7 +110,7 @@ pub fn chain() -> ChainedCollector<DoNothingCollector, DoNothingCollector> {
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use collector::{Collector, CountCollector, TopCollector};
|
||||
use collector::{CountCollector, SegmentCollector, TopCollector};
|
||||
|
||||
#[test]
|
||||
fn test_chained_collector() {
|
||||
|
||||
@@ -4,6 +4,7 @@ use Result;
|
||||
use Score;
|
||||
use SegmentLocalId;
|
||||
use SegmentReader;
|
||||
use collector::SegmentCollector;
|
||||
|
||||
/// `CountCollector` collector only counts how many
|
||||
/// documents match the query.
|
||||
@@ -21,23 +22,33 @@ impl CountCollector {
|
||||
}
|
||||
|
||||
impl Collector for CountCollector {
|
||||
fn set_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
type Child = CountCollector;
|
||||
|
||||
fn collect(&mut self, _: DocId, _: Score) {
|
||||
self.count += 1;
|
||||
fn for_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<CountCollector> {
|
||||
Ok(CountCollector::default())
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn merge_children(&mut self, children: Vec<CountCollector>) {
|
||||
for child in children.into_iter() {
|
||||
self.count += child.count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentCollector for CountCollector {
|
||||
fn collect(&mut self, _: DocId, _: Score) {
|
||||
self.count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use collector::{Collector, CountCollector};
|
||||
use collector::{Collector, CountCollector, SegmentCollector};
|
||||
|
||||
#[test]
|
||||
fn test_count_collector() {
|
||||
|
||||
@@ -3,14 +3,12 @@ use docset::SkipResult;
|
||||
use fastfield::FacetReader;
|
||||
use schema::Facet;
|
||||
use schema::Field;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::collections::btree_map;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::Bound;
|
||||
use std::iter::Peekable;
|
||||
use std::mem;
|
||||
use std::{u64, usize};
|
||||
use termdict::TermMerger;
|
||||
|
||||
@@ -20,6 +18,7 @@ use Result;
|
||||
use Score;
|
||||
use SegmentLocalId;
|
||||
use SegmentReader;
|
||||
use collector::SegmentCollector;
|
||||
|
||||
struct Hit<'a> {
|
||||
count: u64,
|
||||
@@ -194,19 +193,22 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
|
||||
/// }
|
||||
/// ```
|
||||
pub struct FacetCollector {
|
||||
facet_ords: Vec<u64>,
|
||||
field: Field,
|
||||
ff_reader: Option<UnsafeCell<FacetReader>>,
|
||||
segment_counters: Vec<SegmentFacetCounter>,
|
||||
facets: BTreeSet<Facet>,
|
||||
}
|
||||
|
||||
pub struct FacetSegmentCollector {
|
||||
reader: FacetReader,
|
||||
|
||||
facet_ords_buf: Vec<u64>,
|
||||
|
||||
// facet_ord -> collapse facet_id
|
||||
current_segment_collapse_mapping: Vec<usize>,
|
||||
collapse_mapping: Vec<usize>,
|
||||
// collapse facet_id -> count
|
||||
current_segment_counts: Vec<u64>,
|
||||
counts: Vec<u64>,
|
||||
// collapse facet_id -> facet_ord
|
||||
current_collapse_facet_ords: Vec<u64>,
|
||||
|
||||
facets: BTreeSet<Facet>,
|
||||
collapse_facet_ords: Vec<u64>,
|
||||
}
|
||||
|
||||
fn skip<'a, I: Iterator<Item = &'a Facet>>(
|
||||
@@ -240,15 +242,9 @@ impl FacetCollector {
|
||||
/// is of the proper type.
|
||||
pub fn for_field(field: Field) -> FacetCollector {
|
||||
FacetCollector {
|
||||
facet_ords: Vec::with_capacity(255),
|
||||
segment_counters: Vec::new(),
|
||||
field,
|
||||
ff_reader: None,
|
||||
facets: BTreeSet::new(),
|
||||
|
||||
current_segment_collapse_mapping: Vec::new(),
|
||||
current_collapse_facet_ords: Vec::new(),
|
||||
current_segment_counts: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,69 +275,11 @@ impl FacetCollector {
|
||||
self.facets.insert(facet);
|
||||
}
|
||||
|
||||
fn set_collapse_mapping(&mut self, facet_reader: &FacetReader) {
|
||||
self.current_segment_collapse_mapping.clear();
|
||||
self.current_collapse_facet_ords.clear();
|
||||
self.current_segment_counts.clear();
|
||||
let mut collapse_facet_it = self.facets.iter().peekable();
|
||||
self.current_collapse_facet_ords.push(0);
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
|
||||
if !facet_streamer.advance() {
|
||||
return;
|
||||
}
|
||||
'outer: loop {
|
||||
// at the begining of this loop, facet_streamer
|
||||
// is positionned on a term that has not been processed yet.
|
||||
let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it);
|
||||
match skip_result {
|
||||
SkipResult::Reached => {
|
||||
// we reach a facet we decided to collapse.
|
||||
let collapse_depth = facet_depth(facet_streamer.key());
|
||||
let mut collapsed_id = 0;
|
||||
self.current_segment_collapse_mapping.push(0);
|
||||
while facet_streamer.advance() {
|
||||
let depth = facet_depth(facet_streamer.key());
|
||||
if depth <= collapse_depth {
|
||||
continue 'outer;
|
||||
}
|
||||
if depth == collapse_depth + 1 {
|
||||
collapsed_id = self.current_collapse_facet_ords.len();
|
||||
self.current_collapse_facet_ords
|
||||
.push(facet_streamer.term_ord());
|
||||
self.current_segment_collapse_mapping.push(collapsed_id);
|
||||
} else {
|
||||
self.current_segment_collapse_mapping.push(collapsed_id);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
SkipResult::End | SkipResult::OverStep => {
|
||||
self.current_segment_collapse_mapping.push(0);
|
||||
if !facet_streamer.advance() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_segment(&mut self) {
|
||||
if self.ff_reader.is_some() {
|
||||
self.segment_counters.push(SegmentFacetCounter {
|
||||
facet_reader: self.ff_reader.take().unwrap().into_inner(),
|
||||
facet_ords: mem::replace(&mut self.current_collapse_facet_ords, Vec::new()),
|
||||
facet_counts: mem::replace(&mut self.current_segment_counts, Vec::new()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the results of the collection.
|
||||
///
|
||||
/// This method does not just return the counters,
|
||||
/// it also translates the facet ordinals of the last segment.
|
||||
pub fn harvest(mut self) -> FacetCounts {
|
||||
self.finalize_segment();
|
||||
|
||||
pub fn harvest(self) -> FacetCounts {
|
||||
let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters
|
||||
.iter()
|
||||
.map(|segment_counter| &segment_counter.facet_ords[..])
|
||||
@@ -387,30 +325,96 @@ impl FacetCollector {
|
||||
}
|
||||
}
|
||||
|
||||
impl FacetSegmentCollector {
|
||||
fn into_segment_facet_counter(self) -> SegmentFacetCounter {
|
||||
SegmentFacetCounter {
|
||||
facet_reader: self.reader,
|
||||
facet_ords: self.collapse_facet_ords,
|
||||
facet_counts: self.counts,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Collector for FacetCollector {
|
||||
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
|
||||
self.finalize_segment();
|
||||
type Child = FacetSegmentCollector;
|
||||
|
||||
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<FacetSegmentCollector> {
|
||||
let facet_reader = reader.facet_reader(self.field)?;
|
||||
self.set_collapse_mapping(&facet_reader);
|
||||
self.current_segment_counts
|
||||
.resize(self.current_collapse_facet_ords.len(), 0);
|
||||
self.ff_reader = Some(UnsafeCell::new(facet_reader));
|
||||
Ok(())
|
||||
|
||||
let mut collapse_mapping = Vec::new();
|
||||
let mut counts = Vec::new();
|
||||
let mut collapse_facet_ords = Vec::new();
|
||||
|
||||
let mut collapse_facet_it = self.facets.iter().peekable();
|
||||
collapse_facet_ords.push(0);
|
||||
{
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
|
||||
if facet_streamer.advance() {
|
||||
'outer: loop {
|
||||
// at the begining of this loop, facet_streamer
|
||||
// is positionned on a term that has not been processed yet.
|
||||
let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it);
|
||||
match skip_result {
|
||||
SkipResult::Reached => {
|
||||
// we reach a facet we decided to collapse.
|
||||
let collapse_depth = facet_depth(facet_streamer.key());
|
||||
let mut collapsed_id = 0;
|
||||
collapse_mapping.push(0);
|
||||
while facet_streamer.advance() {
|
||||
let depth = facet_depth(facet_streamer.key());
|
||||
if depth <= collapse_depth {
|
||||
continue 'outer;
|
||||
}
|
||||
if depth == collapse_depth + 1 {
|
||||
collapsed_id = collapse_facet_ords.len();
|
||||
collapse_facet_ords.push(facet_streamer.term_ord());
|
||||
collapse_mapping.push(collapsed_id);
|
||||
} else {
|
||||
collapse_mapping.push(collapsed_id);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
SkipResult::End | SkipResult::OverStep => {
|
||||
collapse_mapping.push(0);
|
||||
if !facet_streamer.advance() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
counts.resize(collapse_facet_ords.len(), 0);
|
||||
|
||||
Ok(FacetSegmentCollector {
|
||||
reader: facet_reader,
|
||||
facet_ords_buf: Vec::with_capacity(255),
|
||||
collapse_mapping,
|
||||
counts,
|
||||
collapse_facet_ords,
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn merge_children(&mut self, children: Vec<FacetSegmentCollector>) {
|
||||
for child in children.into_iter() {
|
||||
self.segment_counters.push(child.into_segment_facet_counter());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentCollector for FacetSegmentCollector {
|
||||
fn collect(&mut self, doc: DocId, _: Score) {
|
||||
let facet_reader: &mut FacetReader = unsafe {
|
||||
&mut *self.ff_reader
|
||||
.as_ref()
|
||||
.expect("collect() was called before set_segment. This should never happen.")
|
||||
.get()
|
||||
};
|
||||
facet_reader.facet_ords(doc, &mut self.facet_ords);
|
||||
self.reader.facet_ords(doc, &mut self.facet_ords_buf);
|
||||
let mut previous_collapsed_ord: usize = usize::MAX;
|
||||
for &facet_ord in &self.facet_ords {
|
||||
let collapsed_ord = self.current_segment_collapse_mapping[facet_ord as usize];
|
||||
self.current_segment_counts[collapsed_ord] += if collapsed_ord == previous_collapsed_ord
|
||||
{
|
||||
for &facet_ord in &self.facet_ords_buf {
|
||||
let collapsed_ord = self.collapse_mapping[facet_ord as usize];
|
||||
self.counts[collapsed_ord] += if collapsed_ord == previous_collapsed_ord {
|
||||
0
|
||||
} else {
|
||||
1
|
||||
@@ -418,10 +422,6 @@ impl Collector for FacetCollector {
|
||||
previous_collapsed_ord = collapsed_ord;
|
||||
}
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Intermediary result of the `FacetCollector` that stores
|
||||
|
||||
@@ -7,13 +7,12 @@ use Result;
|
||||
use Score;
|
||||
use SegmentLocalId;
|
||||
use SegmentReader;
|
||||
use query::Query;
|
||||
use Searcher;
|
||||
|
||||
mod count_collector;
|
||||
pub use self::count_collector::CountCollector;
|
||||
|
||||
mod multi_collector;
|
||||
pub use self::multi_collector::MultiCollector;
|
||||
|
||||
mod top_collector;
|
||||
pub use self::top_collector::TopCollector;
|
||||
|
||||
@@ -53,36 +52,76 @@ pub use self::chained_collector::chain;
|
||||
///
|
||||
/// Segments are not guaranteed to be visited in any specific order.
|
||||
pub trait Collector {
|
||||
type Child : SegmentCollector;
|
||||
/// `set_segment` is called before beginning to enumerate
|
||||
/// on this segment.
|
||||
fn set_segment(
|
||||
fn for_segment(
|
||||
&mut self,
|
||||
segment_local_id: SegmentLocalId,
|
||||
segment: &SegmentReader,
|
||||
) -> Result<()>;
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
fn collect(&mut self, doc: DocId, score: Score);
|
||||
) -> Result<Self::Child>;
|
||||
|
||||
/// Returns true iff the collector requires to compute scores for documents.
|
||||
fn requires_scoring(&self) -> bool;
|
||||
|
||||
fn merge_children(&mut self, children: Vec<Self::Child>);
|
||||
|
||||
/// Search works as follows :
|
||||
///
|
||||
/// First the weight object associated to the query is created.
|
||||
///
|
||||
/// Then, the query loops over the segments and for each segment :
|
||||
/// - setup the collector and informs it that the segment being processed has changed.
|
||||
/// - creates a SegmentCollector for collecting documents associated to the segment
|
||||
/// - creates a `Scorer` object associated for this segment
|
||||
/// - iterate throw the matched documents and push them to the segment collector.
|
||||
///
|
||||
/// Finally, the Collector merges each of the child collectors into itself for result usability
|
||||
/// by the caller.
|
||||
fn search(&mut self, searcher: &Searcher, query: &Query) -> Result<()> {
|
||||
let scoring_enabled = self.requires_scoring();
|
||||
let weight = query.weight(searcher, scoring_enabled)?;
|
||||
let mut children = Vec::new();
|
||||
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
|
||||
let mut child: Self::Child = self.for_segment(segment_ord as SegmentLocalId, segment_reader)?;
|
||||
let mut scorer = weight.scorer(segment_reader)?;
|
||||
scorer.collect(&mut child, segment_reader.delete_bitset());
|
||||
children.push(child);
|
||||
}
|
||||
self.merge_children(children);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SegmentCollector {
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
fn collect(&mut self, doc: DocId, score: Score);
|
||||
}
|
||||
|
||||
impl<'a, C: Collector> Collector for &'a mut C {
|
||||
fn set_segment(
|
||||
type Child = C::Child;
|
||||
|
||||
fn for_segment(
|
||||
&mut self,
|
||||
segment_local_id: SegmentLocalId,
|
||||
segment: &SegmentReader,
|
||||
) -> Result<()> {
|
||||
(*self).set_segment(segment_local_id, segment)
|
||||
}
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
C::collect(self, doc, score)
|
||||
) -> Result<C::Child> {
|
||||
(*self).for_segment(segment_local_id, segment)
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
C::requires_scoring(self)
|
||||
}
|
||||
|
||||
fn merge_children(&mut self, children: Vec<C::Child>) {
|
||||
(*self).merge_children(children);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: SegmentCollector> SegmentCollector for &'a mut S {
|
||||
fn collect(&mut self, doc: u32, score: f32) {
|
||||
(*self).collect(doc, score);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -101,8 +140,13 @@ pub mod tests {
|
||||
/// It is unusable in practise, as it does not store
|
||||
/// the segment ordinals
|
||||
pub struct TestCollector {
|
||||
next_offset: DocId,
|
||||
docs: Vec<DocId>,
|
||||
scores: Vec<Score>,
|
||||
}
|
||||
|
||||
pub struct TestSegmentCollector {
|
||||
offset: DocId,
|
||||
segment_max_doc: DocId,
|
||||
docs: Vec<DocId>,
|
||||
scores: Vec<Score>,
|
||||
}
|
||||
@@ -121,8 +165,7 @@ pub mod tests {
|
||||
impl Default for TestCollector {
|
||||
fn default() -> TestCollector {
|
||||
TestCollector {
|
||||
offset: 0,
|
||||
segment_max_doc: 0,
|
||||
next_offset: 0,
|
||||
docs: Vec::new(),
|
||||
scores: Vec::new(),
|
||||
}
|
||||
@@ -130,20 +173,36 @@ pub mod tests {
|
||||
}
|
||||
|
||||
impl Collector for TestCollector {
|
||||
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
|
||||
self.offset += self.segment_max_doc;
|
||||
self.segment_max_doc = reader.max_doc();
|
||||
Ok(())
|
||||
}
|
||||
type Child = TestSegmentCollector;
|
||||
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
self.docs.push(doc + self.offset);
|
||||
self.scores.push(score);
|
||||
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<TestSegmentCollector> {
|
||||
let offset = self.next_offset;
|
||||
self.next_offset += reader.max_doc();
|
||||
Ok(TestSegmentCollector {
|
||||
offset,
|
||||
docs: Vec::new(),
|
||||
scores: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn merge_children(&mut self, mut children: Vec<TestSegmentCollector>) {
|
||||
children.sort_by_key(|x| x.offset);
|
||||
for child in children.into_iter() {
|
||||
self.docs.extend(child.docs);
|
||||
self.scores.extend(child.scores);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentCollector for TestSegmentCollector {
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
self.docs.push(doc + self.offset);
|
||||
self.scores.push(score);
|
||||
}
|
||||
}
|
||||
|
||||
/// Collects in order all of the fast fields for all of the
|
||||
@@ -151,17 +210,23 @@ pub mod tests {
|
||||
///
|
||||
/// This collector is mainly useful for tests.
|
||||
pub struct FastFieldTestCollector {
|
||||
next_counter: usize,
|
||||
vals: Vec<u64>,
|
||||
field: Field,
|
||||
ff_reader: Option<FastFieldReader<u64>>,
|
||||
}
|
||||
|
||||
pub struct FastFieldSegmentCollector {
|
||||
counter: usize,
|
||||
vals: Vec<u64>,
|
||||
reader: FastFieldReader<u64>,
|
||||
}
|
||||
|
||||
impl FastFieldTestCollector {
|
||||
pub fn for_field(field: Field) -> FastFieldTestCollector {
|
||||
FastFieldTestCollector {
|
||||
next_counter: 0,
|
||||
vals: Vec::new(),
|
||||
field,
|
||||
ff_reader: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,20 +236,36 @@ pub mod tests {
|
||||
}
|
||||
|
||||
impl Collector for FastFieldTestCollector {
|
||||
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
|
||||
self.ff_reader = Some(reader.fast_field_reader(self.field)?);
|
||||
Ok(())
|
||||
type Child = FastFieldSegmentCollector;
|
||||
|
||||
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<FastFieldSegmentCollector> {
|
||||
let counter = self.next_counter;
|
||||
self.next_counter += 1;
|
||||
Ok(FastFieldSegmentCollector {
|
||||
counter,
|
||||
vals: Vec::new(),
|
||||
reader: reader.fast_field_reader(self.field)?,
|
||||
})
|
||||
}
|
||||
|
||||
fn collect(&mut self, doc: DocId, _score: Score) {
|
||||
let val = self.ff_reader.as_ref().unwrap().get(doc);
|
||||
self.vals.push(val);
|
||||
}
|
||||
fn requires_scoring(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn merge_children(&mut self, mut children: Vec<FastFieldSegmentCollector>) {
|
||||
children.sort_by_key(|x| x.counter);
|
||||
for child in children.into_iter() {
|
||||
self.vals.extend(child.vals);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentCollector for FastFieldSegmentCollector {
|
||||
fn collect(&mut self, doc: DocId, _score: Score) {
|
||||
let val = self.reader.get(doc);
|
||||
self.vals.push(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
use super::Collector;
|
||||
use DocId;
|
||||
use Result;
|
||||
use Score;
|
||||
use SegmentLocalId;
|
||||
use SegmentReader;
|
||||
|
||||
/// Multicollector makes it possible to collect on more than one collector.
|
||||
/// It should only be used for use cases where the Collector types is unknown
|
||||
/// at compile time.
|
||||
/// If the type of the collectors is known, you should prefer to use `ChainedCollector`.
|
||||
pub struct MultiCollector<'a> {
|
||||
collectors: Vec<&'a mut Collector>,
|
||||
}
|
||||
|
||||
impl<'a> MultiCollector<'a> {
|
||||
/// Constructor
|
||||
pub fn from(collectors: Vec<&'a mut Collector>) -> MultiCollector {
|
||||
MultiCollector { collectors }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Collector for MultiCollector<'a> {
|
||||
fn set_segment(
|
||||
&mut self,
|
||||
segment_local_id: SegmentLocalId,
|
||||
segment: &SegmentReader,
|
||||
) -> Result<()> {
|
||||
for collector in &mut self.collectors {
|
||||
collector.set_segment(segment_local_id, segment)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
for collector in &mut self.collectors {
|
||||
collector.collect(doc, score);
|
||||
}
|
||||
}
|
||||
fn requires_scoring(&self) -> bool {
|
||||
self.collectors
|
||||
.iter()
|
||||
.any(|collector| collector.requires_scoring())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use collector::{Collector, CountCollector, TopCollector};
|
||||
|
||||
#[test]
|
||||
fn test_multi_collector() {
|
||||
let mut top_collector = TopCollector::with_limit(2);
|
||||
let mut count_collector = CountCollector::default();
|
||||
{
|
||||
let mut collectors =
|
||||
MultiCollector::from(vec![&mut top_collector, &mut count_collector]);
|
||||
collectors.collect(1, 0.2);
|
||||
collectors.collect(2, 0.1);
|
||||
collectors.collect(3, 0.5);
|
||||
}
|
||||
assert_eq!(count_collector.count(), 3);
|
||||
assert!(top_collector.at_capacity());
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ use Result;
|
||||
use Score;
|
||||
use SegmentLocalId;
|
||||
use SegmentReader;
|
||||
use collector::SegmentCollector;
|
||||
|
||||
// Rust heap is a max-heap and we need a min heap.
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -99,11 +100,32 @@ impl TopCollector {
|
||||
}
|
||||
|
||||
impl Collector for TopCollector {
|
||||
fn set_segment(&mut self, segment_id: SegmentLocalId, _: &SegmentReader) -> Result<()> {
|
||||
self.segment_id = segment_id;
|
||||
Ok(())
|
||||
type Child = TopCollector;
|
||||
|
||||
fn for_segment(&mut self, segment_id: SegmentLocalId, _: &SegmentReader) -> Result<TopCollector> {
|
||||
Ok(TopCollector {
|
||||
limit: self.limit,
|
||||
heap: BinaryHeap::new(),
|
||||
segment_id,
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn merge_children(&mut self, children: Vec<TopCollector>) {
|
||||
// TODO: Could this be much better?
|
||||
for mut child in children.into_iter() {
|
||||
self.segment_id = child.segment_id;
|
||||
while let Some(doc) = child.heap.pop() {
|
||||
self.collect(doc.doc_address.doc(), doc.score)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentCollector for TopCollector {
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
if self.at_capacity() {
|
||||
// It's ok to unwrap as long as a limit of 0 is forbidden.
|
||||
@@ -125,17 +147,12 @@ impl Collector for TopCollector {
|
||||
self.heap.push(wrapped_doc);
|
||||
}
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::*;
|
||||
use collector::Collector;
|
||||
use DocId;
|
||||
use Score;
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ impl Searcher {
|
||||
|
||||
/// Runs a query on the segment readers wrapped by the searcher
|
||||
pub fn search<C: Collector>(&self, query: &Query, collector: &mut C) -> Result<()> {
|
||||
query.search(self, collector)
|
||||
collector.search(self, query)
|
||||
}
|
||||
|
||||
/// Return the field searcher associated to a `Field`.
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use super::Weight;
|
||||
use collector::Collector;
|
||||
use core::searcher::Searcher;
|
||||
use std::fmt;
|
||||
use Result;
|
||||
use SegmentLocalId;
|
||||
|
||||
/// The `Query` trait defines a set of documents and a scoring method
|
||||
/// for those documents.
|
||||
@@ -56,24 +54,4 @@ pub trait Query: fmt::Debug {
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Search works as follows :
|
||||
///
|
||||
/// First the weight object associated to the query is created.
|
||||
///
|
||||
/// Then, the query loops over the segments and for each segment :
|
||||
/// - setup the collector and informs it that the segment being processed has changed.
|
||||
/// - creates a `Scorer` object associated for this segment
|
||||
/// - iterate throw the matched documents and push them to the collector.
|
||||
///
|
||||
fn search(&self, searcher: &Searcher, collector: &mut Collector) -> Result<()> {
|
||||
let scoring_enabled = collector.requires_scoring();
|
||||
let weight = self.weight(searcher, scoring_enabled)?;
|
||||
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
|
||||
collector.set_segment(segment_ord as SegmentLocalId, segment_reader)?;
|
||||
let mut scorer = weight.scorer(segment_reader)?;
|
||||
scorer.collect(collector, segment_reader.delete_bitset());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,8 +41,7 @@ fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
|
||||
/// # extern crate tantivy;
|
||||
/// # use tantivy::Index;
|
||||
/// # use tantivy::schema::{SchemaBuilder, INT_INDEXED};
|
||||
/// # use tantivy::collector::CountCollector;
|
||||
/// # use tantivy::query::Query;
|
||||
/// # use tantivy::collector::{Collector, CountCollector};
|
||||
/// # use tantivy::Result;
|
||||
/// # use tantivy::query::RangeQuery;
|
||||
/// #
|
||||
@@ -68,7 +67,7 @@ fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
|
||||
/// let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960..1970);
|
||||
///
|
||||
/// let mut count_collector = CountCollector::default();
|
||||
/// docs_in_the_sixties.search(&*searcher, &mut count_collector)?;
|
||||
/// count_collector.search(&*searcher, &docs_in_the_sixties)?;
|
||||
///
|
||||
/// let num_60s_books = count_collector.count();
|
||||
///
|
||||
@@ -259,8 +258,7 @@ impl Weight for RangeWeight {
|
||||
mod tests {
|
||||
|
||||
use super::RangeQuery;
|
||||
use collector::CountCollector;
|
||||
use query::Query;
|
||||
use collector::{Collector, CountCollector};
|
||||
use schema::{Document, Field, SchemaBuilder, INT_INDEXED};
|
||||
use std::collections::Bound;
|
||||
use Index;
|
||||
@@ -291,7 +289,7 @@ mod tests {
|
||||
|
||||
// ... or `1960..=1969` if inclusive range is enabled.
|
||||
let mut count_collector = CountCollector::default();
|
||||
docs_in_the_sixties.search(&*searcher, &mut count_collector)?;
|
||||
count_collector.search(&*searcher, &docs_in_the_sixties)?;
|
||||
assert_eq!(count_collector.count(), 2285);
|
||||
Ok(())
|
||||
}
|
||||
@@ -328,9 +326,7 @@ mod tests {
|
||||
let searcher = index.searcher();
|
||||
let count_multiples = |range_query: RangeQuery| {
|
||||
let mut count_collector = CountCollector::default();
|
||||
range_query
|
||||
.search(&*searcher, &mut count_collector)
|
||||
.unwrap();
|
||||
count_collector.search(&*searcher, &range_query).unwrap();
|
||||
count_collector.count()
|
||||
};
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use collector::Collector;
|
||||
use collector::SegmentCollector;
|
||||
use common::BitSet;
|
||||
use docset::{DocSet, SkipResult};
|
||||
use downcast;
|
||||
@@ -18,7 +18,7 @@ pub trait Scorer: downcast::Any + DocSet + 'static {
|
||||
|
||||
/// Consumes the complete `DocSet` and
|
||||
/// push the scored documents to the collector.
|
||||
fn collect(&mut self, collector: &mut Collector, delete_bitset_opt: Option<&DeleteBitSet>) {
|
||||
fn collect(&mut self, collector: &mut SegmentCollector, delete_bitset_opt: Option<&DeleteBitSet>) {
|
||||
if let Some(delete_bitset) = delete_bitset_opt {
|
||||
while self.advance() {
|
||||
let doc = self.doc();
|
||||
@@ -44,7 +44,7 @@ impl Scorer for Box<Scorer> {
|
||||
self.deref_mut().score()
|
||||
}
|
||||
|
||||
fn collect(&mut self, collector: &mut Collector, delete_bitset: Option<&DeleteBitSet>) {
|
||||
fn collect(&mut self, collector: &mut SegmentCollector, delete_bitset: Option<&DeleteBitSet>) {
|
||||
let scorer = self.deref_mut();
|
||||
scorer.collect(collector, delete_bitset);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user