From b499d76f9114dd95a43d90cc3206abaa4f90bb83 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 13 Oct 2016 10:45:46 +0900 Subject: [PATCH] bug/4 Added docstamp in segment manager to make it possible for index_writer to wait for the segmetn manager to be up to date --- src/core/index.rs | 19 ++++++++++--------- src/core/pool.rs | 3 +-- src/indexer/index_writer.rs | 27 ++++++++++++++++++++++----- src/indexer/segment_manager.rs | 12 ++++++++++-- 4 files changed, 43 insertions(+), 18 deletions(-) diff --git a/src/core/index.rs b/src/core/index.rs index 64c9d89be..06ac705f0 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -197,18 +197,19 @@ impl Index { /// This needs to be called when a new segment has been /// published or after a merge. pub fn load_searchers(&self,) -> Result<()>{ - let res_searchers: Result> = (0..NUM_SEARCHERS) - .map(|_| { - let segment_readers: Vec = try!( - self.searchable_segments() + let searchable_segments = self.searchable_segments(); + let mut searchers = Vec::new(); + for _ in 0..NUM_SEARCHERS { + let searchable_segments_clone = searchable_segments.clone(); + let segment_readers: Vec = try!( + searchable_segments_clone .into_iter() .map(SegmentReader::open) .collect() - ); - Ok(Searcher::from(segment_readers)) - }) - .collect(); - let searchers = try!(res_searchers); + ); + let searcher = Searcher::from(segment_readers); + searchers.push(searcher); + } self.searcher_pool.publish_new_generation(searchers); Ok(()) } diff --git a/src/core/pool.rs b/src/core/pool.rs index b06b6110a..864deb24d 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -41,12 +41,11 @@ impl Pool { } pub fn inc_generation(&self,) { - self.generation.fetch_add(1, Ordering::Release); + self.generation.fetch_add(1, Ordering::SeqCst); } pub fn acquire(&self,) -> LeasedItem { let generation = self.generation.load(Ordering::Acquire); - println("generation {}"); loop { let gen_item = self.queue.pop(); if gen_item.generation >= generation { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 37c9072b1..744506989 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -23,6 +23,7 @@ use chan; use core::SegmentMeta; use core::IndexMeta; use core::META_FILEPATH; +use std::time::Duration; use super::super::core::index::get_segment_manager; use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit}; use Result; @@ -148,7 +149,7 @@ pub enum SegmentUpdate { CancelGeneration, NewGeneration, Terminate, - Commit, + Commit(u64), } impl SegmentUpdate { @@ -193,8 +194,8 @@ impl SegmentUpdate { // indexing new documents. *is_cancelled_generation = false; } - SegmentUpdate::Commit => { - segment_manager.commit(); + SegmentUpdate::Commit(docstamp) => { + segment_manager.commit(docstamp); } SegmentUpdate::Terminate => { return true; @@ -597,8 +598,24 @@ impl IndexWriter { // add a new worker for the next generation. try!(self.add_indexing_worker()); } - - self.segment_update_sender.send(SegmentUpdate::Commit); + // here, because we join all of the worker threads, + // all of the segment update for this commit have been + // sent. + // + // No document belonging to the next generation have been + // pushed too, because add_document can only happen + // on this thread. + + // This will move uncommitted segments to the state of + // committed segments. + self.segment_update_sender.send(SegmentUpdate::Commit(self.committed_docstamp)); + + // wait for the segment update thread to have processed the info + let segment_manager = get_segment_manager(&self.index); + while segment_manager.docstamp() != self.committed_docstamp { + println!("wait"); + thread::sleep(Duration::from_millis(100)); + } // super::super::core::index::commit(&mut self.index, commit_docstamp); try!(self.on_change()); diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index d6a3a8915..411490aa2 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -7,6 +7,7 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::atomic::{AtomicUsize, Ordering}; struct SegmentRegisters { + docstamp: u64, uncommitted: SegmentRegister, committed: SegmentRegister, } @@ -21,6 +22,7 @@ pub enum CommitState { impl Default for SegmentRegisters { fn default() -> SegmentRegisters { SegmentRegisters { + docstamp: 0u64, uncommitted: SegmentRegister::default(), committed: SegmentRegister::default() } @@ -77,10 +79,15 @@ impl SegmentManager { CommitState::Missing } } + + pub fn docstamp(&self,) -> u64 { + self.read().docstamp + } pub fn from_segments(segment_metas: Vec) -> SegmentManager { SegmentManager { registers: RwLock::new( SegmentRegisters { + docstamp: 0u64, // TODO put the actual value uncommitted: SegmentRegister::default(), committed: SegmentRegister::from(segment_metas), }), @@ -113,13 +120,14 @@ impl SegmentManager { segment_ids } - pub fn commit(&self,) { + pub fn commit(&self, docstamp: u64) { let mut registers_lock = self.write(); let segment_entries = registers_lock.uncommitted.segment_entries(); for segment_entry in segment_entries { registers_lock.committed.add_segment_entry(segment_entry); } - registers_lock.uncommitted.clear(); + registers_lock.docstamp = docstamp; + registers_lock.uncommitted.clear(); } pub fn add_segment(&self, segment_meta: SegmentMeta) {