Committing for shared discussion

This commit is contained in:
Jason Wolfe
2018-05-19 14:03:38 +09:00
parent 9628413386
commit c9d8031664
8 changed files with 283 additions and 256 deletions

View File

@@ -5,7 +5,7 @@ use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::multi_collector::CollectorWrapper;
use collector::CollectorWrapper;
/// Collector that does nothing.
/// This is used in the chain Collector and will hopefully
@@ -21,13 +21,17 @@ impl Collector for DoNothingCollector {
fn requires_scoring(&self) -> bool {
false
}
#[inline]
fn merge_children(&mut self, _children: Vec<DoNothingCollector>) {}
}
impl SegmentCollector for DoNothingCollector {
type CollectionResult = ();
#[inline]
fn collect(&mut self, _doc: DocId, _score: Score) {}
fn finalize(self) -> () {
()
}
}
/// Zero-cost abstraction used to collect on multiple collectors.
@@ -69,26 +73,19 @@ impl<Left: Collector, Right: Collector> Collector for ChainedCollector<Left, Rig
fn requires_scoring(&self) -> bool {
self.left.requires_scoring() || self.right.requires_scoring()
}
fn merge_children(&mut self, children: Vec<Self::Child>) {
let mut lefts = Vec::new();
let mut rights = Vec::new();
for child in children.into_iter() {
lefts.push(child.left);
rights.push(child.right);
}
self.left.merge_children(lefts);
self.right.merge_children(rights);
}
}
impl<Left: SegmentCollector, Right: SegmentCollector> SegmentCollector for ChainedSegmentCollector<Left, Right> {
type CollectionResult = (Left::CollectionResult, Right::CollectionResult);
fn collect(&mut self, doc: DocId, score: Score) {
self.left.collect(doc, score);
self.right.collect(doc, score);
}
fn finalize(self) -> Self::CollectionResult {
(self.left.finalize(), self.right.finalize())
}
}
/// Creates a `ChainedCollector`

View File

@@ -5,6 +5,7 @@ use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::Combinable;
/// `CountCollector` collector only counts how many
/// documents match the query.
@@ -31,18 +32,24 @@ impl Collector for CountCollector {
fn requires_scoring(&self) -> bool {
false
}
}
fn merge_children(&mut self, children: Vec<CountCollector>) {
for child in children.into_iter() {
self.count += child.count;
}
impl Combinable for CountCollector {
fn combine_into(&mut self, other: Self) {
self.count += other.count;
}
}
impl SegmentCollector for CountCollector {
type CollectionResult = CountCollector;
fn collect(&mut self, _: DocId, _: Score) {
self.count += 1;
}
fn finalize(self) -> CountCollector {
self
}
}
#[cfg(test)]

View File

@@ -402,15 +402,11 @@ impl Collector for FacetCollector {
fn requires_scoring(&self) -> bool {
false
}
fn merge_children(&mut self, children: Vec<FacetSegmentCollector>) {
for child in children.into_iter() {
self.segment_counters.push(child.into_segment_facet_counter());
}
}
}
impl SegmentCollector for FacetSegmentCollector {
type CollectionResult = Vec<SegmentFacetCounter>;
fn collect(&mut self, doc: DocId, _: Score) {
self.reader.facet_ords(doc, &mut self.facet_ords_buf);
let mut previous_collapsed_ord: usize = usize::MAX;
@@ -424,6 +420,10 @@ impl SegmentCollector for FacetSegmentCollector {
previous_collapsed_ord = collapsed_ord;
}
}
fn finalize(self) -> Vec<SegmentFacetCounter> {
vec![self.into_segment_facet_counter()]
}
}
/// Intermediary result of the `FacetCollector` that stores

View File

@@ -14,8 +14,8 @@ use downcast;
mod count_collector;
pub use self::count_collector::CountCollector;
mod multi_collector;
pub use self::multi_collector::MultiCollector;
//mod multi_collector;
//pub use self::multi_collector::MultiCollector;
mod top_collector;
pub use self::top_collector::TopCollector;
@@ -68,8 +68,6 @@ pub trait Collector {
/// Returns true iff the collector requires to compute scores for documents.
fn requires_scoring(&self) -> bool;
fn merge_children(&mut self, children: Vec<Self::Child>);
/// Search works as follows :
///
/// First the weight object associated to the query is created.
@@ -78,33 +76,59 @@ pub trait Collector {
/// - setup the collector and informs it that the segment being processed has changed.
/// - creates a SegmentCollector for collecting documents associated to the segment
/// - creates a `Scorer` object associated for this segment
/// - iterate throw the matched documents and push them to the segment collector.
/// - iterate through the matched documents and push them to the segment collector.
/// - turn the segment collector into a Combinable segment result
///
/// Finally, the Collector merges each of the child collectors into itself for result usability
/// by the caller.
fn search(&mut self, searcher: &Searcher, query: &Query) -> Result<()> {
/// Combining all of the segment results gives a single Child::CollectionResult, which is returned.
///
/// The result will be Ok(None) in case of having no segments.
fn search(&mut self, searcher: &Searcher, query: &Query) -> Result<Option<<Self::Child as SegmentCollector>::CollectionResult>> {
let scoring_enabled = self.requires_scoring();
let weight = query.weight(searcher, scoring_enabled)?;
let mut children = Vec::new();
let mut results = Vec::new();
for (segment_ord, segment_reader) in searcher.segment_readers().iter().enumerate() {
let mut child: Self::Child = self.for_segment(segment_ord as SegmentLocalId, segment_reader)?;
let mut scorer = weight.scorer(segment_reader)?;
scorer.collect(&mut child, segment_reader.delete_bitset());
children.push(child);
results.push(child.finalize());
}
self.merge_children(children);
Ok(())
Ok(results.into_iter().fold1(|x,y| {
x.combine_into(y);
x
}))
}
}
pub trait Combinable {
fn combine_into(&mut self, other: Self);
}
impl Combinable for () {
fn combine_into(&mut self, other: Self) {
()
}
}
impl<T> Combinable for Vec<T> {
fn combine_into(&mut self, other: Self) {
self.extend(other.into_iter());
}
}
impl<L: Combinable, R: Combinable> Combinable for (L, R) {
fn combine_into(&mut self, other: Self) {
self.0.combine_into(other.0);
self.1.combine_into(other.1);
}
}
pub trait SegmentCollector: downcast::Any + 'static {
type CollectionResult: Combinable + downcast::Any + 'static;
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score);
}
#[allow(missing_docs)]
mod downcast_impl {
downcast!(super::SegmentCollector);
/// Turn into the final result
fn finalize(self) -> Self::CollectionResult;
}
impl<'a, C: Collector> Collector for &'a mut C {
@@ -121,9 +145,59 @@ impl<'a, C: Collector> Collector for &'a mut C {
fn requires_scoring(&self) -> bool {
C::requires_scoring(self)
}
}
fn merge_children(&mut self, children: Vec<C::Child>) {
(*self).merge_children(children);
pub struct CollectorWrapper<'a, TCollector: 'a + Collector>(&'a mut TCollector);
impl<'a, T: 'a + Collector> CollectorWrapper<'a, T> {
pub fn new(collector: &'a mut T) -> CollectorWrapper<'a, T> {
CollectorWrapper(collector)
}
}
impl<'a, T: 'a + Collector> Collector for CollectorWrapper<'a, T> {
type Child = T::Child;
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<T::Child> {
self.0.for_segment(segment_local_id, segment)
}
fn requires_scoring(&self) -> bool {
self.0.requires_scoring()
}
}
trait UntypedCollector {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<UntypedSegmentCollector>>;
}
impl<'a, TCollector:'a + Collector> UntypedCollector for CollectorWrapper<'a, TCollector> {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<UntypedSegmentCollector>> {
let segment_collector = self.0.for_segment(segment_local_id, segment)?;
Ok(Box::new(segment_collector))
}
}
trait UntypedSegmentCollector {
fn finalize(self) -> Box<UntypedCombinable>;
}
trait UntypedCombinable {
fn combine_into(&mut self, other: Box<UntypedCombinable>);
}
pub struct CombinableWrapper<'a, T: 'a + Combinable>(&'a mut T);
impl<'a, T: 'a + Combinable> CombinableWrapper<'a, T> {
pub fn new(combinable: &'a mut T) -> CombinableWrapper<'a, T> {
CombinableWrapper(combinable)
}
}
impl<'a, T: 'a + Combinable> Combinable for CombinableWrapper<'a, T> {
fn combine_into(&mut self, other: Self) {
self.0.combine_into(*::downcast::Downcast::<T>::downcast(other).unwrap())
}
}
@@ -193,21 +267,19 @@ pub mod tests {
fn requires_scoring(&self) -> bool {
true
}
fn merge_children(&mut self, mut children: Vec<TestSegmentCollector>) {
children.sort_by_key(|x| x.offset);
for child in children.into_iter() {
self.docs.extend(child.docs);
self.scores.extend(child.scores);
}
}
}
impl SegmentCollector for TestSegmentCollector {
type CollectionResult = Vec<TestSegmentCollector>;
fn collect(&mut self, doc: DocId, score: Score) {
self.docs.push(doc + self.offset);
self.scores.push(score);
}
fn finalize(self) -> Vec<TestSegmentCollector> {
vec![self]
}
}
/// Collects in order all of the fast fields for all of the
@@ -216,13 +288,17 @@ pub mod tests {
/// This collector is mainly useful for tests.
pub struct FastFieldTestCollector {
next_counter: usize,
vals: Vec<u64>,
field: Field,
}
pub struct FastFieldSegmentCollector {
#[derive(Default)]
pub struct FastFieldSegmentCollectorState {
counter: usize,
vals: Vec<u64>,
}
pub struct FastFieldSegmentCollector {
state: FastFieldSegmentCollectorState,
reader: FastFieldReader<u64>,
}
@@ -230,7 +306,6 @@ pub mod tests {
pub fn for_field(field: Field) -> FastFieldTestCollector {
FastFieldTestCollector {
next_counter: 0,
vals: Vec::new(),
field,
}
}
@@ -247,8 +322,7 @@ pub mod tests {
let counter = self.next_counter;
self.next_counter += 1;
Ok(FastFieldSegmentCollector {
counter,
vals: Vec::new(),
state: FastFieldSegmentCollectorState::default(),
reader: reader.fast_field_reader(self.field)?,
})
}
@@ -256,20 +330,19 @@ pub mod tests {
fn requires_scoring(&self) -> bool {
false
}
fn merge_children(&mut self, mut children: Vec<FastFieldSegmentCollector>) {
children.sort_by_key(|x| x.counter);
for child in children.into_iter() {
self.vals.extend(child.vals);
}
}
}
impl SegmentCollector for FastFieldSegmentCollector {
type CollectionResult = Vec<FastFieldSegmentCollectorState>;
fn collect(&mut self, doc: DocId, _score: Score) {
let val = self.reader.get(doc);
self.vals.push(val);
}
fn finalize(self) -> Vec<FastFieldSegmentCollectorState> {
vec![self.state]
}
}
/// Collects in order all of the fast field bytes for all of the
@@ -312,19 +385,19 @@ pub mod tests {
fn requires_scoring(&self) -> bool {
false
}
fn merge_children(&mut self, children: Vec<<Self as Collector>::Child>) {
for child in children.into_iter() {
self.vals.extend(child.vals);
}
}
}
impl SegmentCollector for BytesFastFieldSegmentCollector {
type CollectionResult = Vec<Vec<u8>>;
fn collect(&mut self, doc: u32, _score: f32) {
let val = self.reader.get_val(doc);
self.vals.extend(val);
}
fn finalize(self) -> Vec<Vec<u8>> {
vec![self.vals]
}
}
}

View File

@@ -7,59 +7,6 @@ use SegmentLocalId;
use SegmentReader;
use downcast::Downcast;
pub struct CollectorWrapper<'a, TCollector: 'a + Collector>(&'a mut TCollector);
impl<'a, T: 'a + Collector> CollectorWrapper<'a, T> {
pub fn new(collector: &'a mut T) -> CollectorWrapper<'a, T> {
CollectorWrapper(collector)
}
}
impl<'a, T: 'a + Collector> Collector for CollectorWrapper<'a, T> {
type Child = T::Child;
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<T::Child> {
self.0.for_segment(segment_local_id, segment)
}
fn requires_scoring(&self) -> bool {
self.0.requires_scoring()
}
fn merge_children(&mut self, children: Vec<T::Child>) {
self.0.merge_children(children)
}
}
trait UntypedCollector {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<SegmentCollector>>;
fn requires_scoring(&self) -> bool;
fn merge_children_anys(&mut self, childrens: Vec<Box<SegmentCollector>>);
}
impl<'a, TCollector:'a + Collector> UntypedCollector for CollectorWrapper<'a, TCollector> {
fn for_segment(&mut self, segment_local_id: u32, segment: &SegmentReader) -> Result<Box<SegmentCollector>> {
let segment_collector = self.0.for_segment(segment_local_id, segment)?;
Ok(Box::new(segment_collector))
}
fn requires_scoring(&self) -> bool {
self.0.requires_scoring()
}
fn merge_children_anys(&mut self, childrens: Vec<Box<SegmentCollector>>) {
let typed_children: Vec<TCollector::Child> = childrens.into_iter()
.map(|untyped_child_collector| {
*Downcast::<TCollector::Child>::downcast(untyped_child_collector).unwrap()
}).collect();
self.0.merge_children(typed_children);
}
}
pub struct MultiCollector<'a> {
collector_wrappers: Vec<Box<UntypedCollector + 'a>>
}
@@ -116,10 +63,6 @@ impl<'a> Collector for MultiCollector<'a> {
}
trait UntypedSegmentCollector {
fn collect();
}
pub struct MultiCollectorChild {
children: Vec<Box<SegmentCollector>>
}

View File

@@ -8,6 +8,7 @@ use Score;
use SegmentLocalId;
use SegmentReader;
use collector::SegmentCollector;
use collector::Combinable;
// Rust heap is a max-heap and we need a min heap.
#[derive(Clone, Copy)]
@@ -113,19 +114,21 @@ impl Collector for TopCollector {
fn requires_scoring(&self) -> bool {
true
}
}
fn merge_children(&mut self, children: Vec<TopCollector>) {
// TODO: Could this be much better?
for mut child in children.into_iter() {
self.segment_id = child.segment_id;
while let Some(doc) = child.heap.pop() {
self.collect(doc.doc_address.doc(), doc.score)
}
impl Combinable for TopCollector {
// TODO: I think this could be a bit better
fn combine_into(&mut self, other: Self) {
self.segment_id = other.segment_id;
while let Some(doc) = other.heap.pop() {
self.collect(doc.doc_address.doc(), doc.score);
}
}
}
impl SegmentCollector for TopCollector {
type CollectionResult = TopCollector;
fn collect(&mut self, doc: DocId, score: Score) {
if self.at_capacity() {
// It's ok to unwrap as long as a limit of 0 is forbidden.
@@ -147,6 +150,10 @@ impl SegmentCollector for TopCollector {
self.heap.push(wrapped_doc);
}
}
fn finalize(self) -> TopCollector {
self
}
}
#[cfg(test)]

View File

@@ -1138,126 +1138,126 @@ mod tests {
}
}
#[test]
fn test_merge_facets() {
let mut schema_builder = schema::SchemaBuilder::default();
let facet_field = schema_builder.add_facet_field("facet");
let index = Index::create_in_ram(schema_builder.build());
use schema::Facet;
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
let mut doc = Document::default();
for facet in doc_facets {
doc.add_facet(facet_field, Facet::from(facet));
}
index_writer.add_document(doc);
};
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b"]);
index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b", "/top/c"]);
index_doc(&mut index_writer, &["/top/a", "/top/b"]);
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b", "/top/d"]);
index_doc(&mut index_writer, &["/top/d"]);
index_doc(&mut index_writer, &["/top/e"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/a"]);
index_doc(&mut index_writer, &["/top/b"]);
index_doc(&mut index_writer, &["/top/c"]);
index_writer.commit().expect("committed");
index_doc(&mut index_writer, &["/top/e", "/top/f"]);
index_writer.commit().expect("committed");
}
index.load_searchers().unwrap();
let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
let searcher = index.searcher();
let mut facet_collector = FacetCollector::for_field(facet_field);
facet_collector.add_facet(Facet::from("/top"));
use collector::{CountCollector, MultiCollector};
let mut count_collector = CountCollector::default();
{
let mut multi_collectors = MultiCollector::new();
multi_collectors.add_collector(&mut count_collector);
multi_collectors.add_collector(&mut facet_collector);
searcher.search(&AllQuery, &mut multi_collectors).unwrap();
}
assert_eq!(count_collector.count(), expected_num_docs);
let facet_counts = facet_collector.harvest();
let facets: Vec<(String, u64)> = facet_counts
.get("/top")
.map(|(facet, count)| (facet.to_string(), count))
.collect();
assert_eq!(
facets,
expected
.iter()
.map(|&(facet_str, count)| (String::from(facet_str), count))
.collect::<Vec<_>>()
);
};
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
// Merging the segments
{
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer
.merge(&segment_ids)
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
index.load_searchers().unwrap();
test_searcher(
11,
&[
("/top/a", 5),
("/top/b", 5),
("/top/c", 2),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
// Deleting one term
{
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
let facet_term = Term::from_facet(facet_field, &facet);
index_writer.delete_term(facet_term);
index_writer.commit().unwrap();
index.load_searchers().unwrap();
test_searcher(
9,
&[
("/top/a", 3),
("/top/b", 3),
("/top/c", 1),
("/top/d", 2),
("/top/e", 2),
("/top/f", 1),
],
);
}
}
// #[test]
// fn test_merge_facets() {
// let mut schema_builder = schema::SchemaBuilder::default();
// let facet_field = schema_builder.add_facet_field("facet");
// let index = Index::create_in_ram(schema_builder.build());
// use schema::Facet;
// {
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| {
// let mut doc = Document::default();
// for facet in doc_facets {
// doc.add_facet(facet_field, Facet::from(facet));
// }
// index_writer.add_document(doc);
// };
//
// index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b"]);
// index_doc(&mut index_writer, &["/top/a/firstdoc", "/top/b", "/top/c"]);
// index_doc(&mut index_writer, &["/top/a", "/top/b"]);
// index_doc(&mut index_writer, &["/top/a"]);
//
// index_doc(&mut index_writer, &["/top/b", "/top/d"]);
// index_doc(&mut index_writer, &["/top/d"]);
// index_doc(&mut index_writer, &["/top/e"]);
// index_writer.commit().expect("committed");
//
// index_doc(&mut index_writer, &["/top/a"]);
// index_doc(&mut index_writer, &["/top/b"]);
// index_doc(&mut index_writer, &["/top/c"]);
// index_writer.commit().expect("committed");
//
// index_doc(&mut index_writer, &["/top/e", "/top/f"]);
// index_writer.commit().expect("committed");
// }
// index.load_searchers().unwrap();
// let test_searcher = |expected_num_docs: usize, expected: &[(&str, u64)]| {
// let searcher = index.searcher();
// let mut facet_collector = FacetCollector::for_field(facet_field);
// facet_collector.add_facet(Facet::from("/top"));
// use collector::{CountCollector, MultiCollector};
// let mut count_collector = CountCollector::default();
// {
// let mut multi_collectors = MultiCollector::new();
// multi_collectors.add_collector(&mut count_collector);
// multi_collectors.add_collector(&mut facet_collector);
// searcher.search(&AllQuery, &mut multi_collectors).unwrap();
// }
// assert_eq!(count_collector.count(), expected_num_docs);
// let facet_counts = facet_collector.harvest();
// let facets: Vec<(String, u64)> = facet_counts
// .get("/top")
// .map(|(facet, count)| (facet.to_string(), count))
// .collect();
// assert_eq!(
// facets,
// expected
// .iter()
// .map(|&(facet_str, count)| (String::from(facet_str), count))
// .collect::<Vec<_>>()
// );
// };
// test_searcher(
// 11,
// &[
// ("/top/a", 5),
// ("/top/b", 5),
// ("/top/c", 2),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
//
// // Merging the segments
// {
// let segment_ids = index
// .searchable_segment_ids()
// .expect("Searchable segments failed.");
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// index_writer
// .merge(&segment_ids)
// .wait()
// .expect("Merging failed");
// index_writer.wait_merging_threads().unwrap();
//
// index.load_searchers().unwrap();
// test_searcher(
// 11,
// &[
// ("/top/a", 5),
// ("/top/b", 5),
// ("/top/c", 2),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
// }
//
// // Deleting one term
// {
// let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
// let facet = Facet::from_path(vec!["top", "a", "firstdoc"]);
// let facet_term = Term::from_facet(facet_field, &facet);
// index_writer.delete_term(facet_term);
// index_writer.commit().unwrap();
// index.load_searchers().unwrap();
// test_searcher(
// 9,
// &[
// ("/top/a", 3),
// ("/top/b", 3),
// ("/top/c", 1),
// ("/top/d", 2),
// ("/top/e", 2),
// ("/top/f", 1),
// ],
// );
// }
// }
#[test]
fn test_merge_multivalued_int_fields_all_deleted() {

View File

@@ -18,7 +18,7 @@ pub trait Scorer: downcast::Any + DocSet + 'static {
/// Consumes the complete `DocSet` and
/// push the scored documents to the collector.
fn collect(&mut self, collector: &mut SegmentCollector, delete_bitset_opt: Option<&DeleteBitSet>) {
fn collect<T>(&mut self, collector: &mut SegmentCollector<CollectionResult = T>, delete_bitset_opt: Option<&DeleteBitSet>) {
if let Some(delete_bitset) = delete_bitset_opt {
while self.advance() {
let doc = self.doc();
@@ -44,7 +44,7 @@ impl Scorer for Box<Scorer> {
self.deref_mut().score()
}
fn collect(&mut self, collector: &mut SegmentCollector, delete_bitset: Option<&DeleteBitSet>) {
fn collect<T>(&mut self, collector: &mut SegmentCollector<CollectionResult = T>, delete_bitset: Option<&DeleteBitSet>) {
let scorer = self.deref_mut();
scorer.collect(collector, delete_bitset);
}