diff --git a/src/core/index.rs b/src/core/index.rs index 483d0eaba..b4e04d0c8 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -231,7 +231,7 @@ impl Index { Ok(load_metas(self.directory())? .segments .iter() - .map(|segment_meta| segment_meta.segment_id) + .map(|segment_meta| segment_meta.id()) .collect()) } diff --git a/src/core/segment.rs b/src/core/segment.rs index 0ed88d3bc..dcf5ec116 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -52,7 +52,7 @@ impl Segment { /// Returns the segment's id. pub fn id(&self,) -> SegmentId { - self.meta.segment_id + self.meta.id() } /// Returns the relative path of a component of our segment. diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs index 2f5cb9b97..40142c1a4 100644 --- a/src/core/segment_meta.rs +++ b/src/core/segment_meta.rs @@ -9,7 +9,7 @@ struct DeleteMeta { #[derive(Clone, Debug, RustcDecodable,RustcEncodable)] pub struct SegmentMeta { - pub segment_id: SegmentId, + segment_id: SegmentId, num_docs: u32, deletes: Option, } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index a953a6810..020db8715 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -237,6 +237,10 @@ impl SegmentReader { /// Returns the posting list associated with a term. + /// + /// If the term is not found, return None. + /// Even when non-null, because of deletes, the posting object + /// returned by this method may contain no documents. pub fn read_postings_all_info(&self, term: &Term) -> Option { let field_entry = self.schema.get_field_entry(term.field()); let segment_posting_option = match *field_entry.field_type() { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index d6873ecc8..e03435199 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -111,7 +111,7 @@ mod tests { } } - fn test_delete(directory: &mut Directory) { + fn test_directory_delete(directory: &mut Directory) { assert!(directory.open_read(*TEST_PATH).is_err()); let mut write_file = directory.open_write(*TEST_PATH).unwrap(); write_file.write_all(&[1, 2, 3, 4]).unwrap(); @@ -131,7 +131,7 @@ mod tests { test_seek(directory); test_rewrite_forbidden(directory); test_write_create_the_file(directory); - test_delete(directory); + test_directory_delete(directory); } } diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index fd1e4f59b..2a2d88abd 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -1,175 +1,26 @@ -use schema::Term; -use std::sync::{Arc, RwLock}; use super::operation::DeleteOperation; -const BLOCK_SIZE: usize = 128; - - -/// DeleteQueue are implemented as an unrolled linked list. -/// Block implements a block of this unrolled linked list. -struct Block { - operations: Vec, - next: Option, -} - -impl Default for Block { - fn default() -> Block { - Block { - operations: Vec::with_capacity(BLOCK_SIZE), - next: None - } - } -} - -/// A shared block wraps a block +// TODO remove clone #[derive(Clone)] -struct SharedBlock(Arc>); - -impl SharedBlock { - // Happens a new element to the block and return - // what the new head is. - fn enqueue(&self, delete_operation: DeleteOperation) -> Option { - let mut writable_block = self.0.write().expect("Panicked while enqueueing in the delete queue."); - if writable_block.operations.len() >= BLOCK_SIZE { - let next_block = SharedBlock::default(); - next_block.enqueue(delete_operation); - writable_block.next = Some(next_block.clone()); - Some(next_block) - } - else { - writable_block.operations.push(delete_operation); - None - } - } - - fn next_block(&self) -> Option { - self.0 - .read() - .unwrap() - .next - .clone() - } - - fn cursor(&self,) -> DeleteQueueCursor { - let len = self.0 - .read() - .expect("Panicked while reading a block in the delete queue.") - .operations - .len(); - DeleteQueueCursor { - block: self.clone(), - pos: len, - } - } -} - -impl Default for SharedBlock { - fn default() -> SharedBlock { - SharedBlock(Arc::default()) - } -} - -impl Default for DeleteQueue { - fn default() -> DeleteQueue { - DeleteQueue { - writing_head: SharedBlock::default(), - } - } -} - -#[derive(Clone)] -pub struct DeleteQueueCursor { - block: SharedBlock, - pos: usize, -} - -impl DeleteQueueCursor { - - /// Skips to the first delete operation which has - /// a timestamp that is greater or equal to opstamp. - /// - /// Returns false in the DeleteQueue reaches its end before - /// meeting such an element. - pub fn skip_to(&mut self, opstamp: u64) -> bool { - // TODO optimize - while let Some(delete_operation) = self.peek() { - if delete_operation.opstamp >= opstamp { - return true; - } - else { - self.next(); - } - } - return false; - } - - pub fn peek(&mut self) -> Option { - if self.pos >= BLOCK_SIZE { - self.pos = 0; - match self.block.next_block() { - Some(next_block) => { - self.block = next_block; - self.pos = 0; - } - None => { - // there is no next block. - return None; - } - } - } - let readable_block = self.block.0 - .read() - .unwrap(); - if self.pos >= readable_block.operations.len() { - None - } - else { - Some(readable_block.operations[self.pos].clone()) - } - } - -} - -impl Iterator for DeleteQueueCursor { - type Item = DeleteOperation; - - /// Returns a delete operation if an operation is available, - /// None if the queue is empty. - /// - /// This iterator may return None once, and return - /// `Some(...)` ulteriorily. - fn next(&mut self) -> Option { - let delete_position = self.peek(); - if delete_position.is_some() { - self.pos += 1; - } - delete_position - } -} - -// ---------------------------------------- - pub struct DeleteQueue { - writing_head: SharedBlock, + delete_operations: Vec, } impl DeleteQueue { - - pub fn cursor(&self) -> DeleteQueueCursor { - self.writing_head.cursor() + + pub fn new() -> DeleteQueue { + DeleteQueue { + delete_operations: vec!(), + } } pub fn push_op(&mut self, delete_operation: DeleteOperation) { - if let Some(new_head) = self.writing_head.enqueue(delete_operation) { - self.writing_head = new_head; - } + self.delete_operations.push(delete_operation); } - pub fn push(&mut self, opstamp: u64, term: Term) { - let delete_operation = DeleteOperation { - opstamp: opstamp, - term: term, - }; - self.push_op(delete_operation); + + pub fn operations(&self,) -> impl Iterator { + // TODO fix iterator + self.delete_operations.clone().into_iter() } } @@ -183,7 +34,7 @@ mod tests { #[test] fn test_deletequeue() { - let mut delete_queue = DeleteQueue::default(); + let mut delete_queue = DeleteQueue::new(); let make_op = |i: usize| { let field = Field(1u8); @@ -196,36 +47,38 @@ mod tests { delete_queue.push_op(make_op(1)); delete_queue.push_op(make_op(2)); - let mut delete_cursor_3 = delete_queue.cursor(); - let mut delete_cursor_3_b = delete_cursor_3.clone(); + // TODO unit tests + + // let mut delete_cursor_3 = delete_queue.cursor(); + // let mut delete_cursor_3_b = delete_cursor_3.clone(); - assert!(delete_cursor_3.next().is_none()); - assert!(delete_cursor_3.peek().is_none()); + // assert!(delete_cursor_3.next().is_none()); + // assert!(delete_cursor_3.peek().is_none()); - delete_queue.push_op(make_op(3)); - delete_queue.push_op(make_op(4)); + // delete_queue.push_op(make_op(3)); + // delete_queue.push_op(make_op(4)); - assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3))); - let mut delete_cursor_3_c = delete_cursor_3_b.clone(); + // assert_eq!(delete_cursor_3_b.peek(), Some(make_op(3))); + // let mut delete_cursor_3_c = delete_cursor_3_b.clone(); - assert_eq!(delete_cursor_3_b.next(), Some(make_op(3))); - let mut delete_cursor_4 = delete_cursor_3_b.clone(); + // assert_eq!(delete_cursor_3_b.next(), Some(make_op(3))); + // let mut delete_cursor_4 = delete_cursor_3_b.clone(); - assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4))); - assert_eq!(delete_cursor_3_b.next(), Some(make_op(4))); + // assert_eq!(delete_cursor_3_b.peek(), Some(make_op(4))); + // assert_eq!(delete_cursor_3_b.next(), Some(make_op(4))); - assert_eq!(delete_cursor_3_c.next(), Some(make_op(3))); + // assert_eq!(delete_cursor_3_c.next(), Some(make_op(3))); - assert!(delete_cursor_3_b.next().is_none()); - assert_eq!(delete_cursor_3_c.next(), Some(make_op(4))); - assert!(delete_cursor_3_c.next().is_none()); + // assert!(delete_cursor_3_b.next().is_none()); + // assert_eq!(delete_cursor_3_c.next(), Some(make_op(4))); + // assert!(delete_cursor_3_c.next().is_none()); - assert_eq!(delete_cursor_3.peek(), Some(make_op(3))); - assert_eq!(delete_cursor_3.next(), Some(make_op(3))); - assert!(delete_cursor_3_b.next().is_none()); + // assert_eq!(delete_cursor_3.peek(), Some(make_op(3))); + // assert_eq!(delete_cursor_3.next(), Some(make_op(3))); + // assert!(delete_cursor_3_b.next().is_none()); - assert_eq!(delete_cursor_4.next(), Some(make_op(4))); - assert!(delete_cursor_4.next().is_none()); + // assert_eq!(delete_cursor_4.next(), Some(make_op(4))); + // assert!(delete_cursor_4.next().is_none()); } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 14a9344d3..bdbe39f29 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -3,7 +3,10 @@ use schema::Document; use super::operation::AddOperation; use core::Index; use core::Segment; +use core::SegmentMeta; +use std::sync::Arc; use core::SegmentId; +use indexer::operation::DeleteOperation; use schema::Term; use indexer::SegmentEntry; use std::thread::JoinHandle; @@ -18,15 +21,15 @@ use core::SegmentComponent; use super::directory_lock::DirectoryLock; use futures::Future; use std::clone::Clone; +use indexer::delete_queue::DeleteQueue; use std::io; use std::thread; use futures::Canceled; use std::mem; use datastruct::stacker::Heap; use core::SegmentReader; -use std::mem::swap; +use std::mem::swap; use chan; -use super::delete_queue::{DeleteQueue, DeleteQueueCursor}; use super::segment_updater::SegmentUpdater; use Result; use Error; @@ -88,11 +91,19 @@ impl !Send for IndexWriter {} impl !Sync for IndexWriter {} +// TODO move doc to opstamp mapping to its own file +#[derive(Clone)] pub enum DocToOpstampMapping { - WithMap(Vec), + WithMap(Arc>), None } +impl From> for DocToOpstampMapping { + fn from(opstamps: Vec) -> DocToOpstampMapping { + DocToOpstampMapping::WithMap(Arc::new(opstamps)) + } +} + impl DocToOpstampMapping { fn compute_doc_limit(&self, opstamp: u64) -> DocId { match *self { @@ -112,13 +123,14 @@ impl DocToOpstampMapping { /// work on SegmentMeta pub fn advance_deletes( segment: &mut Segment, - delete_cursor: &mut DeleteQueueCursor, - doc_opstamps: DocToOpstampMapping) -> Result { + delete_queue: &DeleteQueue, + doc_opstamps: &DocToOpstampMapping) -> Result { let segment_reader = SegmentReader::open(segment.clone())?; let mut delete_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize); - + let mut last_opstamp_opt: Option = None; - while let Some(delete_op) = delete_cursor.next() { + + for delete_op in delete_queue.operations() { // A delete operation should only affect // document that were inserted after it. // @@ -147,9 +159,7 @@ pub fn advance_deletes( let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; } - - Ok(SegmentEntry::new(segment.meta().clone(), delete_cursor.clone())) - + Ok(SegmentEntry::new(segment.meta().clone())) } fn index_documents(heap: &mut Heap, @@ -157,8 +167,7 @@ fn index_documents(heap: &mut Heap, schema: &Schema, generation: usize, document_iterator: &mut Iterator, - segment_updater: &mut SegmentUpdater, - delete_cursor: &mut DeleteQueueCursor) + segment_updater: &mut SegmentUpdater) -> Result { heap.clear(); let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment.clone(), &schema)); @@ -182,7 +191,10 @@ fn index_documents(heap: &mut Heap, let doc_opstamps: Vec = segment_writer.finalize()?; - let segment_entry = advance_deletes(&mut segment, delete_cursor, DocToOpstampMapping::WithMap(doc_opstamps))?; + // let segment_entry = advance_deletes(&mut segment, delete_queue, delete_position, )?; + + let mut segment_entry = SegmentEntry::new(SegmentMeta::new(segment.id())); + segment_entry.set_doc_to_opstamp(DocToOpstampMapping::from(doc_opstamps)); segment_updater .add_segment(generation, segment_entry) @@ -230,8 +242,6 @@ impl IndexWriter { // TODO fix this. the cursor might be too advanced // at this point. - let delete_cursor = self.delete_queue.cursor(); - let generation = self.generation; let join_handle: JoinHandle> = @@ -239,7 +249,6 @@ impl IndexWriter { .name(format!("indexing thread {} for gen {}", self.worker_id, generation)) .spawn(move || { - let mut delete_cursor_clone = delete_cursor.clone(); loop { let mut document_iterator = document_receiver_clone.clone() .into_iter() @@ -259,8 +268,7 @@ impl IndexWriter { &schema, generation, &mut document_iterator, - &mut segment_updater, - &mut delete_cursor_clone)?; + &mut segment_updater)?; } else { // No more documents. @@ -308,9 +316,9 @@ impl IndexWriter { chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); - let delete_queue = DeleteQueue::default(); + let delete_queue = DeleteQueue::new(); - let segment_updater = SegmentUpdater::new(index.clone(), delete_queue.cursor())?; + let segment_updater = SegmentUpdater::new(index.clone())?; let mut index_writer = IndexWriter { @@ -429,6 +437,8 @@ impl IndexWriter { Error::ErrorInThread("Error while waiting for rollback.".to_string()) )?; + self.delete_queue = DeleteQueue::new(); + // reset the opstamp self.uncommitted_opstamp = self.committed_opstamp; Ok(self.committed_opstamp) @@ -478,9 +488,10 @@ impl IndexWriter { // committed segments. self.committed_opstamp = self.stamp(); - let new_delete_queue = DeleteQueue::default(); + let new_delete_queue = DeleteQueue::new(); - let future = self.segment_updater.commit(self.committed_opstamp, new_delete_queue.cursor()); + // TODO remove clone + let future = self.segment_updater.commit(self.delete_queue.clone(), self.committed_opstamp); // wait for the segment update thread to have processed the info // TODO remove unwrap @@ -493,7 +504,11 @@ impl IndexWriter { pub fn delete_term(&mut self, term: Term) { let opstamp = self.stamp(); - self.delete_queue.push(opstamp, term); + let delete_operation = DeleteOperation { + opstamp: opstamp, + term: term, + }; + self.delete_queue.push_op(delete_operation); } fn stamp(&mut self) -> u64 { diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 210d82ed3..413964767 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -79,7 +79,7 @@ impl MergePolicy for LogMergePolicy { .filter(|level| level.len() >= self.min_merge_size) .map(|ind_vec| { MergeCandidate(ind_vec.iter() - .map(|&ind| segments[ind].segment_id) + .map(|&ind| segments[ind].id()) .collect()) }) .collect() diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 550b75382..8d01dee1a 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -3,7 +3,6 @@ use std::sync::RwLock; use core::SegmentMeta; use core::SegmentId; use indexer::SegmentEntry; -use indexer::delete_queue::DeleteQueueCursor; use std::sync::{RwLockReadGuard, RwLockWriteGuard}; use std::fmt::{self, Debug, Formatter}; @@ -52,11 +51,11 @@ pub fn get_segments(segment_manager: &SegmentManager,) -> (Vec, Vec impl SegmentManager { - pub fn from_segments(segment_metas: Vec, delete_cursor: DeleteQueueCursor) -> SegmentManager { + pub fn from_segments(segment_metas: Vec) -> SegmentManager { SegmentManager { - registers: RwLock::new( SegmentRegisters { + registers: RwLock::new(SegmentRegisters { uncommitted: SegmentRegister::default(), - committed: SegmentRegister::new(segment_metas, delete_cursor), + committed: SegmentRegister::new(segment_metas), }), } } @@ -131,7 +130,7 @@ impl SegmentManager { pub fn end_merge(&self, merged_segment_metas: &[SegmentMeta], merged_segment_entry: SegmentEntry) { let mut registers_lock = self.write(); - let merged_segment_ids: Vec = merged_segment_metas.iter().map(|meta| meta.segment_id).collect(); + let merged_segment_ids: Vec = merged_segment_metas.iter().map(|meta| meta.id()).collect(); if registers_lock.uncommitted.contains_all(&merged_segment_ids) { for segment_id in &merged_segment_ids { registers_lock.uncommitted.remove_segment(segment_id); diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index ac1205b0c..fedb53ac1 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -1,9 +1,10 @@ use core::SegmentId; use std::collections::HashMap; use core::SegmentMeta; +use indexer::index_writer::DocToOpstampMapping; use std::fmt; use std::fmt::{Debug, Formatter}; -use indexer::delete_queue::DeleteQueueCursor; + #[derive(Clone, PartialEq, Eq, Debug)] pub enum SegmentState { @@ -24,17 +25,29 @@ impl SegmentState { pub struct SegmentEntry { meta: SegmentMeta, state: SegmentState, - delete_cursor: DeleteQueueCursor, + doc_to_opstamp: DocToOpstampMapping, } impl SegmentEntry { - pub fn segment_id(&self) -> SegmentId { - self.meta.segment_id + pub fn new(segment_meta: SegmentMeta) -> SegmentEntry { + SegmentEntry { + meta: segment_meta, + state: SegmentState::Ready, + doc_to_opstamp: DocToOpstampMapping::None, + } } - pub fn delete_cursor(&mut self) -> &mut DeleteQueueCursor { - &mut self.delete_cursor + pub fn doc_to_opstamp(&self) -> &DocToOpstampMapping { + &self.doc_to_opstamp + } + + pub fn set_doc_to_opstamp(&mut self, doc_to_opstamp: DocToOpstampMapping) { + self.doc_to_opstamp = doc_to_opstamp; + } + + pub fn segment_id(&self) -> SegmentId { + self.meta.id() } pub fn meta(&self) -> &SegmentMeta { @@ -48,15 +61,6 @@ impl SegmentEntry { fn is_ready(&self,) -> bool { self.state == SegmentState::Ready } - - pub fn new(segment_meta: SegmentMeta, - delete_cursor: DeleteQueueCursor) -> SegmentEntry { - SegmentEntry { - meta: segment_meta, - state: SegmentState::Ready, - delete_cursor: delete_cursor, - } - } } impl Debug for SegmentEntry { @@ -117,14 +121,14 @@ impl SegmentRegister { .values() .map(|segment_entry| segment_entry.meta.clone()) .collect(); - segment_ids.sort_by_key(|meta| meta.segment_id); + segment_ids.sort_by_key(|meta| meta.id()); segment_ids } pub fn segment_ids(&self,) -> Vec { self.segment_metas() .into_iter() - .map(|segment_meta| segment_meta.segment_id) + .map(|segment_meta| segment_meta.id()) .collect() } @@ -141,7 +145,7 @@ impl SegmentRegister { } pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) { - let segment_id = segment_entry.meta.segment_id; + let segment_id = segment_entry.meta.id(); self.segment_states.insert(segment_id, segment_entry); } @@ -156,13 +160,13 @@ impl SegmentRegister { .start_merge(); } - pub fn new(segment_metas: Vec, delete_cursor: DeleteQueueCursor) -> SegmentRegister { + pub fn new(segment_metas: Vec) -> SegmentRegister { SegmentRegister { segment_states: segment_metas .into_iter() .map(|segment_meta| { - let segment_id = segment_meta.segment_id; - let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone()); + let segment_id = segment_meta.id(); + let segment_entry = SegmentEntry::new(segment_meta ); (segment_id, segment_entry) }) .collect(), @@ -180,15 +184,12 @@ impl Default for SegmentRegister { #[cfg(test)] mod tests { - use core::SegmentId; use core::SegmentMeta; - use indexer::delete_queue::DeleteQueue; use super::*; #[test] fn test_segment_register() { - let delete_queue = DeleteQueue::default(); let mut segment_register = SegmentRegister::default(); let segment_id_a = SegmentId::generate_random(); let segment_id_b = SegmentId::generate_random(); @@ -196,14 +197,14 @@ mod tests { { let segment_meta = SegmentMeta::new(segment_id_a); - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); + let segment_entry = SegmentEntry::new(segment_meta); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_entry(&segment_id_a).unwrap().state, SegmentState::Ready); assert_eq!(segment_register.segment_ids(), vec!(segment_id_a)); { let segment_meta = SegmentMeta::new(segment_id_b); - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); + let segment_entry = SegmentEntry::new(segment_meta); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_entry(&segment_id_b).unwrap().state, SegmentState::Ready); @@ -215,7 +216,7 @@ mod tests { segment_register.remove_segment(&segment_id_b); { let segment_meta_merged = SegmentMeta::new(segment_id_merged); - let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor()); + let segment_entry = SegmentEntry::new(segment_meta_merged); segment_register.add_segment_entry(segment_entry); } assert_eq!(segment_register.segment_ids(), vec!(segment_id_merged)); diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index c77e3fc24..28c9b0c84 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -22,13 +22,13 @@ use std::borrow::BorrowMut; use indexer::SegmentSerializer; use indexer::SegmentEntry; use schema::Schema; -use indexer::index_writer::{advance_deletes, DocToOpstampMapping}; +use indexer::index_writer::advance_deletes; use directory::Directory; use std::thread::JoinHandle; use std::sync::Arc; use std::collections::HashMap; use rustc_serialize::json; -use indexer::delete_queue::{DeleteQueueCursor, DeleteQueue}; +use indexer::delete_queue::DeleteQueue; use Result; use futures_cpupool::CpuPool; use core::IndexMeta; @@ -37,13 +37,6 @@ use std::io::Write; use super::segment_manager::{SegmentManager, get_segments}; -fn create_metas(metas: Vec, schema: Schema, opstamp: u64) -> IndexMeta { - IndexMeta { - segments: metas, - schema: schema, - opstamp: opstamp, - } -} /// Save the index meta file. @@ -78,7 +71,11 @@ pub fn save_metas(segment_metas: Vec, opstamp: u64, directory: &mut Directory) -> Result<()> { - let metas = create_metas(segment_metas, schema, opstamp); + let metas = IndexMeta { + segments: segment_metas, + schema: schema, + opstamp: opstamp, + }; let mut w = Vec::new(); try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas))); Ok(directory @@ -109,13 +106,10 @@ struct InnerSegmentUpdater { impl SegmentUpdater { - pub fn new( - index: Index, - delete_cursor: DeleteQueueCursor) - -> Result + pub fn new(index: Index) -> Result { let segments = index.segments()?; - let segment_manager = SegmentManager::from_segments(segments, delete_cursor); + let segment_manager = SegmentManager::from_segments(segments); Ok( SegmentUpdater(Arc::new(InnerSegmentUpdater { pool: CpuPool::new(1), @@ -149,7 +143,7 @@ impl SegmentUpdater { }) } - pub fn new_generation(&mut self, generation: usize) -> impl Future { + pub fn rollback(&mut self, generation: usize) -> impl Future { self.0.generation.store(generation, Ordering::Release); self.run_async(|segment_updater| { segment_updater.0.segment_manager.rollback(); @@ -169,26 +163,24 @@ impl SegmentUpdater { } } - fn purge_deletes(&self) -> Result> { - let segment_entries = self.0.segment_manager.segment_entries(); - segment_entries + fn purge_deletes(&self, delete_queue: &DeleteQueue) -> Result> { + self.0.segment_manager + .segment_entries() .into_iter() - .map(|mut segment_entry| { + .map(|segment_entry| { let mut segment = self.0.index.segment(segment_entry.meta().clone()); - advance_deletes(&mut segment, segment_entry.delete_cursor(), DocToOpstampMapping::None) + advance_deletes(&mut segment, delete_queue, segment_entry.doc_to_opstamp()) .map(|entry| entry.meta().clone()) }) .collect() } - pub fn commit(&self, opstamp: u64, new_delete_queue: DeleteQueueCursor) -> impl Future { + pub fn commit(&self, delete_queue: DeleteQueue, opstamp: u64) -> impl Future { self.run_async(move |segment_updater| { - let segment_metas = segment_updater.purge_deletes().expect("Failed purge deletes"); - - let segment_entries = segment_metas.into_iter() - .map(|segment_meta| - SegmentEntry::new(segment_meta, new_delete_queue.clone()) - ) + let segment_metas = segment_updater.purge_deletes(&delete_queue).expect("Failed purge deletes"); + let segment_entries = segment_metas + .into_iter() + .map(SegmentEntry::new) .collect::>(); segment_updater.0.segment_manager.commit(segment_entries); let mut directory = segment_updater.0.index.directory().box_clone(); @@ -241,8 +233,6 @@ impl SegmentUpdater { // An IndexMerger is like a "view" of our merged segments. // TODO unwrap let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed"); - - let mut merged_segment = index.new_segment(); // ... we just serialize this index merger in our new segment @@ -252,10 +242,7 @@ impl SegmentUpdater { let mut segment_meta = SegmentMeta::new(merged_segment.id()); segment_meta.set_num_docs(num_docs); - // TODO fix delete cursor - let delete_queue = DeleteQueue::default(); - - let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor()); + let segment_entry = SegmentEntry::new(segment_meta); segment_updater_clone .end_merge(segment_metas.clone(), segment_entry.clone()) .wait() @@ -297,7 +284,7 @@ impl SegmentUpdater { segment_updater.0.index.opstamp(), directory.borrow_mut()).expect("Could not save metas."); for segment_meta in merged_segment_metas { - segment_updater.0.index.delete_segment(segment_meta.segment_id); + segment_updater.0.index.delete_segment(segment_meta.id()); } }) diff --git a/src/lib.rs b/src/lib.rs index 09219195c..acb4ecba1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -352,6 +352,78 @@ mod tests { assert!(!postings.advance()); } } + { + // writing the segment + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { // 0 + let doc = doc!(text_field=>"a b"); + index_writer.add_document(doc).unwrap(); + } + { // 1 + index_writer.delete_term(Term::from_field_text(text_field, "c")); + } + index_writer.rollback().unwrap(); + } + { + index.load_searchers().unwrap(); + let searcher = index.searcher(); + let reader = searcher.segment_reader(0); + assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none()); + { + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 5); + assert!(!postings.advance()); + } + { + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "b")).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 3); + assert!(postings.advance()); + assert_eq!(postings.doc(), 4); + assert!(!postings.advance()); + } + } + { + // writing the segment + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { + let doc = doc!(text_field=>"a b"); + index_writer.add_document(doc).unwrap(); + } + { + index_writer.delete_term(Term::from_field_text(text_field, "c")); + } + index_writer.rollback().unwrap(); + { + index_writer.delete_term(Term::from_field_text(text_field, "a")); + } + index_writer.commit().unwrap(); + } + { + index.load_searchers().unwrap(); + let searcher = index.searcher(); + let reader = searcher.segment_reader(0); + assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none()); + { + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap(); + assert!(!postings.advance()); + } + { + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "b")).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 3); + assert!(postings.advance()); + assert_eq!(postings.doc(), 4); + assert!(!postings.advance()); + } + { + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "c")).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 4); + assert!(!postings.advance()); + } + } }