another hash

This commit is contained in:
Paul Masurel
2017-05-30 15:36:48 +09:00
parent 568d149db8
commit 4a805733db
5 changed files with 128 additions and 54 deletions

View File

@@ -42,6 +42,7 @@ futures-cpupool = "0.1"
error-chain = "0.8"
owning_ref = "0.3"
stable_deref_trait = "1.0.0"
murmurhash64 = "0.3"
[target.'cfg(windows)'.dependencies]
winapi = "0.2"

View File

@@ -1,15 +1,14 @@
use std::iter;
use super::heap::{Heap, HeapAllocable, BytesRef};
use murmurhash64::murmur_hash64a;
/// dbj2 hash function
fn djb2(key: &[u8]) -> u64 {
let mut state: u64 = 5381;
for &b in key {
state = (state << 5).wrapping_add(state).wrapping_add(b as u64);
}
state
const SEED: u64 = 2915580697u64;
fn hash(key: &[u8]) -> u64 {
murmur_hash64a(key, SEED)
}
impl Default for BytesRef {
fn default() -> BytesRef {
BytesRef {
@@ -99,7 +98,7 @@ impl<'a> HashMap<'a> {
}
pub fn is_saturated(&self) -> bool {
self.table.len() < self.occupied.len() * 5
self.table.len() < self.occupied.len() * 3
}
#[inline(never)]
@@ -137,7 +136,7 @@ impl<'a> HashMap<'a> {
pub fn get_or_create<S: AsRef<[u8]>, V: HeapAllocable>(&mut self, key: S) -> &mut V {
let key_bytes: &[u8] = key.as_ref();
let hash = djb2(key.as_ref());
let hash = hash(key.as_ref());
let masked_hash = self.mask_hash(hash);
let mut probe = self.probe(hash);
loop {
@@ -163,7 +162,6 @@ mod tests {
use super::*;
use super::super::heap::{Heap, HeapAllocable};
use super::djb2;
use test::Bencher;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
@@ -218,20 +216,21 @@ mod tests {
assert!(iter_values.next().is_none());
}
#[bench]
fn bench_djb2(bench: &mut Bencher) {
let v = String::from("abwer");
bench.iter(|| djb2(v.as_bytes()));
}
// #[bench]
// fn bench_djb2(bench: &mut Bencher) {
// let v = String::from("abwer");
// bench.iter(|| djb2(v.as_bytes()));
// }
// #[bench]
// fn bench_siphasher(bench: &mut Bencher) {
// let v = String::from("abwer");
// bench.iter(|| {
// let mut h = DefaultHasher::new();
// h.write(v.as_bytes());
// h.finish()
// });
// }
#[bench]
fn bench_siphasher(bench: &mut Bencher) {
let v = String::from("abwer");
bench.iter(|| {
let mut h = DefaultHasher::new();
h.write(v.as_bytes());
h.finish()
});
}
}

View File

@@ -29,7 +29,7 @@ use std::mem;
use std::mem::swap;
use std::thread::JoinHandle;
use super::directory_lock::DirectoryLock;
use super::operation::AddOperation;
use super::operation::{AddOperation, AddOperations};
use super::segment_updater::SegmentUpdater;
use std::thread;
@@ -42,10 +42,10 @@ pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32;
// Add document will block if the number of docs waiting in the queue to be indexed
// reaches `PIPELINE_MAX_SIZE_IN_DOCS`
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 100_000;
type DocumentSender = chan::Sender<AddOperation>;
type DocumentReceiver = chan::Receiver<AddOperation>;
type DocumentSender = chan::Sender<AddOperations>;
type DocumentReceiver = chan::Receiver<AddOperations>;
/// `IndexWriter` is the user entry-point to add document to an index.
///
@@ -250,32 +250,34 @@ fn index_documents(heap: &mut Heap,
segment: Segment,
schema: &Schema,
generation: usize,
document_iterator: &mut Iterator<Item = AddOperation>,
document_iterator: &mut Iterator<Item=AddOperations>,
segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor)
-> Result<bool> {
heap.clear();
let segment_id = segment.id();
let mut segment_writer = SegmentWriter::for_segment(heap, segment.clone(), schema)?;
for doc in document_iterator {
try!(segment_writer.add_document(&doc, schema));
// There is two possible conditions to close the segment.
// One is the memory arena dedicated to the segment is
// getting full.
if segment_writer.is_buffer_full() {
info!("Buffer limit reached, flushing segment with maxdoc={}.",
segment_writer.max_doc());
break;
}
// The second is the term dictionary hash table
// is reaching saturation.
//
// Tantivy does not resize its hashtable. When it reaches
// capacity, we just stop indexing new document.
if segment_writer.is_term_saturated() {
info!("Term dic saturated, flushing segment with maxdoc={}.",
segment_writer.max_doc());
break;
for docs in document_iterator {
for doc in docs {
try!(segment_writer.add_document(&doc, schema));
// There is two possible conditions to close the segment.
// One is the memory arena dedicated to the segment is
// getting full.
if segment_writer.is_buffer_full() {
info!("Buffer limit reached, flushing segment with maxdoc={}.",
segment_writer.max_doc());
break;
}
// The second is the term dictionary hash table
// is reaching saturation.
//
// Tantivy does not resize its hashtable. When it reaches
// capacity, we just stop indexing new document.
if segment_writer.is_term_saturated() {
info!("Term dic saturated, flushing segment with maxdoc={}.",
segment_writer.max_doc());
break;
}
}
}
let num_docs = segment_writer.max_doc();
@@ -375,9 +377,7 @@ impl IndexWriter {
loop {
let mut document_iterator =
document_receiver_clone.clone().into_iter().peekable();
let mut document_iterator = document_receiver_clone.clone().into_iter().peekable();
// the peeking here is to avoid
// creating a new segment's files
// if no document are available.
@@ -386,7 +386,7 @@ impl IndexWriter {
// peeked document now belongs to
// our local iterator.
if let Some(operation) = document_iterator.peek() {
delete_cursor.skip_to(operation.opstamp);
delete_cursor.skip_to(operation.first_opstamp());
} else {
// No more documents.
// Happens when there is a commit, or if the `IndexWriter`
@@ -583,10 +583,34 @@ impl IndexWriter {
pub fn add_document(&mut self, document: Document) -> u64 {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation {
opstamp: opstamp,
opstamp: opstamp,
document: document,
};
self.document_sender.send(add_operation);
self.document_sender.send(AddOperations::from(add_operation));
opstamp
}
/// Adds documents.
///
/// If the indexing pipeline is full, this call may block.
///
/// The opstamp is an increasing `u64` that can
/// be used by the client to align commits with its own
/// document queue.
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
pub fn add_documents(&mut self, documents: Vec<Document>) -> u64 {
let mut ops = Vec::with_capacity(documents.len());
let mut opstamp = 0u64;
for doc in documents {
opstamp = self.stamper.stamp();
ops.push(AddOperation {
opstamp: opstamp,
document: doc,
});
}
self.document_sender.send(AddOperations::from(ops));
opstamp
}
}

View File

@@ -15,3 +15,52 @@ pub struct AddOperation {
pub opstamp: u64,
pub document: Document,
}
pub enum AddOperations {
Single(AddOperation),
Multiple(Vec<AddOperation>),
}
impl AddOperations {
pub fn first_opstamp(&self) -> u64 {
match *self {
AddOperations::Single(ref op) => {
op.opstamp
}
AddOperations::Multiple(ref ops) => {
ops[0].opstamp
}
}
}
}
impl From<AddOperation> for AddOperations {
fn from(op: AddOperation) -> AddOperations {
AddOperations::Single(op)
}
}
impl From<Vec<AddOperation>> for AddOperations {
fn from(ops: Vec<AddOperation>) -> AddOperations {
AddOperations::Multiple(ops)
}
}
impl IntoIterator for AddOperations {
type Item = AddOperation;
type IntoIter = Box<Iterator<Item=AddOperation>>;
fn into_iter(self) -> Self::IntoIter {
match self {
AddOperations::Single(op) => {
Box::new(Some(op).into_iter())
}
AddOperations::Multiple(ops) => {
Box::new(ops.into_iter())
}
}
}
}

View File

@@ -63,6 +63,7 @@ extern crate futures;
extern crate futures_cpupool;
extern crate owning_ref;
extern crate stable_deref_trait;
extern crate murmurhash64;
#[cfg(test)]
extern crate env_logger;