issue/43 DeleteQueue.

This commit is contained in:
Paul Masurel
2017-01-16 09:13:28 +09:00
parent 5a06f45403
commit 183d5221b5
8 changed files with 86 additions and 26 deletions

View File

@@ -24,11 +24,14 @@ rustc-serialize = "0.3"
log = "0.3.6"
combine = "2.2"
tempdir = "0.3"
bincode = "0.5"
libc = {version = "0.2.20", optional=true}
num_cpus = "1.2"
itertools = "0.5.9"
lz4 = "1.20"
bit-set = "0.4.0"
time = "0.1"
uuid = { version = "0.4", features = ["v4", "rustc-serialize"] }
chan = "0.1"

View File

@@ -1,15 +1,9 @@
use schema::Term;
use std::sync::{Arc, RwLock};
use super::operation::DeleteOperation;
const BLOCK_SIZE: usize = 128;
/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation {
pub opstamp: u64,
pub term: Term,
}
/// DeleteQueue are implemented as an unrolled linked list.
/// Block implements a block of this unrolled linked list.

View File

@@ -1,5 +1,6 @@
use schema::Schema;
use schema::Document;
use super::operation::{DeleteOperation, AddOperation};
use indexer::SegmentSerializer;
use core::SerializableSegment;
use core::Index;
@@ -40,8 +41,8 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
type DocumentSender = chan::Sender<Document>;
type DocumentReceiver = chan::Receiver<Document>;
type DocumentSender = chan::Sender<AddOperation>;
type DocumentReceiver = chan::Receiver<AddOperation>;
@@ -88,9 +89,9 @@ impl !Sync for IndexWriter {}
fn index_documents(heap: &mut Heap,
segment: Segment,
schema: &Schema,
document_iterator: &mut Iterator<Item = Document>,
document_iterator: &mut Iterator<Item=AddOperation>,
segment_update_sender: &mut SegmentUpdateSender,
delete_cursor: DeleteQueueCursor)
delete_cursor: &mut DeleteQueueCursor)
-> Result<()> {
heap.clear();
let segment_id = segment.id();
@@ -161,6 +162,7 @@ impl IndexWriter {
let join_handle: JoinHandle<Result<()>> = try!(thread::Builder::new()
.name(format!("indexing_thread_{}", self.worker_id))
.spawn(move || {
let mut delete_cursor_clone = delete_cursor.clone();
loop {
let segment = index.new_segment();
@@ -176,7 +178,7 @@ impl IndexWriter {
&schema,
&mut document_iterator,
&mut segment_update_sender,
delete_cursor.clone()));
&mut delete_cursor_clone));
} else {
// No more documents.
// Happens when there is a commit, or if the `IndexWriter`
@@ -474,9 +476,13 @@ impl IndexWriter {
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
pub fn add_document(&mut self, doc: Document) -> io::Result<u64> {
pub fn add_document(&mut self, document: Document) -> io::Result<u64> {
let opstamp = self.stamp();
self.document_sender.send(doc);
let add_operation = AddOperation {
opstamp: opstamp,
document: document,
};
self.document_sender.send(add_operation);
Ok(opstamp)
}
}

View File

@@ -10,6 +10,7 @@ mod segment_manager;
pub mod delete_queue;
pub mod segment_updater;
mod directory_lock;
pub mod operation;
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;

17
src/indexer/operation.rs Normal file
View File

@@ -0,0 +1,17 @@
use schema::Document;
use schema::Term;
/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation {
pub opstamp: u64,
pub term: Term,
}
/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {
pub opstamp: u64,
pub document: Document,
}

View File

@@ -20,6 +20,8 @@ use indexer::segment_serializer::SegmentSerializer;
use datastruct::stacker::Heap;
use super::delete_queue::DeleteQueueCursor;
use indexer::index_writer::MARGIN_IN_BYTES;
use super::operation::{AddOperation, DeleteOperation};
use bit_set::BitSet;
/// A `SegmentWriter` is in charge of creating segment index from a
/// documents.
@@ -33,7 +35,8 @@ pub struct SegmentWriter<'a> {
segment_serializer: SegmentSerializer,
fast_field_writers: U32FastFieldsWriter,
fieldnorms_writer: U32FastFieldsWriter,
delete_queue_cursor: DeleteQueueCursor,
delete_queue_cursor: &'a mut DeleteQueueCursor,
docstamps: Vec<u64>,
}
@@ -85,7 +88,7 @@ impl<'a> SegmentWriter<'a> {
pub fn for_segment(heap: &'a Heap,
mut segment: Segment,
schema: &Schema,
delete_queue_cursor: DeleteQueueCursor) -> Result<SegmentWriter<'a>> {
delete_queue_cursor: &'a mut DeleteQueueCursor) -> Result<SegmentWriter<'a>> {
let segment_serializer = try!(SegmentSerializer::for_segment(&mut segment));
let mut per_field_postings_writers: Vec<Box<PostingsWriter + 'a>> = Vec::new();
for field_entry in schema.fields() {
@@ -100,6 +103,7 @@ impl<'a> SegmentWriter<'a> {
segment_serializer: segment_serializer,
fast_field_writers: U32FastFieldsWriter::from_schema(schema),
delete_queue_cursor: delete_queue_cursor,
docstamps: Vec::with_capacity(1_000),
})
}
@@ -112,7 +116,8 @@ impl<'a> SegmentWriter<'a> {
for per_field_postings_writer in &mut self.per_field_postings_writers {
per_field_postings_writer.close(self.heap);
}
try!(write(&self.per_field_postings_writers,
try!(write(
&self.per_field_postings_writers,
&self.fast_field_writers,
&self.fieldnorms_writer,
segment_info,
@@ -132,11 +137,32 @@ impl<'a> SegmentWriter<'a> {
self.heap.num_free_bytes() <= MARGIN_IN_BYTES
}
fn compute_delete_mask(&mut self) -> BitSet {
let delete_docs = BitSet::with_capacity(self.max_doc as usize);
loop {
if let Some(delete_operation) = self.delete_queue_cursor.peek() {
let delete_term = delete_operation.term;
let Field(field_id) = delete_term.field();
let postings_writer = &self.per_field_postings_writers[field_id as usize];
// TODO add the associated posting list with the correct cut-off.
self.delete_queue_cursor.consume();
}
else {
break;
}
}
delete_docs
}
/// Indexes a new document
///
/// As a user, you should rather use `IndexWriter`'s add_document.
pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> {
pub fn add_document(&mut self, add_operation: &AddOperation, schema: &Schema) -> io::Result<()> {
let doc_id = self.max_doc;
let doc = &add_operation.document;
self.docstamps.push(add_operation.opstamp);
for (field, field_values) in doc.get_sorted_field_values() {
let field_posting_writer: &mut Box<PostingsWriter> = &mut self.per_field_postings_writers[field.0 as usize];
let field_options = schema.get_field_entry(field);
@@ -171,7 +197,7 @@ impl<'a> SegmentWriter<'a> {
}
}
self.fieldnorms_writer.fill_val_up_to(doc_id);
self.fast_field_writers.add_document(doc);
self.fast_field_writers.add_document(&doc);
let stored_fieldvalues: Vec<&FieldValue> = doc
.field_values()
.iter()
@@ -221,7 +247,7 @@ fn write<'a>(per_field_postings_writers: &[Box<PostingsWriter + 'a>],
segment_info: SegmentInfo,
mut serializer: SegmentSerializer,
heap: &'a Heap,) -> Result<u32> {
for per_field_postings_writer in per_field_postings_writers.iter() {
for per_field_postings_writer in per_field_postings_writers {
try!(per_field_postings_writer.serialize(serializer.get_postings_serializer(), heap));
}
try!(fast_field_writers.serialize(serializer.get_fast_field_serializer()));

View File

@@ -47,7 +47,7 @@ extern crate combine;
extern crate itertools;
extern crate chan;
extern crate crossbeam;
extern crate bit_set;
#[cfg(feature="simdcompression")]
extern crate libc;

View File

@@ -52,6 +52,7 @@ mod tests {
use query::TermQuery;
use schema::Field;
use test::Bencher;
use indexer::operation::AddOperation;
use rand::{XorShiftRng, Rng, SeedableRng};
@@ -83,27 +84,39 @@ mod tests {
let index = Index::create_in_ram(schema.clone());
let segment = index.new_segment();
let delete_queue = DeleteQueue::default();
let delete_cursor = delete_queue.cursor();
let mut delete_cursor = delete_queue.cursor();
let heap = Heap::with_capacity(10_000_000);
{
let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema, delete_cursor).unwrap();
let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema, &mut delete_cursor).unwrap();
{
let mut doc = Document::default();
doc.add_text(text_field, "a b a c a d a a.");
doc.add_text(text_field, "d d d d a"); // checking that position works if the field has two values.
segment_writer.add_document(&doc, &schema).unwrap();
let op = AddOperation {
opstamp: 0u64,
document: doc,
};
segment_writer.add_document(&op, &schema).unwrap();
}
{
let mut doc = Document::default();
doc.add_text(text_field, "b a");
segment_writer.add_document(&doc, &schema).unwrap();
let op = AddOperation {
opstamp: 1u64,
document: doc,
};
segment_writer.add_document(&op, &schema).unwrap();
}
for i in 2..1000 {
let mut doc = Document::default();
let mut text = iter::repeat("e ").take(i).collect::<String>();
text.push_str(" a");
doc.add_text(text_field, &text);
segment_writer.add_document(&doc, &schema).unwrap();
let op = AddOperation {
opstamp: 2u64,
document: doc,
};
segment_writer.add_document(&op, &schema).unwrap();
}
segment_writer.finalize().unwrap();
}