bug/4 Added docstamp in segment manager to make it possible for index_writer to wait for the segmetn manager to be up to date

This commit is contained in:
Paul Masurel
2016-10-13 10:45:46 +09:00
parent 3bc9978d95
commit b499d76f91
4 changed files with 43 additions and 18 deletions

View File

@@ -197,18 +197,19 @@ impl Index {
/// This needs to be called when a new segment has been
/// published or after a merge.
pub fn load_searchers(&self,) -> Result<()>{
let res_searchers: Result<Vec<Searcher>> = (0..NUM_SEARCHERS)
.map(|_| {
let segment_readers: Vec<SegmentReader> = try!(
self.searchable_segments()
let searchable_segments = self.searchable_segments();
let mut searchers = Vec::new();
for _ in 0..NUM_SEARCHERS {
let searchable_segments_clone = searchable_segments.clone();
let segment_readers: Vec<SegmentReader> = try!(
searchable_segments_clone
.into_iter()
.map(SegmentReader::open)
.collect()
);
Ok(Searcher::from(segment_readers))
})
.collect();
let searchers = try!(res_searchers);
);
let searcher = Searcher::from(segment_readers);
searchers.push(searcher);
}
self.searcher_pool.publish_new_generation(searchers);
Ok(())
}

View File

@@ -41,12 +41,11 @@ impl<T> Pool<T> {
}
pub fn inc_generation(&self,) {
self.generation.fetch_add(1, Ordering::Release);
self.generation.fetch_add(1, Ordering::SeqCst);
}
pub fn acquire(&self,) -> LeasedItem<T> {
let generation = self.generation.load(Ordering::Acquire);
println("generation {}");
loop {
let gen_item = self.queue.pop();
if gen_item.generation >= generation {

View File

@@ -23,6 +23,7 @@ use chan;
use core::SegmentMeta;
use core::IndexMeta;
use core::META_FILEPATH;
use std::time::Duration;
use super::super::core::index::get_segment_manager;
use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit};
use Result;
@@ -148,7 +149,7 @@ pub enum SegmentUpdate {
CancelGeneration,
NewGeneration,
Terminate,
Commit,
Commit(u64),
}
impl SegmentUpdate {
@@ -193,8 +194,8 @@ impl SegmentUpdate {
// indexing new documents.
*is_cancelled_generation = false;
}
SegmentUpdate::Commit => {
segment_manager.commit();
SegmentUpdate::Commit(docstamp) => {
segment_manager.commit(docstamp);
}
SegmentUpdate::Terminate => {
return true;
@@ -597,8 +598,24 @@ impl IndexWriter {
// add a new worker for the next generation.
try!(self.add_indexing_worker());
}
self.segment_update_sender.send(SegmentUpdate::Commit);
// here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
//
// No document belonging to the next generation have been
// pushed too, because add_document can only happen
// on this thread.
// This will move uncommitted segments to the state of
// committed segments.
self.segment_update_sender.send(SegmentUpdate::Commit(self.committed_docstamp));
// wait for the segment update thread to have processed the info
let segment_manager = get_segment_manager(&self.index);
while segment_manager.docstamp() != self.committed_docstamp {
println!("wait");
thread::sleep(Duration::from_millis(100));
}
// super::super::core::index::commit(&mut self.index, commit_docstamp);
try!(self.on_change());

View File

@@ -7,6 +7,7 @@ use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
struct SegmentRegisters {
docstamp: u64,
uncommitted: SegmentRegister,
committed: SegmentRegister,
}
@@ -21,6 +22,7 @@ pub enum CommitState {
impl Default for SegmentRegisters {
fn default() -> SegmentRegisters {
SegmentRegisters {
docstamp: 0u64,
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::default()
}
@@ -77,10 +79,15 @@ impl SegmentManager {
CommitState::Missing
}
}
pub fn docstamp(&self,) -> u64 {
self.read().docstamp
}
pub fn from_segments(segment_metas: Vec<SegmentMeta>) -> SegmentManager {
SegmentManager {
registers: RwLock::new( SegmentRegisters {
docstamp: 0u64, // TODO put the actual value
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::from(segment_metas),
}),
@@ -113,13 +120,14 @@ impl SegmentManager {
segment_ids
}
pub fn commit(&self,) {
pub fn commit(&self, docstamp: u64) {
let mut registers_lock = self.write();
let segment_entries = registers_lock.uncommitted.segment_entries();
for segment_entry in segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
}
registers_lock.uncommitted.clear();
registers_lock.docstamp = docstamp;
registers_lock.uncommitted.clear();
}
pub fn add_segment(&self, segment_meta: SegmentMeta) {