issue/43 refactoring in order to remove the segment updater non sense for simpler futures

This commit is contained in:
Paul Masurel
2017-01-25 08:33:21 +09:00
parent 20eb586660
commit 0ec492dcf2
5 changed files with 181 additions and 193 deletions

View File

@@ -37,7 +37,10 @@ uuid = { version = "0.4", features = ["v4", "rustc-serialize"] }
chan = "0.1"
version = "2"
crossbeam = "0.2"
eventual = "0.1.7"
futures = "0.1.9"
futures-cpupool = "0.1.2"
[dev-dependencies]
rand = "0.3"

View File

@@ -5,6 +5,7 @@ use indexer::SegmentSerializer;
use core::SerializableSegment;
use core::Index;
use core::Segment;
use core::SegmentId;
use schema::Term;
use indexer::SegmentEntry;
use std::thread::JoinHandle;
@@ -63,7 +64,6 @@ pub struct IndexWriter {
_merge_policy: Arc<Mutex<Box<MergePolicy>>>,
index: Index,
segment_manager: Arc<SegmentManager>,
heap_size_in_bytes_per_thread: usize,
@@ -72,7 +72,7 @@ pub struct IndexWriter {
document_receiver: DocumentReceiver,
document_sender: DocumentSender,
segment_update_manager: SegmentUpdater,
segment_updater: SegmentUpdater,
worker_id: usize,
@@ -93,7 +93,7 @@ fn index_documents(heap: &mut Heap,
mut segment: Segment,
schema: &Schema,
document_iterator: &mut Iterator<Item=AddOperation>,
segment_update_manager: &mut SegmentUpdater,
segment_updater: &mut SegmentUpdater,
delete_cursor: &mut DeleteQueueCursor)
-> Result<()> {
heap.clear();
@@ -134,7 +134,7 @@ fn index_documents(heap: &mut Heap,
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone());
try!(segment_writer.finalize());
segment_update_manager.send(SegmentUpdate::AddSegment(segment_entry));
segment_updater.add_segment(segment_entry);
Ok(())
}
@@ -144,10 +144,10 @@ impl IndexWriter {
/// The index writer
pub fn wait_merging_threads(mut self) -> Result<()> {
let future = self.segment_update_manager.send(SegmentUpdate::Terminate);
let future = self.segment_updater.terminate();
// this will stop the indexing thread,
// dropping the last reference to the segment_update_manager.
// dropping the last reference to the segment_updater.
drop(self.document_sender);
let mut v = Vec::new();
@@ -172,7 +172,7 @@ impl IndexWriter {
let index = self.index.clone();
let schema = self.index.schema();
let document_receiver_clone = self.document_receiver.clone();
let mut segment_update_manager = self.segment_update_manager.clone();
let mut segment_updater = self.segment_updater.clone();
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
// TODO fix this. the cursor might be too advanced
@@ -201,7 +201,7 @@ impl IndexWriter {
segment,
&schema,
&mut document_iterator,
&mut segment_update_manager,
&mut segment_updater,
&mut delete_cursor_clone));
} else {
// No more documents.
@@ -250,11 +250,8 @@ impl IndexWriter {
let delete_queue = DeleteQueue::default();
let committed_segments = index.committed_segments()?;
let segment_manager = Arc::new(SegmentManager::from_segments(committed_segments, delete_queue.cursor()));
let segment_update_manager = SegmentUpdater::new(index.clone(), segment_manager.clone(), merge_policy.clone());
let segment_updater = SegmentUpdater::create(index.clone(), delete_queue.cursor(), merge_policy.clone())?;
let mut index_writer = IndexWriter {
@@ -264,12 +261,11 @@ impl IndexWriter {
heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
index: index.clone(),
segment_manager: segment_manager,
document_receiver: document_receiver,
document_sender: document_sender,
segment_update_manager: segment_update_manager,
segment_updater: segment_updater,
workers_join_handle: Vec::new(),
num_threads: num_threads,
@@ -303,72 +299,8 @@ impl IndexWriter {
}
/// Merges a given list of segments
pub fn merge(&mut self, segments: &[Segment]) -> Result<()> {
if segments.len() < 2 {
// no segments or one segment? nothing to do.
return Ok(());
}
let ref segment_manager = self.segment_manager;
{
// let's check that all these segments are in the same
// committed/uncommited state.
let first_commit_state = segment_manager.is_committed(segments[0].id());
for segment in segments {
let commit_state = segment_manager.is_committed(segment.id());
if commit_state == CommitState::Missing {
return Err(Error::InvalidArgument(format!("Segment {:?} is not in the index",
segments[0].id())));
}
if commit_state != first_commit_state {
return Err(Error::InvalidArgument(String::from("You may not merge segments \
that are heterogenously in \
committed and uncommited.")));
}
}
}
let schema = self.index.schema();
// An IndexMerger is like a "view" of our merged segments.
let merger = try!(IndexMerger::open(schema, segments));
let mut merged_segment = self.index.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = try!(SegmentSerializer::for_segment(&mut merged_segment));
let num_docs = try!(merger.write(segment_serializer));
let merged_segment_ids = segments
.iter()
.map(|segment| segment.id())
.collect::<Vec<_>>();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
num_deleted_docs: 0,
};
// TODO fix this!!!
let delete_queue = DeleteQueue::default();
let delete_cursor = delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor);
let segment_update = SegmentUpdate::EndMerge(
None,
merged_segment_ids,
segment_entry
);
self.segment_update_manager.send(segment_update);
// self.segment_updater.(segment_ids, segment_entry);
//segment_manager.end_merge(&merged_segment_ids, segment_entry);
Ok(())
pub fn merge(&mut self, segments: &[SegmentId]) -> impl Async<Value=(), Error=&'static str> {
self.segment_updater.start_merge(segments.to_vec())
}
/// Closes the current document channel send.
@@ -397,7 +329,7 @@ impl IndexWriter {
/// The docstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<u64> {
self.segment_update_manager.send(SegmentUpdate::CancelGeneration);
self.segment_updater.cancel_generation();
// we cannot drop segment ready receiver yet
// as it would block the workers.
@@ -430,15 +362,20 @@ impl IndexWriter {
//
// We can now open a new generation and reaccept segments
// from now on.
self.segment_update_manager.send(SegmentUpdate::NewGeneration);
self.segment_updater.new_generation();
// TODO Send rollback.
//let rollbacked_segments = self.segment_manager.rollback();
let rollbacked_segments = self.segment_manager.rollback();
for segment_id in rollbacked_segments {
// for segment_id in rollbacked_segments {
// // TODO all delete must happen after saving
// // meta.json
// self.index.delete_segment(segment_id);
// }
panic!("aaaa");
// TODO all delete must happen after saving
// meta.json
self.index.delete_segment(segment_id);
}
// reset the docstamp
self.uncommitted_docstamp = self.committed_docstamp;
@@ -490,7 +427,7 @@ impl IndexWriter {
// This will move uncommitted segments to the state of
// committed segments.
let future = self.segment_update_manager.send(SegmentUpdate::Commit(self.committed_docstamp));
let future = self.segment_updater.commit(self.committed_docstamp);
// wait for the segment update thread to have processed the info
// TODO remove unwrap

View File

@@ -1,6 +1,7 @@
use Result;
use core::SegmentReader;
use core::Segment;
use core::SegmentId;
use DocId;
use core::SerializableSegment;
use indexer::SegmentSerializer;
@@ -205,6 +206,7 @@ mod tests {
use collector::tests::TestCollector;
use query::BooleanQuery;
use schema::TextIndexingOptions;
use eventual::Async;
#[test]
fn test_index_merger() {
@@ -260,9 +262,11 @@ mod tests {
}
}
{
let segments = index.searchable_segments().expect("Searchable segments failed.");
let segment_ids = index.searchable_segment_ids().expect("Searchable segments failed.");
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer.merge(&segments).expect("Merging failed");
index_writer.merge(&segment_ids)
.await()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
}
{

View File

@@ -20,7 +20,7 @@ use std::thread::JoinHandle;
use std::sync::Arc;
use std::collections::HashMap;
use rustc_serialize::json;
use indexer::delete_queue::DeleteQueue;
use indexer::delete_queue::{DeleteQueueCursor, DeleteQueue};
use Result;
use core::IndexMeta;
use core::META_FILEPATH;
@@ -91,6 +91,9 @@ pub enum SegmentUpdate {
/// Created by the indexing worker thread
AddSegment(SegmentEntry),
StartMerge(Vec<SegmentId>),
/// A merge is ended.
/// Remove the merged segment and record the new
/// large merged segment.
@@ -119,8 +122,6 @@ pub enum SegmentUpdate {
}
// TODO Rename
#[derive(Clone)]
pub struct SegmentUpdater {
@@ -130,21 +131,56 @@ pub struct SegmentUpdater {
impl SegmentUpdater {
pub fn new(
pub fn create(
index: Index,
segment_manager: Arc<SegmentManager>,
merge_policy: Arc<Mutex<Box<MergePolicy>>>) -> SegmentUpdater {
delete_cursor: DeleteQueueCursor,
merge_policy: Arc<Mutex<Box<MergePolicy>>>) -> Result<SegmentUpdater> {
let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async();
let segment_update_manager = SegmentUpdater {
let segment_updater = SegmentUpdater {
channel: segment_update_sender,
};
let committed_segments = index.committed_segments()?;
let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor);
SegmentUpdateRunner::new(
index,
segment_manager,
merge_policy,
segment_update_manager.clone(),
segment_updater.clone(),
segment_update_receiver).start();
segment_update_manager
Ok(segment_updater)
}
pub fn add_segment(&self, segment_entry: SegmentEntry) -> impl Async<Value=(), Error=&'static str> {
self.send(SegmentUpdate::AddSegment(segment_entry))
}
pub fn commit(&self, committed_docstamp: u64) -> impl Async<Value=(), Error=&'static str> {
self.send(SegmentUpdate::Commit(committed_docstamp))
}
pub fn start_merge(&self, segment_ids: Vec<SegmentId>) -> impl Async<Value=(), Error=&'static str> {
self.send(SegmentUpdate::StartMerge(segment_ids))
}
pub fn new_generation(&self) -> impl Async<Value=(), Error=&'static str> {
self.send(SegmentUpdate::NewGeneration)
}
pub fn cancel_generation(&self) -> impl Async<Value=(), Error=&'static str> {
self.send(SegmentUpdate::CancelGeneration)
}
pub fn end_merge(&self,
merge_thread_id: Option<usize>,
merged_segment_ids: Vec<SegmentId>,
resulting_segment_entry: SegmentEntry) -> impl Async<Value=(), Error=&'static str> {
let segment_update = SegmentUpdate::EndMerge(merge_thread_id, merged_segment_ids, resulting_segment_entry);
self.send(segment_update)
}
pub fn terminate(&self) -> impl Async<Value=(), Error=&'static str> {
self.send(SegmentUpdate::Terminate)
}
pub fn send(&self, segment_update: SegmentUpdate) -> impl Async<Value=(), Error=&'static str> {
@@ -170,8 +206,8 @@ pub struct SegmentUpdateRunner {
index: Index,
is_cancelled_generation: bool,
segment_update_receiver: SegmentUpdateReceiver,
segment_update_manager: SegmentUpdater,
segment_manager: Arc<SegmentManager>,
segment_updater: SegmentUpdater,
segment_manager: SegmentManager,
merge_policy: Arc<Mutex<Box<MergePolicy>>>,
merging_thread_id: usize,
merging_threads: HashMap<usize, JoinHandle<(Vec<SegmentId>, SegmentEntry)> >,
@@ -180,14 +216,14 @@ pub struct SegmentUpdateRunner {
impl SegmentUpdateRunner {
fn new(index: Index,
segment_manager: Arc<SegmentManager>,
segment_manager: SegmentManager,
merge_policy: Arc<Mutex<Box<MergePolicy>>>,
segment_update_manager: SegmentUpdater,
segment_updater: SegmentUpdater,
segment_update_receiver: SegmentUpdateReceiver) -> SegmentUpdateRunner {
SegmentUpdateRunner {
index: index,
is_cancelled_generation: false,
segment_update_manager: segment_update_manager,
segment_updater: segment_updater,
segment_update_receiver: segment_update_receiver,
segment_manager: segment_manager,
merge_policy: merge_policy,
@@ -206,10 +242,10 @@ impl SegmentUpdateRunner {
&mut self,
segment_ids: Vec<SegmentId>,
segment_entry: SegmentEntry) {
let segment_manager = self.segment_manager.clone();
segment_manager.end_merge(&segment_ids, segment_entry);
self.segment_manager.end_merge(&segment_ids, segment_entry);
save_metas(
&*segment_manager,
&self.segment_manager,
self.index.schema(),
self.index.docstamp(),
self.index.directory_mut()).expect("Could not save metas.");
@@ -221,60 +257,64 @@ impl SegmentUpdateRunner {
self.index.load_searchers().unwrap();
}
fn start_merge(&mut self, segment_ids: Vec<SegmentId>, complete_opt: Option<Complete<(), &'static str>>) {
let merging_thread_id = self.new_merging_thread_id();
self.segment_manager.start_merge(&segment_ids);
let index_clone = self.index.clone();
let segment_updater_clone = self.segment_updater.clone();
let merge_thread_handle = thread::Builder::new()
.name(format!("merge_thread_{:?}", merging_thread_id))
.spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
num_deleted_docs: 0u32,
};
// TODO fix delete cursor
let delete_queue = DeleteQueue::default();
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone());
segment_updater_clone.send(segment_update.clone());
if let Some(complete) = complete_opt {
complete.complete(());
}
(segment_ids, segment_entry)
})
.expect("Failed to spawn merge thread");
self.merging_threads.insert(merging_thread_id, merge_thread_handle);
}
fn start_merges(&mut self) {
let merge_candidates = self.consider_merge_options();
for MergeCandidate(segment_ids) in merge_candidates {
let merging_thread_id = self.new_merging_thread_id();
self.segment_manager().start_merge(&segment_ids);
let index_clone = self.index.clone();
let segment_update_manager_clone = self.segment_update_manager.clone();
let merge_thread_handle = thread::Builder::new()
.name(format!("merge_thread_{:?}", merging_thread_id))
.spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
num_deleted_docs: 0u32,
};
// TODO fix delete cursor
let delete_queue = DeleteQueue::default();
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone());
segment_update_manager_clone.send(segment_update.clone());
(segment_ids, segment_entry)
})
.expect("Failed to spawn merge thread");
self.merging_threads.insert(merging_thread_id, merge_thread_handle);
self.start_merge(segment_ids, None);
}
}
fn consider_merge_options(&self,) -> Vec<MergeCandidate> {
let segment_manager = self.segment_manager();
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager);
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&self.segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independantly.
let merge_policy_lock = self.merge_policy.lock().unwrap();
@@ -284,11 +324,6 @@ impl SegmentUpdateRunner {
merge_candidates
}
fn segment_manager(&self,) -> &SegmentManager {
&*self.segment_manager
}
pub fn start(self) -> JoinHandle<()> {
thread::Builder::new()
.name("segment_update".to_string())
@@ -299,9 +334,7 @@ impl SegmentUpdateRunner {
}
fn process(mut self) {
let segment_manager = self.segment_manager.clone();
let mut complete_option = None;
for (complete, segment_update) in self.segment_update_receiver.clone() {
@@ -316,40 +349,46 @@ impl SegmentUpdateRunner {
// dirty-bit. If the value is different
// to our generation, then the segment_manager has
// been update updated.
let generation_before_update = segment_manager.generation();
let generation_before_update = self.segment_manager.generation();
self.process_one(segment_update);
if generation_before_update != segment_manager.generation() {
// The segment manager has changed, we need to
// - save meta.json
save_metas(
&*segment_manager,
self.index.schema(),
self.index.docstamp(),
self.index.directory_mut()).expect("Could not save metas.");
if let SegmentUpdate::StartMerge(segment_ids) = segment_update {
self.start_merge(segment_ids, Some(complete));
}
else {
self.process_one(segment_update);
if generation_before_update != self.segment_manager.generation() {
// The segment manager has changed, we need to
// - save meta.json
save_metas(
&self.segment_manager,
self.index.schema(),
self.index.docstamp(),
self.index.directory_mut()).expect("Could not save metas.");
// - update the searchers
// update the searchers so that they eventually will
// use the new segments.
// TODO eventually have this work through watching meta.json
// so that an external process stays up to date as well.
match self.index.load_searchers() {
Ok(()) => {
}
Err(e) => {
error!("Failure while loading new searchers {:?}", e);
panic!(format!("Failure while loading new searchers {:?}", e));
// - update the searchers
// update the searchers so that they eventually will
// use the new segments.
// TODO eventually have this work through watching meta.json
// so that an external process stays up to date as well.
match self.index.load_searchers() {
Ok(()) => {}
Err(e) => {
error!("Failure while loading new searchers {:?}", e);
panic!(format!("Failure while loading new searchers {:?}", e));
}
}
// - start merges if required
self.start_merges();
}
// - start merges if required
self.start_merges();
complete.complete(());
}
complete.complete(());
}
@@ -381,12 +420,12 @@ impl SegmentUpdateRunner {
segment_update: SegmentUpdate) {
info!("Segment update: {:?}", segment_update);
use self::SegmentUpdate::*;
match segment_update {
AddSegment(segment_entry) => {
if !self.is_cancelled_generation {
self.segment_manager().add_segment(segment_entry);
self.segment_manager.add_segment(segment_entry);
}
else {
// rollback has been called and this
@@ -397,6 +436,9 @@ impl SegmentUpdateRunner {
self.index.delete_segment(segment_entry.segment_id());
}
}
StartMerge(segment_ids) => {
panic!("this should have been handled somewhere else");
}
EndMerge(merging_thread_id_opt, segment_ids, segment_entry) => {
self.end_merge(
segment_ids,
@@ -417,7 +459,7 @@ impl SegmentUpdateRunner {
self.is_cancelled_generation = false;
}
Commit(docstamp) => {
self.segment_manager().commit(docstamp);
self.segment_manager.commit(docstamp);
}
Terminate => {
panic!("We should have left the loop before processing it.");

View File

@@ -50,6 +50,8 @@ extern crate crossbeam;
extern crate bit_set;
extern crate notify;
extern crate eventual;
extern crate futures;
extern crate futures_cpupool;
#[cfg(feature="simdcompression")]
extern crate libc;