diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 52e120cc9..8a9970e0f 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -256,7 +256,7 @@ impl TermBuckets { }); entry.doc_count += 1; if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.collect(doc, &sub_aggregation)?; + sub_aggregations.collect(doc, sub_aggregation)?; } } bucket_count.validate_bucket_count()?; diff --git a/src/collector/count_collector.rs b/src/collector/count_collector.rs index 02f30f85c..3ff1368f8 100644 --- a/src/collector/count_collector.rs +++ b/src/collector/count_collector.rs @@ -70,6 +70,11 @@ impl SegmentCollector for SegmentCountCollector { Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.count += docs.len(); + Ok(()) + } + fn harvest(self) -> usize { self.count } diff --git a/src/collector/custom_score_top_collector.rs b/src/collector/custom_score_top_collector.rs index d597727ed..0f981c20a 100644 --- a/src/collector/custom_score_top_collector.rs +++ b/src/collector/custom_score_top_collector.rs @@ -8,7 +8,8 @@ pub(crate) struct CustomScoreTopCollector { } impl CustomScoreTopCollector -where TScore: Clone + PartialOrd +where + TScore: Clone + PartialOrd, { pub(crate) fn new( custom_scorer: TCustomScorer, @@ -96,6 +97,14 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for (doc, _score) in docs { + let score = self.segment_scorer.score(*doc); + self.segment_collector.collect(*doc, score); + } + Ok(()) + } + fn harvest(self) -> Vec<(TScore, DocAddress)> { self.segment_collector.harvest() } @@ -114,7 +123,8 @@ where } impl CustomSegmentScorer for F -where F: 'static + FnMut(DocId) -> TScore +where + F: 'static + FnMut(DocId) -> TScore, { fn score(&mut self, doc: DocId) -> TScore { (self)(doc) diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 6d600bb48..7e6d43f7b 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -172,19 +172,33 @@ pub trait Collector: Sync + Send { ) -> crate::Result<::Fruit> { let mut segment_collector = self.for_segment(segment_ord as u32, reader)?; + let mut cache_pos = 0; + let mut cache = [(0, 0.0); 64]; + if let Some(alive_bitset) = reader.alive_bitset() { weight.for_each(reader, &mut |doc, score| { if alive_bitset.is_alive(doc) { - segment_collector.collect(doc, score)?; + cache[cache_pos] = (doc, score); + cache_pos += 1; + if cache_pos == 64 { + segment_collector.collect_block(&cache)?; + cache_pos = 0; + } } Ok(()) })?; } else { weight.for_each(reader, &mut |doc, score| { - segment_collector.collect(doc, score)?; + cache[cache_pos] = (doc, score); + cache_pos += 1; + if cache_pos == 64 { + segment_collector.collect_block(&cache)?; + cache_pos = 0; + } Ok(()) })?; } + segment_collector.collect_block(&cache[..cache_pos])?; Ok(segment_collector.harvest()) } } @@ -258,6 +272,14 @@ pub trait SegmentCollector: 'static { /// The query pushes the scored document to the collector via this method. fn collect(&mut self, doc: DocId, score: Score) -> crate::Result<()>; + /// The query pushes the scored document to the collector via this method. + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for (doc, score) in docs { + self.collect(*doc, *score)?; + } + Ok(()) + } + /// Extract the fruit of the collection from the `SegmentCollector`. fn harvest(self) -> Self::Fruit; } @@ -317,6 +339,12 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + self.1.collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> ::Fruit { (self.0.harvest(), self.1.harvest()) } @@ -383,6 +411,13 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + self.1.collect_block(docs)?; + self.2.collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> ::Fruit { (self.0.harvest(), self.1.harvest(), self.2.harvest()) } @@ -459,6 +494,14 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + self.1.collect_block(docs)?; + self.2.collect_block(docs)?; + self.3.collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> ::Fruit { ( self.0.harvest(), diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs index 7b119ad86..a7a8ed169 100644 --- a/src/collector/multi_collector.rs +++ b/src/collector/multi_collector.rs @@ -57,6 +57,11 @@ impl SegmentCollector for Box { Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.as_mut().collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> Box { BoxableSegmentCollector::harvest_from_box(self) } @@ -64,6 +69,7 @@ impl SegmentCollector for Box { pub trait BoxableSegmentCollector { fn collect(&mut self, doc: u32, score: Score) -> crate::Result<()>; + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()>; fn harvest_from_box(self: Box) -> Box; } @@ -76,6 +82,11 @@ impl BoxableSegmentCollector self.0.collect(doc, score) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + Ok(()) + } + fn harvest_from_box(self: Box) -> Box { Box::new(self.0.harvest()) } @@ -236,6 +247,13 @@ impl SegmentCollector for MultiCollectorChild { Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for child in &mut self.children { + child.collect_block(docs)?; + } + Ok(()) + } + fn harvest(self) -> MultiFruit { MultiFruit { sub_fruits: self diff --git a/src/collector/top_score_collector.rs b/src/collector/top_score_collector.rs index e0e3aeb9d..6074d088a 100644 --- a/src/collector/top_score_collector.rs +++ b/src/collector/top_score_collector.rs @@ -704,6 +704,14 @@ impl SegmentCollector for TopScoreSegmentCollector { Ok(()) } + #[inline] + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for (doc, score) in docs { + self.0.collect(*doc, *score); + } + Ok(()) + } + fn harvest(self) -> Vec<(Score, DocAddress)> { self.0.harvest() }