This commit is contained in:
Paul Masurel
2016-10-01 11:38:24 +09:00
parent 0c9b0daf70
commit 5e030ac6bb
3 changed files with 137 additions and 65 deletions

View File

@@ -12,11 +12,12 @@ use std::thread;
use indexer::merger::IndexMerger;
use core::SegmentId;
use datastruct::stacker::Heap;
use std::sync::Arc;
use std::mem::swap;
use chan;
use core::SegmentMeta;
use indexer::SegmentAppender;
use super::super::core::index::get_segment_manager;
use super::segment_manager::SegmentManager;
use Result;
use Error;
@@ -34,8 +35,8 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
type DocumentSender = chan::Sender<Document>;
type DocumentReceiver = chan::Receiver<Document>;
type NewSegmentSender = chan::Sender<Result<SegmentMeta>>;
type NewSegmentReceiver = chan::Receiver<Result<SegmentMeta>>;
type SegmentUpdateSender = chan::Sender<SegmentUpdate>;
type SegmentUpdateReceiver = chan::Receiver<SegmentUpdate>;
/// `IndexWriter` is the user entry-point to add document to an index.
///
@@ -46,10 +47,15 @@ type NewSegmentReceiver = chan::Receiver<Result<SegmentMeta>>;
pub struct IndexWriter {
index: Index,
heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<Result<()>>>,
segment_appender: SegmentAppender,
document_receiver: DocumentReceiver,
document_sender: DocumentSender,
segment_update_sender: SegmentUpdateSender,
segment_update_worker: JoinHandle<Result<()>>,
num_threads: usize,
docstamp: u64,
}
@@ -59,7 +65,7 @@ fn index_documents(heap: &mut Heap,
segment: Segment,
schema: &Schema,
document_iterator: &mut Iterator<Item=Document>,
segment_appender: &mut SegmentAppender) -> Result<bool> {
segment_update_sender: &mut SegmentUpdateSender) -> Result<()> {
heap.clear();
let segment_id = segment.id();
let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment, &schema));
@@ -77,10 +83,95 @@ fn index_documents(heap: &mut Heap,
};
try!(segment_writer.finalize());
segment_appender.add_segment(segment_meta)
segment_update_sender.send(SegmentUpdate::AddSegment(segment_meta));
Ok(())
}
#[derive(Debug)]
pub enum SegmentUpdate {
AddSegment(SegmentMeta),
StartMerge(Vec<SegmentId>),
EndMerge(Vec<SegmentId>, SegmentMeta),
CancelGeneration,
NewGeneration,
}
// Process a single segment update.
//
// If the segment manager has been changed a result,
// return true. (else return false)
fn process_segment_update(segment_manager: &SegmentManager,
segment_update: SegmentUpdate,
is_cancelled_generation: &mut bool) -> Result<bool> {
match segment_update {
SegmentUpdate::AddSegment(segment_meta) => {
if !*is_cancelled_generation {
try!(segment_manager.add_segment(segment_meta));
}
Ok(true)
},
SegmentUpdate::StartMerge(segment_ids) => {
if !*is_cancelled_generation {
try!(segment_manager.start_merge(&segment_ids));
// TODO spawn a segment merge thread
}
Ok(false)
},
SegmentUpdate::EndMerge(segment_ids, segment_meta) => {
try!(segment_manager.end_merge(&segment_ids, &segment_meta));
Ok(true)
},
SegmentUpdate::CancelGeneration => {
*is_cancelled_generation = true;
Ok(false)
},
SegmentUpdate::NewGeneration => {
*is_cancelled_generation = false;
Ok(false)
}
}
}
fn on_segment_change() {
// TODO
// - save meta.json,
// - merge opportunities
// - update searchers
}
// Consumes the `segment_update_receiver` channel
// for segment updates and apply them.
//
// Using a channel ensures that all of the updates
// happen in the same thread, and makes
// the implementation of rollback and commit
// trivial.
fn process_segment_updates(segment_manager: &SegmentManager,
segment_update_receiver: SegmentUpdateReceiver) -> Result<()> {
let mut segment_update_it = segment_update_receiver.into_iter();
let mut is_cancelled_generation = false;
loop {
if let Some(segment_update) = segment_update_it.next() {
let has_changed = try!(
process_segment_update(
segment_manager,
segment_update,
&mut is_cancelled_generation)
);
if has_changed {
on_segment_change();
}
}
else {
// somehow, the channel was dropped.
return Ok(());
}
}
Ok(())
}
impl IndexWriter {
/// Spawns a new worker thread for indexing.
@@ -89,9 +180,10 @@ impl IndexWriter {
fn add_indexing_worker(&mut self,) -> Result<()> {
let index = self.index.clone();
let schema = self.index.schema();
let mut segment_appender_clone = self.segment_appender.clone();
let document_receiver_clone = self.document_receiver.clone();
let document_receiver_clone = self.document_receiver.clone();
let mut segment_update_sender = self.segment_update_sender.clone();
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
let join_handle: JoinHandle<Result<()>> = thread::spawn(move || {
loop {
@@ -104,15 +196,11 @@ impl IndexWriter {
// creating a new segment's files
// if no document are available.
if document_iterator.peek().is_some() {
if !try!(
index_documents(&mut heap,
segment,
&schema,
&mut document_iterator,
&mut segment_appender_clone)
) {
return Ok(());
}
try!(index_documents(&mut heap,
segment,
&schema,
&mut document_iterator,
&mut segment_update_sender));
}
else {
// No more documents.
@@ -123,6 +211,7 @@ impl IndexWriter {
}
});
self.workers_join_handle.push(join_handle);
Ok(())
}
@@ -140,19 +229,28 @@ impl IndexWriter {
pub fn open(index: &Index,
num_threads: usize,
heap_size_in_bytes_per_thread: usize) -> Result<IndexWriter> {
if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize {
if heap_size_in_byt es_per_thread <= HEAP_SIZE_LIMIT as usize {
panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT));
}
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::sync(0);
let segment_manager = get_segment_manager(index);
let segment_appender = SegmentAppender::for_manager(segment_manager);
let segment_update_worker = thread::spawn(move || {
process_segment_updates(&*segment_manager, segment_update_receiver)
});
let mut index_writer = IndexWriter {
heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
index: index.clone(),
segment_appender: segment_appender,
document_receiver: document_receiver,
document_sender: document_sender,
segment_update_sender: segment_update_sender,
segment_update_worker: segment_update_worker,
workers_join_handle: Vec::new(),
num_threads: num_threads,
docstamp: index.docstamp(),
@@ -203,7 +301,6 @@ impl IndexWriter {
swap(&mut self.document_sender, &mut document_sender);
swap(&mut self.document_receiver, &mut document_receiver);
let segment_manager = get_segment_manager(&self.index);
self.segment_appender = SegmentAppender::for_manager(segment_manager);
document_receiver
}
@@ -217,11 +314,8 @@ impl IndexWriter {
/// The docstamp at the last commit is returned.
pub fn rollback(&mut self,) -> Result<u64> {
// the current generation is killed...
// if some pending segment are still in the pipe,
// they won't be added to our index.
self.segment_appender.close();
self.segment_update_sender.send(SegmentUpdate::CancelGeneration);
// we cannot drop segment ready receiver yet
// as it would block the workers.
let document_receiver = self.recreate_channel();
@@ -235,16 +329,29 @@ impl IndexWriter {
// wait for all the worker to finish their work
// (it should be fast since we consumed all pending documents)
let num_workers = former_workers_join_handle.len();
for worker_handle in former_workers_join_handle {
// we stop one worker at a time ...
try!(try!(
worker_handle
.join()
.map_err(|e| Error::ErrorInThread(format!("{:?}", e)))
));
// add a new worker for the next generation.
// ... and recreate a new one right away
// to work on the next generation.
try!(self.add_indexing_worker());
}
// All of our indexing workers for the rollbacked generation have
// been terminated.
// Our document receiver pipe was drained.
// No new document have been added in the meanwhile because `IndexWriter`
// is not shared by different threads.
//
// We can now open a new generation and reaccept segments
// from now on.
self.segment_update_sender.send(SegmentUpdate::NewGeneration);
get_segment_manager(&self.index).rollback();
// reset the docstamp to what it was before

View File

@@ -11,4 +11,4 @@ pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;
pub use self::index_writer::IndexWriter;
pub use self::merge_policy::MergePolicy;
pub use self::segment_manager::{SegmentManager, SegmentAppender};
pub use self::segment_manager::SegmentManager;

View File

@@ -3,8 +3,6 @@ use std::sync::RwLock;
use core::SegmentMeta;
use error::Result;
use core::SegmentId;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
struct SegmentRegisters {
uncommitted: SegmentRegister,
@@ -22,6 +20,7 @@ impl Default for SegmentRegisters {
/// The segment manager stores the list of segments
/// as well as their state.
///
@@ -58,7 +57,7 @@ impl SegmentManager {
Ok(())
}
fn add_segment(&self, segment_meta: SegmentMeta) -> Result<()> {
pub fn add_segment(&self, segment_meta: SegmentMeta) -> Result<()> {
let mut registers_lock = try!(self.registers.write());
registers_lock.uncommitted.add_segment(segment_meta);
Ok(())
@@ -107,37 +106,3 @@ impl SegmentManager {
}
}
#[derive(Clone)]
pub struct SegmentAppender {
is_open: Arc<AtomicBool>,
manager: Arc<SegmentManager>,
}
impl SegmentAppender {
pub fn for_manager(manager: Arc<SegmentManager>) -> SegmentAppender {
SegmentAppender {
is_open: Arc::new(AtomicBool::new(true)),
manager: manager,
}
}
pub fn is_open(&self,) -> bool {
self.is_open.load(Ordering::Acquire)
}
pub fn add_segment(&mut self, segment_meta: SegmentMeta) -> Result<bool> {
if self.is_open() {
try!(self.manager.add_segment(segment_meta));
Ok(true)
}
else {
Ok(false)
}
}
pub fn close(&mut self,) {
self.is_open.store(false, Ordering::Release);
}
}