diff --git a/src/core/index.rs b/src/core/index.rs index 6b62763bd..85005c6e9 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -39,7 +39,7 @@ pub struct Index { directory: Box, schema: Schema, searcher_pool: Arc>, - docstamp: u64, + opstamp: u64, } @@ -116,13 +116,13 @@ impl Index { /// Creates a new index given a directory and an `IndexMeta`. fn create_from_metas(directory: Box, metas: IndexMeta) -> Result { let schema = metas.schema.clone(); - let docstamp = metas.opstamp; + let opstamp = metas.opstamp; // TODO log somethings is uncommitted is not empty. let index = Index { directory: directory, schema: schema, searcher_pool: Arc::new(Pool::new()), - docstamp: docstamp, + opstamp: opstamp, }; try!(index.load_searchers()); Ok(index) @@ -141,12 +141,12 @@ impl Index { Index::create_from_metas(directory.box_clone(), metas) } - /// Returns the index docstamp. + /// Returns the index opstamp. /// - /// The docstamp is the number of documents that have been added + /// The opstamp is the number of documents that have been added /// from the beginning of time, and until the moment of the last commit. - pub fn docstamp(&self) -> u64 { - self.docstamp + pub fn opstamp(&self) -> u64 { + self.opstamp } /// Creates a multithreaded writer. @@ -291,7 +291,7 @@ impl Clone for Index { directory: self.directory.box_clone(), schema: self.schema.clone(), searcher_pool: self.searcher_pool.clone(), - docstamp: self.docstamp, + opstamp: self.opstamp, } } } diff --git a/src/core/segment.rs b/src/core/segment.rs index 4e719c0e3..57e87f7e8 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -25,7 +25,6 @@ impl fmt::Debug for Segment { } } - /// Creates a new segment given an `Index` and a `SegmentId` /// /// The function is here to make it private outside `tantivy`. @@ -38,18 +37,20 @@ pub fn create_segment(index: Index, segment_id: SegmentId, commit_opstamp: u64) } impl Segment { - - + /// Returns our index's schema. pub fn schema(&self,) -> Schema { self.index.schema() } + pub fn commit_opstamp(&self) -> u64 { + self.commit_opstamp + } + /// Returns the segment's id. pub fn id(&self,) -> SegmentId { self.segment_id } - /// Returns the relative path of a component of our segment. /// diff --git a/src/core/segment_component.rs b/src/core/segment_component.rs index 62610af3f..57994ddf9 100644 --- a/src/core/segment_component.rs +++ b/src/core/segment_component.rs @@ -23,7 +23,9 @@ impl SegmentComponent { SegmentComponent::STORE => ".store".to_string(), SegmentComponent::FASTFIELDS => ".fast".to_string(), SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(), - SegmentComponent::DELETE(opstamp) => format!("{}.del", opstamp) + SegmentComponent::DELETE(opstamp) => { + format!(".{}.del", opstamp) + } } } } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index d8518f87f..c1e44e754 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -3,9 +3,13 @@ use core::Segment; use core::SegmentId; use core::SegmentComponent; use schema::Term; +use bit_set::BitSet; +use common::HasLen; +use fastfield::delete::DeleteBitSet; use store::StoreReader; use schema::Document; use directory::ReadOnlySource; +use directory::error::FileError; use DocId; use std::io; use std::str; @@ -44,6 +48,7 @@ pub struct SegmentReader { store_reader: StoreReader, fast_fields_reader: U32FastFieldsReader, fieldnorms_reader: U32FastFieldsReader, + delete_bitset: DeleteBitSet, positions_data: ReadOnlySource, schema: Schema, } @@ -63,9 +68,13 @@ impl SegmentReader { /// Today, `tantivy` does not handle deletes so max doc and /// num_docs are the same. pub fn num_docs(&self) -> DocId { - self.segment_info.max_doc + self.segment_info.max_doc - self.num_deleted_docs() } + pub fn num_deleted_docs(&self) -> DocId { + self.delete_bitset.len() as DocId + } + /// Accessor to a segment's fast field reader given a field. pub fn get_fast_field_reader(&self, field: Field) -> io::Result { let field_entry = self.schema.get_field_entry(field); @@ -137,6 +146,15 @@ impl SegmentReader { .open_read(SegmentComponent::POSITIONS) .unwrap_or_else(|_| ReadOnlySource::empty()); + // TODO 0u64 + let delete_data_res = segment.open_read(SegmentComponent::DELETE(segment.commit_opstamp())); + let delete_bitset; + if let Err(FileError::FileDoesNotExist(_)) = delete_data_res { + delete_bitset = DeleteBitSet::empty(); + } + else { + delete_bitset = DeleteBitSet::open(delete_data_res?); + } let schema = segment.schema(); Ok(SegmentReader { segment_info: segment_info, @@ -146,6 +164,7 @@ impl SegmentReader { store_reader: store_reader, fast_fields_reader: fast_fields_reader, fieldnorms_reader: fieldnorms_reader, + delete_bitset: delete_bitset, positions_data: positions_data, schema: schema, }) @@ -214,9 +233,10 @@ impl SegmentReader { FreqHandler::new_without_freq() } }; - Some(SegmentPostings::from_data(term_info.doc_freq, postings_data, freq_handler)) + Some(SegmentPostings::from_data(term_info.doc_freq, postings_data, &self.delete_bitset, freq_handler)) } - + + /// Returns the posting list associated with a term. pub fn read_postings_all_info(&self, term: &Term) -> Option { let field_entry = self.schema.get_field_entry(term.field()); diff --git a/src/fastfield/delete.rs b/src/fastfield/delete.rs index e700bc3e7..03f3b13b4 100644 --- a/src/fastfield/delete.rs +++ b/src/fastfield/delete.rs @@ -4,6 +4,7 @@ use std::io::Write; use std::io; use directory::ReadOnlySource; use DocId; +use common::HasLen; pub fn write_delete_bitset(delete_bitset: &BitSet, writer: &mut WritePtr) -> io::Result<()> { let max_doc = delete_bitset.capacity(); @@ -28,23 +29,54 @@ pub fn write_delete_bitset(delete_bitset: &BitSet, writer: &mut WritePtr) -> io: writer.flush() } -pub struct DeleteBitSet(ReadOnlySource); +#[derive(Clone)] +pub struct DeleteBitSet { + data: ReadOnlySource, + len: usize, +} impl DeleteBitSet { pub fn open(data: ReadOnlySource) -> DeleteBitSet { - DeleteBitSet(data) + let num_deleted: usize = data + .as_slice() + .iter() + .map(|b| b.count_ones() as usize) + .sum(); + DeleteBitSet { + data: data, + len: num_deleted, + } + } + + pub fn empty() -> DeleteBitSet { + DeleteBitSet { + data: ReadOnlySource::empty(), + len: 0, + } } pub fn is_deleted(&self, doc: DocId) -> bool { - let byte_offset = doc / 8u32; - let b: u8 = (*self.0)[byte_offset as usize]; - let shift = (doc & 7u32) as u8; - b & (1u8 << shift) != 0 + if self.len == 0 { + false + } + else { + let byte_offset = doc / 8u32; + let b: u8 = (*self.data)[byte_offset as usize]; + let shift = (doc & 7u32) as u8; + b & (1u8 << shift) != 0 + } } + } +impl HasLen for DeleteBitSet { + + fn len(&self) -> usize { + self.len + } +} #[cfg(test)] mod tests { @@ -67,6 +99,7 @@ mod tests { for doc in 0..n { assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId)); } + assert_eq!(delete_bitset.len(), bitset.len()); } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 84e375acf..f33ba5773 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -75,8 +75,8 @@ pub struct IndexWriter { delete_queue: DeleteQueue, - uncommitted_docstamp: u64, - committed_docstamp: u64, + uncommitted_opstamp: u64, + committed_opstamp: u64, } // IndexWriter cannot be sent to another thread. @@ -211,7 +211,6 @@ impl IndexWriter { // No more documents. // Happens when there is a commit, or if the `IndexWriter` // was dropped. - opstamp = 0u64; return Ok(()) } @@ -282,8 +281,8 @@ impl IndexWriter { delete_queue: delete_queue, - committed_docstamp: index.docstamp(), - uncommitted_docstamp: index.docstamp(), + committed_opstamp: index.opstamp(), + uncommitted_opstamp: index.opstamp(), generation: 0, @@ -338,7 +337,7 @@ impl IndexWriter { /// After calling rollback, the index is in the same /// state as it was after the last commit. /// - /// The docstamp at the last commit is returned. + /// The opstamp at the last commit is returned. pub fn rollback(&mut self) -> Result { // by updating the generation in the segment updater, @@ -380,9 +379,9 @@ impl IndexWriter { Error::ErrorInThread("Error while waiting for rollback.".to_string()) )?; - // reset the docstamp - self.uncommitted_docstamp = self.committed_docstamp; - Ok(self.committed_docstamp) + // reset the opstamp + self.uncommitted_opstamp = self.committed_opstamp; + Ok(self.committed_opstamp) } @@ -397,7 +396,7 @@ impl IndexWriter { /// long as the hard disk is spared), it will be possible /// to resume indexing from this point. /// - /// Commit returns the `docstamp` of the last document + /// Commit returns the `opstamp` of the last document /// that made it in the commit. /// pub fn commit(&mut self) -> Result { @@ -406,9 +405,6 @@ impl IndexWriter { // and recreate a new one channels. self.recreate_document_channel(); - // Docstamp of the last document in this commit. - self.committed_docstamp = self.uncommitted_docstamp; - let mut former_workers_join_handle = Vec::new(); swap(&mut former_workers_join_handle, &mut self.workers_join_handle); @@ -430,13 +426,15 @@ impl IndexWriter { // This will move uncommitted segments to the state of // committed segments. - let future = self.segment_updater.commit(self.committed_docstamp); + + self.committed_opstamp = self.stamp(); + let future = self.segment_updater.commit(self.committed_opstamp); // wait for the segment update thread to have processed the info // TODO remove unwrap future.wait().unwrap(); - Ok(self.committed_docstamp) + Ok(self.committed_opstamp) } @@ -446,8 +444,8 @@ impl IndexWriter { } fn stamp(&mut self) -> u64 { - let opstamp = self.uncommitted_docstamp; - self.uncommitted_docstamp += 1u64; + let opstamp = self.uncommitted_opstamp; + self.uncommitted_opstamp += 1u64; opstamp } @@ -455,7 +453,7 @@ impl IndexWriter { /// /// If the indexing pipeline is full, this call may block. /// - /// The docstamp is an increasing `u64` that can + /// The opstamp is an increasing `u64` that can /// be used by the client to align commits with its own /// document queue. /// diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 0f51d0652..15bc174f5 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -53,7 +53,7 @@ impl IndexMerger { let mut max_doc = 0; for segment in segments { let reader = try!(SegmentReader::open(segment.clone())); - max_doc += reader.max_doc(); + max_doc += reader.num_docs(); readers.push(reader); } Ok(IndexMerger { diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index d3e274d69..bb79b2721 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -57,11 +57,11 @@ fn create_metas(segment_manager: &SegmentManager, schema: Schema, opstamp: u64) /// /// This method is not part of tantivy's public API pub fn save_new_metas(schema: Schema, - docstamp: u64, + opstamp: u64, directory: &mut Directory) -> Result<()> { let segment_manager = SegmentManager::default(); - save_metas(&segment_manager, schema, docstamp, directory) + save_metas(&segment_manager, schema, opstamp, directory) } @@ -77,10 +77,10 @@ pub fn save_new_metas(schema: Schema, /// This method is not part of tantivy's public API pub fn save_metas(segment_manager: &SegmentManager, schema: Schema, - docstamp: u64, + opstamp: u64, directory: &mut Directory) -> Result<()> { - let metas = create_metas(segment_manager, schema, docstamp); + let metas = create_metas(segment_manager, schema, opstamp); let mut w = Vec::new(); try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas))); Ok(directory @@ -178,7 +178,7 @@ impl SegmentUpdater { save_metas( &segment_updater.0.segment_manager, segment_updater.0.index.schema(), - segment_updater.0.index.docstamp(), + opstamp, directory.borrow_mut()).expect("Could not save metas."); segment_updater.consider_merge_options(); }) @@ -284,7 +284,7 @@ impl SegmentUpdater { save_metas( &segment_updater.0.segment_manager, segment_updater.0.index.schema(), - segment_updater.0.index.docstamp(), + segment_updater.0.index.opstamp(), directory.borrow_mut()).expect("Could not save metas."); for segment_meta in merged_segment_metas { segment_updater.0.index.delete_segment(segment_meta.segment_id); diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 204eb9a37..d64bd77bd 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -22,8 +22,40 @@ use indexer::index_writer::MARGIN_IN_BYTES; use super::operation::AddOperation; use bit_set::BitSet; use indexer::document_receiver::DocumentReceiver; +use core::SegmentReader; +use postings::SegmentPostingsOption; +use postings::DocSet; + +fn update_deleted_bitset( + segment_reader: &SegmentReader, + bitset: &mut BitSet, + delete_cursor: &mut DeleteQueueCursor, + limit_opstamp_opt: Option) -> bool { + let mut has_changed = false; + let limit_opstamp = limit_opstamp_opt.unwrap_or(u64::max_value()); + loop { + if let Some(delete_op) = delete_cursor.peek() { + if delete_op.opstamp > limit_opstamp { + break; + } + if let Some(mut docset) = segment_reader.read_postings(&delete_op.term, SegmentPostingsOption::NoFreq) { + while docset.advance() { + has_changed = true; + let deleted_doc = docset.doc(); + bitset.insert(deleted_doc as usize); + } + } + } + else { + break; + } + delete_cursor.consume(); + } + has_changed +} + struct DocumentDeleter<'a> { limit_doc_id: DocId, deleted_docs: &'a mut BitSet, @@ -180,6 +212,7 @@ impl<'a> SegmentWriter<'a> { .expect("Last doc opstamp called on an empty segment writer")) } + /// TODO compute the bitset using the segment reader directly. pub fn compute_deleted_bitset(&self, delete_queue_cursor: &mut DeleteQueueCursor) -> Option { if let Some(first_opstamp) = self.doc_opstamps.first() { if !delete_queue_cursor.skip_to(*first_opstamp) { diff --git a/src/lib.rs b/src/lib.rs index 96d65e639..b2b3b36d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -290,6 +290,65 @@ mod tests { } } + + #[test] + fn test_delete_postings() { + let mut schema_builder = SchemaBuilder::default(); + let text_field = schema_builder.add_text_field("text", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + { + // writing the segment + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); + { + let doc = doc!(text_field=>"a b"); + index_writer.add_document(doc).unwrap(); + } + { + let doc = doc!(text_field=>" a c"); + index_writer.add_document(doc).unwrap(); + } + { + let doc = doc!(text_field=>" b c"); + index_writer.add_document(doc).unwrap(); + } + { + let doc = doc!(text_field=>" b d"); + index_writer.add_document(doc).unwrap(); + } + { + index_writer.delete_term(Term::from_field_text(text_field, "c")); + } + { + index_writer.delete_term(Term::from_field_text(text_field, "a")); + } + { + let doc = doc!(text_field=>" b c"); + index_writer.add_document(doc).unwrap(); + } + { + let doc = doc!(text_field=>" a"); + index_writer.add_document(doc).unwrap(); + } + index_writer.commit().unwrap(); + } + { + index.load_searchers().unwrap(); + let searcher = index.searcher(); + let reader = searcher.segment_reader(0); + assert!(reader.read_postings_all_info(&Term::from_field_text(text_field, "abcd")).is_none()); + let mut postings = reader.read_postings_all_info(&Term::from_field_text(text_field, "a")).unwrap(); + assert!(postings.advance()); + assert_eq!(postings.doc(), 2); + assert!(postings.advance()); + assert_eq!(postings.doc(), 3); + assert!(postings.advance()); + assert_eq!(postings.doc(), 5); + assert!(!postings.advance()); + } + } + + #[test] fn test_termfreq() { let mut schema_builder = SchemaBuilder::default(); diff --git a/src/postings/docset.rs b/src/postings/docset.rs index 9dda32559..6698c5a2b 100644 --- a/src/postings/docset.rs +++ b/src/postings/docset.rs @@ -55,6 +55,9 @@ pub trait DocSet { /// Returns the current document fn doc(&self) -> DocId; + /// TODO can impl trait for trait? + + /// Advances the cursor to the next document /// None is returned if the iterator has `DocSet` /// has already been entirely consumed. @@ -67,6 +70,7 @@ pub trait DocSet { } } + impl DocSet for Box { fn advance(&mut self) -> bool { let unboxed: &mut TDocSet = self.borrow_mut(); diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index fb313e76e..a886df372 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -2,6 +2,7 @@ use compression::{NUM_DOCS_PER_BLOCK, BlockDecoder, VIntDecoder}; use DocId; use postings::{Postings, FreqHandler, DocSet, HasLen}; use std::num::Wrapping; +use fastfield::delete::DeleteBitSet; const EMPTY_DATA: [u8; 0] = [0u8; 0]; @@ -18,6 +19,7 @@ pub struct SegmentPostings<'a> { freq_handler: FreqHandler, remaining_data: &'a [u8], cur: Wrapping, + delete_bitset: DeleteBitSet, } impl<'a> SegmentPostings<'a> { @@ -41,7 +43,10 @@ impl<'a> SegmentPostings<'a> { /// * `data` - data array. The complete data is not necessarily used. /// * `freq_handler` - the freq handler is in charge of decoding /// frequencies and/or positions - pub fn from_data(len: u32, data: &'a [u8], freq_handler: FreqHandler) -> SegmentPostings<'a> { + pub fn from_data(len: u32, + data: &'a [u8], + delete_bitset: &'a DeleteBitSet, + freq_handler: FreqHandler) -> SegmentPostings<'a> { SegmentPostings { len: len as usize, doc_offset: 0, @@ -49,6 +54,7 @@ impl<'a> SegmentPostings<'a> { freq_handler: freq_handler, remaining_data: data, cur: Wrapping(usize::max_value()), + delete_bitset: delete_bitset.clone(), } } @@ -60,6 +66,7 @@ impl<'a> SegmentPostings<'a> { block_decoder: BlockDecoder::new(), freq_handler: FreqHandler::new_without_freq(), remaining_data: &EMPTY_DATA, + delete_bitset: DeleteBitSet::empty(), cur: Wrapping(usize::max_value()), } } @@ -77,14 +84,18 @@ impl<'a> DocSet for SegmentPostings<'a> { // next needs to be called a first time to point to the correct element. #[inline] fn advance(&mut self) -> bool { - self.cur += Wrapping(1); - if self.cur.0 >= self.len { - return false; + loop { + self.cur += Wrapping(1); + if self.cur.0 >= self.len { + return false; + } + if self.index_within_block() == 0 { + self.load_next_block(); + } + if !self.delete_bitset.is_deleted(self.doc()) { + return true; + } } - if self.index_within_block() == 0 { - self.load_next_block(); - } - true } #[inline]