baby step 2

This commit is contained in:
Paul Masurel
2017-03-11 16:14:20 +09:00
parent 77c61ddab2
commit 7c971b5d3b
7 changed files with 139 additions and 86 deletions

View File

@@ -15,6 +15,13 @@ pub struct DeleteCursor {
operations: InnerDeleteQueue,
}
impl DeleteCursor {
pub fn go_to_tail(&mut self,) {
let read = self.operations.read().unwrap();
self.cursor = read.len();
}
}
// TODO remove copy
impl Iterator for DeleteCursor {
@@ -37,6 +44,11 @@ impl Iterator for DeleteCursor {
pub struct DeleteQueue(InnerDeleteQueue);
impl DeleteQueue {
pub fn new() -> DeleteQueue {
DeleteQueue::default()
}
pub fn push(&self, delete_operation: DeleteOperation) {
self.0.write().unwrap().push(delete_operation);
}
@@ -61,7 +73,7 @@ mod tests {
#[test]
fn test_deletequeue() {
let delete_queue = DeleteQueue::default();
let delete_queue = DeleteQueue::new();
let make_op = |i: usize| {
let field = Field(1u8);

View File

@@ -10,10 +10,9 @@ use datastruct::stacker::Heap;
use Error;
use Directory;
use fastfield::delete::write_delete_bitset;
use indexer::delete_queue::DeleteCursor;
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
use futures::Canceled;
use futures::Future;
use indexer::delete_queue::DeleteQueue;
use indexer::doc_opstamp_mapping::DocToOpstampMapping;
use indexer::MergePolicy;
use indexer::operation::DeleteOperation;
@@ -117,9 +116,9 @@ pub fn open_index_writer(index: &Index,
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::default();
let delete_queue = DeleteQueue::new();
let segment_updater = SegmentUpdater::new(index.clone())?;
let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor())?;
let mut index_writer = IndexWriter {
@@ -156,12 +155,12 @@ pub fn open_index_writer(index: &Index,
// TODO skip delete operation before teh
// last delete opstamp
pub fn advance_deletes(
segment: &mut Segment,
delete_cursor: DeleteCursor,
doc_opstamps: &DocToOpstampMapping) -> Result<SegmentMeta> {
pub fn advance_deletes(mut segment: Segment, segment_entry: &mut SegmentEntry) -> Result<()> {
{
let doc_opstamps = segment_entry.reset_doc_to_stamp();
let delete_cursor = segment_entry.delete_cursor();
let segment_reader = SegmentReader::open(segment.clone())?;
let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
@@ -172,6 +171,8 @@ pub fn advance_deletes(
for delete_op in delete_cursor {
println!("opstamp {:?}", delete_op.opstamp);
// let's skip operations that have already been deleted.0u32
if let Some(previous_delete_opstamp) = previous_delete_opstamp_opt {
if delete_op.opstamp <= previous_delete_opstamp {
@@ -211,8 +212,10 @@ pub fn advance_deletes(
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
}
segment_entry.set_meta(segment.meta().clone());
Ok(segment.meta().clone())
Ok(())
}
fn index_documents(heap: &mut Heap,
@@ -220,7 +223,8 @@ fn index_documents(heap: &mut Heap,
schema: &Schema,
generation: usize,
document_iterator: &mut Iterator<Item=AddOperation>,
segment_updater: &mut SegmentUpdater)
segment_updater: &mut SegmentUpdater,
delete_cursor: DeleteCursor)
-> Result<bool> {
heap.clear();
let segment_id = segment.id();
@@ -245,9 +249,9 @@ fn index_documents(heap: &mut Heap,
let mut segment_meta = SegmentMeta::new(segment_id);
segment_meta.set_max_doc(num_docs);
let mut segment_entry = SegmentEntry::new(segment_meta);
let mut segment_entry = SegmentEntry::new(segment_meta, delete_cursor);
segment_entry.set_doc_to_opstamp(DocToOpstampMapping::from(doc_opstamps));
segment_updater
.add_segment(generation, segment_entry)
.wait()
@@ -292,6 +296,8 @@ impl IndexWriter {
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
let generation = self.generation;
let mut delete_cursor = self.delete_queue.cursor();
let join_handle: JoinHandle<Result<()>> =
thread::Builder::new()
@@ -299,9 +305,14 @@ impl IndexWriter {
.spawn(move || {
loop {
let mut document_iterator = document_receiver_clone.clone()
.into_iter()
.peekable();
// we consume all previous delete operations.
delete_cursor.go_to_tail();
// the peeking here is to avoid
// creating a new segment's files
@@ -317,7 +328,8 @@ impl IndexWriter {
&schema,
generation,
&mut document_iterator,
&mut segment_updater)?;
&mut segment_updater,
delete_cursor.clone())?;
}
else {
// No more documents.
@@ -480,7 +492,7 @@ impl IndexWriter {
// wait for the segment update thread to have processed the info
self.segment_updater
.commit(self.committed_opstamp, self.delete_queue.cursor())
.commit(self.committed_opstamp)
.wait()?;
self.delete_queue.clear();

View File

@@ -494,6 +494,7 @@ mod tests {
let searcher = index.searcher();
assert_eq!(searcher.segment_readers().len(), 2);
assert_eq!(searcher.num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].num_docs(), 1);
assert_eq!(searcher.segment_readers()[0].max_doc(), 3);
assert_eq!(searcher.segment_readers()[1].num_docs(), 2);

View File

@@ -1,7 +1,9 @@
use indexer::doc_opstamp_mapping::DocToOpstampMapping;
use core::SegmentMeta;
use indexer::delete_queue::DeleteCursor;
use core::SegmentId;
use std::fmt;
use std::mem;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum SegmentState {
@@ -23,20 +25,32 @@ pub struct SegmentEntry {
meta: SegmentMeta,
state: SegmentState,
doc_to_opstamp: DocToOpstampMapping,
delete_cursor: DeleteCursor,
}
impl SegmentEntry {
pub fn new(segment_meta: SegmentMeta) -> SegmentEntry {
pub fn new(segment_meta: SegmentMeta,
delete_cursor: DeleteCursor) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
state: SegmentState::Ready,
doc_to_opstamp: DocToOpstampMapping::None,
delete_cursor: delete_cursor,
}
}
pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping {
&self.doc_to_opstamp
pub fn reset_doc_to_stamp(&mut self,) -> DocToOpstampMapping {
mem::replace(&mut self.doc_to_opstamp, DocToOpstampMapping::None)
}
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
self.meta = segment_meta;
}
pub fn delete_cursor(&mut self) -> &mut DeleteCursor {
&mut self.delete_cursor
}
pub fn state(&self) -> SegmentState {

View File

@@ -8,6 +8,7 @@ use std::path::PathBuf;
use std::collections::hash_set::HashSet;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use std::fmt::{self, Debug, Formatter};
use indexer::delete_queue::DeleteCursor;
#[derive(Default)]
struct SegmentRegisters {
@@ -49,11 +50,11 @@ pub fn get_segments(segment_manager: &SegmentManager,) -> (Vec<SegmentMeta>, Vec
impl SegmentManager {
pub fn from_segments(segment_metas: Vec<SegmentMeta>) -> SegmentManager {
pub fn from_segments(segment_metas: Vec<SegmentMeta>, delete_cursor: DeleteCursor) -> SegmentManager {
SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas),
committed: SegmentRegister::new(segment_metas, delete_cursor),
writing: HashSet::new(),
}),
}
@@ -127,22 +128,19 @@ impl SegmentManager {
segment_ids
}
pub fn commit(&self, segment_metas: Vec<SegmentMeta>) {
let committed_segment_entries = segment_metas
.into_iter()
.map(|segment_meta| {
let segment_id = segment_meta.id();
let mut segment_entry = SegmentEntry::new(segment_meta);
if let Some(state) = self.segment_state(&segment_id) {
segment_entry.set_state(state);
}
segment_entry
})
.collect::<Vec<_>>();
pub fn commit(&self, mut segment_entries: Vec<SegmentEntry>) {
// TODO is still relevant!?
// restore the state of the segment_entries
for segment_entry in &mut segment_entries {
let segment_id = segment_entry.segment_id();
if let Some(state) = self.segment_state(&segment_id) {
segment_entry.set_state(state);
}
}
let mut registers_lock = self.write();
registers_lock.committed.clear();
registers_lock.uncommitted.clear();
for segment_entry in committed_segment_entries {
for segment_entry in segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
}
}
@@ -175,21 +173,23 @@ impl SegmentManager {
registers_lock.uncommitted.add_segment_entry(segment_entry);
}
pub fn end_merge(&self, merged_segment_metas: &[SegmentMeta], merged_segment_meta: SegmentMeta) {
pub fn end_merge(&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
let merged_segment_ids: Vec<SegmentId> = merged_segment_metas.iter().map(|meta| meta.id()).collect();
let merged_segment_entry = SegmentEntry::new(merged_segment_meta);
if registers_lock.uncommitted.contains_all(&merged_segment_ids) {
for segment_id in &merged_segment_ids {
if registers_lock.uncommitted.contains_all(&before_merge_segment_ids) {
for segment_id in before_merge_segment_ids {
registers_lock.uncommitted.remove_segment(segment_id);
}
registers_lock.uncommitted.add_segment_entry(merged_segment_entry);
registers_lock.uncommitted.add_segment_entry(after_merge_segment_entry);
}
else if registers_lock.committed.contains_all(&merged_segment_ids) {
for segment_id in &merged_segment_ids {
else if registers_lock.committed.contains_all(&before_merge_segment_ids) {
for segment_id in before_merge_segment_ids {
registers_lock.committed.remove_segment(segment_id);
}
registers_lock.committed.add_segment_entry(merged_segment_entry);
registers_lock.committed.add_segment_entry(after_merge_segment_entry);
} else {
warn!("couldn't find segment in SegmentManager");
}

View File

@@ -4,6 +4,7 @@ use core::SegmentMeta;
use std::fmt;
use std::fmt::{Debug, Formatter};
use indexer::segment_entry::SegmentEntry;
use indexer::delete_queue::DeleteCursor;
/// The segment register keeps track
/// of the list of segment, their size as well
@@ -95,16 +96,15 @@ impl SegmentRegister {
.start_merge();
}
pub fn new(segment_metas: Vec<SegmentMeta>) -> SegmentRegister {
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: DeleteCursor) -> SegmentRegister {
let mut segment_states = HashMap::new();
for segment_meta in segment_metas {
let segment_id = segment_meta.id();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone());
segment_states.insert(segment_id, segment_entry);
}
SegmentRegister {
segment_states: segment_metas
.into_iter()
.map(|segment_meta| {
let segment_id = segment_meta.id();
let segment_entry = SegmentEntry::new(segment_meta );
(segment_id, segment_entry)
})
.collect(),
segment_states: segment_states
}
}
}
@@ -115,10 +115,13 @@ mod tests {
use indexer::SegmentState;
use core::SegmentId;
use core::SegmentMeta;
use indexer::delete_queue::*;
use super::*;
#[test]
fn test_segment_register() {
let delete_queue = DeleteQueue::new();
let mut segment_register = SegmentRegister::default();
let segment_id_a = SegmentId::generate_random();
let segment_id_b = SegmentId::generate_random();
@@ -126,14 +129,14 @@ mod tests {
{
let segment_meta = SegmentMeta::new(segment_id_a);
let segment_entry = SegmentEntry::new(segment_meta);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state(), SegmentState::Ready);
assert_eq!(segment_register.segment_ids(), vec!(segment_id_a));
{
let segment_meta = SegmentMeta::new(segment_id_b);
let segment_entry = SegmentEntry::new(segment_meta);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state(), SegmentState::Ready);
@@ -145,7 +148,7 @@ mod tests {
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged);
let segment_entry = SegmentEntry::new(segment_meta_merged);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor());
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(segment_register.segment_ids(), vec!(segment_id_merged));

View File

@@ -103,9 +103,9 @@ struct InnerSegmentUpdater {
impl SegmentUpdater {
pub fn new(index: Index) -> Result<SegmentUpdater> {
pub fn new(index: Index, delete_cursor: DeleteCursor) -> Result<SegmentUpdater> {
let segments = index.segments()?;
let segment_manager = SegmentManager::from_segments(segments);
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
Ok(
SegmentUpdater(Arc::new(InnerSegmentUpdater {
pool: CpuPool::new(1),
@@ -168,23 +168,22 @@ impl SegmentUpdater {
}
}
fn purge_deletes(&self, delete_cursor: DeleteCursor) -> Result<Vec<SegmentMeta>> {
let mut segment_metas = vec!();
for segment_entry in self.0.segment_manager.segment_entries() {
let mut segment = self.0.index.segment(segment_entry.meta().clone());
let delete_cursor = delete_cursor.clone();
// TODO delete cursor skip...
let segment_meta = advance_deletes(&mut segment, delete_cursor, segment_entry.doc_to_opstamp())?;
segment_metas.push(segment_meta);
fn purge_deletes(&self) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.0.index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry)?;
}
Ok(segment_metas)
Ok(segment_entries)
}
pub fn commit(&self, opstamp: u64, delete_cursor: DeleteCursor) -> impl Future<Item=(), Error=Error> {
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=Error> {
self.run_async(move |segment_updater| {
let segment_metas = segment_updater.purge_deletes(delete_cursor).expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_metas);
let segment_entries = segment_updater
.purge_deletes()
.expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_entries);
let mut index = segment_updater.0.index.clone();
{
let directory = index.directory();
@@ -226,22 +225,20 @@ impl SegmentUpdater {
let ref index = segment_updater_clone.0.index;
let schema = index.schema();
let mut segment_metas = vec!();
let mut segment_entries = vec!();
for segment_id in &segment_ids_vec {
if let Some(segment_entry) = segment_updater_clone.0
if let Some(mut segment_entry) = segment_updater_clone.0
.segment_manager
.segment_entry(segment_id) {
// TODOS make sure that the segment are in the same
// position with regard to deletes.
// let mut segment = index.segment(segment_entry.meta().clone());
// let segment_meta = advance_deletes(
// &mut segment,
// &delete_operations,
// segment_entry.doc_to_opstamp())?;
let segment_meta = segment_entry.meta().clone();
segment_metas.push(segment_meta);
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, &mut segment_entry)?;
segment_entries.push(segment_entry);
}
else {
error!("Error, had to abort merge as some of the segment is not managed anymore.a");
@@ -249,10 +246,13 @@ impl SegmentUpdater {
}
}
let segments: Vec<Segment> = segment_metas
let delete_cursor = segment_entries[0].delete_cursor().clone();
let segments: Vec<Segment> = segment_entries
.iter()
.cloned()
.map(|segment_meta| index.segment(segment_meta))
.map(|segment_entry| {
index.segment(segment_entry.meta().clone())
})
.collect();
// An IndexMerger is like a "view" of our merged segments.
@@ -262,16 +262,27 @@ impl SegmentUpdater {
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed");
let segment_serializer =
SegmentSerializer
::for_segment(&mut merged_segment)
.expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_max_doc(num_docs);
let before_merged_segment_ids = segment_entries
.iter()
.map(|segment_entry| segment_entry.segment_id())
.collect::<Vec<_>>();
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor);
segment_updater_clone
.end_merge(segment_metas.clone(), segment_meta.clone())
.end_merge(before_merged_segment_ids, after_merge_segment_entry)
.wait()
.unwrap();
merging_future_send.complete(segment_meta);
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
Ok(())
@@ -296,11 +307,11 @@ impl SegmentUpdater {
fn end_merge(&self,
merged_segment_metas: Vec<SegmentMeta>,
segment_meta: SegmentMeta) -> impl Future<Item=(), Error=Error> {
before_merge_segment_ids: Vec<SegmentId>,
after_merge_segment_entry: SegmentEntry) -> impl Future<Item=(), Error=Error> {
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.end_merge(&merged_segment_metas, segment_meta);
segment_updater.0.segment_manager.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
let mut directory = segment_updater.0.index.directory().box_clone();
let segment_metas = segment_updater.0.segment_manager.committed_segment_metas();
save_metas(