Compare commits

..

16 Commits

Author SHA1 Message Date
Jason Wolfe
c9d8031664 Committing for shared discussion 2018-05-19 14:03:38 +09:00
jwolfe
9628413386 Merge branch 'issue/query-ergonomics-3' into staged_collector_with_multi 2018-05-18 17:05:14 +09:00
jwolfe
b2ce65f52d Expose parameters of RangeQuery for external usage 2018-05-18 17:04:26 +09:00
Jason Wolfe
0cea706f10 Add docs to new Query methods (#307) 2018-05-18 13:53:29 +09:00
jwolfe
ad81e131ec Merge branch 'master' into staged_collector_with_multi 2018-05-18 12:22:44 +09:00
Paul Masurel
71d41ca209 Added Google to the license 2018-05-18 10:13:23 +09:00
Paul Masurel
bc69dab822 cargo fmt 2018-05-18 10:08:05 +09:00
Jason Wolfe
72acad0921 Add box_clone() and downcast::Any to Query (#303) 2018-05-18 09:53:11 +09:00
Paul Masurel
c9459f74e8 Update docs about TermDict. 2018-05-18 09:20:39 +09:00
jwolfe
327ca2ab02 Make Weight Send+Sync for parallelization purposes 2018-05-16 10:49:38 +09:00
jwolfe
16ca6a0e5c Fix test 2018-05-14 16:40:00 +09:00
jwolfe
8c07ae653d Merge branch 'master' into staged_collector_with_multi 2018-05-14 15:35:23 +09:00
jwolfe
3d483f8711 Fix chained collector 2018-05-14 14:20:07 +09:00
Paul Masurel
56b2e9731f working. Chained collector is broken though 2018-05-12 16:00:58 -07:00
Jason Wolfe
c85668cabe Attempt to add MultiCollector back 2018-05-11 22:18:43 +09:00
Jason Wolfe
8a33ddaca7 Split Collector into an overall Collector and a per-segment SegmentCollector. Precursor to cross-segment parallelism, and as a side benefit cleans up any per-segment fields from being Option<T> to just T. 2018-05-02 14:08:52 +09:00
25 changed files with 798 additions and 622 deletions

View File

@@ -1,4 +1,4 @@
Copyright (c) 2016 Paul Masurel
Copyright (c) 2018 by Paul Masurel, Google LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

View File

@@ -4,24 +4,36 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::CollectorWrapper;
/// Collector that does nothing.
/// This is used in the chain Collector and will hopefully
/// be optimized away by the compiler.
pub struct DoNothingCollector;
impl Collector for DoNothingCollector {
type Child = DoNothingCollector;
#[inline]
fn set_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<()> {
Ok(())
fn for_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<DoNothingCollector> {
Ok(DoNothingCollector)
}
#[inline]
fn collect(&mut self, _doc: DocId, _score: Score) {}
#[inline]
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for DoNothingCollector {
type CollectionResult = ();
#[inline]
fn collect(&mut self, _doc: DocId, _score: Score) {}
fn finalize(self) -> () {
()
}
}
/// Zero-cost abstraction used to collect on multiple collectors.
/// This contraption is only usable if the type of your collectors
/// are known at compile time.
@@ -30,34 +42,49 @@ pub struct ChainedCollector<Left: Collector, Right: Collector> {
right: Right,
}
pub struct ChainedSegmentCollector<Left: SegmentCollector, Right: SegmentCollector> {
left: Left,
right: Right,
}
impl<Left: Collector, Right: Collector> ChainedCollector<Left, Right> {
/// Adds a collector
pub fn push<C: Collector>(self, new_collector: &mut C) -> ChainedCollector<Self, &mut C> {
pub fn push<C: Collector>(self, new_collector: &mut C) -> ChainedCollector<Self, CollectorWrapper<C>> {
ChainedCollector {
left: self,
right: new_collector,
right: CollectorWrapper::new(new_collector),
}
}
}
impl<Left: Collector, Right: Collector> Collector for ChainedCollector<Left, Right> {
fn set_segment(
type Child = ChainedSegmentCollector<Left::Child, Right::Child>;
fn for_segment(
&mut self,
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()> {
self.left.set_segment(segment_local_id, segment)?;
self.right.set_segment(segment_local_id, segment)?;
Ok(())
) -> Result<Self::Child> {
Ok(ChainedSegmentCollector {
left: self.left.for_segment(segment_local_id, segment)?,
right: self.right.for_segment(segment_local_id, segment)?,
})
}
fn requires_scoring(&self) -> bool {
self.left.requires_scoring() || self.right.requires_scoring()
}
}
impl<Left: SegmentCollector, Right: SegmentCollector> SegmentCollector for ChainedSegmentCollector<Left, Right> {
type CollectionResult = (Left::CollectionResult, Right::CollectionResult);
fn collect(&mut self, doc: DocId, score: Score) {
self.left.collect(doc, score);
self.right.collect(doc, score);
}
fn requires_scoring(&self) -> bool {
self.left.requires_scoring() || self.right.requires_scoring()
fn finalize(self) -> Self::CollectionResult {
(self.left.finalize(), self.right.finalize())
}
}
@@ -71,19 +98,35 @@ pub fn chain() -> ChainedCollector<DoNothingCollector, DoNothingCollector> {
#[cfg(test)]
mod tests {
use super::*;
use collector::{Collector, CountCollector, TopCollector};
use collector::{CountCollector, SegmentCollector, TopCollector};
use schema::SchemaBuilder;
use Index;
use Document;
#[test]
fn test_chained_collector() {
let schema_builder = SchemaBuilder::new();
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer(3_000_000).unwrap();
let doc = Document::new();
index_writer.add_document(doc);
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let searcher = index.searcher();
let segment_readers = searcher.segment_readers();
let mut top_collector = TopCollector::with_limit(2);
let mut count_collector = CountCollector::default();
{
let mut collectors = chain().push(&mut top_collector).push(&mut count_collector);
collectors.collect(1, 0.2);
collectors.collect(2, 0.1);
collectors.collect(3, 0.5);
let mut segment_collector = collectors.for_segment(0, &segment_readers[0]).unwrap();
segment_collector.collect(1, 0.2);
segment_collector.collect(2, 0.1);
segment_collector.collect(3, 0.5);
collectors.merge_children(vec![segment_collector]);
}
assert_eq!(count_collector.count(), 3);
assert!(top_collector.at_capacity());

View File

@@ -4,6 +4,8 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::Combinable;
/// `CountCollector` collector only counts how many
/// documents match the query.
@@ -21,12 +23,10 @@ impl CountCollector {
}
impl Collector for CountCollector {
fn set_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<()> {
Ok(())
}
type Child = CountCollector;
fn collect(&mut self, _: DocId, _: Score) {
self.count += 1;
fn for_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<CountCollector> {
Ok(CountCollector::default())
}
fn requires_scoring(&self) -> bool {
@@ -34,10 +34,28 @@ impl Collector for CountCollector {
}
}
impl Combinable for CountCollector {
fn combine_into(&mut self, other: Self) {
self.count += other.count;
}
}
impl SegmentCollector for CountCollector {
type CollectionResult = CountCollector;
fn collect(&mut self, _: DocId, _: Score) {
self.count += 1;
}
fn finalize(self) -> CountCollector {
self
}
}
#[cfg(test)]
mod tests {
use collector::{Collector, CountCollector};
use collector::{Collector, CountCollector, SegmentCollector};
#[test]
fn test_count_collector() {

View File

@@ -3,14 +3,12 @@ use docset::SkipResult;
use fastfield::FacetReader;
use schema::Facet;
use schema::Field;
use std::cell::UnsafeCell;
use std::collections::btree_map;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::BinaryHeap;
use std::collections::Bound;
use std::iter::Peekable;
use std::mem;
use std::{u64, usize};
use termdict::TermMerger;
@@ -20,6 +18,7 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
struct Hit<'a> {
count: u64,
@@ -194,19 +193,22 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// }
/// ```
pub struct FacetCollector {
facet_ords: Vec<u64>,
field: Field,
ff_reader: Option<UnsafeCell<FacetReader>>,
segment_counters: Vec<SegmentFacetCounter>,
facets: BTreeSet<Facet>,
}
pub struct FacetSegmentCollector {
reader: FacetReader,
facet_ords_buf: Vec<u64>,
// facet_ord -> collapse facet_id
current_segment_collapse_mapping: Vec<usize>,
collapse_mapping: Vec<usize>,
// collapse facet_id -> count
current_segment_counts: Vec<u64>,
counts: Vec<u64>,
// collapse facet_id -> facet_ord
current_collapse_facet_ords: Vec<u64>,
facets: BTreeSet<Facet>,
collapse_facet_ords: Vec<u64>,
}
fn skip<'a, I: Iterator<Item = &'a Facet>>(
@@ -240,15 +242,9 @@ impl FacetCollector {
/// is of the proper type.
pub fn for_field(field: Field) -> FacetCollector {
FacetCollector {
facet_ords: Vec::with_capacity(255),
segment_counters: Vec::new(),
field,
ff_reader: None,
facets: BTreeSet::new(),
current_segment_collapse_mapping: Vec::new(),
current_collapse_facet_ords: Vec::new(),
current_segment_counts: Vec::new(),
}
}
@@ -279,69 +275,11 @@ impl FacetCollector {
self.facets.insert(facet);
}
fn set_collapse_mapping(&mut self, facet_reader: &FacetReader) {
self.current_segment_collapse_mapping.clear();
self.current_collapse_facet_ords.clear();
self.current_segment_counts.clear();
let mut collapse_facet_it = self.facets.iter().peekable();
self.current_collapse_facet_ords.push(0);
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
if !facet_streamer.advance() {
return;
}
'outer: loop {
// at the begining of this loop, facet_streamer
// is positionned on a term that has not been processed yet.
let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it);
match skip_result {
SkipResult::Reached => {
// we reach a facet we decided to collapse.
let collapse_depth = facet_depth(facet_streamer.key());
let mut collapsed_id = 0;
self.current_segment_collapse_mapping.push(0);
while facet_streamer.advance() {
let depth = facet_depth(facet_streamer.key());
if depth <= collapse_depth {
continue 'outer;
}
if depth == collapse_depth + 1 {
collapsed_id = self.current_collapse_facet_ords.len();
self.current_collapse_facet_ords
.push(facet_streamer.term_ord());
self.current_segment_collapse_mapping.push(collapsed_id);
} else {
self.current_segment_collapse_mapping.push(collapsed_id);
}
}
break;
}
SkipResult::End | SkipResult::OverStep => {
self.current_segment_collapse_mapping.push(0);
if !facet_streamer.advance() {
break;
}
}
}
}
}
fn finalize_segment(&mut self) {
if self.ff_reader.is_some() {
self.segment_counters.push(SegmentFacetCounter {
facet_reader: self.ff_reader.take().unwrap().into_inner(),
facet_ords: mem::replace(&mut self.current_collapse_facet_ords, Vec::new()),
facet_counts: mem::replace(&mut self.current_segment_counts, Vec::new()),
});
}
}
/// Returns the results of the collection.
///
/// This method does not just return the counters,
/// it also translates the facet ordinals of the last segment.
pub fn harvest(mut self) -> FacetCounts {
self.finalize_segment();
pub fn harvest(self) -> FacetCounts {
let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters
.iter()
.map(|segment_counter| &segment_counter.facet_ords[..])
@@ -389,30 +327,92 @@ impl FacetCollector {
}
}
impl FacetSegmentCollector {
fn into_segment_facet_counter(self) -> SegmentFacetCounter {
SegmentFacetCounter {
facet_reader: self.reader,
facet_ords: self.collapse_facet_ords,
facet_counts: self.counts,
}
}
}
impl Collector for FacetCollector {
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
self.finalize_segment();
type Child = FacetSegmentCollector;
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<FacetSegmentCollector> {
let facet_reader = reader.facet_reader(self.field)?;
self.set_collapse_mapping(&facet_reader);
self.current_segment_counts
.resize(self.current_collapse_facet_ords.len(), 0);
self.ff_reader = Some(UnsafeCell::new(facet_reader));
Ok(())
let mut collapse_mapping = Vec::new();
let mut counts = Vec::new();
let mut collapse_facet_ords = Vec::new();
let mut collapse_facet_it = self.facets.iter().peekable();
collapse_facet_ords.push(0);
{
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
if facet_streamer.advance() {
'outer: loop {
// at the begining of this loop, facet_streamer
// is positionned on a term that has not been processed yet.
let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it);
match skip_result {
SkipResult::Reached => {
// we reach a facet we decided to collapse.
let collapse_depth = facet_depth(facet_streamer.key());
let mut collapsed_id = 0;
collapse_mapping.push(0);
while facet_streamer.advance() {
let depth = facet_depth(facet_streamer.key());
if depth <= collapse_depth {
continue 'outer;
}
if depth == collapse_depth + 1 {
collapsed_id = collapse_facet_ords.len();
collapse_facet_ords.push(facet_streamer.term_ord());
collapse_mapping.push(collapsed_id);
} else {
collapse_mapping.push(collapsed_id);
}
}
break;
}
SkipResult::End | SkipResult::OverStep => {
collapse_mapping.push(0);
if !facet_streamer.advance() {
break;
}
}
}
}
}
}
counts.resize(collapse_facet_ords.len(), 0);
Ok(FacetSegmentCollector {
reader: facet_reader,
facet_ords_buf: Vec::with_capacity(255),
collapse_mapping,
counts,
collapse_facet_ords,
})
}
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for FacetSegmentCollector {
type CollectionResult = Vec<SegmentFacetCounter>;
fn collect(&mut self, doc: DocId, _: Score) {
let facet_reader: &mut FacetReader = unsafe {
&mut *self.ff_reader
.as_ref()
.expect("collect() was called before set_segment. This should never happen.")
.get()
};
facet_reader.facet_ords(doc, &mut self.facet_ords);
self.reader.facet_ords(doc, &mut self.facet_ords_buf);
let mut previous_collapsed_ord: usize = usize::MAX;
for &facet_ord in &self.facet_ords {
let collapsed_ord = self.current_segment_collapse_mapping[facet_ord as usize];
self.current_segment_counts[collapsed_ord] += if collapsed_ord == previous_collapsed_ord
{
for &facet_ord in &self.facet_ords_buf {
let collapsed_ord = self.collapse_mapping[facet_ord as usize];
self.counts[collapsed_ord] += if collapsed_ord == previous_collapsed_ord {
0
} else {
1
@@ -421,8 +421,8 @@ impl Collector for FacetCollector {
}
}
fn requires_scoring(&self) -> bool {
false
fn finalize(self) -> Vec<SegmentFacetCounter> {
vec![self.into_segment_facet_counter()]
}
}

View File

@@ -7,12 +7,15 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use query::Query;
use Searcher;
use downcast;
mod count_collector;
pub use self::count_collector::CountCollector;
mod multi_collector;
pub use self::multi_collector::MultiCollector;
//mod multi_collector;
//pub use self::multi_collector::MultiCollector;
mod top_collector;
pub use self::top_collector::TopCollector;
@@ -53,31 +56,90 @@ pub use self::chained_collector::chain;
///
/// Segments are not guaranteed to be visited in any specific order.
pub trait Collector {
type Child : SegmentCollector + 'static;
/// `set_segment` is called before beginning to enumerate
/// on this segment.
fn set_segment(
fn for_segment(
&mut self,
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()>;
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score);
) -> Result<Self::Child>;
/// Returns true iff the collector requires to compute scores for documents.
fn requires_scoring(&self) -> bool;
/// Search works as follows :
///
/// First the weight object associated to the query is created.
///
/// Then, the query loops over the segments and for each segment :
/// - setup the collector and informs it that the segment being processed has changed.
/// - creates a SegmentCollector for collecting documents associated to the segment
/// - creates a `Scorer` object associated for this segment
/// - iterate through the matched documents and push them to the segment collector.
/// - turn the segment collector into a Combinable segment result
///
/// Combining all of the segment results gives a single Child::CollectionResult, which is returned.
///
/// The result will be Ok(None) in case of having no segments.
fn search(&mut self, searcher: &Searcher, query: &Query) -> Result<Option<<Self::Child as SegmentCollector>::CollectionResult>> {
let scoring_enabled = self.requires_scoring();
let weight = query.weight(searcher, scoring_enabled)?;
let mut results = Vec::new();
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
let mut child: Self::Child = self.for_segment(segment_ord as SegmentLocalId, segment_reader)?;
let mut scorer = weight.scorer(segment_reader)?;
scorer.collect(&mut child, segment_reader.delete_bitset());
results.push(child.finalize());
}
Ok(results.into_iter().fold1(|x,y| {
x.combine_into(y);
x
}))
}
}
pub trait Combinable {
fn combine_into(&mut self, other: Self);
}
impl Combinable for () {
fn combine_into(&mut self, other: Self) {
()
}
}
impl<T> Combinable for Vec<T> {
fn combine_into(&mut self, other: Self) {
self.extend(other.into_iter());
}
}
impl<L: Combinable, R: Combinable> Combinable for (L, R) {
fn combine_into(&mut self, other: Self) {
self.0.combine_into(other.0);
self.1.combine_into(other.1);
}
}
pub trait SegmentCollector: downcast::Any + 'static {
type CollectionResult: Combinable + downcast::Any + 'static;
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score);
/// Turn into the final result
fn finalize(self) -> Self::CollectionResult;
}
impl<'a, C: Collector> Collector for &'a mut C {
fn set_segment(
&mut self,
type Child = C::Child;
fn for_segment(
&mut self, // TODO Ask Jason : why &mut self here!?
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()> {
(*self).set_segment(segment_local_id, segment)
}
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score) {
C::collect(self, doc, score)
) -> Result<C::Child> {
(*self).for_segment(segment_local_id, segment)
}
fn requires_scoring(&self) -> bool {
@@ -85,6 +147,61 @@ impl<'a, C: Collector> Collector for &'a mut C {
}
}
pub struct CollectorWrapper<'a, TCollector: 'a + Collector>(&'a mut TCollector);
impl<'a, T: 'a + Collector> CollectorWrapper<'a, T> {
pub fn new(collector: &'a mut T) -> CollectorWrapper<'a, T> {
CollectorWrapper(collector)
}
}
impl<'a, T: 'a + Collector> Collector for CollectorWrapper<'a, T> {
type Child = T::Child;
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<T::Child> {
self.0.for_segment(segment_local_id, segment)
}
fn requires_scoring(&self) -> bool {
self.0.requires_scoring()
}
}
trait UntypedCollector {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<UntypedSegmentCollector>>;
}
impl<'a, TCollector:'a + Collector> UntypedCollector for CollectorWrapper<'a, TCollector> {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<UntypedSegmentCollector>> {
let segment_collector = self.0.for_segment(segment_local_id, segment)?;
Ok(Box::new(segment_collector))
}
}
trait UntypedSegmentCollector {
fn finalize(self) -> Box<UntypedCombinable>;
}
trait UntypedCombinable {
fn combine_into(&mut self, other: Box<UntypedCombinable>);
}
pub struct CombinableWrapper<'a, T: 'a + Combinable>(&'a mut T);
impl<'a, T: 'a + Combinable> CombinableWrapper<'a, T> {
pub fn new(combinable: &'a mut T) -> CombinableWrapper<'a, T> {
CombinableWrapper(combinable)
}
}
impl<'a, T: 'a + Combinable> Combinable for CombinableWrapper<'a, T> {
fn combine_into(&mut self, other: Self) {
self.0.combine_into(*::downcast::Downcast::<T>::downcast(other).unwrap())
}
}
#[cfg(test)]
pub mod tests {
@@ -102,8 +219,13 @@ pub mod tests {
/// It is unusable in practise, as it does not store
/// the segment ordinals
pub struct TestCollector {
next_offset: DocId,
docs: Vec<DocId>,
scores: Vec<Score>,
}
pub struct TestSegmentCollector {
offset: DocId,
segment_max_doc: DocId,
docs: Vec<DocId>,
scores: Vec<Score>,
}
@@ -122,8 +244,7 @@ pub mod tests {
impl Default for TestCollector {
fn default() -> TestCollector {
TestCollector {
offset: 0,
segment_max_doc: 0,
next_offset: 0,
docs: Vec::new(),
scores: Vec::new(),
}
@@ -131,19 +252,33 @@ pub mod tests {
}
impl Collector for TestCollector {
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
self.offset += self.segment_max_doc;
self.segment_max_doc = reader.max_doc();
Ok(())
type Child = TestSegmentCollector;
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<TestSegmentCollector> {
let offset = self.next_offset;
self.next_offset += reader.max_doc();
Ok(TestSegmentCollector {
offset,
docs: Vec::new(),
scores: Vec::new(),
})
}
fn requires_scoring(&self) -> bool {
true
}
}
impl SegmentCollector for TestSegmentCollector {
type CollectionResult = Vec<TestSegmentCollector>;
fn collect(&mut self, doc: DocId, score: Score) {
self.docs.push(doc + self.offset);
self.scores.push(score);
}
fn requires_scoring(&self) -> bool {
true
fn finalize(self) -> Vec<TestSegmentCollector> {
vec![self]
}
}
@@ -152,17 +287,26 @@ pub mod tests {
///
/// This collector is mainly useful for tests.
pub struct FastFieldTestCollector {
vals: Vec<u64>,
next_counter: usize,
field: Field,
ff_reader: Option<FastFieldReader<u64>>,
}
#[derive(Default)]
pub struct FastFieldSegmentCollectorState {
counter: usize,
vals: Vec<u64>,
}
pub struct FastFieldSegmentCollector {
state: FastFieldSegmentCollectorState,
reader: FastFieldReader<u64>,
}
impl FastFieldTestCollector {
pub fn for_field(field: Field) -> FastFieldTestCollector {
FastFieldTestCollector {
vals: Vec::new(),
next_counter: 0,
field,
ff_reader: None,
}
}
@@ -172,20 +316,35 @@ pub mod tests {
}
impl Collector for FastFieldTestCollector {
fn set_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<()> {
self.ff_reader = Some(reader.fast_field_reader(self.field)?);
Ok(())
type Child = FastFieldSegmentCollector;
fn for_segment(&mut self, _: SegmentLocalId, reader: &SegmentReader) -> Result<FastFieldSegmentCollector> {
let counter = self.next_counter;
self.next_counter += 1;
Ok(FastFieldSegmentCollector {
state: FastFieldSegmentCollectorState::default(),
reader: reader.fast_field_reader(self.field)?,
})
}
fn collect(&mut self, doc: DocId, _score: Score) {
let val = self.ff_reader.as_ref().unwrap().get(doc);
self.vals.push(val);
}
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for FastFieldSegmentCollector {
type CollectionResult = Vec<FastFieldSegmentCollectorState>;
fn collect(&mut self, doc: DocId, _score: Score) {
let val = self.reader.get(doc);
self.vals.push(val);
}
fn finalize(self) -> Vec<FastFieldSegmentCollectorState> {
vec![self.state]
}
}
/// Collects in order all of the fast field bytes for all of the
/// docs in the `DocSet`
///
@@ -193,7 +352,11 @@ pub mod tests {
pub struct BytesFastFieldTestCollector {
vals: Vec<u8>,
field: Field,
ff_reader: Option<BytesFastFieldReader>,
}
pub struct BytesFastFieldSegmentCollector {
vals: Vec<u8>,
reader: BytesFastFieldReader,
}
impl BytesFastFieldTestCollector {
@@ -201,7 +364,6 @@ pub mod tests {
BytesFastFieldTestCollector {
vals: Vec::new(),
field,
ff_reader: None,
}
}
@@ -211,20 +373,32 @@ pub mod tests {
}
impl Collector for BytesFastFieldTestCollector {
fn set_segment(&mut self, _segment_local_id: u32, segment: &SegmentReader) -> Result<()> {
self.ff_reader = Some(segment.bytes_fast_field_reader(self.field)?);
Ok(())
}
type Child = BytesFastFieldSegmentCollector;
fn collect(&mut self, doc: u32, _score: f32) {
let val = self.ff_reader.as_ref().unwrap().get_val(doc);
self.vals.extend(val);
fn for_segment(&mut self, _segment_local_id: u32, segment: &SegmentReader) -> Result<BytesFastFieldSegmentCollector> {
Ok(BytesFastFieldSegmentCollector {
vals: Vec::new(),
reader: segment.bytes_fast_field_reader(self.field)?,
})
}
fn requires_scoring(&self) -> bool {
false
}
}
impl SegmentCollector for BytesFastFieldSegmentCollector {
type CollectionResult = Vec<Vec<u8>>;
fn collect(&mut self, doc: u32, _score: f32) {
let val = self.reader.get_val(doc);
self.vals.extend(val);
}
fn finalize(self) -> Vec<Vec<u8>> {
vec![self.vals]
}
}
}
#[cfg(all(test, feature = "unstable"))]

View File

@@ -1,67 +1,122 @@
use super::Collector;
use super::SegmentCollector;
use DocId;
use Result;
use Score;
use Result;
use SegmentLocalId;
use SegmentReader;
use downcast::Downcast;
/// Multicollector makes it possible to collect on more than one collector.
/// It should only be used for use cases where the Collector types is unknown
/// at compile time.
/// If the type of the collectors is known, you should prefer to use `ChainedCollector`.
pub struct MultiCollector<'a> {
collectors: Vec<&'a mut Collector>,
collector_wrappers: Vec<Box<UntypedCollector + 'a>>
}
impl<'a> MultiCollector<'a> {
/// Constructor
pub fn from(collectors: Vec<&'a mut Collector>) -> MultiCollector {
MultiCollector { collectors }
pub fn new() -> MultiCollector<'a> {
MultiCollector {
collector_wrappers: Vec::new()
}
}
pub fn add_collector<TCollector: 'a + Collector>(&mut self, collector: &'a mut TCollector) {
let collector_wrapper = CollectorWrapper(collector);
self.collector_wrappers.push(Box::new(collector_wrapper));
}
}
impl<'a> Collector for MultiCollector<'a> {
fn set_segment(
&mut self,
segment_local_id: SegmentLocalId,
segment: &SegmentReader,
) -> Result<()> {
for collector in &mut self.collectors {
collector.set_segment(segment_local_id, segment)?;
}
Ok(())
type Child = MultiCollectorChild;
fn for_segment(&mut self, segment_local_id: SegmentLocalId, segment: &SegmentReader) -> Result<MultiCollectorChild> {
let children = self.collector_wrappers
.iter_mut()
.map(|collector_wrapper| {
collector_wrapper.for_segment(segment_local_id, segment)
})
.collect::<Result<Vec<_>>>()?;
Ok(MultiCollectorChild {
children
})
}
fn collect(&mut self, doc: DocId, score: Score) {
for collector in &mut self.collectors {
collector.collect(doc, score);
fn requires_scoring(&self) -> bool {
self.collector_wrappers
.iter()
.any(|c| c.requires_scoring())
}
fn merge_children(&mut self, children: Vec<MultiCollectorChild>) {
let mut per_collector_children: Vec<Vec<Box<SegmentCollector>>> =
(0..self.collector_wrappers.len())
.map(|_| Vec::with_capacity(children.len()))
.collect::<Vec<_>>();
for child in children {
for (idx, segment_collector) in child.children.into_iter().enumerate() {
per_collector_children[idx].push(segment_collector);
}
}
for (collector, children) in self.collector_wrappers.iter_mut().zip(per_collector_children) {
collector.merge_children_anys(children);
}
}
fn requires_scoring(&self) -> bool {
self.collectors
.iter()
.any(|collector| collector.requires_scoring())
}
pub struct MultiCollectorChild {
children: Vec<Box<SegmentCollector>>
}
impl SegmentCollector for MultiCollectorChild {
fn collect(&mut self, doc: DocId, score: Score) {
for child in &mut self.children {
child.collect(doc, score);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use collector::{Collector, CountCollector, TopCollector};
use schema::{TEXT, SchemaBuilder};
use query::TermQuery;
use Index;
use Term;
use schema::IndexRecordOption;
#[test]
fn test_multi_collector() {
let mut schema_builder = SchemaBuilder::new();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(text=>"abc"));
index_writer.add_document(doc!(text=>"abc abc abc"));
index_writer.add_document(doc!(text=>"abc abc"));
index_writer.commit().unwrap();
index_writer.add_document(doc!(text=>""));
index_writer.add_document(doc!(text=>"abc abc abc abc"));
index_writer.add_document(doc!(text=>"abc"));
index_writer.commit().unwrap();
}
index.load_searchers().unwrap();
let searcher = index.searcher();
let term = Term::from_field_text(text, "abc");
let query = TermQuery::new(term, IndexRecordOption::Basic);
let mut top_collector = TopCollector::with_limit(2);
let mut count_collector = CountCollector::default();
{
let mut collectors =
MultiCollector::from(vec![&mut top_collector, &mut count_collector]);
collectors.collect(1, 0.2);
collectors.collect(2, 0.1);
collectors.collect(3, 0.5);
let mut collectors = MultiCollector::new();
collectors.add_collector(&mut top_collector);
collectors.add_collector(&mut count_collector);
collectors.search(&*searcher, &query).unwrap();
}
assert_eq!(count_collector.count(), 3);
assert!(top_collector.at_capacity());
assert_eq!(count_collector.count(), 5);
}
}

View File

@@ -7,6 +7,8 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::Combinable;
// Rust heap is a max-heap and we need a min heap.
#[derive(Clone, Copy)]
@@ -99,11 +101,34 @@ impl TopCollector {
}
impl Collector for TopCollector {
fn set_segment(&mut self, segment_id: SegmentLocalId, _: &SegmentReader) -> Result<()> {
self.segment_id = segment_id;
Ok(())
type Child = TopCollector;
fn for_segment(&mut self, segment_id: SegmentLocalId, _: &SegmentReader) -> Result<TopCollector> {
Ok(TopCollector {
limit: self.limit,
heap: BinaryHeap::new(),
segment_id,
})
}
fn requires_scoring(&self) -> bool {
true
}
}
impl Combinable for TopCollector {
// TODO: I think this could be a bit better
fn combine_into(&mut self, other: Self) {
self.segment_id = other.segment_id;
while let Some(doc) = other.heap.pop() {
self.collect(doc.doc_address.doc(), doc.score);
}
}
}
impl SegmentCollector for TopCollector {
type CollectionResult = TopCollector;
fn collect(&mut self, doc: DocId, score: Score) {
if self.at_capacity() {
// It's ok to unwrap as long as a limit of 0 is forbidden.
@@ -126,8 +151,8 @@ impl Collector for TopCollector {
}
}
fn requires_scoring(&self) -> bool {
true
fn finalize(self) -> TopCollector {
self
}
}
@@ -135,7 +160,6 @@ impl Collector for TopCollector {
mod tests {
use super::*;
use collector::Collector;
use DocId;
use Score;

View File

@@ -73,7 +73,7 @@ impl Searcher {
/// Runs a query on the segment readers wrapped by the searcher
pub fn search<C: Collector>(&self, query: &Query, collector: &mut C) -> Result<()> {
query.search(self, collector)
collector.search(self, query)
}
/// Return the field searcher associated to a `Field`.

View File

@@ -414,8 +414,8 @@ impl<'a> Iterator for SegmentReaderAliveDocsIterator<'a> {
#[cfg(test)]
mod test {
use core::Index;
use schema::{SchemaBuilder, Term, STORED, TEXT};
use DocId;
use schema::{TEXT, STORED, Term, SchemaBuilder};
#[test]
fn test_alive_docs_iterator() {
@@ -448,6 +448,6 @@ mod test {
index.load_searchers().unwrap();
let searcher = index.searcher();
let docs: Vec<DocId> = searcher.segment_reader(0).doc_ids_alive().collect();
assert_eq!(vec![0u32, 2u32], docs);
assert_eq!(vec![0u32, 2u32], docs);
}
}

View File

@@ -1138,125 +1138,126 @@ mod tests {
}
}
#[test]
fn test_merge_facets() {
let mut schema_builder = schema::SchemaBuilder::default();
let facet_field = schema_builder.add_facet_field("facet");
let index = Index::create_in_ram(schema_builder.build());
use schema::Facet;
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
let mut doc = Document::default();
for facet in doc_facets {
doc.add_facet(facet_field, Facet::from(facet));
}
index_writer.add_document(doc);
};
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b"]);
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b", "/top/c"]);
index_doc(&mut index_writer, &["/top/a", "/top/b"]);
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b", "/top/d"]);
index_doc(&mut index_writer, &["/top/d"]);
index_doc(&mut index_writer, &["/top/e"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b"]);
index_doc(&mut index_writer, &["/top/c"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/e", "/top/f"]);
index_writer.commit().expect("committed");
}
index.load_searchers().unwrap();
let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
let searcher = index.searcher();
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet(Facet::from("/top"));
use collector::{CountCollector, MultiCollector};
let mut count_collector = CountCollector::default();
{
let mut multi_collectors =
MultiCollector::from(vec![&mut count_collector, &mut facet_collector]);
searcher.search(&AllQuery, &mut multi_collectors).unwrap();
}
assert_eq!(count_collector.count(), expected_num_docs);
let facet_counts = facet_collector.harvest();
let facets: Vec<(String, u64)> = facet_counts
.get("/top")
.map(|(facet, count)| (facet.to_string(), count))
.collect();
assert_eq!(
facets,
expected
.iter()
.map(|&(facet_str, count)| (String::from(facet_str), count))
.collect::<Vec<_>>()
);
};
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
// Merging the segments
{
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
index.load_searchers().unwrap();
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
// Deleting one term
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
let facet_term = Term::from_facet(facet_field, &facet);
index_writer.delete_term(facet_term);
index_writer.commit().unwrap();
index.load_searchers().unwrap();
test_searcher(
9,
&[
("/top/a", 3),
("/top/b", 3),
("/top/c", 1),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
}
// #[test]
// fn test_merge_facets() {
// let mut schema_builder = schema::SchemaBuilder::default();
// let facet_field = schema_builder.add_facet_field("facet");
// let index = Index::create_in_ram(schema_builder.build());
// use schema::Facet;
// {
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
// let mut doc = Document::default();
// for facet in doc_facets {
// doc.add_facet(facet_field, Facet::from(facet));
// }
// index_writer.add_document(doc);
// };
//
// index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b"]);
// index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b", "/top/c"]);
// index_doc(&mut index_writer, &["/top/a", "/top/b"]);
// index_doc(&mut index_writer, &["/top/a"]);
//
// index_doc(&mut index_writer, &["/top/b", "/top/d"]);
// index_doc(&mut index_writer, &["/top/d"]);
// index_doc(&mut index_writer, &["/top/e"]);
// index_writer.commit().expect("committed");
//
// index_doc(&mut index_writer, &["/top/a"]);
// index_doc(&mut index_writer, &["/top/b"]);
// index_doc(&mut index_writer, &["/top/c"]);
// index_writer.commit().expect("committed");
//
// index_doc(&mut index_writer, &["/top/e", "/top/f"]);
// index_writer.commit().expect("committed");
// }
// index.load_searchers().unwrap();
// let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
// let searcher = index.searcher();
// let mut facet_collector = FacetCollector::for_field(facet_field);
// facet_collector.add_facet(Facet::from("/top"));
// use collector::{CountCollector, MultiCollector};
// let mut count_collector = CountCollector::default();
// {
// let mut multi_collectors = MultiCollector::new();
// multi_collectors.add_collector(&mut count_collector);
// multi_collectors.add_collector(&mut facet_collector);
// searcher.search(&AllQuery, &mut multi_collectors).unwrap();
// }
// assert_eq!(count_collector.count(), expected_num_docs);
// let facet_counts = facet_collector.harvest();
// let facets: Vec<(String, u64)> = facet_counts
// .get("/top")
// .map(|(facet, count)| (facet.to_string(), count))
// .collect();
// assert_eq!(
// facets,
// expected
// .iter()
// .map(|&(facet_str, count)| (String::from(facet_str), count))
// .collect::<Vec<_>>()
// );
// };
// test_searcher(
// 11,
// &[
// ("/top/a", 5),
// ("/top/b", 5),
// ("/top/c", 2),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
//
// // Merging the segments
// {
// let segment_ids = index
// .searchable_segment_ids()
// .expect("Searchable segments failed.");
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// index_writer
// .merge(&segment_ids)
// .wait()
// .expect("Merging failed");
// index_writer.wait_merging_threads().unwrap();
//
// index.load_searchers().unwrap();
// test_searcher(
// 11,
// &[
// ("/top/a", 5),
// ("/top/b", 5),
// ("/top/c", 2),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
// }
//
// // Deleting one term
// {
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
// let facet_term = Term::from_facet(facet_field, &facet);
// index_writer.delete_term(facet_term);
// index_writer.commit().unwrap();
// index.load_searchers().unwrap();
// test_searcher(
// 9,
// &[
// ("/top/a", 3),
// ("/top/b", 3),
// ("/top/c", 1),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
// }
// }
#[test]
fn test_merge_multivalued_int_fields_all_deleted() {

View File

@@ -1,34 +1,25 @@
use common::{BinarySerializable, FixedSize};
use std::io;
/// `TermInfo` contains all of the information
/// associated to terms in the `.term` file.
///
/// It consists of
/// * `doc_freq` : the number of document in the segment
/// containing this term. It is also the length of the
/// posting list associated to this term
/// * `postings_offset` : an offset in the `.idx` file
/// addressing the start of the posting list associated
/// to this term.
/// `TermInfo` wraps the metadata associated to a Term.
/// It is segment-local.
#[derive(Debug, Default, Ord, PartialOrd, Eq, PartialEq, Clone)]
pub struct TermInfo {
/// Number of documents in the segment containing the term
pub doc_freq: u32,
/// Offset within the postings (`.idx`) file.
/// Start offset within the postings (`.idx`) file.
pub postings_offset: u64,
/// Offset within the position (`.pos`) file.
/// Start offset of the first block within the position (`.pos`) file.
pub positions_offset: u64,
/// Offset within the position block.
/// Start offset within this position block.
pub positions_inner_offset: u8,
}
impl FixedSize for TermInfo {
/// Size required for the binary serialization of `TermInfo`.
/// This is large, but in practise, all `TermInfo` but the first one
/// of the block are bitpacked.
///
/// See `TermInfoStore`.
/// Size required for the binary serialization of a `TermInfo` object.
/// This is large, but in practise, `TermInfo` are encoded in blocks and
/// only the first `TermInfo` of a block is serialized uncompressed.
/// The subsequent `TermInfo` are delta encoded and bitpacked.
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES + u8::SIZE_IN_BYTES;
}

View File

@@ -9,7 +9,7 @@ use Score;
/// Query that matches all of the documents.
///
/// All of the document get the score 1f32.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct AllQuery;
impl Query for AllQuery {

View File

@@ -23,6 +23,16 @@ pub struct BooleanQuery {
subqueries: Vec<(Occur, Box<Query>)>,
}
impl Clone for BooleanQuery {
fn clone(&self) -> Self {
self.subqueries
.iter()
.map(|(x, y)| (x.clone(), y.box_clone()))
.collect::<Vec<_>>()
.into()
}
}
impl From<Vec<(Occur, Box<Query>)>> for BooleanQuery {
fn from(subqueries: Vec<(Occur, Box<Query>)>) -> BooleanQuery {
BooleanQuery { subqueries }
@@ -55,4 +65,9 @@ impl BooleanQuery {
.collect();
BooleanQuery::from(occur_term_queries)
}
/// Deconstructed view of the clauses making up this query.
pub fn clauses(&self) -> &[(Occur, Box<Query>)] {
&self.subqueries[..]
}
}

View File

@@ -1,146 +0,0 @@
use common::BitSet;
use core::SegmentReader;
use fst::Automaton;
use levenshtein_automata::{LevenshteinAutomatonBuilder, DFA};
use query::BitSetDocSet;
use query::ConstScorer;
use query::{Query, Scorer, Weight};
use schema::{Field, IndexRecordOption, Term};
use termdict::{TermDictionary, TermStreamer};
use std::collections::HashMap;
use Result;
use Searcher;
lazy_static! {
static ref LEV_BUILDER: HashMap<(u8, bool), LevenshteinAutomatonBuilder> = {
let mut lev_builder_cache = HashMap::new();
// TODO make population lazy on a `(distance, val)` basis
for distance in 0..3 {
for &transposition in [false, true].iter() {
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(distance, transposition);
lev_builder_cache.insert((distance, transposition), lev_automaton_builder);
}
}
lev_builder_cache
};
}
/// A Fuzzy Query matches all of the documents
/// containing a specific term that is with in
/// Levenshtein distance
#[derive(Debug)]
pub struct FuzzyQuery {
term: Term,
distance: u8,
// TODO handle transposition optionally
}
impl FuzzyQuery {
/// Creates a new Fuzzy Query
pub fn new(term: Term, distance: u8) -> FuzzyQuery {
FuzzyQuery { term, distance }
}
pub fn specialized_weight(&self) -> Result<AutomatonWeight<DFA>> {
let automaton = LEV_BUILDER.get(&(self.distance, false))
.unwrap() // TODO return an error
.build_dfa(self.term.text());
Ok(AutomatonWeight {
term: self.term.clone(),
field: self.term.field(),
automaton,
})
}
}
impl Query for FuzzyQuery {
fn weight(&self, _searcher: &Searcher, _scoring_enabled: bool) -> Result<Box<Weight>> {
Ok(Box::new(self.specialized_weight()?))
}
}
pub struct AutomatonWeight<A>
where A: Automaton
{
term: Term,
field: Field,
automaton: A,
}
impl<A> AutomatonWeight<A>
where
A: Automaton,
{
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
let term_stream_builder = term_dict.search(&self.automaton);
term_stream_builder.into_stream()
}
}
impl<A> Weight for AutomatonWeight<A>
where
A: Automaton,
{
fn scorer(&self, reader: &SegmentReader) -> Result<Box<Scorer>> {
let max_doc = reader.max_doc();
let mut doc_bitset = BitSet::with_max_value(max_doc);
let inverted_index = reader.inverted_index(self.field);
let term_dict = inverted_index.terms();
let mut term_stream = self.automaton_stream(term_dict);
while term_stream.advance() {
let term_info = term_stream.value();
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic);
while block_segment_postings.advance() {
for &doc in block_segment_postings.docs() {
doc_bitset.insert(doc);
}
}
}
let doc_bitset = BitSetDocSet::from(doc_bitset);
Ok(Box::new(ConstScorer::new(doc_bitset)))
}
}
#[cfg(test)]
mod test {
use super::FuzzyQuery;
use schema::SchemaBuilder;
use Index;
use collector::TopCollector;
use schema::TEXT;
use Term;
use tests::assert_nearly_equals;
#[test]
pub fn test_automaton_weight() {
let mut schema_builder = SchemaBuilder::new();
let country_field = schema_builder.add_text_field("country", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
index_writer.add_document(doc!(
country_field => "Japan",
));
index_writer.add_document(doc!(
country_field => "Korea",
));
index_writer.commit().unwrap();
}
index.load_searchers().unwrap();
let searcher = index.searcher();
{
let mut collector = TopCollector::with_limit(2);
let term = Term::from_field_text(country_field, "Japon");
let fuzzy_query = FuzzyQuery::new(term, 2);
searcher.search(&fuzzy_query, &mut collector).unwrap();
let scored_docs = collector.score_docs();
assert_eq!(scored_docs.len(), 1);
let (score, _) = scored_docs[0];
assert_nearly_equals(0.77802235, score);
}
}
}

View File

@@ -7,7 +7,6 @@ mod bitset;
mod bm25;
mod boolean_query;
mod exclude;
mod fuzzy_query;
mod intersection;
mod occur;
mod phrase_query;

View File

@@ -21,7 +21,7 @@ use Result;
/// Using a `PhraseQuery` on a field requires positions
/// to be indexed for this field.
///
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct PhraseQuery {
field: Field,
phrase_terms: Vec<Term>,
@@ -47,6 +47,16 @@ impl PhraseQuery {
phrase_terms: terms,
}
}
/// The `Field` this `PhraseQuery` is targeting.
pub fn field(&self) -> Field {
self.field
}
/// The `Term`s in the phrase making up this `PhraseQuery`.
pub fn phrase_terms(&self) -> &[Term] {
&self.phrase_terms[..]
}
}
impl Query for PhraseQuery {

View File

@@ -1,9 +1,8 @@
use super::Weight;
use collector::Collector;
use core::searcher::Searcher;
use downcast;
use std::fmt;
use Result;
use SegmentLocalId;
/// The `Query` trait defines a set of documents and a scoring method
/// for those documents.
@@ -38,7 +37,7 @@ use SegmentLocalId;
///
/// When implementing a new type of `Query`, it is normal to implement a
/// dedicated `Query`, `Weight` and `Scorer`.
pub trait Query: fmt::Debug {
pub trait Query: QueryClone + downcast::Any + fmt::Debug {
/// Create the weight associated to a query.
///
/// If scoring is not required, setting `scoring_enabled` to `false`
@@ -56,24 +55,22 @@ pub trait Query: fmt::Debug {
}
Ok(result)
}
}
/// Search works as follows :
///
/// First the weight object associated to the query is created.
///
/// Then, the query loops over the segments and for each segment :
/// - setup the collector and informs it that the segment being processed has changed.
/// - creates a `Scorer` object associated for this segment
/// - iterate throw the matched documents and push them to the collector.
///
fn search(&self, searcher: &Searcher, collector: &mut Collector) -> Result<()> {
let scoring_enabled = collector.requires_scoring();
let weight = self.weight(searcher, scoring_enabled)?;
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
collector.set_segment(segment_ord as SegmentLocalId, segment_reader)?;
let mut scorer = weight.scorer(segment_reader)?;
scorer.collect(collector, segment_reader.delete_bitset());
}
Ok(())
pub trait QueryClone {
fn box_clone(&self) -> Box<Query>;
}
impl<T> QueryClone for T
where
T: 'static + Query + Clone,
{
fn box_clone(&self) -> Box<Query> {
Box::new(self.clone())
}
}
#[allow(missing_docs)]
mod downcast_impl {
downcast!(super::Query);
}

View File

@@ -12,14 +12,14 @@ use std::ops::Range;
use termdict::{TermDictionary, TermStreamer};
use Result;
fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
bound: Bound<TFrom>,
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
bound: &Bound<TFrom>,
transform: &Transform,
) -> Bound<Vec<u8>> {
) -> Bound<TTo> {
use self::Bound::*;
match bound {
Excluded(from_val) => Excluded(transform(from_val)),
Included(from_val) => Included(transform(from_val)),
Excluded(ref from_val) => Excluded(transform(from_val)),
Included(ref from_val) => Included(transform(from_val)),
Unbounded => Unbounded,
}
}
@@ -41,8 +41,7 @@ fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
/// # extern crate tantivy;
/// # use tantivy::Index;
/// # use tantivy::schema::{SchemaBuilder, INT_INDEXED};
/// # use tantivy::collector::CountCollector;
/// # use tantivy::query::Query;
/// # use tantivy::collector::{Collector, CountCollector};
/// # use tantivy::Result;
/// # use tantivy::query::RangeQuery;
/// #
@@ -68,7 +67,7 @@ fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
/// let docs_in_the_sixties = RangeQuery::new_u64(year_field, 1960..1970);
///
/// let mut count_collector = CountCollector::default();
/// docs_in_the_sixties.search(&*searcher, &mut count_collector)?;
/// count_collector.search(&*searcher, &docs_in_the_sixties)?;
///
/// let num_60s_books = count_collector.count();
///
@@ -80,7 +79,7 @@ fn map_bound<TFrom, Transform: Fn(TFrom) -> Vec<u8>>(
/// # run().unwrap()
/// # }
/// ```
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct RangeQuery {
field: Field,
value_type: Type,
@@ -113,12 +112,12 @@ impl RangeQuery {
left_bound: Bound<i64>,
right_bound: Bound<i64>,
) -> RangeQuery {
let make_term_val = |val: i64| Term::from_field_i64(field, val).value_bytes().to_owned();
let make_term_val = |val: &i64| Term::from_field_i64(field, *val).value_bytes().to_owned();
RangeQuery {
field,
value_type: Type::I64,
left_bound: map_bound(left_bound, &make_term_val),
right_bound: map_bound(right_bound, &make_term_val),
left_bound: map_bound(&left_bound, &make_term_val),
right_bound: map_bound(&right_bound, &make_term_val),
}
}
@@ -134,12 +133,12 @@ impl RangeQuery {
left_bound: Bound<u64>,
right_bound: Bound<u64>,
) -> RangeQuery {
let make_term_val = |val: u64| Term::from_field_u64(field, val).value_bytes().to_owned();
let make_term_val = |val: &u64| Term::from_field_u64(field, *val).value_bytes().to_owned();
RangeQuery {
field,
value_type: Type::U64,
left_bound: map_bound(left_bound, &make_term_val),
right_bound: map_bound(right_bound, &make_term_val),
left_bound: map_bound(&left_bound, &make_term_val),
right_bound: map_bound(&right_bound, &make_term_val),
}
}
@@ -167,12 +166,12 @@ impl RangeQuery {
left: Bound<&'b str>,
right: Bound<&'b str>,
) -> RangeQuery {
let make_term_val = |val: &str| val.as_bytes().to_vec();
let make_term_val = |val: &&str| val.as_bytes().to_vec();
RangeQuery {
field,
value_type: Type::Str,
left_bound: map_bound(left, &make_term_val),
right_bound: map_bound(right, &make_term_val),
left_bound: map_bound(&left, &make_term_val),
right_bound: map_bound(&right, &make_term_val),
}
}
@@ -187,6 +186,21 @@ impl RangeQuery {
Bound::Excluded(range.end),
)
}
/// Field to search over
pub fn field(&self) -> Field {
self.field
}
/// Lower bound of range
pub fn left_bound(&self) -> Bound<Term> {
map_bound(&self.left_bound, &|bytes| Term::from_field_bytes(self.field, bytes))
}
/// Upper bound of range
pub fn right_bound(&self) -> Bound<Term> {
map_bound(&self.right_bound, &|bytes| Term::from_field_bytes(self.field, bytes))
}
}
impl Query for RangeQuery {
@@ -259,8 +273,7 @@ impl Weight for RangeWeight {
mod tests {
use super::RangeQuery;
use collector::CountCollector;
use query::Query;
use collector::{Collector, CountCollector};
use schema::{Document, Field, SchemaBuilder, INT_INDEXED};
use std::collections::Bound;
use Index;
@@ -291,7 +304,7 @@ mod tests {
// ... or `1960..=1969` if inclusive range is enabled.
let mut count_collector = CountCollector::default();
docs_in_the_sixties.search(&*searcher, &mut count_collector)?;
count_collector.search(&*searcher, &docs_in_the_sixties)?;
assert_eq!(count_collector.count(), 2285);
Ok(())
}
@@ -328,9 +341,7 @@ mod tests {
let searcher = index.searcher();
let count_multiples = |range_query: RangeQuery| {
let mut count_collector = CountCollector::default();
range_query
.search(&*searcher, &mut count_collector)
.unwrap();
count_collector.search(&*searcher, &range_query).unwrap();
count_collector.count()
};

View File

@@ -1,4 +1,4 @@
use collector::Collector;
use collector::SegmentCollector;
use common::BitSet;
use docset::{DocSet, SkipResult};
use downcast;
@@ -18,7 +18,7 @@ pub trait Scorer: downcast::Any + DocSet + 'static {
/// Consumes the complete `DocSet` and
/// push the scored documents to the collector.
fn collect(&mut self, collector: &mut Collector, delete_bitset_opt: Option<&DeleteBitSet>) {
fn collect<T>(&mut self, collector: &mut SegmentCollector<CollectionResult = T>, delete_bitset_opt: Option<&DeleteBitSet>) {
if let Some(delete_bitset) = delete_bitset_opt {
while self.advance() {
let doc = self.doc();
@@ -44,7 +44,7 @@ impl Scorer for Box<Scorer> {
self.deref_mut().score()
}
fn collect(&mut self, collector: &mut Collector, delete_bitset: Option<&DeleteBitSet>) {
fn collect<T>(&mut self, collector: &mut SegmentCollector<CollectionResult = T>, delete_bitset: Option<&DeleteBitSet>) {
let scorer = self.deref_mut();
scorer.collect(collector, delete_bitset);
}

View File

@@ -16,7 +16,7 @@ use Term;
/// * `idf` - inverse document frequency.
/// * `term_freq` - number of occurrences of the term in the field
/// * `field norm` - number of tokens in the field.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TermQuery {
term: Term,
index_record_option: IndexRecordOption,
@@ -31,6 +31,11 @@ impl TermQuery {
}
}
/// The `Term` this query is built out of.
pub fn term(&self) -> &Term {
&self.term
}
/// Returns a weight object.
///
/// While `.weight(...)` returns a boxed trait object,

View File

@@ -6,7 +6,7 @@ use Result;
/// for a given set of segments.
///
/// See [`Query`](./trait.Query.html).
pub trait Weight {
pub trait Weight: Send + Sync + 'static {
/// Returns the scorer for the given segment.
/// See [`Query`](./trait.Query.html).
fn scorer(&self, reader: &SegmentReader) -> Result<Box<Scorer>>;

View File

@@ -109,6 +109,12 @@ impl Term {
self.0.extend(bytes);
}
pub(crate) fn from_field_bytes(field: Field, bytes: &[u8]) -> Term {
let mut term = Term::for_field(field);
term.set_bytes(bytes);
term
}
/// Set the texts only, keeping the field untouched.
pub fn set_text(&mut self, text: &str) {
self.set_bytes(text.as_bytes());

View File

@@ -1,50 +1,20 @@
/*!
The term dictionary is one of the key data structures of
tantivy. It associates sorted `terms` to a `TermInfo` struct
that serves as an address to their respective posting list.
The term dictionary main role is to associate the sorted [`Term`s](../struct.Term.html) to
a [`TermInfo`](../postings/struct.TermInfo.html) struct that contains some meta-information
about the term.
The term dictionary API makes it possible to iterate through
a range of keys in a sorted manner.
Internally, the term dictionary relies on the `fst` crate to store
a sorted mapping that associate each term to its rank in the lexicographical order.
For instance, in a dictionary containing the sorted terms "abba", "bjork", "blur" and "donovan",
the `TermOrdinal` are respectively `0`, `1`, `2`, and `3`.
For `u64`-terms, tantivy explicitely uses a `BigEndian` representation to ensure that the
lexicographical order matches the natural order of integers.
# Implementations
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
and then treated as a `u64`.
There are currently two implementations of the term dictionary.
## Default implementation : `fstdict`
The default one relies heavily on the `fst` crate.
It associate each term's `&[u8]` representation to a `u64`
that is in fact an address in a buffer. The value is then accessible
via deserializing the value at this address.
## Stream implementation : `streamdict`
The `fstdict` is a tiny bit slow when streaming all of
the terms.
For some use case (analytics engine), it is preferrable
to use the `streamdict`, that offers better streaming
performance, to the detriment of `lookup` performance.
`streamdict` can be enabled by adding the `streamdict`
feature when compiling `tantivy`.
`streamdict` encodes each term relatively to the precedent
as follows.
- number of bytes that needs to be popped.
- number of bytes that needs to be added.
- sequence of bytes that is to be added
- value.
Because such a structure does not allow for lookups,
it comes with a `fst` that indexes 1 out of `1024`
terms in this structure.
A `lookup` therefore consists in a lookup in the `fst`
followed by a streaming through at most `1024` elements in the
term `stream`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/
/// Position of the term in the sorted list of terms.

View File

@@ -203,7 +203,7 @@ impl TermDictionary {
/// Returns a search builder, to stream all of the terms
/// within the Automaton
pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A> {
pub fn search<'a, A: Automaton>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A> {
let stream_builder = self.fst_index.search(automaton);
TermStreamerBuilder::<A>::new(self, stream_builder)
}

View File

@@ -25,73 +25,76 @@ type StopWordHashSet = HashSet<String, StopWordHasher>;
/// `TokenFilter` that removes stop words from a token stream
#[derive(Clone)]
pub struct StopWordFilter {
words: StopWordHashSet,
words: StopWordHashSet,
}
impl StopWordFilter {
/// Creates a `StopWordFilter` given a list of words to remove
pub fn remove(words: Vec<String>) -> StopWordFilter {
let mut set = StopWordHashSet::default();
/// Creates a `StopWordFilter` given a list of words to remove
pub fn remove(words: Vec<String>) -> StopWordFilter {
let mut set = StopWordHashSet::default();
for word in words {
set.insert(word);
for word in words {
set.insert(word);
}
StopWordFilter { words: set }
}
StopWordFilter { words: set }
}
}
pub struct StopWordFilterStream<TailTokenStream>
where
TailTokenStream: TokenStream,
TailTokenStream: TokenStream,
{
words: StopWordHashSet,
tail: TailTokenStream,
words: StopWordHashSet,
tail: TailTokenStream,
}
impl<TailTokenStream> TokenFilter<TailTokenStream> for StopWordFilter
where
TailTokenStream: TokenStream,
TailTokenStream: TokenStream,
{
type ResultTokenStream = StopWordFilterStream<TailTokenStream>;
type ResultTokenStream = StopWordFilterStream<TailTokenStream>;
fn transform(&self, token_stream: TailTokenStream) -> Self::ResultTokenStream {
StopWordFilterStream::wrap(self.words.clone(), token_stream)
}
fn transform(&self, token_stream: TailTokenStream) -> Self::ResultTokenStream {
StopWordFilterStream::wrap(self.words.clone(), token_stream)
}
}
impl<TailTokenStream> StopWordFilterStream<TailTokenStream>
where
TailTokenStream: TokenStream,
TailTokenStream: TokenStream,
{
fn predicate(&self, token: &Token) -> bool {
!self.words.contains(&token.text)
}
fn predicate(&self, token: &Token) -> bool {
!self.words.contains(&token.text)
}
fn wrap(words: StopWordHashSet, tail: TailTokenStream) -> StopWordFilterStream<TailTokenStream> {
StopWordFilterStream { words, tail }
}
fn wrap(
words: StopWordHashSet,
tail: TailTokenStream,
) -> StopWordFilterStream<TailTokenStream> {
StopWordFilterStream { words, tail }
}
}
impl<TailTokenStream> TokenStream for StopWordFilterStream<TailTokenStream>
where
TailTokenStream: TokenStream,
TailTokenStream: TokenStream,
{
fn token(&self) -> &Token {
self.tail.token()
}
fn token_mut(&mut self) -> &mut Token {
self.tail.token_mut()
}
fn advance(&mut self) -> bool {
while self.tail.advance() {
if self.predicate(self.tail.token()) {
return true;
}
fn token(&self) -> &Token {
self.tail.token()
}
false
}
fn token_mut(&mut self) -> &mut Token {
self.tail.token_mut()
}
fn advance(&mut self) -> bool {
while self.tail.advance() {
if self.predicate(self.tail.token()) {
return true;
}
}
false
}
}