From 0f246ba908a16d5bbdb0290b4e50ae0cc018d5c6 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 15 Oct 2016 12:16:30 +0900 Subject: [PATCH] bug/4 Introduce segment_updater --- src/collector/top_collector.rs | 23 ++-- src/common/living_counter_latch.rs | 71 ++++++++++ src/common/mod.rs | 4 +- src/core/index.rs | 13 +- src/core/segment.rs | 3 + src/indexer/index_writer.rs | 198 +++----------------------- src/indexer/merge_policy.rs | 3 +- src/indexer/mod.rs | 2 +- src/indexer/segment_updater.rs | 214 +++++++++++++++++++++++++++++ src/lib.rs | 1 - src/query/daat_multiterm_scorer.rs | 2 +- 11 files changed, 332 insertions(+), 202 deletions(-) create mode 100644 src/common/living_counter_latch.rs create mode 100644 src/indexer/segment_updater.rs diff --git a/src/collector/top_collector.rs b/src/collector/top_collector.rs index c04c82774..63f9aa112 100644 --- a/src/collector/top_collector.rs +++ b/src/collector/top_collector.rs @@ -10,7 +10,10 @@ use Score; // Rust heap is a max-heap and we need a min heap. #[derive(Clone, Copy)] -struct GlobalScoredDoc(Score, DocAddress); +struct GlobalScoredDoc { + score: Score, + doc_address: DocAddress +} impl PartialOrd for GlobalScoredDoc { fn partial_cmp(&self, other: &GlobalScoredDoc) -> Option { @@ -21,9 +24,9 @@ impl PartialOrd for GlobalScoredDoc { impl Ord for GlobalScoredDoc { #[inline] fn cmp(&self, other: &GlobalScoredDoc) -> Ordering { - other.0.partial_cmp(&self.0) + other.score.partial_cmp(&self.score) .unwrap_or( - other.1.cmp(&self.1) + other.doc_address.cmp(&self.doc_address) ) } } @@ -87,7 +90,7 @@ impl TopCollector { .collect(); scored_docs.sort(); scored_docs.into_iter() - .map(|GlobalScoredDoc(score, doc_address)| (score, doc_address)) + .map(|GlobalScoredDoc {score, doc_address}| (score, doc_address)) .collect() } @@ -110,13 +113,17 @@ impl Collector for TopCollector { if self.at_capacity() { // It's ok to unwrap as long as a limit of 0 is forbidden. let limit_doc: GlobalScoredDoc = *self.heap.peek().expect("Top collector with size 0 is forbidden"); - if limit_doc.0 < scored_doc.score() { - let wrapped_doc = GlobalScoredDoc(scored_doc.score(), DocAddress(self.segment_id, scored_doc.doc())); - self.heap.replace(wrapped_doc); + if limit_doc.score < scored_doc.score() { + let mut mut_head = self.heap.peek_mut().unwrap(); + mut_head.score = scored_doc.score(); + mut_head.doc_address = DocAddress(self.segment_id, scored_doc.doc()); } } else { - let wrapped_doc = GlobalScoredDoc(scored_doc.score(), DocAddress(self.segment_id, scored_doc.doc())); + let wrapped_doc = GlobalScoredDoc { + score: scored_doc.score(), + doc_address: DocAddress(self.segment_id, scored_doc.doc()) + }; self.heap.push(wrapped_doc); } diff --git a/src/common/living_counter_latch.rs b/src/common/living_counter_latch.rs new file mode 100644 index 000000000..efd244484 --- /dev/null +++ b/src/common/living_counter_latch.rs @@ -0,0 +1,71 @@ +use std::sync::atomic::{Ordering, AtomicUsize}; +use std::sync::Arc; + +pub struct LivingCounterLatch { + counter: Arc, +} + + +impl Default for LivingCounterLatch { + fn default() -> LivingCounterLatch { + LivingCounterLatch { + counter: Arc::new(AtomicUsize::default()), + } + } +} + +impl LivingCounterLatch { + /// Returns true if all the living copies of the + /// living counter latch (apart from self) are dead. + pub fn is_zero(&self,) -> bool { + self.counter.load(Ordering::Acquire) == 0 + } +} + +impl Clone for LivingCounterLatch { + fn clone(&self,) -> LivingCounterLatch { + self.counter.fetch_add(1, Ordering::SeqCst); + LivingCounterLatch { + counter: self.counter.clone(), + } + } +} + +impl Drop for LivingCounterLatch { + fn drop(&mut self,) { + self.counter.fetch_sub(1, Ordering::SeqCst); + } +} + + +#[cfg(test)] +mod tests { + + use super::*; + use std::sync::atomic::{Ordering, AtomicUsize}; + use std::sync::Arc; + use std::thread; + use std::mem::drop; + + const NUM_THREADS: usize = 10; + const COUNT_PER_THREAD: usize = 100; + + #[test] + fn test_living_counter_latch() { + let counter = Arc::new(AtomicUsize::default()); + let living_counter_latch = LivingCounterLatch::default(); + // spawn 10 thread + for _ in 0..NUM_THREADS { + let living_counter_latch_clone = living_counter_latch.clone(); + let counter_clone = counter.clone(); + thread::spawn(move || { + for _ in 0..COUNT_PER_THREAD { + counter_clone.fetch_add(1, Ordering::SeqCst); + } + drop(living_counter_latch_clone); + }); + } + while !living_counter_latch.is_zero() {}; + assert_eq!(counter.load(Ordering::Acquire), NUM_THREADS * COUNT_PER_THREAD) + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index e4322d27e..918a7402d 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,12 +1,14 @@ mod serialize; mod timer; mod vint; +mod living_counter_latch; pub use self::serialize::BinarySerializable; pub use self::timer::Timing; pub use self::timer::TimerTree; pub use self::timer::OpenTimer; pub use self::vint::VInt; +pub use self::living_counter_latch::LivingCounterLatch; use std::io; @@ -24,4 +26,4 @@ pub trait HasLen { fn is_empty(&self,) -> bool { self.len() == 0 } -} \ No newline at end of file +} diff --git a/src/core/index.rs b/src/core/index.rs index 06ac705f0..398d38e11 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -16,7 +16,6 @@ use core::SegmentReader; use super::pool::Pool; use super::pool::LeasedItem; use indexer::SegmentManager; -use indexer::{MergePolicy, SimpleMergePolicy}; use core::IndexMeta; use core::META_FILEPATH; use super::segment::create_segment; @@ -41,9 +40,9 @@ fn load_metas(directory: &Directory) -> Result { Ok(loaded_meta) } -pub fn set_metas(index: &mut Index, docstamp: u64) { - index.docstamp = docstamp; -} +// pub fn set_metas(index: &mut Index, docstamp: u64) { +// index.docstamp = docstamp; +// } /// Tantivy's Search Index pub struct Index { @@ -228,12 +227,6 @@ impl Index { self.searcher_pool.acquire() } - pub fn get_merge_policy(&self,) -> Box { - // TODO load that from conf. - Box::new( - SimpleMergePolicy::default() - ) - } } diff --git a/src/core/segment.rs b/src/core/segment.rs index 2b03ab2de..1eed63e53 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -11,6 +11,9 @@ use core::Index; use std::result; use directory::error::{FileError, OpenWriteError}; + + +/// A segment is a piece of the index. #[derive(Clone)] pub struct Segment { index: Index, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 744506989..3b7312ce9 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,18 +1,16 @@ use schema::Schema; use schema::Document; use indexer::SegmentSerializer; +use core::SerializableSegment; use core::Index; use Directory; -use core::SerializableSegment; use core::Segment; use std::thread::JoinHandle; use rustc_serialize::json; use indexer::SegmentWriter; -use indexer::MergeCandidate; use std::clone::Clone; use std::io; use std::io::Write; -use indexer::MergePolicy; use std::thread; use std::mem; use indexer::merger::IndexMerger; @@ -23,9 +21,10 @@ use chan; use core::SegmentMeta; use core::IndexMeta; use core::META_FILEPATH; +use super::segment_updater::{SegmentUpdater, SegmentUpdate, SegmentUpdateSender}; use std::time::Duration; use super::super::core::index::get_segment_manager; -use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit}; +use super::segment_manager::{CommitState, SegmentManager}; use Result; use Error; @@ -43,9 +42,6 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; type DocumentSender = chan::Sender; type DocumentReceiver = chan::Receiver; -type SegmentUpdateSender = chan::Sender; -type SegmentUpdateReceiver = chan::Receiver; - fn create_metas(segment_manager: &SegmentManager, @@ -98,7 +94,7 @@ pub struct IndexWriter { document_receiver: DocumentReceiver, document_sender: DocumentSender, - + segment_update_sender: SegmentUpdateSender, segment_update_thread: JoinHandle<()>, @@ -142,171 +138,14 @@ fn index_documents(heap: &mut Heap, } -#[derive(Debug)] -pub enum SegmentUpdate { - AddSegment(SegmentMeta), - EndMerge(Vec, SegmentMeta), - CancelGeneration, - NewGeneration, - Terminate, - Commit(u64), -} - -impl SegmentUpdate { - - // Process a single segment update. - pub fn process( - self, - index: &Index, - segment_manager: &SegmentManager, - is_cancelled_generation: &mut bool) -> bool { - - info!("Segment update: {:?}", self); - - match self { - SegmentUpdate::AddSegment(segment_meta) => { - if !*is_cancelled_generation { - segment_manager.add_segment(segment_meta); - } - else { - // rollback has been called and this - // segment actually belong to the - // documents that have been dropped. - // - // Let's just remove its files. - index.delete_segment(segment_meta.segment_id); - } - } - SegmentUpdate::EndMerge(segment_ids, segment_meta) => { - segment_manager.end_merge(&segment_ids, &segment_meta); - for segment_id in segment_ids { - index.delete_segment(segment_id); - } - } - SegmentUpdate::CancelGeneration => { - // Called during rollback. The segment - // that will arrive will be ignored - // until a NewGeneration is update arrives. - *is_cancelled_generation = true; - } - SegmentUpdate::NewGeneration => { - // After rollback, we can resume - // indexing new documents. - *is_cancelled_generation = false; - } - SegmentUpdate::Commit(docstamp) => { - segment_manager.commit(docstamp); - } - SegmentUpdate::Terminate => { - return true; - } - } - return false; - } -} - - -fn consider_merge_options(segment_manager: &SegmentManager, merge_policy: &MergePolicy) -> Vec { - let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager); - // Committed segments cannot be merged with uncommitted_segments. - // We therefore consider merges using these two sets of segments independantly. - let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments); - merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&committed_segments)[..]); - merge_candidates -} - - -// Consumes the `segment_update_receiver` channel -// for segment updates and apply them. -// -// Using a channel ensures that all of the updates -// happen in the same thread, and makes -// the implementation of rollback and commit -// trivial. -fn process_segment_updates(mut index: Index, - segment_manager: &SegmentManager, - segment_update_receiver: SegmentUpdateReceiver, - segment_update_sender: SegmentUpdateSender) { - let mut option_segment_update_sender = Some(segment_update_sender); - let mut is_cancelled_generation = false; - let mut generation = segment_manager.generation(); - let merge_policy = index.get_merge_policy(); - - for segment_update in segment_update_receiver { - if segment_update.process( - &index, - segment_manager, - &mut is_cancelled_generation) { - option_segment_update_sender = None; - }; - - let new_generation = segment_manager.generation(); - - // we check the generation number as if it was - // dirty-bit. If the value is different - // to our generation, then the segment_manager has - // been update updated and we need to - // - save meta.json - // - update the searchers - // - consider possible segment merge - - if generation != new_generation { - generation = new_generation; - - // saving the meta file. - save_metas( - segment_manager, - index.schema(), - index.docstamp(), - index.directory_mut()).expect("Could not save metas."); - - // update the searchers so that they eventually will - // use the new segments. - // TODO eventually have this work through watching meta.json - // so that an external process stays up to date as well. - index.load_searchers().expect("Could not load new searchers."); - - if let Some(ref segment_update_sender) = option_segment_update_sender { - for MergeCandidate(segment_ids) in consider_merge_options(&segment_manager, &*merge_policy) { - segment_manager.start_merge(&segment_ids); - let index_clone = index.clone(); - let segment_update_sender_clone = segment_update_sender.clone(); - thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || { - info!("Start merge: {:?}", segment_ids); - let schema = index_clone.schema(); - let segments: Vec = segment_ids - .iter() - .map(|&segment_id| index_clone.segment(segment_id)) - .collect(); - // An IndexMerger is like a "view" of our merged segments. - // TODO unwrap - let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap(); - let mut merged_segment = index_clone.new_segment(); - // ... we just serialize this index merger in our new segment - // to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap(); - let num_docs = merger.write(segment_serializer).unwrap(); - let segment_meta = SegmentMeta { - segment_id: merged_segment.id(), - num_docs: num_docs, - }; - let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta); - segment_update_sender_clone.send(segment_update); - }).expect("Failed to spawn merge thread"); - } - } - - } - } - -} - impl IndexWriter { + + /// The index writer pub fn wait_merging_threads(mut self) -> Result<()> { self.segment_update_sender.send(SegmentUpdate::Terminate); - + drop(self.segment_update_sender); // this will stop the indexing thread, @@ -316,7 +155,12 @@ impl IndexWriter { let mut v = Vec::new(); mem::swap(&mut v, &mut self.workers_join_handle); for join_handle in v { - join_handle.join().expect("Indexer has failed"); + try!( + join_handle + .join() + .expect("Indexing Worker thread panicked") + .map_err(|e| Error::ErrorInThread(format!("Error in indexing worker thread. {:?}", e))) + ); } drop(self.workers_join_handle); self.segment_update_thread @@ -333,10 +177,9 @@ impl IndexWriter { fn add_indexing_worker(&mut self,) -> Result<()> { let index = self.index.clone(); let schema = self.index.schema(); - let document_receiver_clone = self.document_receiver.clone(); let mut segment_update_sender = self.segment_update_sender.clone(); - + let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); let join_handle: JoinHandle> = try!(thread::Builder::new() @@ -400,16 +243,13 @@ impl IndexWriter { panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT)); } let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::sync(0); - let segment_manager = get_segment_manager(index); - - let index_clone = index.clone(); - let segment_update_sender_clone = segment_update_sender.clone(); - let segment_update_thread = try!(thread::Builder::new().name("segment_update".to_string()).spawn(move || { - process_segment_updates(index_clone, &*segment_manager, segment_update_receiver, segment_update_sender_clone) - })); + let segment_updater = SegmentUpdater::new(index.clone()); + let segment_update_sender = segment_updater.update_channel().expect("This should never happen"); // TODO remove expect + + let segment_update_thread = segment_updater.start(); + let mut index_writer = IndexWriter { heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, index: index.clone(), diff --git a/src/indexer/merge_policy.rs b/src/indexer/merge_policy.rs index 1d77c3fd7..b13daa7ea 100644 --- a/src/indexer/merge_policy.rs +++ b/src/indexer/merge_policy.rs @@ -1,10 +1,11 @@ use core::SegmentId; use core::SegmentMeta; +use std::marker; #[derive(Debug, Clone)] pub struct MergeCandidate(pub Vec); -pub trait MergePolicy { +pub trait MergePolicy: marker::Send { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec; } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 3d22c61ae..276973e1b 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -7,7 +7,7 @@ mod simple_merge_policy; mod segment_register; mod segment_writer; mod segment_manager; - +mod segment_updater; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs new file mode 100644 index 000000000..80315f92c --- /dev/null +++ b/src/indexer/segment_updater.rs @@ -0,0 +1,214 @@ +use chan; +use common::LivingCounterLatch; +use core::Index; +use core::Segment; +use core::SegmentId; +use core::SegmentMeta; +use core::SerializableSegment; +use indexer::{MergePolicy, SimpleMergePolicy}; +use indexer::index_writer::save_metas; +use indexer::MergeCandidate; +use indexer::merger::IndexMerger; +use indexer::SegmentSerializer; +use std::thread; +use std::thread::JoinHandle; +use std::sync::Arc; +use super::segment_manager::{SegmentManager, get_segment_ready_for_commit}; +use super::super::core::index::get_segment_manager; + +pub type SegmentUpdateSender = chan::Sender; +pub type SegmentUpdateReceiver = chan::Receiver; + + +#[derive(Debug)] +pub enum SegmentUpdate { + AddSegment(SegmentMeta), + EndMerge(Vec, SegmentMeta), + CancelGeneration, + NewGeneration, + Terminate, + Commit(u64), +} + + + + + +/// The segment updater is in charge of +/// receiving different SegmentUpdate +/// - indexing threads are sending new segments +/// - merging threads are sending merge operations +/// - the index writer sends "terminate" +pub struct SegmentUpdater { + index: Index, + is_cancelled_generation: bool, + segment_update_receiver: SegmentUpdateReceiver, + option_segment_update_sender: Option, + segment_manager_arc: Arc, + merge_policy: Box, +} + + +impl SegmentUpdater { + + pub fn new(index: Index) -> SegmentUpdater { + let segment_manager_arc = get_segment_manager(&index); + let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::sync(0); + SegmentUpdater { + index: index, + is_cancelled_generation: false, + option_segment_update_sender: Some(segment_update_sender), + segment_update_receiver: segment_update_receiver, + segment_manager_arc: segment_manager_arc, + merge_policy: Box::new(SimpleMergePolicy::default()), // TODO make that configurable + } + } + + pub fn update_channel(&self,) -> Option { + self.option_segment_update_sender.clone() + } + + + fn consider_merge_options(&self,) -> Vec { + let segment_manager = self.segment_manager(); + let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager); + // Committed segments cannot be merged with uncommitted_segments. + // We therefore consider merges using these two sets of segments independantly. + let mut merge_candidates = self.merge_policy.compute_merge_candidates(&uncommitted_segments); + let committed_merge_candidates = self.merge_policy.compute_merge_candidates(&committed_segments); + merge_candidates.extend_from_slice(&committed_merge_candidates[..]); + merge_candidates + } + + + fn segment_manager(&self,) -> &SegmentManager { + &*self.segment_manager_arc + } + + pub fn start(self,) -> JoinHandle<()> { + thread::Builder::new() + .name("segment_update".to_string()) + .spawn(move || { + self.process(); + }) + .expect("Failed to start segment updater thread.") + } + + fn process(mut self,) { + + let segment_manager = self.segment_manager_arc.clone(); + + let living_threads = LivingCounterLatch::default(); + + let segment_updates = self.segment_update_receiver.clone(); + for segment_update in segment_updates { + // we check the generation number as if it was + // dirty-bit. If the value is different + // to our generation, then the segment_manager has + // been update updated and we need to + // - save meta.json + // - update the searchers + // - consider possible segment merge + let generation_before_update = segment_manager.generation(); + + self.process_one(segment_update); + + if generation_before_update != segment_manager.generation() { + + // saving the meta file. + save_metas( + &*segment_manager, + self.index.schema(), + self.index.docstamp(), + self.index.directory_mut()).expect("Could not save metas."); + + // update the searchers so that they eventually will + // use the new segments. + // TODO eventually have this work through watching meta.json + // so that an external process stays up to date as well. + self.index.load_searchers().expect("Could not load new searchers."); + + if let Some(ref segment_update_sender) = self.option_segment_update_sender { + for MergeCandidate(segment_ids) in self.consider_merge_options() { + segment_manager.start_merge(&segment_ids); + let living_threads_clone = living_threads.clone(); + let index_clone = self.index.clone(); + let segment_update_sender_clone = segment_update_sender.clone(); + thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || { + info!("Start merge: {:?}", segment_ids); + let schema = index_clone.schema(); + let segments: Vec = segment_ids + .iter() + .map(|&segment_id| index_clone.segment(segment_id)) + .collect(); + // An IndexMerger is like a "view" of our merged segments. + // TODO unwrap + let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap(); + let mut merged_segment = index_clone.new_segment(); + // ... we just serialize this index merger in our new segment + // to merge the two segments. + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap(); + let num_docs = merger.write(segment_serializer).unwrap(); + let segment_meta = SegmentMeta { + segment_id: merged_segment.id(), + num_docs: num_docs, + }; + let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta); + segment_update_sender_clone.send(segment_update); + drop(living_threads_clone); + }).expect("Failed to spawn merge thread"); + } + } + + } + } + } + + + // Process a single segment update. + pub fn process_one( + &mut self, + segment_update: SegmentUpdate) { + + info!("Segment update: {:?}", segment_update); + + match segment_update { + SegmentUpdate::AddSegment(segment_meta) => { + if !self.is_cancelled_generation { + self.segment_manager().add_segment(segment_meta); + } + else { + // rollback has been called and this + // segment actually belong to the + // documents that have been dropped. + // + // Let's just remove its files. + self.index.delete_segment(segment_meta.segment_id); + } + } + SegmentUpdate::EndMerge(segment_ids, segment_meta) => { + self.segment_manager().end_merge(&segment_ids, &segment_meta); + for segment_id in segment_ids { + self.index.delete_segment(segment_id); + } + } + SegmentUpdate::CancelGeneration => { + // Called during rollback. The segment + // that will arrive will be ignored + // until a NewGeneration is update arrives. + self.is_cancelled_generation = true; + } + SegmentUpdate::NewGeneration => { + // After rollback, we can resume + // indexing new documents. + self.is_cancelled_generation = false; + } + SegmentUpdate::Commit(docstamp) => { + self.segment_manager().commit(docstamp); + } + SegmentUpdate::Terminate => { + self.option_segment_update_sender = None; + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 808743ce5..ebcaa2d5b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ #![allow(module_inception)] #![feature(optin_builtin_traits)] -#![feature(binary_heap_extras)] #![feature(conservative_impl_trait)] #![cfg_attr(test, feature(test))] #![cfg_attr(test, feature(step_by))] diff --git a/src/query/daat_multiterm_scorer.rs b/src/query/daat_multiterm_scorer.rs index 8f0d7f5b5..a3abcfb0a 100644 --- a/src/query/daat_multiterm_scorer.rs +++ b/src/query/daat_multiterm_scorer.rs @@ -198,7 +198,7 @@ impl DocSet for DAATMul } } self.advance_head(); - while let Some(&HeapItem { doc: doc, ord: ord}) = self.queue.peek() { + while let Some(&HeapItem {doc, ord}) = self.queue.peek() { if doc == self.doc { let peek_ord: usize = ord as usize; let peek_tf = self.term_frequencies[peek_ord];