From e8ecb68f00eacc10023cfb0efc0a1802878bce73 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 26 Jan 2017 00:06:07 +0900 Subject: [PATCH] issue/43 switching for futures --- Cargo.toml | 1 + src/indexer/index_writer.rs | 51 ++- src/indexer/merge_policy.rs | 2 +- src/indexer/merger.rs | 4 +- src/indexer/segment_manager.rs | 13 - src/indexer/segment_updater.rs | 622 ++++++++++++++++----------------- 6 files changed, 340 insertions(+), 353 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 998d5b48c..6941ae63d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ version = "2" crossbeam = "0.2" eventual = "0.1.7" + futures = "0.1.9" futures-cpupool = "0.1.2" diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 4a4377e74..298d1b6d6 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -14,7 +14,7 @@ use indexer::SegmentWriter; use indexer::SegmentManager; use core::SegmentComponent; use super::directory_lock::DirectoryLock; -use eventual::Async; +use futures::Future; use std::clone::Clone; use std::io; use fastfield::delete; @@ -27,7 +27,7 @@ use std::sync::{Arc, Mutex}; use chan; use core::SegmentMeta; use super::delete_queue::{DeleteQueue, DeleteQueueCursor}; -use super::segment_updater::{SegmentUpdate, SegmentUpdater}; +use super::segment_updater::SegmentUpdater; use super::segment_manager::CommitState; use Result; use Error; @@ -61,8 +61,6 @@ pub struct IndexWriter { // lifetime of the lock with that of the IndexWriter. _directory_lock: DirectoryLock, - _merge_policy: Arc>>, - index: Index, heap_size_in_bytes_per_thread: usize, @@ -78,6 +76,8 @@ pub struct IndexWriter { num_threads: usize, + generation: usize, + delete_queue: DeleteQueue, uncommitted_docstamp: u64, @@ -92,6 +92,7 @@ impl !Sync for IndexWriter {} fn index_documents(heap: &mut Heap, mut segment: Segment, schema: &Schema, + generation: usize, document_iterator: &mut Iterator, segment_updater: &mut SegmentUpdater, delete_cursor: &mut DeleteQueueCursor) @@ -134,7 +135,7 @@ fn index_documents(heap: &mut Heap, let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); try!(segment_writer.finalize()); - segment_updater.add_segment(segment_entry); + segment_updater.add_segment(generation, segment_entry); Ok(()) } @@ -161,7 +162,7 @@ impl IndexWriter { } drop(self.workers_join_handle); - future.await().unwrap(); // TODO do something with the result. + future.wait().unwrap(); // TODO do something with the result. Ok(()) } @@ -179,9 +180,12 @@ impl IndexWriter { // at this point. let delete_cursor = self.delete_queue.cursor(); + let generation = self.generation; + let join_handle: JoinHandle> = try!(thread::Builder::new() - .name(format!("indexing_thread_{}", self.worker_id)) + .name(format!("indexing thread {} for gen {}", self.worker_id, generation)) .spawn(move || { + let mut delete_cursor_clone = delete_cursor.clone(); loop { let segment = index.new_segment(); @@ -200,6 +204,7 @@ impl IndexWriter { try!(index_documents(&mut heap, segment, &schema, + generation, &mut document_iterator, &mut segment_updater, &mut delete_cursor_clone)); @@ -245,20 +250,17 @@ impl IndexWriter { let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - - let merge_policy: Arc>> = Arc::new(Mutex::new(box DefaultMergePolicy::default())); + let delete_queue = DeleteQueue::default(); - - let segment_updater = SegmentUpdater::create(index.clone(), delete_queue.cursor(), merge_policy.clone())?; + let merge_policy = box DefaultMergePolicy::default(); + let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor(), merge_policy)?; let mut index_writer = IndexWriter { _directory_lock: directory_lock, - _merge_policy: merge_policy, - heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, index: index.clone(), @@ -274,21 +276,18 @@ impl IndexWriter { committed_docstamp: index.docstamp(), uncommitted_docstamp: index.docstamp(), + + generation: 0, + worker_id: 0, }; try!(index_writer.start_workers()); Ok(index_writer) } - - /// Returns a clone of the index_writer merge policy. - pub fn get_merge_policy(&self) -> Box { - self._merge_policy.lock().unwrap().box_clone() - } - /// Set the merge policy. pub fn set_merge_policy(&self, merge_policy: Box) { - *self._merge_policy.lock().unwrap() = merge_policy; + // *self._merge_policy.lock().unwrap() = merge_policy; } fn start_workers(&mut self) -> Result<()> { @@ -299,7 +298,7 @@ impl IndexWriter { } /// Merges a given list of segments - pub fn merge(&mut self, segments: &[SegmentId]) -> impl Async { + pub fn merge(&mut self, segments: &[SegmentId]) -> impl Future { self.segment_updater.start_merge(segments.to_vec()) } @@ -431,7 +430,7 @@ impl IndexWriter { // wait for the segment update thread to have processed the info // TODO remove unwrap - future.await().unwrap(); + future.wait().unwrap(); Ok(self.committed_docstamp) } @@ -498,10 +497,10 @@ mod tests { let schema_builder = schema::SchemaBuilder::default(); let index = Index::create_in_ram(schema_builder.build()); let index_writer = index.writer(40_000_000).unwrap(); - assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }"); - let merge_policy = box NoMergePolicy::default(); - index_writer.set_merge_policy(merge_policy); - assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy"); + // assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }"); + // let merge_policy = box NoMergePolicy::default(); + // index_writer.set_merge_policy(merge_policy); + // assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy"); } #[test] diff --git a/src/indexer/merge_policy.rs b/src/indexer/merge_policy.rs index 22a767042..ae1064355 100644 --- a/src/indexer/merge_policy.rs +++ b/src/indexer/merge_policy.rs @@ -13,7 +13,7 @@ pub struct MergeCandidate(pub Vec); /// /// Every time a the list of segments changes, the segment updater /// asks the merge policy if some segments should be merged. -pub trait MergePolicy: marker::Send + Debug { +pub trait MergePolicy: marker::Send + marker::Sync + Debug { /// Given the list of segment metas, returns the list of merge candidates. /// /// This call happens on the segment updater thread, and will block diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index a41f211f0..28068e880 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -14,6 +14,7 @@ use fastfield::FastFieldSerializer; use store::StoreWriter; use postings::ChainedPostings; use postings::HasLen; +use futures::Future; use postings::OffsetPostings; use core::SegmentInfo; use std::cmp::{min, max}; @@ -207,6 +208,7 @@ mod tests { use query::BooleanQuery; use schema::TextIndexingOptions; use eventual::Async; + use futures::Future; #[test] fn test_index_merger() { @@ -265,7 +267,7 @@ mod tests { let segment_ids = index.searchable_segment_ids().expect("Searchable segments failed."); let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); index_writer.merge(&segment_ids) - .await() + .wait() .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 6fcdb253d..61f7b6fc1 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -39,12 +39,6 @@ impl Default for SegmentRegisters { /// changes (merges especially) pub struct SegmentManager { registers: RwLock, - // generation is an ever increasing counter that - // is incremented whenever we modify - // the segment manager. It can be useful for debugging - // purposes, and it also acts as a "dirty" marker, - // to detect when the `meta.json` should be written. - generation: AtomicUsize, } impl Debug for SegmentManager { @@ -89,7 +83,6 @@ impl SegmentManager { uncommitted: SegmentRegister::default(), committed: SegmentRegister::new(segment_metas, delete_cursor), }), - generation: AtomicUsize::default(), } } @@ -101,14 +94,9 @@ impl SegmentManager { } fn write(&self,) -> RwLockWriteGuard { - self.generation.fetch_add(1, Ordering::Release); self.registers.write().expect("Failed to acquire write lock on SegmentManager.") } - pub fn generation(&self,) -> usize { - self.generation.load(Ordering::Acquire) - } - /// Removes all of the uncommitted segments /// and returns them. pub fn rollback(&self,) -> Vec { @@ -180,7 +168,6 @@ impl Default for SegmentManager { uncommitted: SegmentRegister::default(), committed: SegmentRegister::default(), }), - generation: AtomicUsize::default(), } } } \ No newline at end of file diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 53ab821c6..14394d3b4 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -7,6 +7,8 @@ use core::Segment; use core::SegmentId; use core::SegmentMeta; use std::mem; +use futures::Future; +use std::sync::atomic::AtomicUsize; use core::SerializableSegment; use indexer::MergePolicy; use indexer::MergeCandidate; @@ -22,15 +24,13 @@ use std::collections::HashMap; use rustc_serialize::json; use indexer::delete_queue::{DeleteQueueCursor, DeleteQueue}; use Result; +use futures_cpupool::CpuPool; use core::IndexMeta; use core::META_FILEPATH; use std::io::Write; use eventual::*; use super::segment_manager::{SegmentManager, get_segment_ready_for_commit}; -type SegmentUpdateSender = chan::Sender<(Complete<(), &'static str>, SegmentUpdate)>; -type SegmentUpdateReceiver = chan::Receiver<(Complete<(), &'static str>, SegmentUpdate)>; - fn create_metas(segment_manager: &SegmentManager, schema: Schema, docstamp: u64) -> IndexMeta { let (committed_segments, uncommitted_segments) = segment_manager.segment_metas(); @@ -84,386 +84,384 @@ pub fn save_metas(segment_manager: &SegmentManager, } -#[derive(Clone, Debug)] -pub enum SegmentUpdate { +// #[derive(Clone, Debug)] +// pub enum SegmentUpdate { - /// New segment added. - /// Created by the indexing worker thread - AddSegment(SegmentEntry), +// /// New segment added. +// /// Created by the indexing worker thread +// AddSegment(usize, SegmentEntry), - StartMerge(Vec), +// StartMerge(Vec), - /// A merge is ended. - /// Remove the merged segment and record the new - /// large merged segment. - EndMerge(Option, Vec, SegmentEntry), +// /// A merge is ended. +// /// Remove the merged segment and record the new +// /// large merged segment. +// EndMerge(Option, Vec, SegmentEntry), - /// Happens when rollback is called. - /// The current generation of segments is cancelled. - CancelGeneration, +// /// Happens when rollback is called. +// /// The current generation of segments is cancelled. +// CancelGeneration, - /// Starts a new generation... This - /// happens at the end of Rollback. - NewGeneration, +// /// Starts a new generation... This +// /// happens at the end of Rollback. +// NewGeneration, - /// Just dropping the Segment updater object - /// is safe, but some merge might be happening in - /// the background and the user may want to wait for these - /// threads to terminate. - /// - /// When receiving the Terminate signal, the segment updater stops - /// receiving segment updates and just waits for the merging threads - /// to terminate. - Terminate, +// /// Just dropping the Segment updater object +// /// is safe, but some merge might be happening in +// /// the background and the user may want to wait for these +// /// threads to terminate. +// /// +// /// When receiving the Terminate signal, the segment updater stops +// /// receiving segment updates and just waits for the merging threads +// /// to terminate. +// Terminate, - /// Commit marks uncommmitted segments as committed. - Commit(u64), -} +// /// Commit marks uncommmitted segments as committed. +// Commit(u64), +// } - -// TODO Rename #[derive(Clone)] -pub struct SegmentUpdater { - channel: SegmentUpdateSender, -} +pub struct SegmentUpdater(Arc); +struct InnerSegmentUpdater { + pool: CpuPool, + segment_manager: SegmentManager, + merge_policy: Box, + merging_thread_id: AtomicUsize, + merging_threads: HashMap, SegmentEntry)> >, +} + impl SegmentUpdater { - pub fn create( + pub fn new( index: Index, delete_cursor: DeleteQueueCursor, - merge_policy: Arc>>) -> Result { - let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async(); - let segment_updater = SegmentUpdater { - channel: segment_update_sender, - }; + merge_policy: Box) + -> Result + { let committed_segments = index.committed_segments()?; let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor); - SegmentUpdateRunner::new( - index, - segment_manager, - merge_policy, - segment_updater.clone(), - segment_update_receiver).start(); - Ok(segment_updater) + Ok( + SegmentUpdater(Arc::new(InnerSegmentUpdater { + pool: CpuPool::new(1), + segment_manager: segment_manager, + merge_policy: merge_policy, + merging_thread_id: AtomicUsize::new(0), + merging_threads: HashMap::new(), + })) + ) } - pub fn add_segment(&self, segment_entry: SegmentEntry) -> impl Async { - self.send(SegmentUpdate::AddSegment(segment_entry)) + pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) { } - pub fn commit(&self, committed_docstamp: u64) -> impl Async { - self.send(SegmentUpdate::Commit(committed_docstamp)) + pub fn commit(&self, committed_docstamp: u64) -> impl Future { + self.0.pool.spawn_fn(|| { + Ok(()) + }) } - pub fn start_merge(&self, segment_ids: Vec) -> impl Async { - self.send(SegmentUpdate::StartMerge(segment_ids)) + pub fn start_merge(&self, segment_ids: Vec) -> impl Future { + self.0.pool.spawn_fn(|| { + Ok(()) + }) } - pub fn new_generation(&self) -> impl Async { - self.send(SegmentUpdate::NewGeneration) + pub fn new_generation(&self) { } - pub fn cancel_generation(&self) -> impl Async { - self.send(SegmentUpdate::CancelGeneration) + pub fn cancel_generation(&self) { } - pub fn end_merge(&self, merge_thread_id: Option, merged_segment_ids: Vec, - resulting_segment_entry: SegmentEntry) -> impl Async { - let segment_update = SegmentUpdate::EndMerge(merge_thread_id, merged_segment_ids, resulting_segment_entry); - self.send(segment_update) + resulting_segment_entry: SegmentEntry) { } - pub fn terminate(&self) -> impl Async { - self.send(SegmentUpdate::Terminate) - } - - pub fn send(&self, segment_update: SegmentUpdate) -> impl Async { - let (fullfiller, future) = Future::<(), &'static str>::pair(); - self.channel.send((fullfiller, segment_update)); - future + pub fn terminate(&self) -> impl Future { + self.0.pool.spawn_fn(|| { + Ok(()) + }) } } -/// The segment update runner is in charge of processing all -/// of the `SegmentUpdate`s. -/// -/// All this processing happens on a single thread -/// consuming a common queue. -/// -/// The segment updates producers are : -/// - indexing threads are sending new segments -/// - merging threads are sending merge operations -/// - the index writer sends "terminate" -pub struct SegmentUpdateRunner { - index: Index, - is_cancelled_generation: bool, - segment_update_receiver: SegmentUpdateReceiver, - segment_updater: SegmentUpdater, - segment_manager: SegmentManager, - merge_policy: Arc>>, - merging_thread_id: usize, - merging_threads: HashMap, SegmentEntry)> >, -} +// impl SegmentUpdater { -impl SegmentUpdateRunner { +// pub fn create( +// index: Index, +// delete_cursor: DeleteQueueCursor, +// merge_policy: Box) -> Result { +// let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async(); +// let segment_updater = SegmentUpdater { +// channel: segment_update_sender, +// }; +// let committed_segments = index.committed_segments()?; +// let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor); +// SegmentUpdateRunner::new( +// index, +// segment_manager, +// merge_policy, +// segment_updater.clone(), +// segment_update_receiver).start(); +// Ok(segment_updater) +// } + + +// } + + +// The segment update runner is in charge of processing all +// of the `SegmentUpdate`s. +// +// All this processing happens on a single thread +// consuming a common queue. +// +// The segment updates producers are : +// - indexing threads are sending new segments +// - merging threads are sending merge operations +// - the index writer sends "terminate" +// pub struct SegmentUpdateRunner { +// index: Index, +// is_cancelled_generation: bool, +// segment_update_receiver: SegmentUpdateReceiver, +// segment_updater: SegmentUpdater, +// segment_manager: SegmentManager, +// merge_policy: Box, +// merging_thread_id: usize, +// merging_threads: HashMap, SegmentEntry)> >, +// } + +// impl SegmentUpdateRunner { - fn new(index: Index, - segment_manager: SegmentManager, - merge_policy: Arc>>, - segment_updater: SegmentUpdater, - segment_update_receiver: SegmentUpdateReceiver) -> SegmentUpdateRunner { - SegmentUpdateRunner { - index: index, - is_cancelled_generation: false, - segment_updater: segment_updater, - segment_update_receiver: segment_update_receiver, - segment_manager: segment_manager, - merge_policy: merge_policy, - merging_thread_id: 0, - merging_threads: HashMap::new(), - } - } +// fn new(index: Index, +// segment_manager: SegmentManager, +// merge_policy: Box, +// segment_updater: SegmentUpdater, +// segment_update_receiver: SegmentUpdateReceiver) -> SegmentUpdateRunner { +// SegmentUpdateRunner { +// index: index, +// is_cancelled_generation: false, +// segment_updater: segment_updater, +// segment_update_receiver: segment_update_receiver, +// segment_manager: segment_manager, +// merge_policy: merge_policy, +// merging_thread_id: 0, +// merging_threads: HashMap::new(), +// } +// } - fn new_merging_thread_id(&mut self,) -> usize { - self.merging_thread_id += 1; - self.merging_thread_id - } +// fn new_merging_thread_id(&mut self,) -> usize { +// self.merging_thread_id += 1; +// self.merging_thread_id +// } - fn end_merge( - &mut self, - segment_ids: Vec, - segment_entry: SegmentEntry) { +// fn end_merge( +// &mut self, +// segment_ids: Vec, +// segment_entry: SegmentEntry) { - self.segment_manager.end_merge(&segment_ids, segment_entry); - save_metas( - &self.segment_manager, - self.index.schema(), - self.index.docstamp(), - self.index.directory_mut()).expect("Could not save metas."); +// self.segment_manager.end_merge(&segment_ids, segment_entry); +// save_metas( +// &self.segment_manager, +// self.index.schema(), +// self.index.docstamp(), +// self.index.directory_mut()).expect("Could not save metas."); - for segment_id in segment_ids { - self.index.delete_segment(segment_id); - } +// for segment_id in segment_ids { +// self.index.delete_segment(segment_id); +// } - self.index.load_searchers().unwrap(); - } +// self.index.load_searchers().unwrap(); +// } - fn start_merge(&mut self, segment_ids: Vec, complete_opt: Option>) { +// fn start_merge(&mut self, segment_ids: Vec, complete_opt: Option>) { - let merging_thread_id = self.new_merging_thread_id(); - self.segment_manager.start_merge(&segment_ids); +// let merging_thread_id = self.new_merging_thread_id(); +// self.segment_manager.start_merge(&segment_ids); - let index_clone = self.index.clone(); - let segment_updater_clone = self.segment_updater.clone(); +// let index_clone = self.index.clone(); +// let segment_updater_clone = self.segment_updater.clone(); - let merge_thread_handle = thread::Builder::new() - .name(format!("merge_thread_{:?}", merging_thread_id)) - .spawn(move || { - info!("Start merge: {:?}", segment_ids); - let schema = index_clone.schema(); - let segments: Vec = segment_ids - .iter() - .map(|&segment_id| index_clone.segment(segment_id)) - .collect(); - // An IndexMerger is like a "view" of our merged segments. - // TODO unwrap - let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); - let mut merged_segment = index_clone.new_segment(); - // ... we just serialize this index merger in our new segment - // to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); - let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); - let segment_meta = SegmentMeta { - segment_id: merged_segment.id(), - num_docs: num_docs, - num_deleted_docs: 0u32, - }; +// let merge_thread_handle = thread::Builder::new() +// .name(format!("merge_thread_{:?}", merging_thread_id)) +// .spawn(move || { +// info!("Start merge: {:?}", segment_ids); +// let schema = index_clone.schema(); +// let segments: Vec = segment_ids +// .iter() +// .map(|&segment_id| index_clone.segment(segment_id)) +// .collect(); +// // An IndexMerger is like a "view" of our merged segments. +// // TODO unwrap +// let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); +// let mut merged_segment = index_clone.new_segment(); +// // ... we just serialize this index merger in our new segment +// // to merge the two segments. +// let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed"); +// let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed"); +// let segment_meta = SegmentMeta { +// segment_id: merged_segment.id(), +// num_docs: num_docs, +// num_deleted_docs: 0u32, +// }; - // TODO fix delete cursor - let delete_queue = DeleteQueue::default(); +// // TODO fix delete cursor +// let delete_queue = DeleteQueue::default(); - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); +// let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); - let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone()); - segment_updater_clone.send(segment_update.clone()); - if let Some(complete) = complete_opt { - complete.complete(()); - } - (segment_ids, segment_entry) - }) - .expect("Failed to spawn merge thread"); +// let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone()); +// // segment_updater_clone.send(segment_update.clone()); +// if let Some(complete) = complete_opt { +// complete.complete(()); +// } +// (segment_ids, segment_entry) +// }) +// .expect("Failed to spawn merge thread"); - self.merging_threads.insert(merging_thread_id, merge_thread_handle); - } +// self.merging_threads.insert(merging_thread_id, merge_thread_handle); +// } - fn start_merges(&mut self) { - let merge_candidates = self.consider_merge_options(); - for MergeCandidate(segment_ids) in merge_candidates { - self.start_merge(segment_ids, None); - } - } +// fn start_merges(&mut self) { +// let merge_candidates = self.consider_merge_options(); +// for MergeCandidate(segment_ids) in merge_candidates { +// self.start_merge(segment_ids, None); +// } +// } - fn consider_merge_options(&self,) -> Vec { - let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&self.segment_manager); - // Committed segments cannot be merged with uncommitted_segments. - // We therefore consider merges using these two sets of segments independantly. - let merge_policy_lock = self.merge_policy.lock().unwrap(); - let mut merge_candidates = merge_policy_lock.compute_merge_candidates(&uncommitted_segments); - let committed_merge_candidates = merge_policy_lock.compute_merge_candidates(&committed_segments); - merge_candidates.extend_from_slice(&committed_merge_candidates[..]); - merge_candidates - } +// fn consider_merge_options(&self,) -> Vec { +// let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&self.segment_manager); +// // Committed segments cannot be merged with uncommitted_segments. +// // We therefore consider merges using these two sets of segments independantly. +// let mut merge_candidates = self.merge_policy.compute_merge_candidates(&uncommitted_segments); +// let committed_merge_candidates = self.merge_policy.compute_merge_candidates(&committed_segments); +// merge_candidates.extend_from_slice(&committed_merge_candidates[..]); +// merge_candidates +// } - pub fn start(self) -> JoinHandle<()> { - thread::Builder::new() - .name("segment_update".to_string()) - .spawn(move || { - self.process(); - }) - .expect("Failed to start segment updater thread.") - } +// pub fn start(self) -> JoinHandle<()> { +// thread::Builder::new() +// .name("segment_update".to_string()) +// .spawn(move || { +// self.process(); +// }) +// .expect("Failed to start segment updater thread.") +// } - fn process(mut self) { +// fn process(mut self) { - let mut complete_option = None; +// let mut complete_option = None; - for (complete, segment_update) in self.segment_update_receiver.clone() { +// for (complete, segment_update) in self.segment_update_receiver.clone() { - if let SegmentUpdate::Terminate = segment_update { - complete_option = Some(complete); - break; - } +// if let SegmentUpdate::Terminate = segment_update { +// complete_option = Some(complete); +// break; +// } - - // we check the generation number as if it was - // dirty-bit. If the value is different - // to our generation, then the segment_manager has - // been update updated. - let generation_before_update = self.segment_manager.generation(); - - if let SegmentUpdate::StartMerge(segment_ids) = segment_update { - self.start_merge(segment_ids, Some(complete)); - } - else { - self.process_one(segment_update); - if generation_before_update != self.segment_manager.generation() { - // The segment manager has changed, we need to - // - save meta.json - save_metas( - &self.segment_manager, - self.index.schema(), - self.index.docstamp(), - self.index.directory_mut()).expect("Could not save metas."); - - - // - update the searchers - - // update the searchers so that they eventually will - // use the new segments. - // TODO eventually have this work through watching meta.json - // so that an external process stays up to date as well. - match self.index.load_searchers() { - Ok(()) => {} - Err(e) => { - error!("Failure while loading new searchers {:?}", e); - panic!(format!("Failure while loading new searchers {:?}", e)); - } - } +// if let SegmentUpdate::StartMerge(segment_ids) = segment_update { +// self.start_merge(segment_ids, Some(complete)); +// } +// else { +// self.process_one(segment_update); - // - start merges if required - self.start_merges(); - } - complete.complete(()); - } - - - - - - } +// // - start merges if required +// self.start_merges(); +// complete.complete(()); +// } +// } - let mut merging_threads = HashMap::new(); - mem::swap(&mut merging_threads, &mut self.merging_threads); - for (_, merging_thread_handle) in merging_threads { - match merging_thread_handle.join() { - Ok((segment_ids, segment_entry)) => { - self.end_merge(segment_ids, segment_entry); - } - Err(e) => { - error!("Error in merging thread {:?}", e); - break; - } - } - } +// let mut merging_threads = HashMap::new(); +// mem::swap(&mut merging_threads, &mut self.merging_threads); +// for (_, merging_thread_handle) in merging_threads { +// match merging_thread_handle.join() { +// Ok((segment_ids, segment_entry)) => { +// self.end_merge(segment_ids, segment_entry); +// } +// Err(e) => { +// error!("Error in merging thread {:?}", e); +// break; +// } +// } +// } - if let Some(complete) = complete_option { - complete.complete(()); - } - } +// if let Some(complete) = complete_option { +// complete.complete(()); +// } +// } - // Process a single segment update. - pub fn process_one( - &mut self, - segment_update: SegmentUpdate) { +// // Process a single segment update. +// pub fn process_one( +// &mut self, +// segment_update: SegmentUpdate) { - info!("Segment update: {:?}", segment_update); +// info!("Segment update: {:?}", segment_update); - use self::SegmentUpdate::*; - match segment_update { - AddSegment(segment_entry) => { - if !self.is_cancelled_generation { - self.segment_manager.add_segment(segment_entry); - } - else { - // rollback has been called and this - // segment actually belong to the - // documents that have been dropped. - // - // Let's just remove its files. - self.index.delete_segment(segment_entry.segment_id()); - } - } - StartMerge(segment_ids) => { - panic!("this should have been handled somewhere else"); - } - EndMerge(merging_thread_id_opt, segment_ids, segment_entry) => { - self.end_merge( - segment_ids, - segment_entry); - if let Some(merging_thread_id) = merging_thread_id_opt { - self.merging_threads.remove(&merging_thread_id); - } - } - CancelGeneration => { - // Called during rollback. The segment - // that will arrive will be ignored - // until a NewGeneration is update arrives. - self.is_cancelled_generation = true; - } - NewGeneration => { - // After rollback, we can resume - // indexing new documents. - self.is_cancelled_generation = false; - } - Commit(docstamp) => { - self.segment_manager.commit(docstamp); - } - Terminate => { - panic!("We should have left the loop before processing it."); - } - } - } -} +// use self::SegmentUpdate::*; +// match segment_update { +// AddSegment(generation, segment_entry) => { +// if !self.is_cancelled_generation { +// self.segment_manager.add_segment(segment_entry); +// } +// else { +// // rollback has been called and this +// // segment actually belong to the +// // documents that have been dropped. +// // +// // Let's just remove its files. +// self.index.delete_segment(segment_entry.segment_id()); +// } +// } +// StartMerge(segment_ids) => { +// panic!("this should have been handled somewhere else"); +// } +// EndMerge(merging_thread_id_opt, segment_ids, segment_entry) => { +// self.end_merge( +// segment_ids, +// segment_entry); +// if let Some(merging_thread_id) = merging_thread_id_opt { +// self.merging_threads.remove(&merging_thread_id); +// } +// } +// CancelGeneration => { +// // Called during rollback. The segment +// // that will arrive will be ignored +// // until a NewGeneration is update arrives. +// self.is_cancelled_generation = true; +// } +// NewGeneration => { +// // After rollback, we can resume +// // indexing new documents. +// self.is_cancelled_generation = false; +// } +// Commit(docstamp) => { +// self.segment_manager.commit(docstamp); +// save_metas( +// &self.segment_manager, +// self.index.schema(), +// self.index.docstamp(), +// self.index.directory_mut()).expect("Could not save metas."); +// match self.index.load_searchers() { +// Ok(()) => {} +// Err(e) => { +// error!("Failure while loading new searchers {:?}", e); +// panic!(format!("Failure while loading new searchers {:?}", e)); +// } +// } +// } +// Terminate => { +// panic!("We should have left the loop before processing it."); +// } +// } +// } +// }