bug/4 Introduce segment_updater

This commit is contained in:
Paul Masurel
2016-10-15 12:16:30 +09:00
parent 746d6284d9
commit 0f246ba908
11 changed files with 332 additions and 202 deletions

View File

@@ -10,7 +10,10 @@ use Score;
// Rust heap is a max-heap and we need a min heap.
#[derive(Clone, Copy)]
struct GlobalScoredDoc(Score, DocAddress);
struct GlobalScoredDoc {
score: Score,
doc_address: DocAddress
}
impl PartialOrd for GlobalScoredDoc {
fn partial_cmp(&self, other: &GlobalScoredDoc) -> Option<Ordering> {
@@ -21,9 +24,9 @@ impl PartialOrd for GlobalScoredDoc {
impl Ord for GlobalScoredDoc {
#[inline]
fn cmp(&self, other: &GlobalScoredDoc) -> Ordering {
other.0.partial_cmp(&self.0)
other.score.partial_cmp(&self.score)
.unwrap_or(
other.1.cmp(&self.1)
other.doc_address.cmp(&self.doc_address)
)
}
}
@@ -87,7 +90,7 @@ impl TopCollector {
.collect();
scored_docs.sort();
scored_docs.into_iter()
.map(|GlobalScoredDoc(score, doc_address)| (score, doc_address))
.map(|GlobalScoredDoc {score, doc_address}| (score, doc_address))
.collect()
}
@@ -110,13 +113,17 @@ impl Collector for TopCollector {
if self.at_capacity() {
// It's ok to unwrap as long as a limit of 0 is forbidden.
let limit_doc: GlobalScoredDoc = *self.heap.peek().expect("Top collector with size 0 is forbidden");
if limit_doc.0 < scored_doc.score() {
let wrapped_doc = GlobalScoredDoc(scored_doc.score(), DocAddress(self.segment_id, scored_doc.doc()));
self.heap.replace(wrapped_doc);
if limit_doc.score < scored_doc.score() {
let mut mut_head = self.heap.peek_mut().unwrap();
mut_head.score = scored_doc.score();
mut_head.doc_address = DocAddress(self.segment_id, scored_doc.doc());
}
}
else {
let wrapped_doc = GlobalScoredDoc(scored_doc.score(), DocAddress(self.segment_id, scored_doc.doc()));
let wrapped_doc = GlobalScoredDoc {
score: scored_doc.score(),
doc_address: DocAddress(self.segment_id, scored_doc.doc())
};
self.heap.push(wrapped_doc);
}

View File

@@ -0,0 +1,71 @@
use std::sync::atomic::{Ordering, AtomicUsize};
use std::sync::Arc;
pub struct LivingCounterLatch {
counter: Arc<AtomicUsize>,
}
impl Default for LivingCounterLatch {
fn default() -> LivingCounterLatch {
LivingCounterLatch {
counter: Arc::new(AtomicUsize::default()),
}
}
}
impl LivingCounterLatch {
/// Returns true if all the living copies of the
/// living counter latch (apart from self) are dead.
pub fn is_zero(&self,) -> bool {
self.counter.load(Ordering::Acquire) == 0
}
}
impl Clone for LivingCounterLatch {
fn clone(&self,) -> LivingCounterLatch {
self.counter.fetch_add(1, Ordering::SeqCst);
LivingCounterLatch {
counter: self.counter.clone(),
}
}
}
impl Drop for LivingCounterLatch {
fn drop(&mut self,) {
self.counter.fetch_sub(1, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{Ordering, AtomicUsize};
use std::sync::Arc;
use std::thread;
use std::mem::drop;
const NUM_THREADS: usize = 10;
const COUNT_PER_THREAD: usize = 100;
#[test]
fn test_living_counter_latch() {
let counter = Arc::new(AtomicUsize::default());
let living_counter_latch = LivingCounterLatch::default();
// spawn 10 thread
for _ in 0..NUM_THREADS {
let living_counter_latch_clone = living_counter_latch.clone();
let counter_clone = counter.clone();
thread::spawn(move || {
for _ in 0..COUNT_PER_THREAD {
counter_clone.fetch_add(1, Ordering::SeqCst);
}
drop(living_counter_latch_clone);
});
}
while !living_counter_latch.is_zero() {};
assert_eq!(counter.load(Ordering::Acquire), NUM_THREADS * COUNT_PER_THREAD)
}
}

View File

@@ -1,12 +1,14 @@
mod serialize;
mod timer;
mod vint;
mod living_counter_latch;
pub use self::serialize::BinarySerializable;
pub use self::timer::Timing;
pub use self::timer::TimerTree;
pub use self::timer::OpenTimer;
pub use self::vint::VInt;
pub use self::living_counter_latch::LivingCounterLatch;
use std::io;
@@ -24,4 +26,4 @@ pub trait HasLen {
fn is_empty(&self,) -> bool {
self.len() == 0
}
}
}

View File

@@ -16,7 +16,6 @@ use core::SegmentReader;
use super::pool::Pool;
use super::pool::LeasedItem;
use indexer::SegmentManager;
use indexer::{MergePolicy, SimpleMergePolicy};
use core::IndexMeta;
use core::META_FILEPATH;
use super::segment::create_segment;
@@ -41,9 +40,9 @@ fn load_metas(directory: &Directory) -> Result<IndexMeta> {
Ok(loaded_meta)
}
pub fn set_metas(index: &mut Index, docstamp: u64) {
index.docstamp = docstamp;
}
// pub fn set_metas(index: &mut Index, docstamp: u64) {
// index.docstamp = docstamp;
// }
/// Tantivy's Search Index
pub struct Index {
@@ -228,12 +227,6 @@ impl Index {
self.searcher_pool.acquire()
}
pub fn get_merge_policy(&self,) -> Box<MergePolicy> {
// TODO load that from conf.
Box::new(
SimpleMergePolicy::default()
)
}
}

View File

@@ -11,6 +11,9 @@ use core::Index;
use std::result;
use directory::error::{FileError, OpenWriteError};
/// A segment is a piece of the index.
#[derive(Clone)]
pub struct Segment {
index: Index,

View File

@@ -1,18 +1,16 @@
use schema::Schema;
use schema::Document;
use indexer::SegmentSerializer;
use core::SerializableSegment;
use core::Index;
use Directory;
use core::SerializableSegment;
use core::Segment;
use std::thread::JoinHandle;
use rustc_serialize::json;
use indexer::SegmentWriter;
use indexer::MergeCandidate;
use std::clone::Clone;
use std::io;
use std::io::Write;
use indexer::MergePolicy;
use std::thread;
use std::mem;
use indexer::merger::IndexMerger;
@@ -23,9 +21,10 @@ use chan;
use core::SegmentMeta;
use core::IndexMeta;
use core::META_FILEPATH;
use super::segment_updater::{SegmentUpdater, SegmentUpdate, SegmentUpdateSender};
use std::time::Duration;
use super::super::core::index::get_segment_manager;
use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit};
use super::segment_manager::{CommitState, SegmentManager};
use Result;
use Error;
@@ -43,9 +42,6 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
type DocumentSender = chan::Sender<Document>;
type DocumentReceiver = chan::Receiver<Document>;
type SegmentUpdateSender = chan::Sender<SegmentUpdate>;
type SegmentUpdateReceiver = chan::Receiver<SegmentUpdate>;
fn create_metas(segment_manager: &SegmentManager,
@@ -98,7 +94,7 @@ pub struct IndexWriter {
document_receiver: DocumentReceiver,
document_sender: DocumentSender,
segment_update_sender: SegmentUpdateSender,
segment_update_thread: JoinHandle<()>,
@@ -142,171 +138,14 @@ fn index_documents(heap: &mut Heap,
}
#[derive(Debug)]
pub enum SegmentUpdate {
AddSegment(SegmentMeta),
EndMerge(Vec<SegmentId>, SegmentMeta),
CancelGeneration,
NewGeneration,
Terminate,
Commit(u64),
}
impl SegmentUpdate {
// Process a single segment update.
pub fn process(
self,
index: &Index,
segment_manager: &SegmentManager,
is_cancelled_generation: &mut bool) -> bool {
info!("Segment update: {:?}", self);
match self {
SegmentUpdate::AddSegment(segment_meta) => {
if !*is_cancelled_generation {
segment_manager.add_segment(segment_meta);
}
else {
// rollback has been called and this
// segment actually belong to the
// documents that have been dropped.
//
// Let's just remove its files.
index.delete_segment(segment_meta.segment_id);
}
}
SegmentUpdate::EndMerge(segment_ids, segment_meta) => {
segment_manager.end_merge(&segment_ids, &segment_meta);
for segment_id in segment_ids {
index.delete_segment(segment_id);
}
}
SegmentUpdate::CancelGeneration => {
// Called during rollback. The segment
// that will arrive will be ignored
// until a NewGeneration is update arrives.
*is_cancelled_generation = true;
}
SegmentUpdate::NewGeneration => {
// After rollback, we can resume
// indexing new documents.
*is_cancelled_generation = false;
}
SegmentUpdate::Commit(docstamp) => {
segment_manager.commit(docstamp);
}
SegmentUpdate::Terminate => {
return true;
}
}
return false;
}
}
fn consider_merge_options(segment_manager: &SegmentManager, merge_policy: &MergePolicy) -> Vec<MergeCandidate> {
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independantly.
let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments);
merge_candidates.extend_from_slice(&merge_policy.compute_merge_candidates(&committed_segments)[..]);
merge_candidates
}
// Consumes the `segment_update_receiver` channel
// for segment updates and apply them.
//
// Using a channel ensures that all of the updates
// happen in the same thread, and makes
// the implementation of rollback and commit
// trivial.
fn process_segment_updates(mut index: Index,
segment_manager: &SegmentManager,
segment_update_receiver: SegmentUpdateReceiver,
segment_update_sender: SegmentUpdateSender) {
let mut option_segment_update_sender = Some(segment_update_sender);
let mut is_cancelled_generation = false;
let mut generation = segment_manager.generation();
let merge_policy = index.get_merge_policy();
for segment_update in segment_update_receiver {
if segment_update.process(
&index,
segment_manager,
&mut is_cancelled_generation) {
option_segment_update_sender = None;
};
let new_generation = segment_manager.generation();
// we check the generation number as if it was
// dirty-bit. If the value is different
// to our generation, then the segment_manager has
// been update updated and we need to
// - save meta.json
// - update the searchers
// - consider possible segment merge
if generation != new_generation {
generation = new_generation;
// saving the meta file.
save_metas(
segment_manager,
index.schema(),
index.docstamp(),
index.directory_mut()).expect("Could not save metas.");
// update the searchers so that they eventually will
// use the new segments.
// TODO eventually have this work through watching meta.json
// so that an external process stays up to date as well.
index.load_searchers().expect("Could not load new searchers.");
if let Some(ref segment_update_sender) = option_segment_update_sender {
for MergeCandidate(segment_ids) in consider_merge_options(&segment_manager, &*merge_policy) {
segment_manager.start_merge(&segment_ids);
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
}).expect("Failed to spawn merge thread");
}
}
}
}
}
impl IndexWriter {
/// The index writer
pub fn wait_merging_threads(mut self) -> Result<()> {
self.segment_update_sender.send(SegmentUpdate::Terminate);
drop(self.segment_update_sender);
// this will stop the indexing thread,
@@ -316,7 +155,12 @@ impl IndexWriter {
let mut v = Vec::new();
mem::swap(&mut v, &mut self.workers_join_handle);
for join_handle in v {
join_handle.join().expect("Indexer has failed");
try!(
join_handle
.join()
.expect("Indexing Worker thread panicked")
.map_err(|e| Error::ErrorInThread(format!("Error in indexing worker thread. {:?}", e)))
);
}
drop(self.workers_join_handle);
self.segment_update_thread
@@ -333,10 +177,9 @@ impl IndexWriter {
fn add_indexing_worker(&mut self,) -> Result<()> {
let index = self.index.clone();
let schema = self.index.schema();
let document_receiver_clone = self.document_receiver.clone();
let mut segment_update_sender = self.segment_update_sender.clone();
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
let join_handle: JoinHandle<Result<()>> = try!(thread::Builder::new()
@@ -400,16 +243,13 @@ impl IndexWriter {
panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT));
}
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::sync(0);
let segment_manager = get_segment_manager(index);
let index_clone = index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
let segment_update_thread = try!(thread::Builder::new().name("segment_update".to_string()).spawn(move || {
process_segment_updates(index_clone, &*segment_manager, segment_update_receiver, segment_update_sender_clone)
}));
let segment_updater = SegmentUpdater::new(index.clone());
let segment_update_sender = segment_updater.update_channel().expect("This should never happen"); // TODO remove expect
let segment_update_thread = segment_updater.start();
let mut index_writer = IndexWriter {
heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
index: index.clone(),

View File

@@ -1,10 +1,11 @@
use core::SegmentId;
use core::SegmentMeta;
use std::marker;
#[derive(Debug, Clone)]
pub struct MergeCandidate(pub Vec<SegmentId>);
pub trait MergePolicy {
pub trait MergePolicy: marker::Send {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate>;
}

View File

@@ -7,7 +7,7 @@ mod simple_merge_policy;
mod segment_register;
mod segment_writer;
mod segment_manager;
mod segment_updater;
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;

View File

@@ -0,0 +1,214 @@
use chan;
use common::LivingCounterLatch;
use core::Index;
use core::Segment;
use core::SegmentId;
use core::SegmentMeta;
use core::SerializableSegment;
use indexer::{MergePolicy, SimpleMergePolicy};
use indexer::index_writer::save_metas;
use indexer::MergeCandidate;
use indexer::merger::IndexMerger;
use indexer::SegmentSerializer;
use std::thread;
use std::thread::JoinHandle;
use std::sync::Arc;
use super::segment_manager::{SegmentManager, get_segment_ready_for_commit};
use super::super::core::index::get_segment_manager;
pub type SegmentUpdateSender = chan::Sender<SegmentUpdate>;
pub type SegmentUpdateReceiver = chan::Receiver<SegmentUpdate>;
#[derive(Debug)]
pub enum SegmentUpdate {
AddSegment(SegmentMeta),
EndMerge(Vec<SegmentId>, SegmentMeta),
CancelGeneration,
NewGeneration,
Terminate,
Commit(u64),
}
/// The segment updater is in charge of
/// receiving different SegmentUpdate
/// - indexing threads are sending new segments
/// - merging threads are sending merge operations
/// - the index writer sends "terminate"
pub struct SegmentUpdater {
index: Index,
is_cancelled_generation: bool,
segment_update_receiver: SegmentUpdateReceiver,
option_segment_update_sender: Option<SegmentUpdateSender>,
segment_manager_arc: Arc<SegmentManager>,
merge_policy: Box<MergePolicy>,
}
impl SegmentUpdater {
pub fn new(index: Index) -> SegmentUpdater {
let segment_manager_arc = get_segment_manager(&index);
let (segment_update_sender, segment_update_receiver): (SegmentUpdateSender, SegmentUpdateReceiver) = chan::sync(0);
SegmentUpdater {
index: index,
is_cancelled_generation: false,
option_segment_update_sender: Some(segment_update_sender),
segment_update_receiver: segment_update_receiver,
segment_manager_arc: segment_manager_arc,
merge_policy: Box::new(SimpleMergePolicy::default()), // TODO make that configurable
}
}
pub fn update_channel(&self,) -> Option<SegmentUpdateSender> {
self.option_segment_update_sender.clone()
}
fn consider_merge_options(&self,) -> Vec<MergeCandidate> {
let segment_manager = self.segment_manager();
let (committed_segments, uncommitted_segments) = get_segment_ready_for_commit(segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independantly.
let mut merge_candidates = self.merge_policy.compute_merge_candidates(&uncommitted_segments);
let committed_merge_candidates = self.merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
merge_candidates
}
fn segment_manager(&self,) -> &SegmentManager {
&*self.segment_manager_arc
}
pub fn start(self,) -> JoinHandle<()> {
thread::Builder::new()
.name("segment_update".to_string())
.spawn(move || {
self.process();
})
.expect("Failed to start segment updater thread.")
}
fn process(mut self,) {
let segment_manager = self.segment_manager_arc.clone();
let living_threads = LivingCounterLatch::default();
let segment_updates = self.segment_update_receiver.clone();
for segment_update in segment_updates {
// we check the generation number as if it was
// dirty-bit. If the value is different
// to our generation, then the segment_manager has
// been update updated and we need to
// - save meta.json
// - update the searchers
// - consider possible segment merge
let generation_before_update = segment_manager.generation();
self.process_one(segment_update);
if generation_before_update != segment_manager.generation() {
// saving the meta file.
save_metas(
&*segment_manager,
self.index.schema(),
self.index.docstamp(),
self.index.directory_mut()).expect("Could not save metas.");
// update the searchers so that they eventually will
// use the new segments.
// TODO eventually have this work through watching meta.json
// so that an external process stays up to date as well.
self.index.load_searchers().expect("Could not load new searchers.");
if let Some(ref segment_update_sender) = self.option_segment_update_sender {
for MergeCandidate(segment_ids) in self.consider_merge_options() {
segment_manager.start_merge(&segment_ids);
let living_threads_clone = living_threads.clone();
let index_clone = self.index.clone();
let segment_update_sender_clone = segment_update_sender.clone();
thread::Builder::new().name(format!("merge_thread_{:?}", segment_ids[0])).spawn(move || {
info!("Start merge: {:?}", segment_ids);
let schema = index_clone.schema();
let segments: Vec<Segment> = segment_ids
.iter()
.map(|&segment_id| index_clone.segment(segment_id))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).unwrap();
let mut merged_segment = index_clone.new_segment();
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).unwrap();
let num_docs = merger.write(segment_serializer).unwrap();
let segment_meta = SegmentMeta {
segment_id: merged_segment.id(),
num_docs: num_docs,
};
let segment_update = SegmentUpdate::EndMerge(segment_ids, segment_meta);
segment_update_sender_clone.send(segment_update);
drop(living_threads_clone);
}).expect("Failed to spawn merge thread");
}
}
}
}
}
// Process a single segment update.
pub fn process_one(
&mut self,
segment_update: SegmentUpdate) {
info!("Segment update: {:?}", segment_update);
match segment_update {
SegmentUpdate::AddSegment(segment_meta) => {
if !self.is_cancelled_generation {
self.segment_manager().add_segment(segment_meta);
}
else {
// rollback has been called and this
// segment actually belong to the
// documents that have been dropped.
//
// Let's just remove its files.
self.index.delete_segment(segment_meta.segment_id);
}
}
SegmentUpdate::EndMerge(segment_ids, segment_meta) => {
self.segment_manager().end_merge(&segment_ids, &segment_meta);
for segment_id in segment_ids {
self.index.delete_segment(segment_id);
}
}
SegmentUpdate::CancelGeneration => {
// Called during rollback. The segment
// that will arrive will be ignored
// until a NewGeneration is update arrives.
self.is_cancelled_generation = true;
}
SegmentUpdate::NewGeneration => {
// After rollback, we can resume
// indexing new documents.
self.is_cancelled_generation = false;
}
SegmentUpdate::Commit(docstamp) => {
self.segment_manager().commit(docstamp);
}
SegmentUpdate::Terminate => {
self.option_segment_update_sender = None;
}
}
}
}

View File

@@ -2,7 +2,6 @@
#![allow(module_inception)]
#![feature(optin_builtin_traits)]
#![feature(binary_heap_extras)]
#![feature(conservative_impl_trait)]
#![cfg_attr(test, feature(test))]
#![cfg_attr(test, feature(step_by))]

View File

@@ -198,7 +198,7 @@ impl<TPostings: Postings, TAccumulator: MultiTermAccumulator> DocSet for DAATMul
}
}
self.advance_head();
while let Some(&HeapItem { doc: doc, ord: ord}) = self.queue.peek() {
while let Some(&HeapItem {doc, ord}) = self.queue.peek() {
if doc == self.doc {
let peek_ord: usize = ord as usize;
let peek_tf = self.term_frequencies[peek_ord];