switching for the stacker datastructure

This commit is contained in:
Paul Masurel
2016-09-05 10:27:14 +09:00
parent 45bb1c5ab8
commit 24d2e3f6c1
16 changed files with 714 additions and 362 deletions

View File

@@ -52,7 +52,11 @@ fn run(index_path: &Path) -> tantivy::Result<()> {
// There can be only one writer at one time.
// The writer will use more than one thread
// to use your multicore CPU.
let mut index_writer = try!(index.writer());
//
// Here we used a buffer of 1MB. In the
// real world, you want to use much more RAM on your indexer,
// to maximum your throughput. (300MB for instance)
let mut index_writer = try!(index.writer(1_000_000));

View File

@@ -108,15 +108,15 @@ impl Index {
/// Creates a multithreaded writer.
/// Each writer produces an independant segment.
pub fn writer_with_num_threads(&self, num_threads: usize) -> Result<IndexWriter> {
IndexWriter::open(self, num_threads)
pub fn writer_with_num_threads(&self, num_threads: usize, heap_size_in_bytes: usize) -> Result<IndexWriter> {
IndexWriter::open(self, num_threads, heap_size_in_bytes)
}
/// Creates a multithreaded writer
/// It just calls `writer_with_num_threads` with the number of core as `num_threads`
pub fn writer(&self,) -> Result<IndexWriter> {
self.writer_with_num_threads(num_cpus::get())
pub fn writer(&self, heap_size_in_bytes: usize) -> Result<IndexWriter> {
self.writer_with_num_threads(num_cpus::get(), heap_size_in_bytes)
}
pub fn schema(&self,) -> Schema {

View File

@@ -1,5 +1,6 @@
mod fstmap;
mod skip;
pub mod stacker;
pub use self::fstmap::FstMapBuilder;
pub use self::fstmap::FstMap;

View File

@@ -0,0 +1,213 @@
use std::iter;
use std::marker::PhantomData;
use super::heap::{Heap, BytesRef};
fn djb2(key: &[u8]) -> u64 {
let mut state: u64 = 5381;
for &b in key {
state = (state << 5).wrapping_add(state).wrapping_add(b as u64);
}
state
}
impl Default for BytesRef {
fn default() -> BytesRef {
BytesRef {
start: 0u32,
stop: 0u32,
}
}
}
#[derive(Copy, Clone, Default)]
struct KeyValue {
key: BytesRef,
value_addr: u32,
}
impl KeyValue {
fn is_empty(&self,) -> bool {
self.key.stop == 0u32
}
}
pub struct HashMap<'a, V> where V: From<u32> {
table: Box<[KeyValue]>,
heap: &'a Heap,
_phantom: PhantomData<V>,
mask: usize,
occupied: Vec<usize>,
}
pub enum Entry {
Vacant(usize),
Occupied(u32),
}
impl<'a, V> HashMap<'a, V> where V: From<u32> {
pub fn new(num_bucket_power_of_2: usize, heap: &'a Heap) -> HashMap<'a, V> {
let table_size = 1 << num_bucket_power_of_2;
let table: Vec<KeyValue> = iter::repeat(KeyValue::default())
.take(table_size)
.collect();
HashMap {
table: table.into_boxed_slice(),
heap: heap,
_phantom: PhantomData,
mask: table_size - 1,
occupied: Vec::with_capacity(table_size / 2),
}
}
#[inline(always)]
fn bucket(&self, key: &[u8]) -> usize {
let hash: u64 = djb2(key);
(hash as usize) & self.mask
}
fn get_key(&self, bytes_ref: BytesRef) -> &[u8] {
self.heap.get_slice(bytes_ref)
}
pub fn set_bucket(&mut self, key_bytes: &[u8], bucket: usize, addr: u32) -> u32 {
self.occupied.push(bucket);
self.table[bucket] = KeyValue {
key: self.heap.allocate_and_set(key_bytes),
value_addr: addr,
};
addr
}
pub fn iter<'b: 'a>(&'b self,) -> impl Iterator<Item=(&'a [u8], (u32, &'a V))> + 'b {
let heap: &'a Heap = &self.heap;
let table: &'b [KeyValue] = &self.table;
self.occupied
.iter()
.cloned()
.map(move |bucket: usize| {
let kv = table[bucket];
let addr = kv.value_addr;
let v: &V = heap.get_mut_ref::<V>(addr);
(heap.get_slice(kv.key), (addr, v))
})
// .map(move |addr: u32| (heap.get_mut_ref::<V>(addr)) )
}
pub fn values_mut<'b: 'a>(&'b self,) -> impl Iterator<Item=&'a mut V> + 'b {
let heap: &'a Heap = &self.heap;
let table: &'b [KeyValue] = &self.table;
self.occupied
.iter()
.cloned()
.map(move |bucket: usize| table[bucket].value_addr)
.map(move |addr: u32| heap.get_mut_ref::<V>(addr))
}
pub fn get_or_create<S: AsRef<[u8]>>(&mut self, key: S) -> &mut V {
let entry = self.lookup(key.as_ref());
match entry {
Entry::Occupied(addr) => {
self.heap.get_mut_ref(addr)
}
Entry::Vacant(bucket) => {
let (addr, val): (u32, &mut V) = self.heap.new();
self.set_bucket(key.as_ref(), bucket, addr);
val
}
}
}
pub fn lookup<S: AsRef<[u8]>>(&self, key: S) -> Entry {
let key_bytes: &[u8] = key.as_ref();
let mut bucket = self.bucket(key_bytes);
loop {
let kv: KeyValue = self.table[bucket];
if kv.is_empty() {
return Entry::Vacant(bucket);
}
if self.get_key(kv.key) == key_bytes {
return Entry::Occupied(kv.value_addr);
}
bucket = (bucket + 1) & self.mask;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::heap::Heap;
use super::djb2;
use test::Bencher;
use std::hash::SipHasher;
use std::hash::Hasher;
struct TestValue {
val: u32,
_addr: u32,
}
impl From<u32> for TestValue {
fn from(addr: u32) -> TestValue {
TestValue {
val: 0u32,
_addr: addr,
}
}
}
#[test]
fn test_hash_map() {
let heap = Heap::with_capacity(2_000_000);
let mut hash_map: HashMap<TestValue> = HashMap::new(18, &heap);
{
{
let v: &mut TestValue = hash_map.get_or_create("abc");
assert_eq!(v.val, 0u32);
v.val = 3u32;
}
}
{
let v: &mut TestValue = hash_map.get_or_create("abcd");
assert_eq!(v.val, 0u32);
v.val = 4u32;
}
{
let v: &mut TestValue = hash_map.get_or_create("abc");
assert_eq!(v.val, 3u32);
}
{
let v: &mut TestValue = hash_map.get_or_create("abcd");
assert_eq!(v.val, 4u32);
}
let mut iter_values = hash_map.values_mut();
assert_eq!(iter_values.next().unwrap().val, 3u32);
assert_eq!(iter_values.next().unwrap().val, 4u32);
assert!(!iter_values.next().is_some());
}
#[bench]
fn bench_djb2(bench: &mut Bencher) {
let v = String::from("abwer");
bench.iter(|| {
djb2(v.as_bytes())
});
}
#[bench]
fn bench_siphasher(bench: &mut Bencher) {
let v = String::from("abwer");
bench.iter(|| {
let mut h = SipHasher::new();
h.write(v.as_bytes());
h.finish()
});
}
}

View File

@@ -0,0 +1,139 @@
use std::cell::UnsafeCell;
use std::mem;
use std::ptr;
use std::iter;
#[derive(Copy, Clone)]
pub struct BytesRef {
pub start: u32,
pub stop: u32,
}
struct InnerHeap {
buffer: Vec<u8>,
used: u32,
}
pub struct Heap {
inner: UnsafeCell<InnerHeap>,
}
impl Heap {
pub fn with_capacity(num_bytes: usize) -> Heap {
Heap {
inner: UnsafeCell::new(
InnerHeap::with_capacity(num_bytes)
),
}
}
fn inner(&self,) -> &mut InnerHeap {
unsafe { &mut *self.inner.get() }
}
pub fn clear(&self) {
self.inner().clear();
}
pub fn len(&self,) -> u32 {
self.inner().len()
}
pub fn free(&self,) -> u32 {
self.inner().free()
}
pub fn allocate(&self, num_bytes: usize) -> u32 {
self.inner().allocate(num_bytes)
}
pub fn new<V: From<u32>>(&self,) -> (u32, &mut V) {
let addr = self.inner().allocate(mem::size_of::<V>());
let v: V = V::from(addr);
self.inner().set(addr, &v);
(addr, self.inner().get_mut_ref(addr))
}
pub fn allocate_and_set(&self, data: &[u8]) -> BytesRef {
self.inner().allocate_and_set(data)
}
pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] {
self.inner().get_slice(bytes_ref)
}
pub fn set<Item>(&self, addr: u32, val: &Item) {
self.inner().set(addr, val);
}
pub fn get_mut_ref<Item>(&self, addr: u32) -> &mut Item {
self.inner().get_mut_ref(addr)
}
}
impl InnerHeap {
pub fn with_capacity(num_bytes: usize) -> InnerHeap {
InnerHeap {
buffer: iter::repeat(0u8).take(num_bytes).collect(),
used: 0u32,
}
}
pub fn clear(&mut self) {
self.used = 0u32;
}
pub fn len(&self,) -> u32 {
self.used
}
pub fn free(&self,) -> u32 {
(self.buffer.len() as u32) - self.used
}
pub fn allocate(&mut self, num_bytes: usize) -> u32 {
let addr = self.used;
self.used += num_bytes as u32;
let len_buffer = self.buffer.len();
if self.used > len_buffer as u32 {
self.buffer.resize(self.used as usize * 2, 0u8);
}
addr
}
pub fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef {
let start = self.allocate(data.len()) as usize;
let stop = start + data.len();
&mut self.buffer[start..stop].clone_from_slice(data);
BytesRef {
start: start as u32,
stop: stop as u32,
}
}
pub fn get_mut(&mut self, addr: u32) -> *mut u8 {
let addr_usize = addr as isize;
unsafe { self.buffer.as_mut_ptr().offset(addr_usize) }
}
pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] {
&self.buffer[bytes_ref.start as usize .. bytes_ref.stop as usize]
}
pub fn get_mut_ref<Item>(&mut self, addr: u32) -> &mut Item {
let v_ptr_u8 = self.get_mut(addr) as *mut u8;
let v_ptr = v_ptr_u8 as *mut Item;
unsafe { &mut *v_ptr }
}
pub fn set<Item>(&mut self, addr: u32, val: &Item) {
let v_ptr: *const Item = val as *const Item;
let v_ptr_u8: *const u8 = v_ptr as *const u8;
unsafe {
let dest_ptr: *mut u8 = self.get_mut(addr);
ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::<Item>());
}
}
}

View File

@@ -0,0 +1,7 @@
mod hashmap;
mod heap;
mod stack;
pub use self::heap::Heap;
pub use self::stack::Stack;
pub use self::hashmap::HashMap;

View File

@@ -0,0 +1,182 @@
use std::mem;
use super::heap::Heap;
#[inline]
pub fn is_power_of_2(val: u32) -> bool {
val & (val - 1) == 0
}
#[inline]
pub fn jump_needed(val: u32) -> bool {
val > 3 && is_power_of_2(val)
}
#[derive(Debug)]
pub struct Stack {
len: u32,
end: u32,
val0: u32,
val1: u32,
val2: u32,
next: u32, // inline of the first block
}
impl Stack {
pub fn iterate<'a>(&self, addr: u32, heap: &'a Heap) -> StackIterator<'a> {
StackIterator {
heap: heap,
addr: addr + 2u32 * (mem::size_of::<u32>() as u32),
len: self.len,
consumed: 0,
}
}
pub fn push(&mut self, val: u32, heap: &Heap) {
self.len += 1;
if jump_needed(self.len) {
// we need to allocate another block.
// ... As we want to grow block exponentially
// the next block as a size of (length so far),
// and we need to add 1u32 to store the pointer
// to the next element.
let new_block_size: usize = (self.len as usize + 1) * mem::size_of::<u32>();
let new_block_addr: u32 = heap.allocate(new_block_size);
heap.set(self.end, &new_block_addr);
self.end = new_block_addr;
}
heap.set(self.end, &val);
self.end += mem::size_of::<u32>() as u32;
}
}
impl From<u32> for Stack {
fn from(addr: u32) -> Stack {
let last_addr = addr + mem::size_of::<u32>() as u32 * 2u32;
Stack {
len: 0u32,
end: last_addr,
val0: 0u32,
val1: 0u32,
val2: 0u32,
next: 0u32,
}
}
}
impl Default for Stack {
fn default() -> Stack {
Stack {
len: 0u32,
end: 0u32,
val0: 0u32,
val1: 0u32,
val2: 0u32,
next: 0u32,
}
}
}
pub struct StackIterator<'a> {
heap: &'a Heap,
addr: u32,
len: u32,
consumed: u32,
}
impl<'a> Iterator for StackIterator<'a> {
type Item = u32;
fn next(&mut self,) -> Option<u32> {
if self.consumed == self.len {
None
}
else {
let addr: u32;
self.consumed += 1;
if jump_needed(self.consumed) {
addr = *self.heap.get_mut_ref(self.addr);
}
else {
addr = self.addr;
}
self.addr = addr + mem::size_of::<u32>() as u32;
Some(*self.heap.get_mut_ref(addr))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::heap::Heap;
use test::Bencher;
const NUM_STACK: usize = 10_000;
const STACK_SIZE: u32 = 1000;
#[test]
fn test_stack() {
let heap = Heap::with_capacity(1_000_000);
let (addr, stack) = heap.new::<Stack>();
stack.push(1u32, &heap);
stack.push(2u32, &heap);
stack.push(4u32, &heap);
stack.push(8u32, &heap);
{
let mut it = stack.iterate(addr, &heap);
assert_eq!(it.next().unwrap(), 1u32);
assert_eq!(it.next().unwrap(), 2u32);
assert_eq!(it.next().unwrap(), 4u32);
assert_eq!(it.next().unwrap(), 8u32);
assert!(it.next().is_none());
}
}
#[bench]
fn bench_push_vec(bench: &mut Bencher) {
bench.iter(|| {
let mut vecs = Vec::with_capacity(100);
for _ in 0..NUM_STACK {
vecs.push(Vec::new());
}
for s in 0..NUM_STACK {
for i in 0u32..STACK_SIZE {
let t = s * 392017 % NUM_STACK;
vecs[t].push(i);
}
}
});
}
#[bench]
fn bench_push_stack(bench: &mut Bencher) {
let heap = Heap::with_capacity(64_000_000);
bench.iter(|| {
let mut stacks = Vec::with_capacity(100);
for _ in 0..NUM_STACK {
let (_, stack) = heap.new::<Stack>();
stacks.push(stack);
}
for s in 0..NUM_STACK {
for i in 0u32..STACK_SIZE {
let t = s * 392017 % NUM_STACK;
stacks[t].push(i, &heap);
}
}
heap.clear();
});
}
}

View File

@@ -12,14 +12,15 @@ use std::thread;
use std::collections::HashSet;
use indexer::merger::IndexMerger;
use core::SegmentId;
use datastruct::stacker::Heap;
use std::mem::swap;
use postings::BlockStore;
use chan;
use Result;
use Error;
pub struct IndexWriter {
heap_size_in_bytes: usize,
index: Index,
workers_join_handle: Vec<JoinHandle<()>>,
segment_ready_sender: chan::Sender<Result<(SegmentId, usize)>>,
@@ -33,12 +34,12 @@ pub struct IndexWriter {
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
fn index_documents(block_store: &mut BlockStore,
fn index_documents(heap: &mut Heap,
segment: Segment,
schema: &Schema,
document_iterator: &mut Iterator<Item=Document>) -> Result<usize> {
block_store.clear();
let mut segment_writer = try!(SegmentWriter::for_segment(block_store, segment, &schema));
heap.clear();
let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment, &schema));
for doc in document_iterator {
try!(segment_writer.add_document(&doc, &schema));
if segment_writer.is_buffer_full() {
@@ -64,8 +65,9 @@ impl IndexWriter {
let schema = self.index.schema();
let segment_ready_sender_clone = self.segment_ready_sender.clone();
let document_receiver_clone = self.document_receiver.clone();
let heap_size_in_bytes = self.heap_size_in_bytes;
let join_handle: JoinHandle<()> = thread::spawn(move || {
let mut block_store = BlockStore::allocate(1_500_000);
let mut heap = Heap::with_capacity(heap_size_in_bytes);
loop {
let segment = index.new_segment();
let segment_id = segment.id();
@@ -77,7 +79,7 @@ impl IndexWriter {
// creating a new segment's files
// if no document are available.
if document_iterator.peek().is_some() {
let index_result = index_documents(&mut block_store, segment, &schema, &mut document_iterator)
let index_result = index_documents(&mut heap, segment, &schema, &mut document_iterator)
.map(|num_docs| (segment_id, num_docs));
segment_ready_sender_clone.send(index_result);
}
@@ -94,10 +96,13 @@ impl IndexWriter {
///
/// num_threads tells the number of indexing worker that
/// should work at the same time.
pub fn open(index: &Index, num_threads: usize) -> Result<IndexWriter> {
pub fn open(index: &Index,
num_threads: usize,
heap_size_in_bytes: usize) -> Result<IndexWriter> {
let (document_sender, document_receiver): (chan::Sender<Document>, chan::Receiver<Document>) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let (segment_ready_sender, segment_ready_receiver): (chan::Sender<Result<(SegmentId, usize)>>, chan::Receiver<Result<(SegmentId, usize)>>) = chan::async();
let mut index_writer = IndexWriter {
heap_size_in_bytes: heap_size_in_bytes,
index: index.clone(),
segment_ready_receiver: segment_ready_receiver,
segment_ready_sender: segment_ready_sender,
@@ -280,7 +285,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(8).unwrap();
let mut index_writer = index.writer_with_num_threads(3, 30_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a");

View File

@@ -294,7 +294,7 @@ mod tests {
{
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af b");
@@ -318,7 +318,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af b");
@@ -336,7 +336,7 @@ mod tests {
}
{
let segments = index.segments().unwrap();
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
index_writer.merge(&segments).unwrap();
}
{

View File

@@ -17,12 +17,12 @@ use schema::TextIndexingOptions;
use postings::SpecializedPostingsWriter;
use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder};
use indexer::segment_serializer::SegmentSerializer;
use postings::BlockStore;
use datastruct::stacker::Heap;
pub struct SegmentWriter<'a> {
block_store: &'a mut BlockStore,
heap: &'a Heap,
max_doc: DocId,
per_field_postings_writers: Vec<Box<PostingsWriter>>,
per_field_postings_writers: Vec<Box<PostingsWriter + 'a>>,
segment_serializer: SegmentSerializer,
fast_field_writers: U32FastFieldsWriter,
fieldnorms_writer: U32FastFieldsWriter,
@@ -38,23 +38,23 @@ fn create_fieldnorms_writer(schema: &Schema) -> U32FastFieldsWriter {
U32FastFieldsWriter::new(u32_fields)
}
fn posting_from_field_entry(field_entry: &FieldEntry) -> Box<PostingsWriter> {
fn posting_from_field_entry<'a>(field_entry: &FieldEntry, heap: &'a Heap) -> Box<PostingsWriter + 'a> {
match field_entry.field_type() {
&FieldType::Str(ref text_options) => {
match text_options.get_indexing_options() {
TextIndexingOptions::TokenizedWithFreq => {
SpecializedPostingsWriter::<TermFrequencyRecorder>::new_boxed()
SpecializedPostingsWriter::<TermFrequencyRecorder>::new_boxed(heap)
}
TextIndexingOptions::TokenizedWithFreqAndPosition => {
SpecializedPostingsWriter::<TFAndPositionRecorder>::new_boxed()
SpecializedPostingsWriter::<TFAndPositionRecorder>::new_boxed(heap)
}
_ => {
SpecializedPostingsWriter::<NothingRecorder>::new_boxed()
SpecializedPostingsWriter::<NothingRecorder>::new_boxed(heap)
}
}
}
&FieldType::U32(_) => {
SpecializedPostingsWriter::<NothingRecorder>::new_boxed()
SpecializedPostingsWriter::<NothingRecorder>::new_boxed(heap)
}
}
}
@@ -63,16 +63,15 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box<PostingsWriter> {
impl<'a> SegmentWriter<'a> {
pub fn for_segment(block_store: &'a mut BlockStore, mut segment: Segment, schema: &Schema) -> Result<SegmentWriter<'a>> {
pub fn for_segment(heap: &'a Heap, mut segment: Segment, schema: &Schema) -> Result<SegmentWriter<'a>> {
let segment_serializer = try!(SegmentSerializer::for_segment(&mut segment));
let per_field_postings_writers = schema.fields()
.iter()
.map(|field_entry| {
posting_from_field_entry(field_entry)
})
.collect();
let mut per_field_postings_writers: Vec<Box<PostingsWriter + 'a>> = Vec::new();
for field_entry in schema.fields() {
let postings_writer: Box<PostingsWriter + 'a> = posting_from_field_entry(field_entry, heap);
per_field_postings_writers.push(postings_writer);
}
Ok(SegmentWriter {
block_store: block_store,
heap: heap,
max_doc: 0,
per_field_postings_writers: per_field_postings_writers,
fieldnorms_writer: create_fieldnorms_writer(schema),
@@ -91,18 +90,18 @@ impl<'a> SegmentWriter<'a> {
pub fn finalize(mut self,) -> Result<()> {
let segment_info = self.segment_info();
for per_field_postings_writer in self.per_field_postings_writers.iter_mut() {
per_field_postings_writer.close(&mut self.block_store);
per_field_postings_writer.close(self.heap);
}
write(&mut self.block_store,
&self.per_field_postings_writers,
write(&self.per_field_postings_writers,
&self.fast_field_writers,
&self.fieldnorms_writer,
segment_info,
self.segment_serializer)
self.segment_serializer,
self.heap)
}
pub fn is_buffer_full(&self,) -> bool {
self.block_store.num_free_blocks() < 100_000
self.heap.free() <= 1_000_000
}
pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> {
@@ -112,17 +111,17 @@ impl<'a> SegmentWriter<'a> {
let field_options = schema.get_field_entry(field);
match *field_options.field_type() {
FieldType::Str(ref text_options) => {
let mut num_tokens = 0u32;
let mut num_tokens = 0;
if text_options.get_indexing_options().is_tokenized() {
num_tokens = field_posting_writer.index_text(&mut self.block_store, doc_id, field, &field_values);
num_tokens = field_posting_writer.index_text(doc_id, field, &field_values, self.heap);
}
else {
for field_value in field_values {
let term = Term::from_field_text(field, field_value.value().text());
field_posting_writer.suscribe(&mut self.block_store, doc_id, 0, &term);
field_posting_writer.suscribe(doc_id, 0, &term, self.heap);
num_tokens += 1u32;
}
}
}
self.fieldnorms_writer
.get_field_writer(field)
.map(|field_norms_writer| {
@@ -133,14 +132,13 @@ impl<'a> SegmentWriter<'a> {
if u32_options.is_indexed() {
for field_value in field_values {
let term = Term::from_field_u32(field_value.field(), field_value.value().u32_value());
field_posting_writer.suscribe(&mut self.block_store, doc_id, 0, &term);
field_posting_writer.suscribe(doc_id, 0, &term, self.heap);
}
}
}
}
}
self.fieldnorms_writer.fill_val_up_to(doc_id);
self.fast_field_writers.add_document(&doc);
@@ -168,14 +166,14 @@ impl<'a> SegmentWriter<'a> {
}
fn write(block_store: &BlockStore,
per_field_postings_writers: &Vec<Box<PostingsWriter>>,
fn write<'a>(per_field_postings_writers: &Vec<Box<PostingsWriter + 'a>>,
fast_field_writers: &U32FastFieldsWriter,
fieldnorms_writer: &U32FastFieldsWriter,
segment_info: SegmentInfo,
mut serializer: SegmentSerializer) -> Result<()> {
mut serializer: SegmentSerializer,
heap: &'a Heap,) -> Result<()> {
for per_field_postings_writer in per_field_postings_writers.iter() {
try!(per_field_postings_writer.serialize(block_store, serializer.get_postings_serializer()));
try!(per_field_postings_writer.serialize(serializer.get_postings_serializer(), heap));
}
try!(fast_field_writers.serialize(serializer.get_fast_field_serializer()));
try!(fieldnorms_writer.serialize(serializer.get_fieldnorms_serializer()));
@@ -186,11 +184,11 @@ fn write(block_store: &BlockStore,
impl<'a> SerializableSegment for SegmentWriter<'a> {
fn write(&self, serializer: SegmentSerializer) -> Result<()> {
write(&self.block_store,
&self.per_field_postings_writers,
write(&self.per_field_postings_writers,
&self.fast_field_writers,
&self.fieldnorms_writer,
self.segment_info(),
serializer)
serializer,
self.heap)
}
}

View File

@@ -5,9 +5,11 @@ Tantivy is a search engine library.
*/
#![feature(binary_heap_extras)]
#![feature(conservative_impl_trait)]
#![cfg_attr(test, feature(test))]
#![cfg_attr(test, feature(step_by))]
#![doc(test(attr(allow(unused_variables), deny(warnings))))]
#![feature(conservative_impl_trait)]
#[macro_use]
@@ -33,7 +35,6 @@ extern crate itertools;
extern crate chan;
extern crate crossbeam;
#[cfg(test)] extern crate test;
#[cfg(test)] extern crate rand;
@@ -138,7 +139,7 @@ mod tests {
let index = Index::create_from_tempdir(schema).unwrap();
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af b");
@@ -165,14 +166,14 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut doc = Document::new();
doc.add_text(text_field, "a b c");
index_writer.add_document(doc).unwrap();
index_writer.commit().unwrap();
}
{
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a");
@@ -186,7 +187,7 @@ mod tests {
index_writer.commit().unwrap();
}
{
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap();
let mut doc = Document::new();
doc.add_text(text_field, "c");
index_writer.add_document(doc).unwrap();
@@ -212,7 +213,7 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a b c");
@@ -248,7 +249,7 @@ mod tests {
let index = Index::create_in_ram(schema);
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af af af bc bc");
@@ -276,7 +277,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af af af b");
@@ -345,7 +346,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af b");

View File

@@ -1,193 +0,0 @@
pub const BLOCK_SIZE: u32 = 64u32;
struct Block {
data: [u32; BLOCK_SIZE as usize],
next: u32,
}
impl Block {
fn new() -> Block {
Block {
data: [0u32; BLOCK_SIZE as usize],
next: u32::max_value(),
}
}
}
#[derive(Copy, Clone)]
struct ListInfo {
first: u32,
last: u32,
len: u32,
}
pub struct BlockStore {
lists: Vec<ListInfo>,
blocks: Vec<Block>,
free_block_id: usize,
}
impl BlockStore {
pub fn allocate(num_blocks: usize) -> BlockStore {
BlockStore {
lists: Vec::with_capacity(num_blocks),
blocks: (0 .. num_blocks).map(|_| Block::new()).collect(),
free_block_id: 0,
}
}
pub fn num_free_blocks(&self) -> usize {
self.blocks.len() - self.free_block_id
}
pub fn new_list(&mut self) -> u32 {
let res = self.lists.len() as u32;
let new_block_id = self.new_block().unwrap();
self.lists.push(ListInfo {
first: new_block_id,
last: new_block_id,
len: 0,
});
res
}
pub fn clear(&mut self,) {
self.free_block_id = 0;
}
fn new_block(&mut self,) -> Option<u32> {
let block_id = self.free_block_id;
self.free_block_id += 1;
if block_id >= self.blocks.len() {
None
}
else {
self.blocks[block_id].next = u32::max_value();
Some(block_id as u32)
}
}
fn get_list_info(&mut self, list_id: u32) -> &mut ListInfo {
&mut self.lists[list_id as usize]
}
fn block_id_to_append(&mut self, list_id: u32) -> u32 {
let list_info: ListInfo = self.lists[list_id as usize];
if list_info.len != 0 && list_info.len % BLOCK_SIZE == 0 {
// we need to add a fresh new block.
let new_block_id: u32 = { self.new_block().expect("Failed to allocate block") };
let last_block_id: usize;
{
// update the list info.
let list_info: &mut ListInfo = self.get_list_info(list_id);
last_block_id = list_info.last as usize;
list_info.last = new_block_id;
}
self.blocks[last_block_id].next = new_block_id;
new_block_id
}
else {
list_info.last
}
}
pub fn push(&mut self, list_id: u32, val: u32) {
let block_id: u32 = self.block_id_to_append(list_id);
let list_len: u32;
{
let list_info: &mut ListInfo = self.get_list_info(list_id);
list_len = list_info.len;
list_info.len += 1u32;
}
self.blocks[block_id as usize].data[(list_len % BLOCK_SIZE) as usize] = val;
}
pub fn iter_list(&self, list_id: u32) -> BlockIterator {
let list_info = &self.lists[list_id as usize];
BlockIterator {
current_block: &self.blocks[list_info.first as usize],
blocks: &self.blocks,
cursor: 0,
len: list_info.len as usize,
}
}
}
pub struct BlockIterator<'a> {
current_block: &'a Block,
blocks: &'a [Block],
cursor: usize,
len: usize,
}
impl<'a> Iterator for BlockIterator<'a> {
type Item = u32;
fn next(&mut self) -> Option<u32> {
if self.cursor == self.len {
None
}
else {
if self.cursor % (BLOCK_SIZE as usize) == 0 {
if self.cursor != 0 {
if self.current_block.next != u32::max_value() {
self.current_block = &self.blocks[self.current_block.next as usize];
}
else {
panic!("Block linked list ended prematurely.");
}
}
}
let res = self.current_block.data[self.cursor % (BLOCK_SIZE as usize)];
self.cursor += 1;
Some(res)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_block_store() {
let mut block_store = BlockStore::allocate(50_000);
let list_2 = block_store.new_list();
let list_3 = block_store.new_list();
let list_4 = block_store.new_list();
let list_5 = block_store.new_list();
for i in 0 .. 2_000 {
block_store.push(list_2, i * 2);
block_store.push(list_3, i * 3);
}
for i in 0 .. 10 {
block_store.push(list_4, i * 4);
block_store.push(list_5, i * 5);
}
let mut list2_iter = block_store.iter_list(list_2);
let mut list3_iter = block_store.iter_list(list_3);
let mut list4_iter = block_store.iter_list(list_4);
let mut list5_iter = block_store.iter_list(list_5);
for i in 0 .. 2_000 {
assert_eq!(list2_iter.next().unwrap(), i * 2);
assert_eq!(list3_iter.next().unwrap(), i * 3);
}
assert!(list2_iter.next().is_none());
assert!(list3_iter.next().is_none());
for i in 0 .. 10 {
assert_eq!(list4_iter.next().unwrap(), i * 4);
assert_eq!(list5_iter.next().unwrap(), i * 5);
}
assert!(list4_iter.next().is_none());
assert!(list5_iter.next().is_none());
}
}

View File

@@ -12,7 +12,6 @@ mod freq_handler;
mod docset;
mod scored_docset;
mod segment_postings_option;
mod block_store;
pub use self::docset::{SkipResult, DocSet};
pub use self::offset_postings::OffsetPostings;
@@ -31,7 +30,6 @@ pub use self::freq_handler::FreqHandler;
pub use self::scored_docset::ScoredDocSet;
pub use self::postings::HasLen;
pub use self::segment_postings_option::SegmentPostingsOption;
pub use self::block_store::BlockStore;
#[cfg(test)]
mod tests {
@@ -43,6 +41,7 @@ mod tests {
use core::SegmentReader;
use core::Index;
use std::iter;
use datastruct::stacker::Heap;
#[test]
@@ -72,9 +71,9 @@ mod tests {
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let segment = index.new_segment();
let mut block_store = BlockStore::allocate(50_000);
let heap = Heap::with_capacity(10_000_000);
{
let mut segment_writer = SegmentWriter::for_segment(&mut block_store, segment.clone(), &schema).unwrap();
let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a b a c a d a a.");

View File

@@ -1,24 +1,23 @@
use DocId;
use std::collections::HashMap;
use schema::Term;
use schema::FieldValue;
use postings::PostingsSerializer;
use std::io;
use postings::Recorder;
use postings::block_store::BlockStore;
use analyzer::SimpleTokenizer;
use schema::Field;
use analyzer::StreamingIterator;
use datastruct::stacker::{HashMap, Heap};
pub trait PostingsWriter {
fn close(&mut self, block_store: &mut BlockStore);
fn close(&mut self, heap: &Heap);
fn suscribe(&mut self, block_store: &mut BlockStore, doc: DocId, pos: u32, term: &Term);
fn suscribe(&mut self, doc: DocId, pos: u32, term: &Term, heap: &Heap);
fn serialize(&self, block_store: &BlockStore, serializer: &mut PostingsSerializer) -> io::Result<()>;
fn serialize(&self, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()>;
fn index_text<'a>(&mut self, block_store: &mut BlockStore, doc_id: DocId, field: Field, field_values: &Vec<&'a FieldValue>) -> u32 {
fn index_text<'a>(&mut self, doc_id: DocId, field: Field, field_values: &Vec<&'a FieldValue>, heap: &Heap) -> u32 {
let mut pos = 0u32;
let mut num_tokens: u32 = 0u32;
let mut term = Term::allocate(field, 100);
@@ -30,7 +29,7 @@ pub trait PostingsWriter {
match tokens.next() {
Some(token) => {
term.set_text(token);
self.suscribe(block_store, doc_id, pos, &term);
self.suscribe(doc_id, pos, &term, heap);
pos += 1u32;
num_tokens += 1u32;
},
@@ -45,68 +44,58 @@ pub trait PostingsWriter {
}
}
pub struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
term_index: HashMap<Term, Rec>,
pub struct SpecializedPostingsWriter<'a, Rec: Recorder + 'static> {
term_index: HashMap<'a, Rec>,
}
#[inline(always)]
fn get_or_create_recorder<'a, Rec: Recorder>(term: &Term, term_index: &'a mut HashMap<Term, Rec>, block_store: &mut BlockStore) -> &'a mut Rec {
if term_index.contains_key(term) {
term_index.get_mut(term).expect("The term should be here as we just checked it")
}
else {
term_index
.entry(term.clone())
.or_insert_with(|| Rec::new(block_store))
}
}
impl<Rec: Recorder + 'static> SpecializedPostingsWriter<Rec> {
impl<'a, Rec: Recorder + 'static> SpecializedPostingsWriter<'a, Rec> {
pub fn new() -> SpecializedPostingsWriter<Rec> {
pub fn new(heap: &'a Heap) -> SpecializedPostingsWriter<'a, Rec> {
SpecializedPostingsWriter {
term_index: HashMap::new(),
term_index: HashMap::new(25, heap), // TODO compute the size of the table as a % of the heap
}
}
pub fn new_boxed() -> Box<PostingsWriter> {
Box::new(Self::new())
}
pub fn new_boxed(heap: &'a Heap) -> Box<PostingsWriter + 'a> {
let res = SpecializedPostingsWriter::<Rec>::new(heap);
Box::new(res)
}
}
impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec> {
impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<'a, Rec> {
fn close(&mut self, block_store: &mut BlockStore) {
fn close(&mut self, heap: &Heap) {
for recorder in self.term_index.values_mut() {
recorder.close_doc(block_store);
recorder.close_doc(heap);
}
}
#[inline(always)]
fn suscribe(&mut self, block_store: &mut BlockStore, doc: DocId, position: u32, term: &Term) {
let mut recorder = get_or_create_recorder(term, &mut self.term_index, block_store);
fn suscribe(&mut self, doc: DocId, position: u32, term: &Term, heap: &Heap) {
let mut recorder = self.term_index.get_or_create(term);
let current_doc = recorder.current_doc();
if current_doc != doc {
if current_doc != u32::max_value() {
recorder.close_doc(block_store);
recorder.close_doc(heap);
}
recorder.new_doc(block_store, doc);
recorder.new_doc(doc, heap);
}
recorder.record_position(block_store, position);
recorder.record_position(position, heap);
}
fn serialize(&self, block_store: &BlockStore, serializer: &mut PostingsSerializer) -> io::Result<()> {
let mut term_offsets: Vec<(&Term, &Rec)> = self.term_index
fn serialize(&self, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> {
let mut term_offsets: Vec<(&[u8], (u32, &Rec))> = self.term_index
.iter()
.map(|(k,v)| (k, v))
.collect();
term_offsets.sort_by_key(|&(k, _v)| k);
for (term, recorder) in term_offsets {
try!(serializer.new_term(term, recorder.doc_freq()));
try!(recorder.serialize(serializer, block_store));
let mut term = Term::allocate(Field(0), 100);
for (term_bytes, (addr, recorder)) in term_offsets {
// TODO remove copy
term.set_content(term_bytes);
try!(serializer.new_term(&term, recorder.doc_freq()));
try!(recorder.serialize(addr, serializer, heap));
try!(serializer.close_term());
}
Ok(())

View File

@@ -1,62 +1,59 @@
use postings::block_store::BlockStore;
use DocId;
use std::io;
use postings::PostingsSerializer;
use datastruct::stacker::{Stack, Heap};
const EMPTY_ARRAY: [u32; 0] = [0u32; 0];
const POSITION_END: u32 = 4294967295;
pub trait Recorder {
pub trait Recorder: From<u32> {
fn current_doc(&self,) -> u32;
fn new(block_store: &mut BlockStore) -> Self;
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId);
fn record_position(&mut self, block_store: &mut BlockStore, position: u32);
fn close_doc(&mut self, block_store: &mut BlockStore);
fn new_doc(&mut self, doc: DocId, heap: &Heap);
fn record_position(&mut self, position: u32, heap: &Heap);
fn close_doc(&mut self, heap: &Heap);
fn doc_freq(&self,) -> u32;
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()>;
fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()>;
}
pub struct NothingRecorder {
list_id: u32,
stack: Stack,
current_doc: DocId,
doc_freq: u32,
}
impl From<u32> for NothingRecorder {
fn from(addr: u32) -> NothingRecorder {
NothingRecorder {
stack: Stack::from(addr),
current_doc: u32::max_value(),
doc_freq: 0u32,
}
}
}
impl Recorder for NothingRecorder {
fn current_doc(&self,) -> DocId {
self.current_doc
}
fn new(block_store: &mut BlockStore) -> Self {
NothingRecorder {
list_id: block_store.new_list(),
current_doc: u32::max_value(),
doc_freq: 0u32,
}
}
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) {
fn new_doc(&mut self, doc: DocId, heap: &Heap) {
self.current_doc = doc;
block_store.push(self.list_id, doc);
self.stack.push(doc, heap);
self.doc_freq += 1;
}
fn record_position(&mut self, _block_store: &mut BlockStore, _position: u32) {
}
fn close_doc(&mut self, _block_store: &mut BlockStore) {
}
fn record_position(&mut self, _position: u32, _heap: &Heap) {}
fn close_doc(&mut self, _heap: &Heap) {}
fn doc_freq(&self,) -> u32 {
self.doc_freq
}
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> {
let doc_id_iter = block_store.iter_list(self.list_id);
for doc in doc_id_iter {
fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> {
for doc in self.stack.iterate(self_addr, heap) {
try!(serializer.write_doc(doc, 0u32, &EMPTY_ARRAY));
}
Ok(())
@@ -66,40 +63,42 @@ impl Recorder for NothingRecorder {
pub struct TermFrequencyRecorder {
list_id: u32,
stack: Stack,
current_doc: DocId,
current_tf: u32,
doc_freq: u32,
}
impl Recorder for TermFrequencyRecorder {
fn new(block_store: &mut BlockStore) -> Self {
impl From<u32> for TermFrequencyRecorder {
fn from(addr: u32) -> TermFrequencyRecorder {
TermFrequencyRecorder {
list_id: block_store.new_list(),
stack: Stack::from(addr),
current_doc: u32::max_value(),
current_tf: 0u32,
doc_freq: 0u32,
}
doc_freq: 0u32
}
}
}
impl Recorder for TermFrequencyRecorder {
fn current_doc(&self,) -> DocId {
self.current_doc
}
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) {
fn new_doc(&mut self, doc: DocId, heap: &Heap) {
self.doc_freq += 1u32;
self.current_doc = doc;
block_store.push(self.list_id, doc);
self.stack.push(doc, heap);
}
fn record_position(&mut self, _block_store: &mut BlockStore, _position: u32) {
fn record_position(&mut self, _position: u32, _heap: &Heap) {
self.current_tf += 1;
}
fn close_doc(&mut self, block_store: &mut BlockStore) {
fn close_doc(&mut self, heap: &Heap) {
assert!(self.current_tf > 0);
block_store.push(self.list_id, self.current_tf);
self.stack.push(self.current_tf, heap);
self.current_tf = 0;
}
@@ -107,8 +106,8 @@ impl Recorder for TermFrequencyRecorder {
self.doc_freq
}
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> {
let mut doc_iter = block_store.iter_list(self.list_id);
fn serialize(&self, self_addr:u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> {
let mut doc_iter = self.stack.iterate(self_addr, heap);
loop {
if let Some(doc) = doc_iter.next() {
if let Some(term_freq) = doc_iter.next() {
@@ -125,48 +124,49 @@ impl Recorder for TermFrequencyRecorder {
pub struct TFAndPositionRecorder {
list_id: u32,
stack: Stack,
current_doc: DocId,
doc_freq: u32,
}
impl Recorder for TFAndPositionRecorder {
fn new(block_store: &mut BlockStore) -> Self {
impl From<u32> for TFAndPositionRecorder {
fn from(addr: u32) -> TFAndPositionRecorder {
TFAndPositionRecorder {
list_id: block_store.new_list(),
stack: Stack::from(addr),
current_doc: u32::max_value(),
doc_freq: 0u32,
}
}
}
impl Recorder for TFAndPositionRecorder {
fn current_doc(&self,) -> DocId {
self.current_doc
}
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) {
fn new_doc(&mut self, doc: DocId, heap: &Heap) {
self.doc_freq += 1;
self.current_doc = doc;
block_store.push(self.list_id, doc);
self.stack.push(doc, heap);
}
fn record_position(&mut self, position: u32, heap: &Heap) {
self.stack.push(position, heap);
}
fn record_position(&mut self, block_store: &mut BlockStore, position: u32) {
block_store.push(self.list_id, position);
}
fn close_doc(&mut self, block_store: &mut BlockStore) {
block_store.push(self.list_id, POSITION_END);
fn close_doc(&mut self, heap: &Heap) {
self.stack.push(POSITION_END, heap);
}
fn doc_freq(&self,) -> u32 {
self.doc_freq
}
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> {
fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> {
let mut doc_positions = Vec::with_capacity(100);
let mut positions_iter = block_store.iter_list(self.list_id);
let mut positions_iter = self.stack.iterate(self_addr, heap);
loop {
if let Some(doc) = positions_iter.next() {
let mut prev_position = 0;

View File

@@ -3,17 +3,24 @@ use std::fmt;
use common::BinarySerializable;
use super::Field;
#[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
pub struct Term(Vec<u8>);
impl Term {
pub fn allocate(field: Field, num_bytes: usize) -> Term {
let mut term = Term(Vec::with_capacity(num_bytes));
field.serialize(&mut term.0);
field.serialize(&mut term.0).expect("Serializing term in a Vec should never fail");
term
}
pub fn set_content(&mut self, content: &[u8]) {
self.0.resize(content.len(), 0u8);
(&mut self.0[..]).clone_from_slice(content);
}
fn field_id(&self,) -> u8 {
self.0[0]