Merge branch 'bug/4' of github.com:fulmicoton/tantivy into bug/4

This commit is contained in:
Paul Masurel
2016-10-10 16:06:37 +09:00
5 changed files with 123 additions and 62 deletions

View File

@@ -41,6 +41,10 @@ impl SegmentId {
SegmentId(create_uuid())
}
pub fn short_uuid_string(&self,) -> String {
String::from(&self.0.to_simple_string()[..8])
}
pub fn uuid_string(&self,) -> String {
self.0.to_simple_string()
}

View File

@@ -136,6 +136,7 @@ impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, FileError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = try!(
@@ -176,6 +177,7 @@ impl Directory for MmapDirectory {
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
debug!("Open Write {:?}", path);
let full_path = self.resolve_path(path);
let open_res = OpenOptions::new()
@@ -206,6 +208,7 @@ impl Directory for MmapDirectory {
}
fn delete(&self, path: &Path) -> result::Result<(), FileError> {
debug!("Delete {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = try!(self.mmap_cache
.write()
@@ -228,6 +231,7 @@ impl Directory for MmapDirectory {
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let full_path = self.resolve_path(path);
let meta_file = atomicwrites::AtomicFile::new(full_path, atomicwrites::AllowOverwrite);
try!(meta_file.write(|f| {

View File

@@ -55,7 +55,10 @@ pub struct IndexWriter {
document_sender: DocumentSender,
segment_update_sender: SegmentUpdateSender,
segment_update_thread: JoinHandle<()>,
worker_id: usize,
num_threads: usize,
docstamp: u64,
}
@@ -165,59 +168,63 @@ fn process_segment_updates(mut index: Index,
segment_manager: &SegmentManager,
segment_update_receiver: SegmentUpdateReceiver,
segment_update_sender: SegmentUpdateSender) {
let mut segment_update_it = segment_update_receiver.into_iter();
let mut is_cancelled_generation = false;
let merge_policy = index.get_merge_policy();
loop {
if let Some(segment_update) = segment_update_it.next() {
let has_changed = process_segment_update(
&index,
segment_manager,
segment_update,
&mut is_cancelled_generation);
if has_changed {
on_segment_change(&mut index);
for segment_update in segment_update_receiver {
let has_changed = process_segment_update(
&index,
segment_manager,
segment_update,
&mut is_cancelled_generation);
if has_changed {
on_segment_change(&mut index);
let segment_manager = get_segment_manager(&index);
let segment_manager = get_segment_manager(&index);
for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) {
segment_manager.start_merge(&segment_ids);
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
});
}
for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) {
segment_manager.start_merge(&segment_ids);
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
}).expect("Failed to spawn merge thread");
}
}
else {
// somehow, the channel was dropped.
return;
}
}
}
impl IndexWriter {
pub fn wait_merging_threads(self) -> Result<()> {
drop(self.segment_update_sender);
info!("Joining update thread");
self.segment_update_thread
.join()
.map_err(|err| {
error!("Error in the merging thread {:?}", err);
Error::ErrorInThread(format!("{:?}", err))
})
}
/// Spawns a new worker thread for indexing.
/// The thread consumes documents from the pipeline.
///
@@ -228,8 +235,11 @@ impl IndexWriter {
let document_receiver_clone = self.document_receiver.clone();
let mut segment_update_sender = self.segment_update_sender.clone();
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
let join_handle: JoinHandle<Result<()>> = thread::spawn(move || {
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
let join_handle: JoinHandle<Result<()>> = try!(thread::Builder::new()
.name(format!("indexing_thread_{}", self.worker_id))
.spawn(move || {
loop {
let segment = index.new_segment();
let mut document_iterator = document_receiver_clone
@@ -256,7 +266,8 @@ impl IndexWriter {
return Ok(());
}
}
});
}));
self.worker_id += 1;
self.workers_join_handle.push(join_handle);
Ok(())
@@ -285,9 +296,9 @@ impl IndexWriter {
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::spawn(move || {
let segment_update_thread = try!(thread::Builder::new().name("segment_update".to_string()).spawn(move || {
process_segment_updates(index_clone, &*segment_manager, segment_update_receiver, segment_update_sender_clone)
});
}));
let mut index_writer = IndexWriter {
heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
@@ -297,11 +308,13 @@ impl IndexWriter {
document_sender: document_sender,
segment_update_sender: segment_update_sender,
segment_update_thread: segment_update_thread,
workers_join_handle: Vec::new(),
num_threads: num_threads,
docstamp: index.docstamp(),
worker_id: 0,
};
try!(index_writer.start_workers());
Ok(index_writer)

View File

@@ -3,6 +3,7 @@ use std::sync::RwLock;
use core::SegmentMeta;
use core::SegmentId;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::fmt::{self, Debug, Formatter};
struct SegmentRegisters {
uncommitted: SegmentRegister,
@@ -28,16 +29,21 @@ pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
}
impl Debug for SegmentManager {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
let lock = self.read();
write!(f, "{{ uncommitted: {:?}, committed: {:?} }}", lock.uncommitted, lock.committed)
}
}
/// Returns the segment_metas for (committed segment, uncommitted segments).
/// The result is consistent with other transactions.
///
/// For instance, a segment will not appear in both committed and uncommitted
/// segments
pub fn get_segment_ready_for_commit(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager
.registers
.read()
.expect("Segment manager lock is poisoned");
let registers_lock = segment_manager.read();
(registers_lock.committed.get_segment_ready_for_commit(),
registers_lock.uncommitted.get_segment_ready_for_commit())
}
@@ -75,9 +81,9 @@ impl SegmentManager {
pub fn commit(&self,) {
let mut registers_lock = self.write();
let segment_metas = registers_lock.uncommitted.segment_metas();
for segment_meta in segment_metas {
registers_lock.committed.add_segment(segment_meta.clone());
let segment_entries = registers_lock.uncommitted.segment_entries();
for segment_entry in segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
}
registers_lock.uncommitted.clear();
}
@@ -114,6 +120,8 @@ impl SegmentManager {
registers_lock.committed.remove_segment(segment_id);
}
registers_lock.committed.add_segment(merged_segment_meta.clone());
} else {
warn!("couldn't find segment in SegmentManager");
}
}

View File

@@ -1,13 +1,24 @@
use core::SegmentId;
use std::collections::HashMap;
use core::SegmentMeta;
use std::fmt;
use std::fmt::{Debug, Formatter};
#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Clone, PartialEq, Eq)]
pub enum SegmentState {
Ready,
InMerge,
}
impl SegmentState {
fn letter_code(&self,) -> char {
match *self {
SegmentState::InMerge => 'M',
SegmentState::Ready => 'R',
}
}
}
#[derive(Clone)]
pub struct SegmentEntry {
meta: SegmentMeta,
@@ -38,6 +49,17 @@ pub struct SegmentRegister {
segment_states: HashMap<SegmentId, SegmentEntry>,
}
impl Debug for SegmentRegister {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
try!(write!(f, "SegmentRegister("));
for (ref k, ref v) in &self.segment_states {
try!(write!(f, "{}:{}, ", k.short_uuid_string(), v.state.letter_code()));
}
try!(write!(f, ")"));
Ok(())
}
}
impl SegmentRegister {
pub fn clear(&mut self,) {
@@ -51,7 +73,14 @@ impl SegmentRegister {
.map(|segment_entry| segment_entry.meta.clone())
.collect()
}
pub fn segment_entries(&self,) -> Vec<SegmentEntry>{
self.segment_states
.values()
.cloned()
.collect()
}
pub fn segment_metas(&self,) -> Vec<SegmentMeta> {
let mut segment_ids: Vec<SegmentMeta> = self.segment_states
.values()
@@ -82,13 +111,16 @@ impl SegmentRegister {
.all(|segment_id| self.segment_states.contains_key(segment_id))
}
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
let segment_id = segment_entry.meta.segment_id;
self.segment_states.insert(segment_id, segment_entry);
}
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
let segment_id = segment_meta.segment_id.clone();
let segment_entry = SegmentEntry {
self.add_segment_entry(SegmentEntry {
meta: segment_meta.clone(),
state: SegmentState::Ready,
};
self.segment_states.insert(segment_id, segment_entry);
});
}
pub fn remove_segment(&mut self, segment_id: &SegmentId) {