Compare commits

...

11 Commits

13 changed files with 678 additions and 368 deletions

View File

@@ -4,24 +4,36 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::CollectorWrapper;
/// Collector that does nothing.
/// This is used in the chain Collector and will hopefully
/// be optimized away by the compiler.
pub struct DoNothingCollector;
impl Collector for DoNothingCollector {
type Child = DoNothingCollector;
#[inline]
fn set_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<()> {
Ok(())
fn for_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<DoNothingCollector> {
Ok(DoNothingCollector)
}
#[inline]
fn collect(&mut self, _doc: DocId, _score: Score) {}
#[inline]
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for DoNothingCollector {
type CollectionResult = ();
#[inline]
fn collect(&mut self, _doc: DocId, _score: Score) {}
fn finalize(self) -> () {
()
}
}
/// Zero-cost abstraction used to collect on multiple collectors.
/// This contraption is only usable if the type of your collectors
/// are known at compile time.
@@ -30,34 +42,49 @@ pub struct ChainedCollector<Left: Collector, Right: Collector> {
right: Right,
}
pub struct ChainedSegmentCollector<Left: SegmentCollector, Right: SegmentCollector> {
left: Left,
right: Right,
}
impl<Left: Collector, Right: Collector> ChainedCollector<Left, Right> {
/// Adds a collector
pub fn push<C: Collector>(self, new_collector: &mut C) -> ChainedCollector<Self, &mut C> {
pub fn push<C: Collector>(self, new_collector: &mut C) -> ChainedCollector<Self, CollectorWrapper<C>> {
ChainedCollector {
left: self,
right: new_collector,
right: CollectorWrapper::new(new_collector),
}
}
}
impl<Left: Collector, Right: Collector> Collector for ChainedCollector<Left, Right> {
fn set_segment(
type Child = ChainedSegmentCollector<Left::Child, Right::Child>;
fn for_segment(
&mut self,
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()> {
self.left.set_segment(segment_local_id, segment)?;
self.right.set_segment(segment_local_id, segment)?;
Ok(())
) -> Result<Self::Child> {
Ok(ChainedSegmentCollector {
left: self.left.for_segment(segment_local_id, segment)?,
right: self.right.for_segment(segment_local_id, segment)?,
})
}
fn requires_scoring(&self) -> bool {
self.left.requires_scoring() || self.right.requires_scoring()
}
}
impl<Left: SegmentCollector, Right: SegmentCollector> SegmentCollector for ChainedSegmentCollector<Left, Right> {
type CollectionResult = (Left::CollectionResult, Right::CollectionResult);
fn collect(&mut self, doc: DocId, score: Score) {
self.left.collect(doc, score);
self.right.collect(doc, score);
}
fn requires_scoring(&self) -> bool {
self.left.requires_scoring() || self.right.requires_scoring()
fn finalize(self) -> Self::CollectionResult {
(self.left.finalize(), self.right.finalize())
}
}
@@ -71,19 +98,35 @@ pub fn chain() -> ChainedCollector<DoNothingCollector, DoNothingCollector> {
#[cfg(test)]
mod tests {
use super::*;
use collector::{Collector, CountCollector, TopCollector};
use collector::{CountCollector, SegmentCollector, TopCollector};
use schema::SchemaBuilder;
use Index;
use Document;
#[test]
fn test_chained_collector() {
let schema_builder = SchemaBuilder::new();
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer(3_000_000).unwrap();
let doc = Document::new();
index_writer.add_document(doc);
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let searcher = index.searcher();
let segment_readers = searcher.segment_readers();
let mut top_collector = TopCollector::with_limit(2);
let mut count_collector = CountCollector::default();
{
let mut collectors = chain().push(&mut top_collector).push(&mut count_collector);
collectors.collect(1, 0.2);
collectors.collect(2, 0.1);
collectors.collect(3, 0.5);
let mut segment_collector = collectors.for_segment(0, &segment_readers[0]).unwrap();
segment_collector.collect(1, 0.2);
segment_collector.collect(2, 0.1);
segment_collector.collect(3, 0.5);
collectors.merge_children(vec![segment_collector]);
}
assert_eq!(count_collector.count(), 3);
assert!(top_collector.at_capacity());

View File

@@ -4,6 +4,8 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::Combinable;
/// `CountCollector` collector only counts how many
/// documents match the query.
@@ -21,12 +23,10 @@ impl CountCollector {
}
impl Collector for CountCollector {
fn set_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<()> {
Ok(())
}
type Child = CountCollector;
fn collect(&mut self, _: DocId, _: Score) {
self.count += 1;
fn for_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<CountCollector> {
Ok(CountCollector::default())
}
fn requires_scoring(&self) -> bool {
@@ -34,10 +34,28 @@ impl Collector for CountCollector {
}
}
impl Combinable for CountCollector {
fn combine_into(&mut self, other: Self) {
self.count += other.count;
}
}
impl SegmentCollector for CountCollector {
type CollectionResult = CountCollector;
fn collect(&mut self, _: DocId, _: Score) {
self.count += 1;
}
fn finalize(self) -> CountCollector {
self
}
}
#[cfg(test)]
mod tests {
use collector::{Collector, CountCollector};
use collector::{Collector, CountCollector, SegmentCollector};
#[test]
fn test_count_collector() {

View File

@@ -3,14 +3,12 @@ use docset::SkipResult;
use fastfield::FacetReader;
use schema::Facet;
use schema::Field;
use std::cell::UnsafeCell;
use std::collections::btree_map;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::BinaryHeap;
use std::collections::Bound;
use std::iter::Peekable;
use std::mem;
use std::{u64, usize};
use termdict::TermMerger;
@@ -20,6 +18,7 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
struct Hit<'a> {
count: u64,
@@ -194,19 +193,22 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// }
/// ```
pub struct FacetCollector {
facet_ords: Vec<u64>,
field: Field,
ff_reader: Option<UnsafeCell<FacetReader>>,
segment_counters: Vec<SegmentFacetCounter>,
facets: BTreeSet<Facet>,
}
pub struct FacetSegmentCollector {
reader: FacetReader,
facet_ords_buf: Vec<u64>,
// facet_ord -> collapse facet_id
current_segment_collapse_mapping: Vec<usize>,
collapse_mapping: Vec<usize>,
// collapse facet_id -> count
current_segment_counts: Vec<u64>,
counts: Vec<u64>,
// collapse facet_id -> facet_ord
current_collapse_facet_ords: Vec<u64>,
facets: BTreeSet<Facet>,
collapse_facet_ords: Vec<u64>,
}
fn skip<'a, I: Iterator<Item = &'a Facet>>(
@@ -240,15 +242,9 @@ impl FacetCollector {
/// is of the proper type.
pub fn for_field(field: Field) -> FacetCollector {
FacetCollector {
facet_ords: Vec::with_capacity(255),
segment_counters: Vec::new(),
field,
ff_reader: None,
facets: BTreeSet::new(),
current_segment_collapse_mapping: Vec::new(),
current_collapse_facet_ords: Vec::new(),
current_segment_counts: Vec::new(),
}
}
@@ -279,69 +275,11 @@ impl FacetCollector {
self.facets.insert(facet);
}
fn set_collapse_mapping(&mut self, facet_reader: &FacetReader) {
self.current_segment_collapse_mapping.clear();
self.current_collapse_facet_ords.clear();
self.current_segment_counts.clear();
let mut collapse_facet_it = self.facets.iter().peekable();
self.current_collapse_facet_ords.push(0);
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
if !facet_streamer.advance() {
return;
}
'outer: loop {
// at the begining of this loop, facet_streamer
// is positionned on a term that has not been processed yet.
let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it);
match skip_result {
SkipResult::Reached => {
// we reach a facet we decided to collapse.
let collapse_depth = facet_depth(facet_streamer.key());
let mut collapsed_id = 0;
self.current_segment_collapse_mapping.push(0);
while facet_streamer.advance() {
let depth = facet_depth(facet_streamer.key());
if depth <= collapse_depth {
continue 'outer;
}
if depth == collapse_depth + 1 {
collapsed_id = self.current_collapse_facet_ords.len();
self.current_collapse_facet_ords
.push(facet_streamer.term_ord());
self.current_segment_collapse_mapping.push(collapsed_id);
} else {
self.current_segment_collapse_mapping.push(collapsed_id);
}
}
break;
}
SkipResult::End | SkipResult::OverStep => {
self.current_segment_collapse_mapping.push(0);
if !facet_streamer.advance() {
break;
}
}
}
}
}
fn finalize_segment(&mut self) {
if self.ff_reader.is_some() {
self.segment_counters.push(SegmentFacetCounter {
facet_reader: self.ff_reader.take().unwrap().into_inner(),
facet_ords: mem::replace(&mut self.current_collapse_facet_ords, Vec::new()),
facet_counts: mem::replace(&mut self.current_segment_counts, Vec::new()),
});
}
}
/// Returns the results of the collection.
///
/// This method does not just return the counters,
/// it also translates the facet ordinals of the last segment.
pub fn harvest(mut self) -> FacetCounts {
self.finalize_segment();
pub fn harvest(self) -> FacetCounts {
let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters
.iter()
.map(|segment_counter| &segment_counter.facet_ords[..])
@@ -389,30 +327,92 @@ impl FacetCollector {
}
}
impl FacetSegmentCollector {
fn into_segment_facet_counter(self) -> SegmentFacetCounter {
SegmentFacetCounter {
facet_reader: self.reader,
facet_ords: self.collapse_facet_ords,
facet_counts: self.counts,
}
}
}
impl Collector for FacetCollector {
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
self.finalize_segment();
type Child = FacetSegmentCollector;
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<FacetSegmentCollector> {
let facet_reader = reader.facet_reader(self.field)?;
self.set_collapse_mapping(&facet_reader);
self.current_segment_counts
.resize(self.current_collapse_facet_ords.len(), 0);
self.ff_reader = Some(UnsafeCell::new(facet_reader));
Ok(())
let mut collapse_mapping = Vec::new();
let mut counts = Vec::new();
let mut collapse_facet_ords = Vec::new();
let mut collapse_facet_it = self.facets.iter().peekable();
collapse_facet_ords.push(0);
{
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
if facet_streamer.advance() {
'outer: loop {
// at the begining of this loop, facet_streamer
// is positionned on a term that has not been processed yet.
let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it);
match skip_result {
SkipResult::Reached => {
// we reach a facet we decided to collapse.
let collapse_depth = facet_depth(facet_streamer.key());
let mut collapsed_id = 0;
collapse_mapping.push(0);
while facet_streamer.advance() {
let depth = facet_depth(facet_streamer.key());
if depth <= collapse_depth {
continue 'outer;
}
if depth == collapse_depth + 1 {
collapsed_id = collapse_facet_ords.len();
collapse_facet_ords.push(facet_streamer.term_ord());
collapse_mapping.push(collapsed_id);
} else {
collapse_mapping.push(collapsed_id);
}
}
break;
}
SkipResult::End | SkipResult::OverStep => {
collapse_mapping.push(0);
if !facet_streamer.advance() {
break;
}
}
}
}
}
}
counts.resize(collapse_facet_ords.len(), 0);
Ok(FacetSegmentCollector {
reader: facet_reader,
facet_ords_buf: Vec::with_capacity(255),
collapse_mapping,
counts,
collapse_facet_ords,
})
}
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for FacetSegmentCollector {
type CollectionResult = Vec<SegmentFacetCounter>;
fn collect(&mut self, doc: DocId, _: Score) {
let facet_reader: &mut FacetReader = unsafe {
&mut *self.ff_reader
.as_ref()
.expect("collect() was called before set_segment. This should never happen.")
.get()
};
facet_reader.facet_ords(doc, &mut self.facet_ords);
self.reader.facet_ords(doc, &mut self.facet_ords_buf);
let mut previous_collapsed_ord: usize = usize::MAX;
for &facet_ord in &self.facet_ords {
let collapsed_ord = self.current_segment_collapse_mapping[facet_ord as usize];
self.current_segment_counts[collapsed_ord] += if collapsed_ord == previous_collapsed_ord
{
for &facet_ord in &self.facet_ords_buf {
let collapsed_ord = self.collapse_mapping[facet_ord as usize];
self.counts[collapsed_ord] += if collapsed_ord == previous_collapsed_ord {
0
} else {
1
@@ -421,8 +421,8 @@ impl Collector for FacetCollector {
}
}
fn requires_scoring(&self) -> bool {
false
fn finalize(self) -> Vec<SegmentFacetCounter> {
vec![self.into_segment_facet_counter()]
}
}

View File

@@ -7,12 +7,15 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use query::Query;
use Searcher;
use downcast;
mod count_collector;
pub use self::count_collector::CountCollector;
mod multi_collector;
pub use self::multi_collector::MultiCollector;
//mod multi_collector;
//pub use self::multi_collector::MultiCollector;
mod top_collector;
pub use self::top_collector::TopCollector;
@@ -53,31 +56,90 @@ pub use self::chained_collector::chain;
///
/// Segments are not guaranteed to be visited in any specific order.
pub trait Collector {
type Child : SegmentCollector + 'static;
/// `set_segment` is called before beginning to enumerate
/// on this segment.
fn set_segment(
fn for_segment(
&mut self,
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()>;
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score);
) -> Result<Self::Child>;
/// Returns true iff the collector requires to compute scores for documents.
fn requires_scoring(&self) -> bool;
/// Search works as follows :
///
/// First the weight object associated to the query is created.
///
/// Then, the query loops over the segments and for each segment :
/// - setup the collector and informs it that the segment being processed has changed.
/// - creates a SegmentCollector for collecting documents associated to the segment
/// - creates a `Scorer` object associated for this segment
/// - iterate through the matched documents and push them to the segment collector.
/// - turn the segment collector into a Combinable segment result
///
/// Combining all of the segment results gives a single Child::CollectionResult, which is returned.
///
/// The result will be Ok(None) in case of having no segments.
fn search(&mut self, searcher: &Searcher, query: &Query) -> Result<Option<<Self::Child as SegmentCollector>::CollectionResult>> {
let scoring_enabled = self.requires_scoring();
let weight = query.weight(searcher, scoring_enabled)?;
let mut results = Vec::new();
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
let mut child: Self::Child = self.for_segment(segment_ord as SegmentLocalId, segment_reader)?;
let mut scorer = weight.scorer(segment_reader)?;
scorer.collect(&mut child, segment_reader.delete_bitset());
results.push(child.finalize());
}
Ok(results.into_iter().fold1(|x,y| {
x.combine_into(y);
x
}))
}
}
pub trait Combinable {
fn combine_into(&mut self, other: Self);
}
impl Combinable for () {
fn combine_into(&mut self, other: Self) {
()
}
}
impl<T> Combinable for Vec<T> {
fn combine_into(&mut self, other: Self) {
self.extend(other.into_iter());
}
}
impl<L: Combinable, R: Combinable> Combinable for (L, R) {
fn combine_into(&mut self, other: Self) {
self.0.combine_into(other.0);
self.1.combine_into(other.1);
}
}
pub trait SegmentCollector: downcast::Any + 'static {
type CollectionResult: Combinable + downcast::Any + 'static;
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score);
/// Turn into the final result
fn finalize(self) -> Self::CollectionResult;
}
impl<'a, C: Collector> Collector for &'a mut C {
fn set_segment(
&mut self,
type Child = C::Child;
fn for_segment(
&mut self, // TODO Ask Jason : why &mut self here!?
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()> {
(*self).set_segment(segment_local_id, segment)
}
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score) {
C::collect(self, doc, score)
) -> Result<C::Child> {
(*self).for_segment(segment_local_id, segment)
}
fn requires_scoring(&self) -> bool {
@@ -85,6 +147,61 @@ impl<'a, C: Collector> Collector for &'a mut C {
}
}
pub struct CollectorWrapper<'a, TCollector: 'a + Collector>(&'a mut TCollector);
impl<'a, T: 'a + Collector> CollectorWrapper<'a, T> {
pub fn new(collector: &'a mut T) -> CollectorWrapper<'a, T> {
CollectorWrapper(collector)
}
}
impl<'a, T: 'a + Collector> Collector for CollectorWrapper<'a, T> {
type Child = T::Child;
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<T::Child> {
self.0.for_segment(segment_local_id, segment)
}
fn requires_scoring(&self) -> bool {
self.0.requires_scoring()
}
}
trait UntypedCollector {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<UntypedSegmentCollector>>;
}
impl<'a, TCollector:'a + Collector> UntypedCollector for CollectorWrapper<'a, TCollector> {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<UntypedSegmentCollector>> {
let segment_collector = self.0.for_segment(segment_local_id, segment)?;
Ok(Box::new(segment_collector))
}
}
trait UntypedSegmentCollector {
fn finalize(self) -> Box<UntypedCombinable>;
}
trait UntypedCombinable {
fn combine_into(&mut self, other: Box<UntypedCombinable>);
}
pub struct CombinableWrapper<'a, T: 'a + Combinable>(&'a mut T);
impl<'a, T: 'a + Combinable> CombinableWrapper<'a, T> {
pub fn new(combinable: &'a mut T) -> CombinableWrapper<'a, T> {
CombinableWrapper(combinable)
}
}
impl<'a, T: 'a + Combinable> Combinable for CombinableWrapper<'a, T> {
fn combine_into(&mut self, other: Self) {
self.0.combine_into(*::downcast::Downcast::<T>::downcast(other).unwrap())
}
}
#[cfg(test)]
pub mod tests {
@@ -102,8 +219,13 @@ pub mod tests {
/// It is unusable in practise, as it does not store
/// the segment ordinals
pub struct TestCollector {
next_offset: DocId,
docs: Vec<DocId>,
scores: Vec<Score>,
}
pub struct TestSegmentCollector {
offset: DocId,
segment_max_doc: DocId,
docs: Vec<DocId>,
scores: Vec<Score>,
}
@@ -122,8 +244,7 @@ pub mod tests {
impl Default for TestCollector {
fn default() -> TestCollector {
TestCollector {
offset: 0,
segment_max_doc: 0,
next_offset: 0,
docs: Vec::new(),
scores: Vec::new(),
}
@@ -131,19 +252,33 @@ pub mod tests {
}
impl Collector for TestCollector {
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
self.offset += self.segment_max_doc;
self.segment_max_doc = reader.max_doc();
Ok(())
type Child = TestSegmentCollector;
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<TestSegmentCollector> {
let offset = self.next_offset;
self.next_offset += reader.max_doc();
Ok(TestSegmentCollector {
offset,
docs: Vec::new(),
scores: Vec::new(),
})
}
fn requires_scoring(&self) -> bool {
true
}
}
impl SegmentCollector for TestSegmentCollector {
type CollectionResult = Vec<TestSegmentCollector>;
fn collect(&mut self, doc: DocId, score: Score) {
self.docs.push(doc + self.offset);
self.scores.push(score);
}
fn requires_scoring(&self) -> bool {
true
fn finalize(self) -> Vec<TestSegmentCollector> {
vec![self]
}
}
@@ -152,17 +287,26 @@ pub mod tests {
///
/// This collector is mainly useful for tests.
pub struct FastFieldTestCollector {
vals: Vec<u64>,
next_counter: usize,
field: Field,
ff_reader: Option<FastFieldReader<u64>>,
}
#[derive(Default)]
pub struct FastFieldSegmentCollectorState {
counter: usize,
vals: Vec<u64>,
}
pub struct FastFieldSegmentCollector {
state: FastFieldSegmentCollectorState,
reader: FastFieldReader<u64>,
}
impl FastFieldTestCollector {
pub fn for_field(field: Field) -> FastFieldTestCollector {
FastFieldTestCollector {
vals: Vec::new(),
next_counter: 0,
field,
ff_reader: None,
}
}
@@ -172,20 +316,35 @@ pub mod tests {
}
impl Collector for FastFieldTestCollector {
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
self.ff_reader = Some(reader.fast_field_reader(self.field)?);
Ok(())
type Child = FastFieldSegmentCollector;
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<FastFieldSegmentCollector> {
let counter = self.next_counter;
self.next_counter += 1;
Ok(FastFieldSegmentCollector {
state: FastFieldSegmentCollectorState::default(),
reader: reader.fast_field_reader(self.field)?,
})
}
fn collect(&mut self, doc: DocId, _score: Score) {
let val = self.ff_reader.as_ref().unwrap().get(doc);
self.vals.push(val);
}
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for FastFieldSegmentCollector {
type CollectionResult = Vec<FastFieldSegmentCollectorState>;
fn collect(&mut self, doc: DocId, _score: Score) {
let val = self.reader.get(doc);
self.vals.push(val);
}
fn finalize(self) -> Vec<FastFieldSegmentCollectorState> {
vec![self.state]
}
}
/// Collects in order all of the fast field bytes for all of the
/// docs in the `DocSet`
///
@@ -193,7 +352,11 @@ pub mod tests {
pub struct BytesFastFieldTestCollector {
vals: Vec<u8>,
field: Field,
ff_reader: Option<BytesFastFieldReader>,
}
pub struct BytesFastFieldSegmentCollector {
vals: Vec<u8>,
reader: BytesFastFieldReader,
}
impl BytesFastFieldTestCollector {
@@ -201,7 +364,6 @@ pub mod tests {
BytesFastFieldTestCollector {
vals: Vec::new(),
field,
ff_reader: None,
}
}
@@ -211,20 +373,32 @@ pub mod tests {
}
impl Collector for BytesFastFieldTestCollector {
fn set_segment(&mut self, _segment_local_id: u32, segment: &SegmentReader) -> Result<()> {
self.ff_reader = Some(segment.bytes_fast_field_reader(self.field)?);
Ok(())
}
type Child = BytesFastFieldSegmentCollector;
fn collect(&mut self, doc: u32, _score: f32) {
let val = self.ff_reader.as_ref().unwrap().get_val(doc);
self.vals.extend(val);
fn for_segment(&mut self, _segment_local_id: u32, segment: &SegmentReader) -> Result<BytesFastFieldSegmentCollector> {
Ok(BytesFastFieldSegmentCollector {
vals: Vec::new(),
reader: segment.bytes_fast_field_reader(self.field)?,
})
}
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for BytesFastFieldSegmentCollector {
type CollectionResult = Vec<Vec<u8>>;
fn collect(&mut self, doc: u32, _score: f32) {
let val = self.reader.get_val(doc);
self.vals.extend(val);
}
fn finalize(self) -> Vec<Vec<u8>> {
vec![self.vals]
}
}
}
#[cfg(all(test, feature = "unstable"))]

View File

@@ -1,67 +1,122 @@
use super::Collector;
use super::SegmentCollector;
use DocId;
use Result;
use Score;
use Result;
use SegmentLocalId;
use SegmentReader;
use downcast::Downcast;
/// Multicollector makes it possible to collect on more than one collector.
/// It should only be used for use cases where the Collector types is unknown
/// at compile time.
/// If the type of the collectors is known, you should prefer to use `ChainedCollector`.
pub struct MultiCollector<'a> {
collectors: Vec<&'a mut Collector>,
collector_wrappers: Vec<Box<UntypedCollector + 'a>>
}
impl<'a> MultiCollector<'a> {
/// Constructor
pub fn from(collectors: Vec<&'a mut Collector>) -> MultiCollector {
MultiCollector { collectors }
pub fn new() -> MultiCollector<'a> {
MultiCollector {
collector_wrappers: Vec::new()
}
}
pub fn add_collector<TCollector: 'a + Collector>(&mut self, collector: &'a mut TCollector) {
let collector_wrapper = CollectorWrapper(collector);
self.collector_wrappers.push(Box::new(collector_wrapper));
}
}
impl<'a> Collector for MultiCollector<'a> {
fn set_segment(
&mut self,
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()> {
for collector in &mut self.collectors {
collector.set_segment(segment_local_id, segment)?;
}
Ok(())
type Child = MultiCollectorChild;
fn for_segment(&mut self, segment_local_id: SegmentLocalId, segment: &SegmentReader) -> Result<MultiCollectorChild> {
let children = self.collector_wrappers
.iter_mut()
.map(|collector_wrapper| {
collector_wrapper.for_segment(segment_local_id, segment)
})
.collect::<Result<Vec<_>>>()?;
Ok(MultiCollectorChild {
children
})
}
fn collect(&mut self, doc: DocId, score: Score) {
for collector in &mut self.collectors {
collector.collect(doc, score);
fn requires_scoring(&self) -> bool {
self.collector_wrappers
.iter()
.any(|c| c.requires_scoring())
}
fn merge_children(&mut self, children: Vec<MultiCollectorChild>) {
let mut per_collector_children: Vec<Vec<Box<SegmentCollector>>> =
(0..self.collector_wrappers.len())
.map(|_| Vec::with_capacity(children.len()))
.collect::<Vec<_>>();
for child in children {
for (idx, segment_collector) in child.children.into_iter().enumerate() {
per_collector_children[idx].push(segment_collector);
}
}
for (collector, children) in self.collector_wrappers.iter_mut().zip(per_collector_children) {
collector.merge_children_anys(children);
}
}
fn requires_scoring(&self) -> bool {
self.collectors
.iter()
.any(|collector| collector.requires_scoring())
}
pub struct MultiCollectorChild {
children: Vec<Box<SegmentCollector>>
}
impl SegmentCollector for MultiCollectorChild {
fn collect(&mut self, doc: DocId, score: Score) {
for child in &mut self.children {
child.collect(doc, score);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use collector::{Collector, CountCollector, TopCollector};
use schema::{TEXT, SchemaBuilder};
use query::TermQuery;
use Index;
use Term;
use schema::IndexRecordOption;
#[test]
fn test_multi_collector() {
let mut schema_builder = SchemaBuilder::new();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(text=>"abc"));
index_writer.add_document(doc!(text=>"abc abc abc"));
index_writer.add_document(doc!(text=>"abc abc"));
index_writer.commit().unwrap();
index_writer.add_document(doc!(text=>""));
index_writer.add_document(doc!(text=>"abc abc abc abc"));
index_writer.add_document(doc!(text=>"abc"));
index_writer.commit().unwrap();
}
index.load_searchers().unwrap();
let searcher = index.searcher();
let term = Term::from_field_text(text, "abc");
let query = TermQuery::new(term, IndexRecordOption::Basic);
let mut top_collector = TopCollector::with_limit(2);
let mut count_collector = CountCollector::default();
{
let mut collectors =
MultiCollector::from(vec![&mut top_collector, &mut count_collector]);
collectors.collect(1, 0.2);
collectors.collect(2, 0.1);
collectors.collect(3, 0.5);
let mut collectors = MultiCollector::new();
collectors.add_collector(&mut top_collector);
collectors.add_collector(&mut count_collector);
collectors.search(&*searcher, &query).unwrap();
}
assert_eq!(count_collector.count(), 3);
assert!(top_collector.at_capacity());
assert_eq!(count_collector.count(), 5);
}
}

View File

@@ -7,6 +7,8 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::Combinable;
// Rust heap is a max-heap and we need a min heap.
#[derive(Clone, Copy)]
@@ -99,11 +101,34 @@ impl TopCollector {
}
impl Collector for TopCollector {
fn set_segment(&mut self, segment_id: SegmentLocalId, _: &SegmentReader) -> Result<()> {
self.segment_id = segment_id;
Ok(())
type Child = TopCollector;
fn for_segment(&mut self, segment_id: SegmentLocalId, _: &SegmentReader) -> Result<TopCollector> {
Ok(TopCollector {
limit: self.limit,
heap: BinaryHeap::new(),
segment_id,
})
}
fn requires_scoring(&self) -> bool {
true
}
}
impl Combinable for TopCollector {
// TODO: I think this could be a bit better
fn combine_into(&mut self, other: Self) {
self.segment_id = other.segment_id;
while let Some(doc) = other.heap.pop() {
self.collect(doc.doc_address.doc(), doc.score);
}
}
}
impl SegmentCollector for TopCollector {
type CollectionResult = TopCollector;
fn collect(&mut self, doc: DocId, score: Score) {
if self.at_capacity() {
// It's ok to unwrap as long as a limit of 0 is forbidden.
@@ -126,8 +151,8 @@ impl Collector for TopCollector {
}
}
fn requires_scoring(&self) -> bool {
true
fn finalize(self) -> TopCollector {
self
}
}
@@ -135,7 +160,6 @@ impl Collector for TopCollector {
mod tests {
use super::*;
use collector::Collector;
use DocId;
use Score;

View File

@@ -73,7 +73,7 @@ impl Searcher {
/// Runs a query on the segment readers wrapped by the searcher
pub fn search<C: Collector>(&self, query: &Query, collector: &mut C) -> Result<()> {
query.search(self, collector)
collector.search(self, query)
}
/// Return the field searcher associated to a `Field`.

View File

@@ -1138,125 +1138,126 @@ mod tests {
}
}
#[test]
fn test_merge_facets() {
let mut schema_builder = schema::SchemaBuilder::default();
let facet_field = schema_builder.add_facet_field("facet");
let index = Index::create_in_ram(schema_builder.build());
use schema::Facet;
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
let mut doc = Document::default();
for facet in doc_facets {
doc.add_facet(facet_field, Facet::from(facet));
}
index_writer.add_document(doc);
};
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b"]);
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b", "/top/c"]);
index_doc(&mut index_writer, &["/top/a", "/top/b"]);
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b", "/top/d"]);
index_doc(&mut index_writer, &["/top/d"]);
index_doc(&mut index_writer, &["/top/e"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b"]);
index_doc(&mut index_writer, &["/top/c"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/e", "/top/f"]);
index_writer.commit().expect("committed");
}
index.load_searchers().unwrap();
let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
let searcher = index.searcher();
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet(Facet::from("/top"));
use collector::{CountCollector, MultiCollector};
let mut count_collector = CountCollector::default();
{
let mut multi_collectors =
MultiCollector::from(vec![&mut count_collector, &mut facet_collector]);
searcher.search(&AllQuery, &mut multi_collectors).unwrap();
}
assert_eq!(count_collector.count(), expected_num_docs);
let facet_counts = facet_collector.harvest();
let facets: Vec<(String, u64)> = facet_counts
.get("/top")
.map(|(facet, count)| (facet.to_string(), count))
.collect();
assert_eq!(
facets,
expected
.iter()
.map(|&(facet_str, count)| (String::from(facet_str), count))
.collect::<Vec<_>>()
);
};
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
// Merging the segments
{
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
index.load_searchers().unwrap();
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
// Deleting one term
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
let facet_term = Term::from_facet(facet_field, &facet);
index_writer.delete_term(facet_term);
index_writer.commit().unwrap();
index.load_searchers().unwrap();
test_searcher(
9,
&[
("/top/a", 3),
("/top/b", 3),
("/top/c", 1),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
}
// #[test]
// fn test_merge_facets() {
// let mut schema_builder = schema::SchemaBuilder::default();
// let facet_field = schema_builder.add_facet_field("facet");
// let index = Index::create_in_ram(schema_builder.build());
// use schema::Facet;
// {
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
// let mut doc = Document::default();
// for facet in doc_facets {
// doc.add_facet(facet_field, Facet::from(facet));
// }
// index_writer.add_document(doc);
// };
//
// index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b"]);
// index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b", "/top/c"]);
// index_doc(&mut index_writer, &["/top/a", "/top/b"]);
// index_doc(&mut index_writer, &["/top/a"]);
//
// index_doc(&mut index_writer, &["/top/b", "/top/d"]);
// index_doc(&mut index_writer, &["/top/d"]);
// index_doc(&mut index_writer, &["/top/e"]);
// index_writer.commit().expect("committed");
//
// index_doc(&mut index_writer, &["/top/a"]);
// index_doc(&mut index_writer, &["/top/b"]);
// index_doc(&mut index_writer, &["/top/c"]);
// index_writer.commit().expect("committed");
//
// index_doc(&mut index_writer, &["/top/e", "/top/f"]);
// index_writer.commit().expect("committed");
// }
// index.load_searchers().unwrap();
// let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
// let searcher = index.searcher();
// let mut facet_collector = FacetCollector::for_field(facet_field);
// facet_collector.add_facet(Facet::from("/top"));
// use collector::{CountCollector, MultiCollector};
// let mut count_collector = CountCollector::default();
// {
// let mut multi_collectors = MultiCollector::new();
// multi_collectors.add_collector(&mut count_collector);
// multi_collectors.add_collector(&mut facet_collector);
// searcher.search(&AllQuery, &mut multi_collectors).unwrap();
// }
// assert_eq!(count_collector.count(), expected_num_docs);
// let facet_counts = facet_collector.harvest();
// let facets: Vec<(String, u64)> = facet_counts
// .get("/top")
// .map(|(facet, count)| (facet.to_string(), count))
// .collect();
// assert_eq!(
// facets,
// expected
// .iter()
// .map(|&(facet_str, count)| (String::from(facet_str), count))
// .collect::<Vec<_>>()
// );
// };
// test_searcher(
// 11,
// &[
// ("/top/a", 5),
// ("/top/b", 5),
// ("/top/c", 2),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
//
// // Merging the segments
// {
// let segment_ids = index
// .searchable_segment_ids()
// .expect("Searchable segments failed.");
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// index_writer
// .merge(&segment_ids)
// .wait()
// .expect("Merging failed");
// index_writer.wait_merging_threads().unwrap();
//
// index.load_searchers().unwrap();
// test_searcher(
// 11,
// &[
// ("/top/a", 5),
// ("/top/b", 5),
// ("/top/c", 2),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
// }
//
// // Deleting one term
// {
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
// let facet_term = Term::from_facet(facet_field, &facet);
// index_writer.delete_term(facet_term);
// index_writer.commit().unwrap();
// index.load_searchers().unwrap();
// test_searcher(
// 9,
// &[
// ("/top/a", 3),
// ("/top/b", 3),
// ("/top/c", 1),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
// }
// }
#[test]
fn test_merge_multivalued_int_fields_all_deleted() {

View File

@@ -1,10 +1,8 @@
use super::Weight;
use collector::Collector;
use core::searcher::Searcher;
use downcast;
use std::fmt;
use Result;
use SegmentLocalId;
/// The `Query` trait defines a set of documents and a scoring method
/// for those documents.
@@ -57,26 +55,6 @@ pub trait Query: QueryClone + downcast::Any + fmt::Debug {
}
Ok(result)
}
/// Search works as follows :
///
/// First the weight object associated to the query is created.
///
/// Then, the query loops over the segments and for each segment :
/// - setup the collector and informs it that the segment being processed has changed.
/// - creates a `Scorer` object associated for this segment
/// - iterate throw the matched documents and push them to the collector.
///
fn search(&self, searcher: &Searcher, collector: &mut Collector) -> Result<()> {
let scoring_enabled = collector.requires_scoring();
let weight = self.weight(searcher, scoring_enabled)?;
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
collector.set_segment(segment_ord as SegmentLocalId, segment_reader)?;
let mut scorer = weight.scorer(segment_reader)?;
scorer.collect(collector, segment_reader.delete_bitset());
}
Ok(())
}
}
pub trait QueryClone {

View File

@@ -12,14 +12,14 @@ use std::ops::Range;
use termdict::{TermDictionary, TermStreamer};
use Result;
fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
bound: Bound<TFrom>,
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
bound: &Bound<TFrom>,
transform: &Transform,
) -> Bound<Vec<u8>> {
) -> Bound<TTo> {
use self::Bound::*;
match bound {
Excluded(from_val) => Excluded(transform(from_val)),
Included(from_val) => Included(transform(from_val)),
Excluded(ref from_val) => Excluded(transform(from_val)),
Included(ref from_val) => Included(transform(from_val)),
Unbounded => Unbounded,
}
}
@@ -41,8 +41,7 @@ fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
/// # extern crate tantivy;
/// # use tantivy::Index;
/// # use tantivy::schema::{SchemaBuilder, INT_INDEXED};
/// # use tantivy::collector::CountCollector;
/// # use tantivy::query::Query;
/// # use tantivy::collector::{Collector, CountCollector};
/// # use tantivy::Result;
/// # use tantivy::query::RangeQuery;
/// #
@@ -68,7 +67,7 @@ fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
/// let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960..1970);
///
/// let mut count_collector = CountCollector::default();
/// docs_in_the_sixties.search(&*searcher, &mut count_collector)?;
/// count_collector.search(&*searcher, &docs_in_the_sixties)?;
///
/// let num_60s_books = count_collector.count();
///
@@ -113,12 +112,12 @@ impl RangeQuery {
left_bound: Bound<i64>,
right_bound: Bound<i64>,
) -> RangeQuery {
let make_term_val = |val: i64| Term::from_field_i64(field, val).value_bytes().to_owned();
let make_term_val = |val: &i64| Term::from_field_i64(field, *val).value_bytes().to_owned();
RangeQuery {
field,
value_type: Type::I64,
left_bound: map_bound(left_bound, &make_term_val),
right_bound: map_bound(right_bound, &make_term_val),
left_bound: map_bound(&left_bound, &make_term_val),
right_bound: map_bound(&right_bound, &make_term_val),
}
}
@@ -134,12 +133,12 @@ impl RangeQuery {
left_bound: Bound<u64>,
right_bound: Bound<u64>,
) -> RangeQuery {
let make_term_val = |val: u64| Term::from_field_u64(field, val).value_bytes().to_owned();
let make_term_val = |val: &u64| Term::from_field_u64(field, *val).value_bytes().to_owned();
RangeQuery {
field,
value_type: Type::U64,
left_bound: map_bound(left_bound, &make_term_val),
right_bound: map_bound(right_bound, &make_term_val),
left_bound: map_bound(&left_bound, &make_term_val),
right_bound: map_bound(&right_bound, &make_term_val),
}
}
@@ -167,12 +166,12 @@ impl RangeQuery {
left: Bound<&'b str>,
right: Bound<&'b str>,
) -> RangeQuery {
let make_term_val = |val: &str| val.as_bytes().to_vec();
let make_term_val = |val: &&str| val.as_bytes().to_vec();
RangeQuery {
field,
value_type: Type::Str,
left_bound: map_bound(left, &make_term_val),
right_bound: map_bound(right, &make_term_val),
left_bound: map_bound(&left, &make_term_val),
right_bound: map_bound(&right, &make_term_val),
}
}
@@ -187,6 +186,21 @@ impl RangeQuery {
Bound::Excluded(range.end),
)
}
/// Field to search over
pub fn field(&self) -> Field {
self.field
}
/// Lower bound of range
pub fn left_bound(&self) -> Bound<Term> {
map_bound(&self.left_bound, &|bytes| Term::from_field_bytes(self.field, bytes))
}
/// Upper bound of range
pub fn right_bound(&self) -> Bound<Term> {
map_bound(&self.right_bound, &|bytes| Term::from_field_bytes(self.field, bytes))
}
}
impl Query for RangeQuery {
@@ -259,8 +273,7 @@ impl Weight for RangeWeight {
mod tests {
use super::RangeQuery;
use collector::CountCollector;
use query::Query;
use collector::{Collector, CountCollector};
use schema::{Document, Field, SchemaBuilder, INT_INDEXED};
use std::collections::Bound;
use Index;
@@ -291,7 +304,7 @@ mod tests {
// ... or `1960..=1969` if inclusive range is enabled.
let mut count_collector = CountCollector::default();
docs_in_the_sixties.search(&*searcher, &mut count_collector)?;
count_collector.search(&*searcher, &docs_in_the_sixties)?;
assert_eq!(count_collector.count(), 2285);
Ok(())
}
@@ -328,9 +341,7 @@ mod tests {
let searcher = index.searcher();
let count_multiples = |range_query: RangeQuery| {
let mut count_collector = CountCollector::default();
range_query
.search(&*searcher, &mut count_collector)
.unwrap();
count_collector.search(&*searcher, &range_query).unwrap();
count_collector.count()
};

View File

@@ -1,4 +1,4 @@
use collector::Collector;
use collector::SegmentCollector;
use common::BitSet;
use docset::{DocSet, SkipResult};
use downcast;
@@ -18,7 +18,7 @@ pub trait Scorer: downcast::Any + DocSet + 'static {
/// Consumes the complete `DocSet` and
/// push the scored documents to the collector.
fn collect(&mut self, collector: &mut Collector, delete_bitset_opt: Option<&DeleteBitSet>) {
fn collect<T>(&mut self, collector: &mut SegmentCollector<CollectionResult = T>, delete_bitset_opt: Option<&DeleteBitSet>) {
if let Some(delete_bitset) = delete_bitset_opt {
while self.advance() {
let doc = self.doc();
@@ -44,7 +44,7 @@ impl Scorer for Box<Scorer> {
self.deref_mut().score()
}
fn collect(&mut self, collector: &mut Collector, delete_bitset: Option<&DeleteBitSet>) {
fn collect<T>(&mut self, collector: &mut SegmentCollector<CollectionResult = T>, delete_bitset: Option<&DeleteBitSet>) {
let scorer = self.deref_mut();
scorer.collect(collector, delete_bitset);
}

View File

@@ -6,7 +6,7 @@ use Result;
/// for a given set of segments.
///
/// See [`Query`](./trait.Query.html).
pub trait Weight {
pub trait Weight: Send + Sync + 'static {
/// Returns the scorer for the given segment.
/// See [`Query`](./trait.Query.html).
fn scorer(&self, reader: &SegmentReader) -> Result<Box<Scorer>>;

View File

@@ -109,6 +109,12 @@ impl Term {
self.0.extend(bytes);
}
pub(crate) fn from_field_bytes(field: Field, bytes: &[u8]) -> Term {
let mut term = Term::for_field(field);
term.set_bytes(bytes);
term
}
/// Set the texts only, keeping the field untouched.
pub fn set_text(&mut self, text: &str) {
self.set_bytes(text.as_bytes());