diff --git a/Cargo.toml b/Cargo.toml index 4d2999409..ae67b3a25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ futures-cpupool = "0.1" error-chain = "0.8" owning_ref = "0.3" stable_deref_trait = "1.0.0" +murmurhash64 = "0.3" [target.'cfg(windows)'.dependencies] winapi = "0.2" diff --git a/src/datastruct/stacker/hashmap.rs b/src/datastruct/stacker/hashmap.rs index 591315309..56e19fe2a 100644 --- a/src/datastruct/stacker/hashmap.rs +++ b/src/datastruct/stacker/hashmap.rs @@ -1,15 +1,14 @@ use std::iter; use super::heap::{Heap, HeapAllocable, BytesRef}; +use murmurhash64::murmur_hash64a; -/// dbj2 hash function -fn djb2(key: &[u8]) -> u64 { - let mut state: u64 = 5381; - for &b in key { - state = (state << 5).wrapping_add(state).wrapping_add(b as u64); - } - state +const SEED: u64 = 2915580697u64; + +fn hash(key: &[u8]) -> u64 { + murmur_hash64a(key, SEED) } + impl Default for BytesRef { fn default() -> BytesRef { BytesRef { @@ -99,7 +98,7 @@ impl<'a> HashMap<'a> { } pub fn is_saturated(&self) -> bool { - self.table.len() < self.occupied.len() * 5 + self.table.len() < self.occupied.len() * 3 } #[inline(never)] @@ -137,7 +136,7 @@ impl<'a> HashMap<'a> { pub fn get_or_create, V: HeapAllocable>(&mut self, key: S) -> &mut V { let key_bytes: &[u8] = key.as_ref(); - let hash = djb2(key.as_ref()); + let hash = hash(key.as_ref()); let masked_hash = self.mask_hash(hash); let mut probe = self.probe(hash); loop { @@ -163,7 +162,6 @@ mod tests { use super::*; use super::super::heap::{Heap, HeapAllocable}; - use super::djb2; use test::Bencher; use std::collections::hash_map::DefaultHasher; use std::hash::Hasher; @@ -218,20 +216,21 @@ mod tests { assert!(iter_values.next().is_none()); } - #[bench] - fn bench_djb2(bench: &mut Bencher) { - let v = String::from("abwer"); - bench.iter(|| djb2(v.as_bytes())); - } + // #[bench] + // fn bench_djb2(bench: &mut Bencher) { + // let v = String::from("abwer"); + // bench.iter(|| djb2(v.as_bytes())); + // } + + // #[bench] + // fn bench_siphasher(bench: &mut Bencher) { + // let v = String::from("abwer"); + // bench.iter(|| { + // let mut h = DefaultHasher::new(); + // h.write(v.as_bytes()); + // h.finish() + // }); + // } - #[bench] - fn bench_siphasher(bench: &mut Bencher) { - let v = String::from("abwer"); - bench.iter(|| { - let mut h = DefaultHasher::new(); - h.write(v.as_bytes()); - h.finish() - }); - } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index aa577f317..d6d59015a 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -29,7 +29,7 @@ use std::mem; use std::mem::swap; use std::thread::JoinHandle; use super::directory_lock::DirectoryLock; -use super::operation::AddOperation; +use super::operation::{AddOperation, AddOperations}; use super::segment_updater::SegmentUpdater; use std::thread; @@ -42,10 +42,10 @@ pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32; // Add document will block if the number of docs waiting in the queue to be indexed // reaches `PIPELINE_MAX_SIZE_IN_DOCS` -const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; +const PIPELINE_MAX_SIZE_IN_DOCS: usize = 100_000; -type DocumentSender = chan::Sender; -type DocumentReceiver = chan::Receiver; +type DocumentSender = chan::Sender; +type DocumentReceiver = chan::Receiver; /// `IndexWriter` is the user entry-point to add document to an index. /// @@ -250,32 +250,34 @@ fn index_documents(heap: &mut Heap, segment: Segment, schema: &Schema, generation: usize, - document_iterator: &mut Iterator, + document_iterator: &mut Iterator, segment_updater: &mut SegmentUpdater, mut delete_cursor: DeleteCursor) -> Result { heap.clear(); let segment_id = segment.id(); let mut segment_writer = SegmentWriter::for_segment(heap, segment.clone(), schema)?; - for doc in document_iterator { - try!(segment_writer.add_document(&doc, schema)); - // There is two possible conditions to close the segment. - // One is the memory arena dedicated to the segment is - // getting full. - if segment_writer.is_buffer_full() { - info!("Buffer limit reached, flushing segment with maxdoc={}.", - segment_writer.max_doc()); - break; - } - // The second is the term dictionary hash table - // is reaching saturation. - // - // Tantivy does not resize its hashtable. When it reaches - // capacity, we just stop indexing new document. - if segment_writer.is_term_saturated() { - info!("Term dic saturated, flushing segment with maxdoc={}.", - segment_writer.max_doc()); - break; + for docs in document_iterator { + for doc in docs { + try!(segment_writer.add_document(&doc, schema)); + // There is two possible conditions to close the segment. + // One is the memory arena dedicated to the segment is + // getting full. + if segment_writer.is_buffer_full() { + info!("Buffer limit reached, flushing segment with maxdoc={}.", + segment_writer.max_doc()); + break; + } + // The second is the term dictionary hash table + // is reaching saturation. + // + // Tantivy does not resize its hashtable. When it reaches + // capacity, we just stop indexing new document. + if segment_writer.is_term_saturated() { + info!("Term dic saturated, flushing segment with maxdoc={}.", + segment_writer.max_doc()); + break; + } } } let num_docs = segment_writer.max_doc(); @@ -375,9 +377,7 @@ impl IndexWriter { loop { - let mut document_iterator = - document_receiver_clone.clone().into_iter().peekable(); - + let mut document_iterator = document_receiver_clone.clone().into_iter().peekable(); // the peeking here is to avoid // creating a new segment's files // if no document are available. @@ -386,7 +386,7 @@ impl IndexWriter { // peeked document now belongs to // our local iterator. if let Some(operation) = document_iterator.peek() { - delete_cursor.skip_to(operation.opstamp); + delete_cursor.skip_to(operation.first_opstamp()); } else { // No more documents. // Happens when there is a commit, or if the `IndexWriter` @@ -583,10 +583,34 @@ impl IndexWriter { pub fn add_document(&mut self, document: Document) -> u64 { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { - opstamp: opstamp, + opstamp: opstamp, document: document, }; - self.document_sender.send(add_operation); + self.document_sender.send(AddOperations::from(add_operation)); + opstamp + } + + /// Adds documents. + /// + /// If the indexing pipeline is full, this call may block. + /// + /// The opstamp is an increasing `u64` that can + /// be used by the client to align commits with its own + /// document queue. + /// + /// Currently it represents the number of documents that + /// have been added since the creation of the index. + pub fn add_documents(&mut self, documents: Vec) -> u64 { + let mut ops = Vec::with_capacity(documents.len()); + let mut opstamp = 0u64; + for doc in documents { + opstamp = self.stamper.stamp(); + ops.push(AddOperation { + opstamp: opstamp, + document: doc, + }); + } + self.document_sender.send(AddOperations::from(ops)); opstamp } } diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index ecdc2d827..6395d98e9 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -15,3 +15,52 @@ pub struct AddOperation { pub opstamp: u64, pub document: Document, } + + +pub enum AddOperations { + Single(AddOperation), + Multiple(Vec), +} + +impl AddOperations { + pub fn first_opstamp(&self) -> u64 { + match *self { + AddOperations::Single(ref op) => { + op.opstamp + } + AddOperations::Multiple(ref ops) => { + ops[0].opstamp + } + } + } +} + +impl From for AddOperations { + fn from(op: AddOperation) -> AddOperations { + AddOperations::Single(op) + } +} + +impl From> for AddOperations { + fn from(ops: Vec) -> AddOperations { + AddOperations::Multiple(ops) + } +} + +impl IntoIterator for AddOperations { + + type Item = AddOperation; + + type IntoIter = Box>; + + fn into_iter(self) -> Self::IntoIter { + match self { + AddOperations::Single(op) => { + Box::new(Some(op).into_iter()) + } + AddOperations::Multiple(ops) => { + Box::new(ops.into_iter()) + } + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 5d13c8299..ab334c14d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,7 @@ extern crate futures; extern crate futures_cpupool; extern crate owning_ref; extern crate stable_deref_trait; +extern crate murmurhash64; #[cfg(test)] extern crate env_logger;