diff --git a/Cargo.toml b/Cargo.toml index bf59afcc2..2c2ec995c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,14 @@ rustc-serialize = "0.3" log = "0.3.6" combine = "2.2" tempdir = "0.3" + + bincode = "0.5" libc = {version = "0.2.20", optional=true} num_cpus = "1.2" itertools = "0.5.9" lz4 = "1.20" +bit-set = "0.4.0" time = "0.1" uuid = { version = "0.4", features = ["v4", "rustc-serialize"] } chan = "0.1" diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index 75d9635fb..23b8e162c 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -1,15 +1,9 @@ use schema::Term; use std::sync::{Arc, RwLock}; +use super::operation::DeleteOperation; const BLOCK_SIZE: usize = 128; -/// Timestamped Delete operation. -#[derive(Clone, Eq, PartialEq, Debug)] -pub struct DeleteOperation { - pub opstamp: u64, - pub term: Term, -} - /// DeleteQueue are implemented as an unrolled linked list. /// Block implements a block of this unrolled linked list. diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 2e756e8f0..bda2ac391 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1,5 +1,6 @@ use schema::Schema; use schema::Document; +use super::operation::{DeleteOperation, AddOperation}; use indexer::SegmentSerializer; use core::SerializableSegment; use core::Index; @@ -40,8 +41,8 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; -type DocumentSender = chan::Sender; -type DocumentReceiver = chan::Receiver; +type DocumentSender = chan::Sender; +type DocumentReceiver = chan::Receiver; @@ -88,9 +89,9 @@ impl !Sync for IndexWriter {} fn index_documents(heap: &mut Heap, segment: Segment, schema: &Schema, - document_iterator: &mut Iterator, + document_iterator: &mut Iterator, segment_update_sender: &mut SegmentUpdateSender, - delete_cursor: DeleteQueueCursor) + delete_cursor: &mut DeleteQueueCursor) -> Result<()> { heap.clear(); let segment_id = segment.id(); @@ -161,6 +162,7 @@ impl IndexWriter { let join_handle: JoinHandle> = try!(thread::Builder::new() .name(format!("indexing_thread_{}", self.worker_id)) .spawn(move || { + let mut delete_cursor_clone = delete_cursor.clone(); loop { let segment = index.new_segment(); @@ -176,7 +178,7 @@ impl IndexWriter { &schema, &mut document_iterator, &mut segment_update_sender, - delete_cursor.clone())); + &mut delete_cursor_clone)); } else { // No more documents. // Happens when there is a commit, or if the `IndexWriter` @@ -474,9 +476,13 @@ impl IndexWriter { /// /// Currently it represents the number of documents that /// have been added since the creation of the index. - pub fn add_document(&mut self, doc: Document) -> io::Result { + pub fn add_document(&mut self, document: Document) -> io::Result { let opstamp = self.stamp(); - self.document_sender.send(doc); + let add_operation = AddOperation { + opstamp: opstamp, + document: document, + }; + self.document_sender.send(add_operation); Ok(opstamp) } } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index cb8c78963..9bcfae1c5 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -10,6 +10,7 @@ mod segment_manager; pub mod delete_queue; pub mod segment_updater; mod directory_lock; +pub mod operation; pub use self::segment_serializer::SegmentSerializer; pub use self::segment_writer::SegmentWriter; diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs new file mode 100644 index 000000000..ecdc2d827 --- /dev/null +++ b/src/indexer/operation.rs @@ -0,0 +1,17 @@ +use schema::Document; +use schema::Term; + + +/// Timestamped Delete operation. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct DeleteOperation { + pub opstamp: u64, + pub term: Term, +} + +/// Timestamped Add operation. +#[derive(Eq, PartialEq, Debug)] +pub struct AddOperation { + pub opstamp: u64, + pub document: Document, +} diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index c0682312a..c4ba34602 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -20,6 +20,8 @@ use indexer::segment_serializer::SegmentSerializer; use datastruct::stacker::Heap; use super::delete_queue::DeleteQueueCursor; use indexer::index_writer::MARGIN_IN_BYTES; +use super::operation::{AddOperation, DeleteOperation}; +use bit_set::BitSet; /// A `SegmentWriter` is in charge of creating segment index from a /// documents. @@ -33,7 +35,8 @@ pub struct SegmentWriter<'a> { segment_serializer: SegmentSerializer, fast_field_writers: U32FastFieldsWriter, fieldnorms_writer: U32FastFieldsWriter, - delete_queue_cursor: DeleteQueueCursor, + delete_queue_cursor: &'a mut DeleteQueueCursor, + docstamps: Vec, } @@ -85,7 +88,7 @@ impl<'a> SegmentWriter<'a> { pub fn for_segment(heap: &'a Heap, mut segment: Segment, schema: &Schema, - delete_queue_cursor: DeleteQueueCursor) -> Result> { + delete_queue_cursor: &'a mut DeleteQueueCursor) -> Result> { let segment_serializer = try!(SegmentSerializer::for_segment(&mut segment)); let mut per_field_postings_writers: Vec> = Vec::new(); for field_entry in schema.fields() { @@ -100,6 +103,7 @@ impl<'a> SegmentWriter<'a> { segment_serializer: segment_serializer, fast_field_writers: U32FastFieldsWriter::from_schema(schema), delete_queue_cursor: delete_queue_cursor, + docstamps: Vec::with_capacity(1_000), }) } @@ -112,7 +116,8 @@ impl<'a> SegmentWriter<'a> { for per_field_postings_writer in &mut self.per_field_postings_writers { per_field_postings_writer.close(self.heap); } - try!(write(&self.per_field_postings_writers, + try!(write( + &self.per_field_postings_writers, &self.fast_field_writers, &self.fieldnorms_writer, segment_info, @@ -132,11 +137,32 @@ impl<'a> SegmentWriter<'a> { self.heap.num_free_bytes() <= MARGIN_IN_BYTES } + fn compute_delete_mask(&mut self) -> BitSet { + let delete_docs = BitSet::with_capacity(self.max_doc as usize); + loop { + if let Some(delete_operation) = self.delete_queue_cursor.peek() { + let delete_term = delete_operation.term; + let Field(field_id) = delete_term.field(); + let postings_writer = &self.per_field_postings_writers[field_id as usize]; + // TODO add the associated posting list with the correct cut-off. + self.delete_queue_cursor.consume(); + } + else { + break; + } + + } + delete_docs + } + + /// Indexes a new document /// /// As a user, you should rather use `IndexWriter`'s add_document. - pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> { + pub fn add_document(&mut self, add_operation: &AddOperation, schema: &Schema) -> io::Result<()> { let doc_id = self.max_doc; + let doc = &add_operation.document; + self.docstamps.push(add_operation.opstamp); for (field, field_values) in doc.get_sorted_field_values() { let field_posting_writer: &mut Box = &mut self.per_field_postings_writers[field.0 as usize]; let field_options = schema.get_field_entry(field); @@ -171,7 +197,7 @@ impl<'a> SegmentWriter<'a> { } } self.fieldnorms_writer.fill_val_up_to(doc_id); - self.fast_field_writers.add_document(doc); + self.fast_field_writers.add_document(&doc); let stored_fieldvalues: Vec<&FieldValue> = doc .field_values() .iter() @@ -221,7 +247,7 @@ fn write<'a>(per_field_postings_writers: &[Box], segment_info: SegmentInfo, mut serializer: SegmentSerializer, heap: &'a Heap,) -> Result { - for per_field_postings_writer in per_field_postings_writers.iter() { + for per_field_postings_writer in per_field_postings_writers { try!(per_field_postings_writer.serialize(serializer.get_postings_serializer(), heap)); } try!(fast_field_writers.serialize(serializer.get_fast_field_serializer())); diff --git a/src/lib.rs b/src/lib.rs index e82447472..3315e776d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,7 @@ extern crate combine; extern crate itertools; extern crate chan; extern crate crossbeam; - +extern crate bit_set; #[cfg(feature="simdcompression")] extern crate libc; diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 8760b4e71..f146c132b 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -52,6 +52,7 @@ mod tests { use query::TermQuery; use schema::Field; use test::Bencher; + use indexer::operation::AddOperation; use rand::{XorShiftRng, Rng, SeedableRng}; @@ -83,27 +84,39 @@ mod tests { let index = Index::create_in_ram(schema.clone()); let segment = index.new_segment(); let delete_queue = DeleteQueue::default(); - let delete_cursor = delete_queue.cursor(); + let mut delete_cursor = delete_queue.cursor(); let heap = Heap::with_capacity(10_000_000); { - let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema, delete_cursor).unwrap(); + let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema, &mut delete_cursor).unwrap(); { let mut doc = Document::default(); doc.add_text(text_field, "a b a c a d a a."); doc.add_text(text_field, "d d d d a"); // checking that position works if the field has two values. - segment_writer.add_document(&doc, &schema).unwrap(); + let op = AddOperation { + opstamp: 0u64, + document: doc, + }; + segment_writer.add_document(&op, &schema).unwrap(); } { let mut doc = Document::default(); doc.add_text(text_field, "b a"); - segment_writer.add_document(&doc, &schema).unwrap(); + let op = AddOperation { + opstamp: 1u64, + document: doc, + }; + segment_writer.add_document(&op, &schema).unwrap(); } for i in 2..1000 { let mut doc = Document::default(); let mut text = iter::repeat("e ").take(i).collect::(); text.push_str(" a"); doc.add_text(text_field, &text); - segment_writer.add_document(&doc, &schema).unwrap(); + let op = AddOperation { + opstamp: 2u64, + document: doc, + }; + segment_writer.add_document(&op, &schema).unwrap(); } segment_writer.finalize().unwrap(); }