issue/43 Refactoring of SegmentUpdater

This commit is contained in:
Paul Masurel
2017-01-30 23:47:56 +09:00
parent e8ecb68f00
commit ca977fb17b
11 changed files with 250 additions and 460 deletions

View File

@@ -38,8 +38,6 @@ chan = "0.1"
version = "2"
crossbeam = "0.2"
eventual = "0.1.7"
futures = "0.1.9"
futures-cpupool = "0.1.2"

View File

@@ -170,6 +170,7 @@ mod tests {
index_writer.commit().unwrap();
}
}
index.load_searchers().unwrap();
let searcher = index.searcher();
let mut term_it = searcher.terms();
let mut terms = String::new();

View File

@@ -1,17 +1,14 @@
use schema::Schema;
use schema::Document;
use super::operation::AddOperation;
use indexer::SegmentSerializer;
use core::SerializableSegment;
use core::Index;
use core::Segment;
use core::SegmentId;
use schema::Term;
use indexer::SegmentEntry;
use std::thread::JoinHandle;
use indexer::{MergePolicy, DefaultMergePolicy};
use indexer::MergePolicy;
use indexer::SegmentWriter;
use indexer::SegmentManager;
use core::SegmentComponent;
use super::directory_lock::DirectoryLock;
use futures::Future;
@@ -19,16 +16,14 @@ use std::clone::Clone;
use std::io;
use fastfield::delete;
use std::thread;
use futures::Canceled;
use std::mem;
use indexer::merger::IndexMerger;
use datastruct::stacker::Heap;
use std::mem::swap;
use std::sync::{Arc, Mutex};
use chan;
use core::SegmentMeta;
use super::delete_queue::{DeleteQueue, DeleteQueueCursor};
use super::segment_updater::SegmentUpdater;
use super::segment_manager::CommitState;
use Result;
use Error;
@@ -96,7 +91,7 @@ fn index_documents(heap: &mut Heap,
document_iterator: &mut Iterator<Item=AddOperation>,
segment_updater: &mut SegmentUpdater,
delete_cursor: &mut DeleteQueueCursor)
-> Result<()> {
-> Result<bool> {
heap.clear();
let segment_id = segment.id();
let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment.clone(), &schema));
@@ -133,10 +128,13 @@ fn index_documents(heap: &mut Heap,
};
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone());
try!(segment_writer.finalize());
segment_updater.add_segment(generation, segment_entry);
Ok(())
segment_updater
.add_segment(generation, segment_entry)
.wait()
.map_err(|_| Error::ErrorInThread("Could not add segment.".to_string()))
}
@@ -145,7 +143,7 @@ impl IndexWriter {
/// The index writer
pub fn wait_merging_threads(mut self) -> Result<()> {
let future = self.segment_updater.terminate();
// let future = self.segment_updater.terminate();
// this will stop the indexing thread,
// dropping the last reference to the segment_updater.
@@ -162,7 +160,12 @@ impl IndexWriter {
}
drop(self.workers_join_handle);
future.wait().unwrap(); // TODO do something with the result.
self.segment_updater
.wait_merging_thread()
.map_err(|_|
Error::ErrorInThread("Failed to join merging thread.".to_string())
)?;
// future.wait().unwrap(); // TODO do something with the result.
Ok(())
}
@@ -201,13 +204,16 @@ impl IndexWriter {
// peeked document now belongs to
// our local iterator.
if document_iterator.peek().is_some() {
try!(index_documents(&mut heap,
let valid_generation = try!(index_documents(&mut heap,
segment,
&schema,
generation,
&mut document_iterator,
&mut segment_updater,
&mut delete_cursor_clone));
if valid_generation {
return Ok(());
}
} else {
// No more documents.
// Happens when there is a commit, or if the `IndexWriter`
@@ -254,8 +260,7 @@ impl IndexWriter {
let delete_queue = DeleteQueue::default();
let merge_policy = box DefaultMergePolicy::default();
let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor(), merge_policy)?;
let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor())?;
let mut index_writer = IndexWriter {
@@ -285,9 +290,14 @@ impl IndexWriter {
Ok(index_writer)
}
pub fn get_merge_policy(&self) -> Box<MergePolicy> {
self.segment_updater.get_merge_policy()
}
/// Set the merge policy.
pub fn set_merge_policy(&self, merge_policy: Box<MergePolicy>) {
// *self._merge_policy.lock().unwrap() = merge_policy;
self.segment_updater.set_merge_policy(merge_policy);
}
fn start_workers(&mut self) -> Result<()> {
@@ -298,7 +308,7 @@ impl IndexWriter {
}
/// Merges a given list of segments
pub fn merge(&mut self, segments: &[SegmentId]) -> impl Future<Item=(), Error=&'static str> {
pub fn merge(&mut self, segments: &[SegmentId]) -> impl Future<Item=SegmentEntry, Error=Canceled> {
self.segment_updater.start_merge(segments.to_vec())
}
@@ -328,8 +338,11 @@ impl IndexWriter {
/// The docstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<u64> {
self.segment_updater.cancel_generation();
// by updating the generation in the segment updater,
// pending add segment commands will be dismissed.
self.generation += 1;
let rollback_future = self.segment_updater.new_generation(self.generation);
// we cannot drop segment ready receiver yet
// as it would block the workers.
let document_receiver = self.recreate_document_channel();
@@ -341,7 +354,7 @@ impl IndexWriter {
let mut former_workers_join_handle = Vec::new();
swap(&mut former_workers_join_handle,
&mut self.workers_join_handle);
// wait for all the worker to finish their work
// (it should be fast since we consumed all pending documents)
for worker_handle in former_workers_join_handle {
@@ -355,27 +368,15 @@ impl IndexWriter {
// All of our indexing workers for the rollbacked generation have
// been terminated.
//
// Our document receiver pipe was drained.
// No new document have been added in the meanwhile because `IndexWriter`
// is not shared by different threads.
//
// We can now open a new generation and reaccept segments
// from now on.
self.segment_updater.new_generation();
// TODO Send rollback.
//let rollbacked_segments = self.segment_manager.rollback();
rollback_future.wait().map_err(|_|
Error::ErrorInThread("Error while waiting for rollback.".to_string())
)?;
// for segment_id in rollbacked_segments {
// // TODO all delete must happen after saving
// // meta.json
// self.index.delete_segment(segment_id);
// }
panic!("aaaa");
// reset the docstamp
self.uncommitted_docstamp = self.committed_docstamp;
Ok(self.committed_docstamp)
@@ -474,12 +475,11 @@ impl IndexWriter {
#[cfg(test)]
mod tests {
use indexer::NoMergePolicy;
use schema::{self, Document};
use Index;
use Term;
use Error;
use indexer::NoMergePolicy;
#[test]
fn test_lockfile_stops_duplicates() {
@@ -497,10 +497,10 @@ mod tests {
let schema_builder = schema::SchemaBuilder::default();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(40_000_000).unwrap();
// assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }");
// let merge_policy = box NoMergePolicy::default();
// index_writer.set_merge_policy(merge_policy);
// assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy");
assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_merge_size: 8, min_layer_size: 10000, level_log_size: 0.75 }");
let merge_policy = box NoMergePolicy::default();
index_writer.set_merge_policy(merge_policy);
assert_eq!(format!("{:?}", index_writer.get_merge_policy()), "NoMergePolicy");
}
#[test]
@@ -521,7 +521,6 @@ mod tests {
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let num_docs_containing = |s: &str| {
let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, s);
@@ -550,11 +549,12 @@ mod tests {
index_writer.add_document(doc).unwrap();
}
assert_eq!(index_writer.commit().unwrap(), 2u64);
index.load_searchers().unwrap();
assert_eq!(num_docs_containing("a"), 0);
assert_eq!(num_docs_containing("b"), 1);
assert_eq!(num_docs_containing("c"), 1);
}
index.load_searchers().unwrap();
index.searcher();
}
@@ -586,6 +586,7 @@ mod tests {
// this should create 8 segments and trigger a merge.
index_writer.commit().expect("commit failed");
index_writer.wait_merging_threads().expect("waiting merging thread failed");
index.load_searchers().unwrap();
assert_eq!(num_docs_containing("a"), 200);
assert_eq!(index.searchable_segments().unwrap().len(), 1);
}

View File

@@ -48,7 +48,6 @@ impl LogMergePolicy {
impl MergePolicy for LogMergePolicy {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
if segments.is_empty() {
return Vec::new();
}
@@ -75,16 +74,15 @@ impl MergePolicy for LogMergePolicy {
levels.last_mut().unwrap().push(ind);
}
let result = levels.iter()
levels
.iter()
.filter(|level| level.len() >= self.min_merge_size)
.map(|ind_vec| {
MergeCandidate(ind_vec.iter()
.map(|&ind| segments[ind].segment_id)
.collect())
})
.collect();
result
.collect()
}
fn box_clone(&self) -> Box<MergePolicy> {

View File

@@ -1,7 +1,6 @@
use Result;
use core::SegmentReader;
use core::Segment;
use core::SegmentId;
use DocId;
use core::SerializableSegment;
use indexer::SegmentSerializer;
@@ -14,7 +13,6 @@ use fastfield::FastFieldSerializer;
use store::StoreWriter;
use postings::ChainedPostings;
use postings::HasLen;
use futures::Future;
use postings::OffsetPostings;
use core::SegmentInfo;
use std::cmp::{min, max};
@@ -207,7 +205,6 @@ mod tests {
use collector::tests::TestCollector;
use query::BooleanQuery;
use schema::TextIndexingOptions;
use eventual::Async;
use futures::Future;
#[test]
@@ -243,7 +240,7 @@ mod tests {
doc.add_u32(score_field, 7);
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
index_writer.commit().expect("committed");
}
{
@@ -272,6 +269,7 @@ mod tests {
index_writer.wait_merging_threads().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let get_doc_ids = |terms: Vec<Term>| {
let mut collector = TestCollector::default();

View File

@@ -6,7 +6,6 @@ use indexer::SegmentEntry;
use indexer::delete_queue::DeleteQueueCursor;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
struct SegmentRegisters {
docstamp: u64,
@@ -14,13 +13,6 @@ struct SegmentRegisters {
committed: SegmentRegister,
}
#[derive(Eq, PartialEq)]
pub enum CommitState {
Committed,
Uncommitted,
Missing,
}
impl Default for SegmentRegisters {
fn default() -> SegmentRegisters {
SegmentRegisters {
@@ -54,28 +46,14 @@ impl Debug for SegmentManager {
///
/// For instance, a segment will not appear in both committed and uncommitted
/// segments
pub fn get_segment_ready_for_commit(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
pub fn get_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager.read();
(registers_lock.committed.get_segment_ready_for_commit(),
registers_lock.uncommitted.get_segment_ready_for_commit())
(registers_lock.committed.get_segments(),
registers_lock.uncommitted.get_segments())
}
impl SegmentManager {
/// Returns whether a segment is committed, uncommitted or missing.
pub fn is_committed(&self, segment_id: SegmentId) -> CommitState {
let lock = self.read();
if lock.uncommitted.contains(segment_id) {
CommitState::Uncommitted
}
else if lock.committed.contains(segment_id) {
CommitState::Committed
}
else {
CommitState::Missing
}
}
pub fn from_segments(segment_metas: Vec<SegmentMeta>, delete_cursor: DeleteQueueCursor) -> SegmentManager {
SegmentManager {
registers: RwLock::new( SegmentRegisters {

View File

@@ -89,7 +89,7 @@ impl SegmentRegister {
self.segment_states.clear();
}
pub fn get_segment_ready_for_commit(&self,) -> Vec<SegmentMeta> {
pub fn get_segments(&self,) -> Vec<SegmentMeta> {
self.segment_states
.values()
.filter(|segment_entry| segment_entry.is_ready())
@@ -126,12 +126,7 @@ impl SegmentRegister {
.get(&segment_id)
.map(|segment_entry| segment_entry.clone())
}
pub fn contains(&self, segment_id: SegmentId) -> bool {
self.segment_states.contains_key(&segment_id)
}
pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool {
segment_ids
.iter()

View File

@@ -1,21 +1,25 @@
#![allow(for_kv_map)]
use chan;
use core::Index;
use std::sync::Mutex;
use core::Segment;
use indexer::{MergePolicy, DefaultMergePolicy};
use core::SegmentId;
use core::SegmentMeta;
use std::mem;
use futures::Future;
use std::sync::atomic::Ordering;
use std::ops::DerefMut;
use futures::{Future, future};
use futures::oneshot;
use futures::Canceled;
use std::thread;
use std::sync::atomic::AtomicUsize;
use std::sync::RwLock;
use core::SerializableSegment;
use indexer::MergePolicy;
use indexer::MergeCandidate;
use indexer::merger::IndexMerger;
use std::borrow::BorrowMut;
use indexer::SegmentSerializer;
use indexer::SegmentEntry;
use std::thread;
use schema::Schema;
use directory::Directory;
use std::thread::JoinHandle;
@@ -28,8 +32,7 @@ use futures_cpupool::CpuPool;
use core::IndexMeta;
use core::META_FILEPATH;
use std::io::Write;
use eventual::*;
use super::segment_manager::{SegmentManager, get_segment_ready_for_commit};
use super::segment_manager::{SegmentManager, get_segments};
fn create_metas(segment_manager: &SegmentManager, schema: Schema, docstamp: u64) -> IndexMeta {
@@ -84,135 +87,6 @@ pub fn save_metas(segment_manager: &SegmentManager,
}
// #[derive(Clone, Debug)]
// pub enum SegmentUpdate {
// /// New segment added.
// /// Created by the indexing worker thread
// AddSegment(usize, SegmentEntry),
// StartMerge(Vec<SegmentId>),
// /// A merge is ended.
// /// Remove the merged segment and record the new
// /// large merged segment.
// EndMerge(Option<usize>, Vec<SegmentId>, SegmentEntry),
// /// Happens when rollback is called.
// /// The current generation of segments is cancelled.
// CancelGeneration,
// /// Starts a new generation... This
// /// happens at the end of Rollback.
// NewGeneration,
// /// Just dropping the Segment updater object
// /// is safe, but some merge might be happening in
// /// the background and the user may want to wait for these
// /// threads to terminate.
// ///
// /// When receiving the Terminate signal, the segment updater stops
// /// receiving segment updates and just waits for the merging threads
// /// to terminate.
// Terminate,
// /// Commit marks uncommmitted segments as committed.
// Commit(u64),
// }
#[derive(Clone)]
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
struct InnerSegmentUpdater {
pool: CpuPool,
segment_manager: SegmentManager,
merge_policy: Box<MergePolicy>,
merging_thread_id: AtomicUsize,
merging_threads: HashMap<usize, JoinHandle<(Vec<SegmentId>, SegmentEntry)> >,
}
impl SegmentUpdater {
pub fn new(
index: Index,
delete_cursor: DeleteQueueCursor,
merge_policy: Box<MergePolicy>)
-> Result<SegmentUpdater>
{
let committed_segments = index.committed_segments()?;
let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor);
Ok(
SegmentUpdater(Arc::new(InnerSegmentUpdater {
pool: CpuPool::new(1),
segment_manager: segment_manager,
merge_policy: merge_policy,
merging_thread_id: AtomicUsize::new(0),
merging_threads: HashMap::new(),
}))
)
}
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) {
}
pub fn commit(&self, committed_docstamp: u64) -> impl Future<Item=(), Error=&'static str> {
self.0.pool.spawn_fn(|| {
Ok(())
})
}
pub fn start_merge(&self, segment_ids: Vec<SegmentId>) -> impl Future<Item=(), Error=&'static str> {
self.0.pool.spawn_fn(|| {
Ok(())
})
}
pub fn new_generation(&self) {
}
pub fn cancel_generation(&self) {
}
pub fn end_merge(&self,
merge_thread_id: Option<usize>,
merged_segment_ids: Vec<SegmentId>,
resulting_segment_entry: SegmentEntry) {
}
pub fn terminate(&self) -> impl Future<Item=(), Error=&'static str> {
self.0.pool.spawn_fn(|| {
Ok(())
})
}
}
// impl SegmentUpdater {
// pub fn create(
// index: Index,
// delete_cursor: DeleteQueueCursor,
// merge_policy: Box<MergePolicy>) -> Result<SegmentUpdater> {
// let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::async();
// let segment_updater = SegmentUpdater {
// channel: segment_update_sender,
// };
// let committed_segments = index.committed_segments()?;
// let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor);
// SegmentUpdateRunner::new(
// index,
// segment_manager,
// merge_policy,
// segment_updater.clone(),
// segment_update_receiver).start();
// Ok(segment_updater)
// }
// }
// The segment update runner is in charge of processing all
@@ -220,248 +94,191 @@ impl SegmentUpdater {
//
// All this processing happens on a single thread
// consuming a common queue.
//
// The segment updates producers are :
// - indexing threads are sending new segments
// - merging threads are sending merge operations
// - the index writer sends "terminate"
// pub struct SegmentUpdateRunner {
// index: Index,
// is_cancelled_generation: bool,
// segment_update_receiver: SegmentUpdateReceiver,
// segment_updater: SegmentUpdater,
// segment_manager: SegmentManager,
// merge_policy: Box<MergePolicy>,
// merging_thread_id: usize,
// merging_threads: HashMap<usize, JoinHandle<(Vec<SegmentId>, SegmentEntry)> >,
// }
// impl SegmentUpdateRunner {
// fn new(index: Index,
// segment_manager: SegmentManager,
// merge_policy: Box<MergePolicy>,
// segment_updater: SegmentUpdater,
// segment_update_receiver: SegmentUpdateReceiver) -> SegmentUpdateRunner {
// SegmentUpdateRunner {
// index: index,
// is_cancelled_generation: false,
// segment_updater: segment_updater,
// segment_update_receiver: segment_update_receiver,
// segment_manager: segment_manager,
// merge_policy: merge_policy,
// merging_thread_id: 0,
// merging_threads: HashMap::new(),
// }
// }
// fn new_merging_thread_id(&mut self,) -> usize {
// self.merging_thread_id += 1;
// self.merging_thread_id
// }
// fn end_merge(
// &mut self,
// segment_ids: Vec<SegmentId>,
// segment_entry: SegmentEntry) {
// self.segment_manager.end_merge(&segment_ids, segment_entry);
// save_metas(
// &self.segment_manager,
// self.index.schema(),
// self.index.docstamp(),
// self.index.directory_mut()).expect("Could not save metas.");
// for segment_id in segment_ids {
// self.index.delete_segment(segment_id);
// }
// self.index.load_searchers().unwrap();
// }
#[derive(Clone)]
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
// fn start_merge(&mut self, segment_ids: Vec<SegmentId>, complete_opt: Option<Complete<(), &'static str>>) {
// let merging_thread_id = self.new_merging_thread_id();
// self.segment_manager.start_merge(&segment_ids);
// let index_clone = self.index.clone();
// let segment_updater_clone = self.segment_updater.clone();
// let merge_thread_handle = thread::Builder::new()
// .name(format!("merge_thread_{:?}", merging_thread_id))
// .spawn(move || {
// info!("Start merge: {:?}", segment_ids);
// let schema = index_clone.schema();
// let segments: Vec<Segment> = segment_ids
// .iter()
// .map(|&segment_id| index_clone.segment(segment_id))
// .collect();
// // An IndexMerger is like a "view" of our merged segments.
// // TODO unwrap
// let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
// let mut merged_segment = index_clone.new_segment();
// // ... we just serialize this index merger in our new segment
// // to merge the two segments.
// let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed");
// let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
// let segment_meta = SegmentMeta {
// segment_id: merged_segment.id(),
// num_docs: num_docs,
// num_deleted_docs: 0u32,
// };
struct InnerSegmentUpdater {
pool: CpuPool,
index: Index,
segment_manager: SegmentManager,
merge_policy: RwLock<Box<MergePolicy>>,
merging_thread_id: AtomicUsize,
merging_threads: RwLock<HashMap<usize, JoinHandle<SegmentEntry>>>,
generation: AtomicUsize,
}
// // TODO fix delete cursor
// let delete_queue = DeleteQueue::default();
// let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
impl SegmentUpdater {
// let segment_update = SegmentUpdate::EndMerge(Some(merging_thread_id), segment_ids.clone(), segment_entry.clone());
// // segment_updater_clone.send(segment_update.clone());
// if let Some(complete) = complete_opt {
// complete.complete(());
// }
// (segment_ids, segment_entry)
// })
// .expect("Failed to spawn merge thread");
pub fn new(
index: Index,
delete_cursor: DeleteQueueCursor)
-> Result<SegmentUpdater>
{
let committed_segments = index.committed_segments()?;
let segment_manager = SegmentManager::from_segments(committed_segments, delete_cursor);
Ok(
SegmentUpdater(Arc::new(InnerSegmentUpdater {
pool: CpuPool::new(1),
index: index,
segment_manager: segment_manager,
merge_policy: RwLock::new(box DefaultMergePolicy::default()),
merging_thread_id: AtomicUsize::default(),
merging_threads: RwLock::new(HashMap::new()),
generation: AtomicUsize::default(),
}))
)
}
pub fn get_merge_policy(&self) -> Box<MergePolicy> {
self.0.merge_policy.read().unwrap().box_clone()
}
pub fn set_merge_policy(&self, merge_policy: Box<MergePolicy>) {
*self.0.merge_policy.write().unwrap()= merge_policy;
}
fn get_merging_thread_id(&self) -> usize {
self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst)
}
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> impl Future<Item=T, Error=&'static str> {
let me_clone = self.clone();
self.0.pool.spawn_fn(move || {
Ok(f(me_clone))
})
}
pub fn new_generation(&mut self, generation: usize) -> impl Future<Item=(), Error=&'static str> {
self.0.generation.store(generation, Ordering::Release);
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.rollback();
})
}
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future<Item=bool, Error=&'static str> {
if generation >= self.0.generation.load(Ordering::Acquire) {
future::Either::A(self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
}))
}
else {
future::Either::B(future::ok(false))
}
}
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=&'static str> {
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.commit(opstamp);
let mut directory = segment_updater.0.index.directory().box_clone();
save_metas(
&segment_updater.0.segment_manager,
segment_updater.0.index.schema(),
segment_updater.0.index.docstamp(),
directory.borrow_mut()).expect("Could not save metas.");
segment_updater.consider_merge_options();
})
}
pub fn start_merge(&self, segment_ids: Vec<SegmentId>) -> impl Future<Item=SegmentEntry, Error=Canceled> {
// self.merging_threads.insert(merging_thread_id, merge_thread_handle);
// }
self.0.segment_manager.start_merge(&segment_ids);
let segment_updater_clone = self.clone();
// fn start_merges(&mut self) {
// let merge_candidates = self.consider_merge_options();
// for MergeCandidate(segment_ids) in merge_candidates {
// self.start_merge(segment_ids, None);
// }
// }
// fn consider_merge_options(&self,) -> Vec<MergeCandidate> {
// let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&self.segment_manager);
// // Committed segments cannot be merged with uncommitted_segments.
// // We therefore consider merges using these two sets of segments independantly.
// let mut merge_candidates = self.merge_policy.compute_merge_candidates(&uncommitted_segments);
// let committed_merge_candidates = self.merge_policy.compute_merge_candidates(&committed_segments);
// merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
// merge_candidates
// }
// pub fn start(self) -> JoinHandle<()> {
// thread::Builder::new()
// .name("segment_update".to_string())
// .spawn(move || {
// self.process();
// })
// .expect("Failed to start segment updater thread.")
// }
// fn process(mut self) {
// let mut complete_option = None;
// for (complete, segment_update) in self.segment_update_receiver.clone() {
let merging_thread_id = self.get_merging_thread_id();
let (merging_future_send, merging_future_recv) = oneshot();
let merging_join_handle = thread::spawn(move || {
// if let SegmentUpdate::Terminate = segment_update {
// complete_option = Some(complete);
// break;
// }
info!("Start merge: {:?}", segment_ids);
// if let SegmentUpdate::StartMerge(segment_ids) = segment_update {
// self.start_merge(segment_ids, Some(complete));
// }
// else {
// self.process_one(segment_update);
// // - start merges if required
// self.start_merges();
// complete.complete(());
// }
// }
// let mut merging_threads = HashMap::new();
// mem::swap(&mut merging_threads, &mut self.merging_threads);
// for (_, merging_thread_handle) in merging_threads {
// match merging_thread_handle.join() {
// Ok((segment_ids, segment_entry)) => {
// self.end_merge(segment_ids, segment_entry);
// }
// Err(e) => {
// error!("Error in merging thread {:?}", e);
// break;
// }
// }
// }
let ref index = segment_updater_clone.0.index;
let schema = index.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
let mut merged_segment = index.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
num_deleted_docs: 0u32,
};
// if let Some(complete) = complete_option {
// complete.complete(());
// }
// }
// TODO fix delete cursor
let delete_queue = DeleteQueue::default();
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
segment_updater_clone
.end_merge(segment_ids.clone(), segment_entry.clone())
.wait()
.unwrap();
merging_future_send.complete(segment_entry.clone());
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
segment_entry
});
self.0.merging_threads.write().unwrap().insert(merging_thread_id, merging_join_handle);
merging_future_recv
}
fn consider_merge_options(&self) {
let (committed_segments, uncommitted_segments) = get_segments(&self.0.segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independently.
let merge_policy = self.get_merge_policy();
let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments);
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
for MergeCandidate(segment_ids) in merge_candidates {
self.start_merge(segment_ids);
}
}
fn end_merge(&self,
merged_segment_ids: Vec<SegmentId>,
resulting_segment_entry: SegmentEntry) -> impl Future<Item=(), Error=&'static str> {
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.end_merge(&merged_segment_ids, resulting_segment_entry);
let mut directory = segment_updater.0.index.directory().box_clone();
save_metas(
&segment_updater.0.segment_manager,
segment_updater.0.index.schema(),
segment_updater.0.index.docstamp(),
directory.borrow_mut()).expect("Could not save metas.");
for segment_id in merged_segment_ids {
segment_updater.0.index.delete_segment(segment_id);
}
})
}
// // Process a single segment update.
// pub fn process_one(
// &mut self,
// segment_update: SegmentUpdate) {
// info!("Segment update: {:?}", segment_update);
// use self::SegmentUpdate::*;
// match segment_update {
// AddSegment(generation, segment_entry) => {
// if !self.is_cancelled_generation {
// self.segment_manager.add_segment(segment_entry);
// }
// else {
// // rollback has been called and this
// // segment actually belong to the
// // documents that have been dropped.
// //
// // Let's just remove its files.
// self.index.delete_segment(segment_entry.segment_id());
// }
// }
// StartMerge(segment_ids) => {
// panic!("this should have been handled somewhere else");
// }
// EndMerge(merging_thread_id_opt, segment_ids, segment_entry) => {
// self.end_merge(
// segment_ids,
// segment_entry);
// if let Some(merging_thread_id) = merging_thread_id_opt {
// self.merging_threads.remove(&merging_thread_id);
// }
// }
// CancelGeneration => {
// // Called during rollback. The segment
// // that will arrive will be ignored
// // until a NewGeneration is update arrives.
// self.is_cancelled_generation = true;
// }
// NewGeneration => {
// // After rollback, we can resume
// // indexing new documents.
// self.is_cancelled_generation = false;
// }
// Commit(docstamp) => {
// self.segment_manager.commit(docstamp);
// save_metas(
// &self.segment_manager,
// self.index.schema(),
// self.index.docstamp(),
// self.index.directory_mut()).expect("Could not save metas.");
// match self.index.load_searchers() {
// Ok(()) => {}
// Err(e) => {
// error!("Failure while loading new searchers {:?}", e);
// panic!(format!("Failure while loading new searchers {:?}", e));
// }
// }
// }
// Terminate => {
// panic!("We should have left the loop before processing it.");
// }
// }
// }
// }
pub fn wait_merging_thread(&self) -> thread::Result<()> {
let mut new_merging_threads = HashMap::new();
{
let mut merging_threads = self.0.merging_threads.write().unwrap();
mem::swap(&mut new_merging_threads, merging_threads.deref_mut());
}
for (_, merging_thread_handle) in new_merging_threads {
merging_thread_handle
.join()
.map(|_| ())?
}
Ok(())
}
}

View File

@@ -49,7 +49,6 @@ extern crate chan;
extern crate crossbeam;
extern crate bit_set;
extern crate notify;
extern crate eventual;
extern crate futures;
extern crate futures_cpupool;
@@ -245,6 +244,7 @@ mod tests {
index_writer.commit().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, "a");
assert_eq!(searcher.doc_freq(&term_a), 3);
@@ -280,7 +280,7 @@ mod tests {
index_writer.commit().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let segment_reader: &SegmentReader = searcher.segment_reader(0);
let fieldnorms_reader = segment_reader.get_fieldnorms_reader(text_field).unwrap();
@@ -306,6 +306,7 @@ mod tests {
index_writer.commit().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let reader = searcher.segment_reader(0);
assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none());
@@ -342,6 +343,7 @@ mod tests {
index_writer.commit().unwrap();
}
{
index.load_searchers().unwrap();
let searcher = index.searcher();
let get_doc_ids = |terms: Vec<Term>| {
let query = BooleanQuery::new_multiterms_query(terms);

View File

@@ -189,6 +189,7 @@ mod tests {
}
assert!(index_writer.commit().is_ok());
}
index.load_searchers().unwrap();
let term_query = TermQuery::new(Term::from_field_text(text_field, "a"), SegmentPostingsOption::NoFreq);
let searcher = index.searcher();
let mut term_weight = term_query.specialized_weight(&*searcher);
@@ -256,6 +257,7 @@ mod tests {
}
assert!(index_writer.commit().is_ok());
}
index.load_searchers().unwrap();
index
};
}
@@ -275,7 +277,6 @@ mod tests {
fn bench_segment_intersection(b: &mut Bencher) {
let searcher = INDEX.searcher();
let segment_reader = searcher.segment_reader(0);
b.iter(|| {
let segment_postings_a = segment_reader.read_postings(&*TERM_A, SegmentPostingsOption::NoFreq).unwrap();
let segment_postings_b = segment_reader.read_postings(&*TERM_B, SegmentPostingsOption::NoFreq).unwrap();

View File

@@ -48,6 +48,7 @@ mod tests {
assert!(index_writer.commit().is_ok());
}
index.load_searchers().unwrap();
let searcher = index.searcher();
let test_query = |texts: Vec<&str>| {
let mut test_collector = TestCollector::default();