From ca977fb17b0072af63cd0a398d9bb87134e93c47 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 30 Jan 2017 23:47:56 +0900 Subject: [PATCH] issue/43 Refactoring of SegmentUpdater --- Cargo.toml | 2 - src/core/term_iterator.rs | 1 + src/indexer/index_writer.rs | 91 +++--- src/indexer/log_merge_policy.rs | 8 +- src/indexer/merger.rs | 6 +- src/indexer/segment_manager.rs | 28 +- src/indexer/segment_register.rs | 9 +- src/indexer/segment_updater.rs | 555 +++++++++++--------------------- src/lib.rs | 6 +- src/postings/mod.rs | 3 +- src/query/phrase_query/mod.rs | 1 + 11 files changed, 250 insertions(+), 460 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6941ae63d..e65a5b608 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,8 +38,6 @@ chan = "0.1" version = "2" crossbeam = "0.2" -eventual = "0.1.7" - futures = "0.1.9" futures-cpupool = "0.1.2" diff --git a/src/core/term_iterator.rs b/src/core/term_iterator.rs index 3a5e259f7..53311e1cd 100644 --- a/src/core/term_iterator.rs +++ b/src/core/term_iterator.rs @@ -170,6 +170,7 @@ mod tests { index_writer.commit().unwrap(); } } + index.load_searchers().unwrap(); let searcher = index.searcher(); let mut term_it = searcher.terms(); let mut terms = String::new(); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 298d1b6d6..e4578c51f 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,17 +1,14 @@ use schema::Schema; use schema::Document; use super::operation::AddOperation; -use indexer::SegmentSerializer; -use core::SerializableSegment; use core::Index; use core::Segment; use core::SegmentId; use schema::Term; use indexer::SegmentEntry; use std::thread::JoinHandle; -use indexer::{MergePolicy, DefaultMergePolicy}; +use indexer::MergePolicy; use indexer::SegmentWriter; -use indexer::SegmentManager; use core::SegmentComponent; use super::directory_lock::DirectoryLock; use futures::Future; @@ -19,16 +16,14 @@ use std::clone::Clone; use std::io; use fastfield::delete; use std::thread; +use futures::Canceled; use std::mem; -use indexer::merger::IndexMerger; use datastruct::stacker::Heap; use std::mem::swap; -use std::sync::{Arc, Mutex}; use chan; use core::SegmentMeta; use super::delete_queue::{DeleteQueue, DeleteQueueCursor}; use super::segment_updater::SegmentUpdater; -use super::segment_manager::CommitState; use Result; use Error; @@ -96,7 +91,7 @@ fn index_documents(heap: &mut Heap, document_iterator: &mut Iterator, segment_updater: &mut SegmentUpdater, delete_cursor: &mut DeleteQueueCursor) - -> Result<()> { + -> Result { heap.clear(); let segment_id = segment.id(); let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment.clone(), &schema)); @@ -133,10 +128,13 @@ fn index_documents(heap: &mut Heap, }; let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); - + try!(segment_writer.finalize()); - segment_updater.add_segment(generation, segment_entry); - Ok(()) + + segment_updater + .add_segment(generation, segment_entry) + .wait() + .map_err(|_| Error::ErrorInThread("Could not add segment.".to_string())) } @@ -145,7 +143,7 @@ impl IndexWriter { /// The index writer pub fn wait_merging_threads(mut self) -> Result<()> { - let future = self.segment_updater.terminate(); + // let future = self.segment_updater.terminate(); // this will stop the indexing thread, // dropping the last reference to the segment_updater. @@ -162,7 +160,12 @@ impl IndexWriter { } drop(self.workers_join_handle); - future.wait().unwrap(); // TODO do something with the result. + self.segment_updater + .wait_merging_thread() + .map_err(|_| + Error::ErrorInThread("Failed to join merging thread.".to_string()) + )?; + // future.wait().unwrap(); // TODO do something with the result. Ok(()) } @@ -201,13 +204,16 @@ impl IndexWriter { // peeked document now belongs to // our local iterator. if document_iterator.peek().is_some() { - try!(index_documents(&mut heap, + let valid_generation = try!(index_documents(&mut heap, segment, &schema, generation, &mut document_iterator, &mut segment_updater, &mut delete_cursor_clone)); + if valid_generation { + return Ok(()); + } } else { // No more documents. // Happens when there is a commit, or if the `IndexWriter` @@ -254,8 +260,7 @@ impl IndexWriter { let delete_queue = DeleteQueue::default(); - let merge_policy = box DefaultMergePolicy::default(); - let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor(), merge_policy)?; + let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor())?; let mut index_writer = IndexWriter { @@ -285,9 +290,14 @@ impl IndexWriter { Ok(index_writer) } + + pub fn get_merge_policy(&self) -> Box { + self.segment_updater.get_merge_policy() + } + /// Set the merge policy. pub fn set_merge_policy(&self, merge_policy: Box) { - // *self._merge_policy.lock().unwrap() = merge_policy; + self.segment_updater.set_merge_policy(merge_policy); } fn start_workers(&mut self) -> Result<()> { @@ -298,7 +308,7 @@ impl IndexWriter { } /// Merges a given list of segments - pub fn merge(&mut self, segments: &[SegmentId]) -> impl Future { + pub fn merge(&mut self, segments: &[SegmentId]) -> impl Future { self.segment_updater.start_merge(segments.to_vec()) } @@ -328,8 +338,11 @@ impl IndexWriter { /// The docstamp at the last commit is returned. pub fn rollback(&mut self) -> Result { - self.segment_updater.cancel_generation(); - + // by updating the generation in the segment updater, + // pending add segment commands will be dismissed. + self.generation += 1; + let rollback_future = self.segment_updater.new_generation(self.generation); + // we cannot drop segment ready receiver yet // as it would block the workers. let document_receiver = self.recreate_document_channel(); @@ -341,7 +354,7 @@ impl IndexWriter { let mut former_workers_join_handle = Vec::new(); swap(&mut former_workers_join_handle, &mut self.workers_join_handle); - + // wait for all the worker to finish their work // (it should be fast since we consumed all pending documents) for worker_handle in former_workers_join_handle { @@ -355,27 +368,15 @@ impl IndexWriter { // All of our indexing workers for the rollbacked generation have // been terminated. + // // Our document receiver pipe was drained. // No new document have been added in the meanwhile because `IndexWriter` // is not shared by different threads. - // - // We can now open a new generation and reaccept segments - // from now on. - self.segment_updater.new_generation(); - - // TODO Send rollback. - //let rollbacked_segments = self.segment_manager.rollback(); + rollback_future.wait().map_err(|_| + Error::ErrorInThread("Error while waiting for rollback.".to_string()) + )?; - // for segment_id in rollbacked_segments { - // // TODO all delete must happen after saving - // // meta.json - // self.index.delete_segment(segment_id); - // } - - panic!("aaaa"); - - // reset the docstamp self.uncommitted_docstamp = self.committed_docstamp; Ok(self.committed_docstamp) @@ -474,12 +475,11 @@ impl IndexWriter { #[cfg(test)] mod tests { - + use indexer::NoMergePolicy; use schema::{self, Document}; use Index; use Term; use Error; - use indexer::NoMergePolicy; #[test] fn test_lockfile_stops_duplicates() { @@ -497,10 +497,10 @@ mod tests { let schema_builder = schema::SchemaBuilder::default(); let index = Index::create_in_ram(schema_builder.build()); let index_writer = index.writer(40_000_000).unwrap(); - // assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }"); - // let merge_policy = box NoMergePolicy::default(); - // index_writer.set_merge_policy(merge_policy); - // assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy"); + assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }"); + let merge_policy = box NoMergePolicy::default(); + index_writer.set_merge_policy(merge_policy); + assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy"); } #[test] @@ -521,7 +521,6 @@ mod tests { let text_field = schema_builder.add_text_field("text", schema::TEXT); let index = Index::create_in_ram(schema_builder.build()); - let num_docs_containing = |s: &str| { let searcher = index.searcher(); let term_a = Term::from_field_text(text_field, s); @@ -550,11 +549,12 @@ mod tests { index_writer.add_document(doc).unwrap(); } assert_eq!(index_writer.commit().unwrap(), 2u64); - + index.load_searchers().unwrap(); assert_eq!(num_docs_containing("a"), 0); assert_eq!(num_docs_containing("b"), 1); assert_eq!(num_docs_containing("c"), 1); } + index.load_searchers().unwrap(); index.searcher(); } @@ -586,6 +586,7 @@ mod tests { // this should create 8 segments and trigger a merge. index_writer.commit().expect("commit failed"); index_writer.wait_merging_threads().expect("waiting merging thread failed"); + index.load_searchers().unwrap(); assert_eq!(num_docs_containing("a"), 200); assert_eq!(index.searchable_segments().unwrap().len(), 1); } diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 5ca049e4c..3eebdf78f 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -48,7 +48,6 @@ impl LogMergePolicy { impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { - if segments.is_empty() { return Vec::new(); } @@ -75,16 +74,15 @@ impl MergePolicy for LogMergePolicy { levels.last_mut().unwrap().push(ind); } - let result = levels.iter() + levels + .iter() .filter(|level| level.len() >= self.min_merge_size) .map(|ind_vec| { MergeCandidate(ind_vec.iter() .map(|&ind| segments[ind].segment_id) .collect()) }) - .collect(); - - result + .collect() } fn box_clone(&self) -> Box { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 28068e880..0f51d0652 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,7 +1,6 @@ use Result; use core::SegmentReader; use core::Segment; -use core::SegmentId; use DocId; use core::SerializableSegment; use indexer::SegmentSerializer; @@ -14,7 +13,6 @@ use fastfield::FastFieldSerializer; use store::StoreWriter; use postings::ChainedPostings; use postings::HasLen; -use futures::Future; use postings::OffsetPostings; use core::SegmentInfo; use std::cmp::{min, max}; @@ -207,7 +205,6 @@ mod tests { use collector::tests::TestCollector; use query::BooleanQuery; use schema::TextIndexingOptions; - use eventual::Async; use futures::Future; #[test] @@ -243,7 +240,7 @@ mod tests { doc.add_u32(score_field, 7); index_writer.add_document(doc).unwrap(); } - index_writer.commit().unwrap(); + index_writer.commit().expect("committed"); } { @@ -272,6 +269,7 @@ mod tests { index_writer.wait_merging_threads().unwrap(); } { + index.load_searchers().unwrap(); let searcher = index.searcher(); let get_doc_ids = |terms: Vec| { let mut collector = TestCollector::default(); diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 61f7b6fc1..20ccfcdfa 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -6,7 +6,6 @@ use indexer::SegmentEntry; use indexer::delete_queue::DeleteQueueCursor; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; use std::fmt::{self, Debug, Formatter}; -use std::sync::atomic::{AtomicUsize, Ordering}; struct SegmentRegisters { docstamp: u64, @@ -14,13 +13,6 @@ struct SegmentRegisters { committed: SegmentRegister, } -#[derive(Eq, PartialEq)] -pub enum CommitState { - Committed, - Uncommitted, - Missing, -} - impl Default for SegmentRegisters { fn default() -> SegmentRegisters { SegmentRegisters { @@ -54,28 +46,14 @@ impl Debug for SegmentManager { /// /// For instance, a segment will not appear in both committed and uncommitted /// segments -pub fn get_segment_ready_for_commit(segment_manager: &SegmentManager,) -> (Vec, Vec) { +pub fn get_segments(segment_manager: &SegmentManager,) -> (Vec, Vec) { let registers_lock = segment_manager.read(); - (registers_lock.committed.get_segment_ready_for_commit(), - registers_lock.uncommitted.get_segment_ready_for_commit()) + (registers_lock.committed.get_segments(), + registers_lock.uncommitted.get_segments()) } impl SegmentManager { - /// Returns whether a segment is committed, uncommitted or missing. - pub fn is_committed(&self, segment_id: SegmentId) -> CommitState { - let lock = self.read(); - if lock.uncommitted.contains(segment_id) { - CommitState::Uncommitted - } - else if lock.committed.contains(segment_id) { - CommitState::Committed - } - else { - CommitState::Missing - } - } - pub fn from_segments(segment_metas: Vec, delete_cursor: DeleteQueueCursor) -> SegmentManager { SegmentManager { registers: RwLock::new( SegmentRegisters { diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index f1dc37d24..8f8f20611 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -89,7 +89,7 @@ impl SegmentRegister { self.segment_states.clear(); } - pub fn get_segment_ready_for_commit(&self,) -> Vec { + pub fn get_segments(&self,) -> Vec { self.segment_states .values() .filter(|segment_entry| segment_entry.is_ready()) @@ -126,12 +126,7 @@ impl SegmentRegister { .get(&segment_id) .map(|segment_entry| segment_entry.clone()) } - - pub fn contains(&self, segment_id: SegmentId) -> bool { - self.segment_states.contains_key(&segment_id) - } - - + pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool { segment_ids .iter() diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 14394d3b4..2622d97a1 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -1,21 +1,25 @@ #![allow(for_kv_map)] -use chan; use core::Index; -use std::sync::Mutex; use core::Segment; +use indexer::{MergePolicy, DefaultMergePolicy}; use core::SegmentId; use core::SegmentMeta; use std::mem; -use futures::Future; +use std::sync::atomic::Ordering; +use std::ops::DerefMut; +use futures::{Future, future}; +use futures::oneshot; +use futures::Canceled; +use std::thread; use std::sync::atomic::AtomicUsize; +use std::sync::RwLock; use core::SerializableSegment; -use indexer::MergePolicy; use indexer::MergeCandidate; use indexer::merger::IndexMerger; +use std::borrow::BorrowMut; use indexer::SegmentSerializer; use indexer::SegmentEntry; -use std::thread; use schema::Schema; use directory::Directory; use std::thread::JoinHandle; @@ -28,8 +32,7 @@ use futures_cpupool::CpuPool; use core::IndexMeta; use core::META_FILEPATH; use std::io::Write; -use eventual::*; -use super::segment_manager::{SegmentManager, get_segment_ready_for_commit}; +use super::segment_manager::{SegmentManager, get_segments}; fn create_metas(segment_manager: &SegmentManager, schema: Schema, docstamp: u64) -> IndexMeta { @@ -84,135 +87,6 @@ pub fn save_metas(segment_manager: &SegmentManager, } -// #[derive(Clone, Debug)] -// pub enum SegmentUpdate { - -// /// New segment added. -// /// Created by the indexing worker thread -// AddSegment(usize, SegmentEntry), - - -// StartMerge(Vec), - -// /// A merge is ended. -// /// Remove the merged segment and record the new -// /// large merged segment. -// EndMerge(Option, Vec, SegmentEntry), - -// /// Happens when rollback is called. -// /// The current generation of segments is cancelled. -// CancelGeneration, - -// /// Starts a new generation... This -// /// happens at the end of Rollback. -// NewGeneration, - -// /// Just dropping the Segment updater object -// /// is safe, but some merge might be happening in -// /// the background and the user may want to wait for these -// /// threads to terminate. -// /// -// /// When receiving the Terminate signal, the segment updater stops -// /// receiving segment updates and just waits for the merging threads -// /// to terminate. -// Terminate, - -// /// Commit marks uncommmitted segments as committed. -// Commit(u64), -// } - -#[derive(Clone)] -pub struct SegmentUpdater(Arc); - - -struct InnerSegmentUpdater { - pool: CpuPool, - segment_manager: SegmentManager, - merge_policy: Box, - merging_thread_id: AtomicUsize, - merging_threads: HashMap, SegmentEntry)> >, -} - -impl SegmentUpdater { - - pub fn new( - index: Index, - delete_cursor: DeleteQueueCursor, - merge_policy: Box) - -> Result - { - let committed_segments = index.committed_segments()?; - let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor); - Ok( - SegmentUpdater(Arc::new(InnerSegmentUpdater { - pool: CpuPool::new(1), - segment_manager: segment_manager, - merge_policy: merge_policy, - merging_thread_id: AtomicUsize::new(0), - merging_threads: HashMap::new(), - })) - ) - } - - pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) { - } - - pub fn commit(&self, committed_docstamp: u64) -> impl Future { - self.0.pool.spawn_fn(|| { - Ok(()) - }) - } - - pub fn start_merge(&self, segment_ids: Vec) -> impl Future { - self.0.pool.spawn_fn(|| { - Ok(()) - }) - } - - pub fn new_generation(&self) { - } - - pub fn cancel_generation(&self) { - } - - pub fn end_merge(&self, - merge_thread_id: Option, - merged_segment_ids: Vec, - resulting_segment_entry: SegmentEntry) { - } - - pub fn terminate(&self) -> impl Future { - self.0.pool.spawn_fn(|| { - Ok(()) - }) - } - -} - - -// impl SegmentUpdater { - -// pub fn create( -// index: Index, -// delete_cursor: DeleteQueueCursor, -// merge_policy: Box) -> Result { -// let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async(); -// let segment_updater = SegmentUpdater { -// channel: segment_update_sender, -// }; -// let committed_segments = index.committed_segments()?; -// let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor); -// SegmentUpdateRunner::new( -// index, -// segment_manager, -// merge_policy, -// segment_updater.clone(), -// segment_update_receiver).start(); -// Ok(segment_updater) -// } - - -// } // The segment update runner is in charge of processing all @@ -220,248 +94,191 @@ impl SegmentUpdater { // // All this processing happens on a single thread // consuming a common queue. -// -// The segment updates producers are : -// - indexing threads are sending new segments -// - merging threads are sending merge operations -// - the index writer sends "terminate" -// pub struct SegmentUpdateRunner { -// index: Index, -// is_cancelled_generation: bool, -// segment_update_receiver: SegmentUpdateReceiver, -// segment_updater: SegmentUpdater, -// segment_manager: SegmentManager, -// merge_policy: Box, -// merging_thread_id: usize, -// merging_threads: HashMap, SegmentEntry)> >, -// } - -// impl SegmentUpdateRunner { - -// fn new(index: Index, -// segment_manager: SegmentManager, -// merge_policy: Box, -// segment_updater: SegmentUpdater, -// segment_update_receiver: SegmentUpdateReceiver) -> SegmentUpdateRunner { -// SegmentUpdateRunner { -// index: index, -// is_cancelled_generation: false, -// segment_updater: segment_updater, -// segment_update_receiver: segment_update_receiver, -// segment_manager: segment_manager, -// merge_policy: merge_policy, -// merging_thread_id: 0, -// merging_threads: HashMap::new(), -// } -// } - -// fn new_merging_thread_id(&mut self,) -> usize { -// self.merging_thread_id += 1; -// self.merging_thread_id -// } - - -// fn end_merge( -// &mut self, -// segment_ids: Vec, -// segment_entry: SegmentEntry) { - -// self.segment_manager.end_merge(&segment_ids, segment_entry); -// save_metas( -// &self.segment_manager, -// self.index.schema(), -// self.index.docstamp(), -// self.index.directory_mut()).expect("Could not save metas."); - -// for segment_id in segment_ids { -// self.index.delete_segment(segment_id); -// } - -// self.index.load_searchers().unwrap(); -// } +#[derive(Clone)] +pub struct SegmentUpdater(Arc); -// fn start_merge(&mut self, segment_ids: Vec, complete_opt: Option>) { - -// let merging_thread_id = self.new_merging_thread_id(); -// self.segment_manager.start_merge(&segment_ids); - -// let index_clone = self.index.clone(); -// let segment_updater_clone = self.segment_updater.clone(); - -// let merge_thread_handle = thread::Builder::new() -// .name(format!("merge_thread_{:?}", merging_thread_id)) -// .spawn(move || { -// info!("Start merge: {:?}", segment_ids); -// let schema = index_clone.schema(); -// let segments: Vec = segment_ids -// .iter() -// .map(|&segment_id| index_clone.segment(segment_id)) -// .collect(); -// // An IndexMerger is like a "view" of our merged segments. -// // TODO unwrap -// let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); -// let mut merged_segment = index_clone.new_segment(); -// // ... we just serialize this index merger in our new segment -// // to merge the two segments. -// let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); -// let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); -// let segment_meta = SegmentMeta { -// segment_id: merged_segment.id(), -// num_docs: num_docs, -// num_deleted_docs: 0u32, -// }; +struct InnerSegmentUpdater { + pool: CpuPool, + index: Index, + segment_manager: SegmentManager, + merge_policy: RwLock>, + merging_thread_id: AtomicUsize, + merging_threads: RwLock>>, + generation: AtomicUsize, +} -// // TODO fix delete cursor -// let delete_queue = DeleteQueue::default(); - -// let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); +impl SegmentUpdater { -// let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone()); -// // segment_updater_clone.send(segment_update.clone()); -// if let Some(complete) = complete_opt { -// complete.complete(()); -// } -// (segment_ids, segment_entry) -// }) -// .expect("Failed to spawn merge thread"); + pub fn new( + index: Index, + delete_cursor: DeleteQueueCursor) + -> Result + { + let committed_segments = index.committed_segments()?; + let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor); + Ok( + SegmentUpdater(Arc::new(InnerSegmentUpdater { + pool: CpuPool::new(1), + index: index, + segment_manager: segment_manager, + merge_policy: RwLock::new(box DefaultMergePolicy::default()), + merging_thread_id: AtomicUsize::default(), + merging_threads: RwLock::new(HashMap::new()), + generation: AtomicUsize::default(), + })) + ) + } + + pub fn get_merge_policy(&self) -> Box { + self.0.merge_policy.read().unwrap().box_clone() + } + + pub fn set_merge_policy(&self, merge_policy: Box) { + *self.0.merge_policy.write().unwrap()= merge_policy; + } + + fn get_merging_thread_id(&self) -> usize { + self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst) + } + + + fn run_async T>(&self, f: F) -> impl Future { + let me_clone = self.clone(); + self.0.pool.spawn_fn(move || { + Ok(f(me_clone)) + }) + } + + pub fn new_generation(&mut self, generation: usize) -> impl Future { + self.0.generation.store(generation, Ordering::Release); + self.run_async(|segment_updater| { + segment_updater.0.segment_manager.rollback(); + }) + } + + pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future { + if generation >= self.0.generation.load(Ordering::Acquire) { + future::Either::A(self.run_async(|segment_updater| { + segment_updater.0.segment_manager.add_segment(segment_entry); + segment_updater.consider_merge_options(); + true + })) + } + else { + future::Either::B(future::ok(false)) + } + } + + pub fn commit(&self, opstamp: u64) -> impl Future { + self.run_async(move |segment_updater| { + segment_updater.0.segment_manager.commit(opstamp); + let mut directory = segment_updater.0.index.directory().box_clone(); + save_metas( + &segment_updater.0.segment_manager, + segment_updater.0.index.schema(), + segment_updater.0.index.docstamp(), + directory.borrow_mut()).expect("Could not save metas."); + segment_updater.consider_merge_options(); + }) + } + + + pub fn start_merge(&self, segment_ids: Vec) -> impl Future { -// self.merging_threads.insert(merging_thread_id, merge_thread_handle); -// } + self.0.segment_manager.start_merge(&segment_ids); + let segment_updater_clone = self.clone(); -// fn start_merges(&mut self) { -// let merge_candidates = self.consider_merge_options(); -// for MergeCandidate(segment_ids) in merge_candidates { -// self.start_merge(segment_ids, None); -// } -// } - -// fn consider_merge_options(&self,) -> Vec { -// let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&self.segment_manager); -// // Committed segments cannot be merged with uncommitted_segments. -// // We therefore consider merges using these two sets of segments independantly. -// let mut merge_candidates = self.merge_policy.compute_merge_candidates(&uncommitted_segments); -// let committed_merge_candidates = self.merge_policy.compute_merge_candidates(&committed_segments); -// merge_candidates.extend_from_slice(&committed_merge_candidates[..]); -// merge_candidates -// } - -// pub fn start(self) -> JoinHandle<()> { -// thread::Builder::new() -// .name("segment_update".to_string()) -// .spawn(move || { -// self.process(); -// }) -// .expect("Failed to start segment updater thread.") -// } - -// fn process(mut self) { - -// let mut complete_option = None; - -// for (complete, segment_update) in self.segment_update_receiver.clone() { + let merging_thread_id = self.get_merging_thread_id(); + let (merging_future_send, merging_future_recv) = oneshot(); + let merging_join_handle = thread::spawn(move || { -// if let SegmentUpdate::Terminate = segment_update { -// complete_option = Some(complete); -// break; -// } + info!("Start merge: {:?}", segment_ids); -// if let SegmentUpdate::StartMerge(segment_ids) = segment_update { -// self.start_merge(segment_ids, Some(complete)); -// } -// else { -// self.process_one(segment_update); - -// // - start merges if required -// self.start_merges(); -// complete.complete(()); -// } -// } - -// let mut merging_threads = HashMap::new(); -// mem::swap(&mut merging_threads, &mut self.merging_threads); -// for (_, merging_thread_handle) in merging_threads { -// match merging_thread_handle.join() { -// Ok((segment_ids, segment_entry)) => { -// self.end_merge(segment_ids, segment_entry); -// } -// Err(e) => { -// error!("Error in merging thread {:?}", e); -// break; -// } -// } -// } + let ref index = segment_updater_clone.0.index; + let schema = index.schema(); + let segments: Vec = segment_ids + .iter() + .map(|&segment_id| index.segment(segment_id)) + .collect(); + + // An IndexMerger is like a "view" of our merged segments. + // TODO unwrap + let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); + let mut merged_segment = index.new_segment(); + + // ... we just serialize this index merger in our new segment + // to merge the two segments. + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); + let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); + let segment_meta = SegmentMeta { + segment_id: merged_segment.id(), + num_docs: num_docs, + num_deleted_docs: 0u32, + }; -// if let Some(complete) = complete_option { -// complete.complete(()); -// } -// } + // TODO fix delete cursor + let delete_queue = DeleteQueue::default(); + + let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); + segment_updater_clone + .end_merge(segment_ids.clone(), segment_entry.clone()) + .wait() + .unwrap(); + merging_future_send.complete(segment_entry.clone()); + segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id); + segment_entry + }); + self.0.merging_threads.write().unwrap().insert(merging_thread_id, merging_join_handle); + merging_future_recv + } + + + fn consider_merge_options(&self) { + let (committed_segments, uncommitted_segments) = get_segments(&self.0.segment_manager); + // Committed segments cannot be merged with uncommitted_segments. + // We therefore consider merges using these two sets of segments independently. + let merge_policy = self.get_merge_policy(); + let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments); + let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments); + merge_candidates.extend_from_slice(&committed_merge_candidates[..]); + for MergeCandidate(segment_ids) in merge_candidates { + self.start_merge(segment_ids); + } + } + fn end_merge(&self, + merged_segment_ids: Vec, + resulting_segment_entry: SegmentEntry) -> impl Future { + + self.run_async(move |segment_updater| { + segment_updater.0.segment_manager.end_merge(&merged_segment_ids, resulting_segment_entry); + let mut directory = segment_updater.0.index.directory().box_clone(); + save_metas( + &segment_updater.0.segment_manager, + segment_updater.0.index.schema(), + segment_updater.0.index.docstamp(), + directory.borrow_mut()).expect("Could not save metas."); + for segment_id in merged_segment_ids { + segment_updater.0.index.delete_segment(segment_id); + } + }) + + } - -// // Process a single segment update. -// pub fn process_one( -// &mut self, -// segment_update: SegmentUpdate) { - -// info!("Segment update: {:?}", segment_update); - -// use self::SegmentUpdate::*; -// match segment_update { -// AddSegment(generation, segment_entry) => { -// if !self.is_cancelled_generation { -// self.segment_manager.add_segment(segment_entry); -// } -// else { -// // rollback has been called and this -// // segment actually belong to the -// // documents that have been dropped. -// // -// // Let's just remove its files. -// self.index.delete_segment(segment_entry.segment_id()); -// } -// } -// StartMerge(segment_ids) => { -// panic!("this should have been handled somewhere else"); -// } -// EndMerge(merging_thread_id_opt, segment_ids, segment_entry) => { -// self.end_merge( -// segment_ids, -// segment_entry); -// if let Some(merging_thread_id) = merging_thread_id_opt { -// self.merging_threads.remove(&merging_thread_id); -// } -// } -// CancelGeneration => { -// // Called during rollback. The segment -// // that will arrive will be ignored -// // until a NewGeneration is update arrives. -// self.is_cancelled_generation = true; -// } -// NewGeneration => { -// // After rollback, we can resume -// // indexing new documents. -// self.is_cancelled_generation = false; -// } -// Commit(docstamp) => { -// self.segment_manager.commit(docstamp); -// save_metas( -// &self.segment_manager, -// self.index.schema(), -// self.index.docstamp(), -// self.index.directory_mut()).expect("Could not save metas."); -// match self.index.load_searchers() { -// Ok(()) => {} -// Err(e) => { -// error!("Failure while loading new searchers {:?}", e); -// panic!(format!("Failure while loading new searchers {:?}", e)); -// } -// } -// } -// Terminate => { -// panic!("We should have left the loop before processing it."); -// } -// } -// } -// } + pub fn wait_merging_thread(&self) -> thread::Result<()> { + let mut new_merging_threads = HashMap::new(); + { + let mut merging_threads = self.0.merging_threads.write().unwrap(); + mem::swap(&mut new_merging_threads, merging_threads.deref_mut()); + } + for (_, merging_thread_handle) in new_merging_threads { + merging_thread_handle + .join() + .map(|_| ())? + } + Ok(()) + } + +} diff --git a/src/lib.rs b/src/lib.rs index ed0d726fa..96d65e639 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,7 +49,6 @@ extern crate chan; extern crate crossbeam; extern crate bit_set; extern crate notify; -extern crate eventual; extern crate futures; extern crate futures_cpupool; @@ -245,6 +244,7 @@ mod tests { index_writer.commit().unwrap(); } { + index.load_searchers().unwrap(); let searcher = index.searcher(); let term_a = Term::from_field_text(text_field, "a"); assert_eq!(searcher.doc_freq(&term_a), 3); @@ -280,7 +280,7 @@ mod tests { index_writer.commit().unwrap(); } { - + index.load_searchers().unwrap(); let searcher = index.searcher(); let segment_reader: &SegmentReader = searcher.segment_reader(0); let fieldnorms_reader = segment_reader.get_fieldnorms_reader(text_field).unwrap(); @@ -306,6 +306,7 @@ mod tests { index_writer.commit().unwrap(); } { + index.load_searchers().unwrap(); let searcher = index.searcher(); let reader = searcher.segment_reader(0); assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none()); @@ -342,6 +343,7 @@ mod tests { index_writer.commit().unwrap(); } { + index.load_searchers().unwrap(); let searcher = index.searcher(); let get_doc_ids = |terms: Vec| { let query = BooleanQuery::new_multiterms_query(terms); diff --git a/src/postings/mod.rs b/src/postings/mod.rs index ca70512b8..f9898b9fc 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -189,6 +189,7 @@ mod tests { } assert!(index_writer.commit().is_ok()); } + index.load_searchers().unwrap(); let term_query = TermQuery::new(Term::from_field_text(text_field, "a"), SegmentPostingsOption::NoFreq); let searcher = index.searcher(); let mut term_weight = term_query.specialized_weight(&*searcher); @@ -256,6 +257,7 @@ mod tests { } assert!(index_writer.commit().is_ok()); } + index.load_searchers().unwrap(); index }; } @@ -275,7 +277,6 @@ mod tests { fn bench_segment_intersection(b: &mut Bencher) { let searcher = INDEX.searcher(); let segment_reader = searcher.segment_reader(0); - b.iter(|| { let segment_postings_a = segment_reader.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap(); let segment_postings_b = segment_reader.read_postings(&*TERM_B, SegmentPostingsOption::NoFreq).unwrap(); diff --git a/src/query/phrase_query/mod.rs b/src/query/phrase_query/mod.rs index 6983ff65e..e01743eb3 100644 --- a/src/query/phrase_query/mod.rs +++ b/src/query/phrase_query/mod.rs @@ -48,6 +48,7 @@ mod tests { assert!(index_writer.commit().is_ok()); } + index.load_searchers().unwrap(); let searcher = index.searcher(); let test_query = |texts: Vec<&str>| { let mut test_collector = TestCollector::default();