bug/4 Bugfix, and made unit test way faster

This commit is contained in:
Paul Masurel
2016-10-11 08:59:41 +09:00
parent 513b44eb93
commit fe905ff18b
6 changed files with 266 additions and 100 deletions

View File

@@ -70,10 +70,10 @@ fn load_metas(directory: &Directory) -> Result<IndexMeta> {
Ok(loaded_meta)
}
pub fn commit(index: &mut Index, docstamp: u64) {
index.docstamp = docstamp;
index.segment_manager.commit();
}
// pub fn commit(index: &mut Index, docstamp: u64) {
// index.docstamp = docstamp;
// index.segment_manager.commit();
// }
/// Tantivy's Search Index
pub struct Index {

View File

@@ -1,8 +1,6 @@
use std::cell::UnsafeCell;
use std::mem;
use std::ptr;
use std::iter;
/// `BytesRef` refers to a slice in tantivy's custom `Heap`.
#[derive(Copy, Clone)]
@@ -103,11 +101,23 @@ struct InnerHeap {
next_heap: Option<Box<InnerHeap>>,
}
/// initializing a long Vec<u8> is crazy slow in
/// debug mode.
fn allocate_fast(num_bytes: usize) -> Vec<u8> {
let mut scavenged = Vec::with_capacity(num_bytes);
unsafe {
let ptr = scavenged.as_mut_ptr();
mem::forget(scavenged);
Vec::from_raw_parts(ptr, num_bytes, num_bytes)
}
}
impl InnerHeap {
pub fn with_capacity(num_bytes: usize) -> InnerHeap {
let buffer: Vec<u8> = allocate_fast(num_bytes);
InnerHeap {
buffer: iter::repeat(0u8).take(num_bytes).collect(),
buffer: buffer,
buffer_len: num_bytes as u32,
next_heap: None,
used: 0u32,

View File

@@ -11,6 +11,7 @@ use std::clone::Clone;
use std::io;
use indexer::MergePolicy;
use std::thread;
use std::mem;
use indexer::merger::IndexMerger;
use core::SegmentId;
use datastruct::stacker::Heap;
@@ -18,7 +19,7 @@ use std::mem::swap;
use chan;
use core::SegmentMeta;
use super::super::core::index::get_segment_manager;
use super::segment_manager::{SegmentManager, get_segment_ready_for_commit};
use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit};
use Result;
use Error;
@@ -63,6 +64,10 @@ pub struct IndexWriter {
docstamp: u64,
}
// IndexWriter cannot be sent to another thread.
impl !Send for IndexWriter {}
impl !Sync for IndexWriter {}
fn index_documents(heap: &mut Heap,
segment: Segment,
@@ -97,65 +102,73 @@ pub enum SegmentUpdate {
EndMerge(Vec<SegmentId>, SegmentMeta),
CancelGeneration,
NewGeneration,
Terminate,
Commit,
}
// Process a single segment update.
//
// If the segment manager has been changed a result,
// return true. (else return false)
fn process_segment_update(
impl SegmentUpdate {
// Process a single segment update.
pub fn process(
self,
index: &Index,
segment_manager: &SegmentManager,
segment_update: SegmentUpdate,
is_cancelled_generation: &mut bool) -> bool {
info!("Segment update: {:?}", self);
info!("Segment update: {:?}", segment_update);
match segment_update {
SegmentUpdate::AddSegment(segment_meta) => {
if !*is_cancelled_generation {
segment_manager.add_segment(segment_meta);
match self {
SegmentUpdate::AddSegment(segment_meta) => {
if !*is_cancelled_generation {
segment_manager.add_segment(segment_meta);
}
else {
// rollback has been called and this
// segment actually belong to the
// documents that have been dropped.
//
// Let's just remove its files.
index.delete_segment(segment_meta.segment_id);
}
}
else {
index.delete_segment(segment_meta.segment_id);
SegmentUpdate::EndMerge(segment_ids, segment_meta) => {
segment_manager.end_merge(&segment_ids, &segment_meta);
for segment_id in segment_ids {
index.delete_segment(segment_id);
}
}
true
},
SegmentUpdate::EndMerge(segment_ids, segment_meta) => {
segment_manager.end_merge(&segment_ids, &segment_meta);
for segment_id in segment_ids {
index.delete_segment(segment_id);
SegmentUpdate::CancelGeneration => {
// Called during rollback. The segment
// that will arrive will be ignored
// until a NewGeneration is update arrives.
*is_cancelled_generation = true;
}
SegmentUpdate::NewGeneration => {
// After rollback, we can resume
// indexing new documents.
*is_cancelled_generation = false;
}
SegmentUpdate::Commit => {
segment_manager.commit();
}
SegmentUpdate::Terminate => {
return true;
}
true
},
SegmentUpdate::CancelGeneration => {
*is_cancelled_generation = true;
false
},
SegmentUpdate::NewGeneration => {
*is_cancelled_generation = false;
false
}
return false;
}
}
fn consider_merge_options(index: &Index, merge_policy: &MergePolicy) -> Vec<MergeCandidate> {
let segment_manager = get_segment_manager(index);
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(&*segment_manager);
// committed segments cannot be merged with uncommitted_segments.
let mut merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&uncommitted_segments)[..]);
fn consider_merge_options(segment_manager: &SegmentManager, merge_policy: &MergePolicy) -> Vec<MergeCandidate> {
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independantly.
let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments);
merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&committed_segments)[..]);
merge_candidates
}
fn on_segment_change(index: &mut Index) -> Result<()> {
// saving the meta file.
try!(index.save_metas());
// update the searcher so that they eventually will
// use the new segments.
try!(index.load_searchers());
Ok(())
}
// Consumes the `segment_update_receiver` channel
// for segment updates and apply them.
@@ -168,55 +181,94 @@ fn process_segment_updates(mut index: Index,
segment_manager: &SegmentManager,
segment_update_receiver: SegmentUpdateReceiver,
segment_update_sender: SegmentUpdateSender) {
let mut option_segment_update_sender = Some(segment_update_sender);
let mut is_cancelled_generation = false;
let mut generation = segment_manager.generation();
let merge_policy = index.get_merge_policy();
for segment_update in segment_update_receiver {
let has_changed = process_segment_update(
&index,
segment_manager,
segment_update,
&mut is_cancelled_generation);
if has_changed {
on_segment_change(&mut index);
if segment_update.process(
&index,
segment_manager,
&mut is_cancelled_generation) {
option_segment_update_sender = None;
};
let segment_manager = get_segment_manager(&index);
let new_generation = segment_manager.generation();
// we check the generation number as if it was
// dirty-bit. If the value is different
// to our generation, then the segment_manager has
// been update updated and we need to
// - save meta.json
// - update the searchers
// - consider possible segment merge
for MergeCandidate(segment_ids) in consider_merge_options(&index, &*merge_policy) {
segment_manager.start_merge(&segment_ids);
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
}).expect("Failed to spawn merge thread");
if generation != new_generation {
generation = new_generation;
// saving the meta file.
index.save_metas().expect("Could not save metas.");
// update the searchers so that they eventually will
// use the new segments.
// TODO eventually have this work through watching meta.json
// so that an external process stays up to date as well.
index.load_searchers().expect("Could not load new searchers.");
if let Some(ref segment_update_sender) = option_segment_update_sender {
for MergeCandidate(segment_ids) in consider_merge_options(&segment_manager, &*merge_policy) {
segment_manager.start_merge(&segment_ids);
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
}).expect("Failed to spawn merge thread");
}
}
}
}
}
impl IndexWriter {
pub fn wait_merging_threads(self) -> Result<()> {
pub fn wait_merging_threads(mut self) -> Result<()> {
self.segment_update_sender.send(SegmentUpdate::Terminate);
drop(self.segment_update_sender);
info!("Joining update thread");
// this will stop the indexing thread,
// dropping the last reference to the segment_update_sender.
drop(self.document_sender);
let mut v = Vec::new();
mem::swap(&mut v, &mut self.workers_join_handle);
for join_handle in v {
join_handle.join().expect("Indexer has failed");
}
drop(self.workers_join_handle);
self.segment_update_thread
.join()
.map_err(|err| {
@@ -329,11 +381,37 @@ impl IndexWriter {
/// Merges a given list of segments
pub fn merge(&mut self, segments: &[Segment]) -> Result<()> {
// TODO fix commit or uncommited?
if segments.len() < 2 {
// no segments or one segment? nothing to do.
return Ok(());
}
let segment_manager = get_segment_manager(&self.index);
{
// let's check that all these segments are in the same
// committed/uncommited state.
let first_commit_state = segment_manager.is_committed(segments[0].id());
for segment in segments {
let commit_state = segment_manager.is_committed(segment.id());
if commit_state == CommitState::Missing {
return Err(Error::InvalidArgument(format!("Segment {:?} is not in the index", segments[0].id())));
}
if commit_state != first_commit_state {
return Err(Error::InvalidArgument(String::from("You may not merge segments that are heterogenously in committed and uncommited.")));
}
}
}
let schema = self.index.schema();
// An IndexMerger is like a "view" of our merged segments.
let merger = try!(IndexMerger::open(schema, segments));
let mut merged_segment = self.index.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = try!(SegmentSerializer::for_segment(&mut merged_segment));
@@ -343,7 +421,7 @@ impl IndexWriter {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_manager = get_segment_manager(&self.index);
segment_manager.end_merge(&merged_segment_ids, &segment_meta);
try!(self.index.load_searchers());
Ok(())
@@ -351,13 +429,13 @@ impl IndexWriter {
/// Closes the current document channel send.
/// and replace all the channels by new ones.
///
///
/// The current workers will keep on indexing
/// the pending document and stop
/// when no documents are remaining.
///
/// Returns the former segment_ready channel.
fn recreate_channel(&mut self,) -> DocumentReceiver {
fn recreate_document_channel(&mut self,) -> DocumentReceiver {
let (mut document_sender, mut document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
swap(&mut self.document_sender, &mut document_sender);
swap(&mut self.document_receiver, &mut document_receiver);
@@ -378,7 +456,7 @@ impl IndexWriter {
// we cannot drop segment ready receiver yet
// as it would block the workers.
let document_receiver = self.recreate_channel();
let document_receiver = self.recreate_document_channel();
// Drains the document receiver pipeline :
// Workers don't need to index the pending documents.
@@ -442,8 +520,9 @@ impl IndexWriter {
///
pub fn commit(&mut self,) -> Result<u64> {
// this will drop the current channel
self.recreate_channel();
// this will drop the current document channel
// and recreate a new one channels.
self.recreate_document_channel();
// Docstamp of the last document in this commit.
let commit_docstamp = self.docstamp;
@@ -461,7 +540,9 @@ impl IndexWriter {
try!(self.add_indexing_worker());
}
super::super::core::index::commit(&mut self.index, commit_docstamp);
self.segment_update_sender.send(SegmentUpdate::Commit);
// super::super::core::index::commit(&mut self.index, commit_docstamp);
try!(self.on_change());
Ok(commit_docstamp)
}
@@ -546,4 +627,39 @@ mod tests {
index.searcher();
}
#[test]
fn test_with_merges() {
let mut schema_builder = schema::SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let num_docs_containing = |s: &str| {
let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, s);
searcher.doc_freq(&term_a)
};
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap();
// create 10 segments with 100 tiny docs
for _doc in 0..100 {
let mut doc = Document::default();
doc.add_text(text_field, "a");
index_writer.add_document(doc).unwrap();
}
index_writer.commit().expect("commit failed");
for _doc in 0..100 {
let mut doc = Document::default();
doc.add_text(text_field, "a");
index_writer.add_document(doc).unwrap();
}
// this should create 8 segments and trigger a merge.
index_writer.commit().expect("commit failed");
index_writer.wait_merging_threads().expect("waiting merging thread failed");
assert_eq!(num_docs_containing("a"), 200);
assert_eq!(index.searchable_segments().len(), 1);
}
}
}

View File

@@ -4,17 +4,25 @@ use core::SegmentMeta;
use core::SegmentId;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::fmt::{self, Debug, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
struct SegmentRegisters {
uncommitted: SegmentRegister,
committed: SegmentRegister,
}
#[derive(Eq, PartialEq)]
pub enum CommitState {
Committed,
Uncommitted,
Missing,
}
impl Default for SegmentRegisters {
fn default() -> SegmentRegisters {
SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::default(),
committed: SegmentRegister::default()
}
}
}
@@ -27,6 +35,12 @@ impl Default for SegmentRegisters {
/// changes (merges especially)
pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
// generation is an ever increasing counter that
// is incremented whenever we modify
// the segment manager. It can be useful for debugging
// purposes, and it also acts as a "dirty" marker,
// to detect when the `meta.json` should be written.
generation: AtomicUsize,
}
impl Debug for SegmentManager {
@@ -49,27 +63,47 @@ pub fn get_segment_ready_for_commit(segment_manager: &SegmentManager,) -> (Vec<S
}
impl SegmentManager {
/// Returns whether a segment is committed, uncommitted or missing.
pub fn is_committed(&self, segment_id: SegmentId) -> CommitState {
let lock = self.read();
if lock.uncommitted.contains(segment_id) {
CommitState::Uncommitted
}
else if lock.committed.contains(segment_id) {
CommitState::Committed
}
else {
CommitState::Missing
}
}
pub fn from_segments(segment_metas: Vec<SegmentMeta>) -> SegmentManager {
SegmentManager {
registers: RwLock::new( SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::from(segment_metas),
})
}),
generation: AtomicUsize::default(),
}
}
// Lock poisoning should never happen :
// The lock is acquired and released within this class,
// and the operations cannot panic.
fn read(&self,) -> RwLockReadGuard<SegmentRegisters> {
self.registers.read().expect("Failed to acquire read lock on SegmentManager.")
}
fn write(&self,) -> RwLockWriteGuard<SegmentRegisters> {
self.generation.fetch_add(1, Ordering::Release);
self.registers.write().expect("Failed to acquire write lock on SegmentManager.")
}
pub fn generation(&self,) -> usize {
self.generation.load(Ordering::Acquire)
}
/// Removes all of the uncommitted segments
/// and returns them.
pub fn rollback(&self,) -> Vec<SegmentId> {

View File

@@ -4,7 +4,7 @@ use core::SegmentMeta;
use std::fmt;
use std::fmt::{Debug, Formatter};
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum SegmentState {
Ready,
InMerge,
@@ -105,6 +105,11 @@ impl SegmentRegister {
.map(|segment_entry| segment_entry.clone())
}
pub fn contains(&self, segment_id: SegmentId) -> bool {
self.segment_states.contains_key(&segment_id)
}
pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool {
segment_ids
.iter()

View File

@@ -1,6 +1,7 @@
#![allow(unknown_lints)] // for the clippy lint options
#![allow(module_inception)]
#![feature(optin_builtin_traits)]
#![feature(binary_heap_extras)]
#![feature(conservative_impl_trait)]
#![cfg_attr(test, feature(test))]