From 24d2e3f6c1ac6b63a759160dee62ded2cf0bd60f Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 5 Sep 2016 10:27:14 +0900 Subject: [PATCH] switching for the stacker datastructure --- examples/simple_search.rs | 6 +- src/core/index.rs | 8 +- src/datastruct/mod.rs | 1 + src/datastruct/stacker/hashmap.rs | 213 ++++++++++++++++++++++++++++++ src/datastruct/stacker/heap.rs | 139 +++++++++++++++++++ src/datastruct/stacker/mod.rs | 7 + src/datastruct/stacker/stack.rs | 182 +++++++++++++++++++++++++ src/indexer/index_writer.rs | 21 +-- src/indexer/merger.rs | 6 +- src/indexer/segment_writer.rs | 66 +++++---- src/lib.rs | 19 +-- src/postings/block_store.rs | 193 --------------------------- src/postings/mod.rs | 7 +- src/postings/postings_writer.rs | 75 +++++------ src/postings/recorder.rs | 122 ++++++++--------- src/schema/term.rs | 11 +- 16 files changed, 714 insertions(+), 362 deletions(-) create mode 100644 src/datastruct/stacker/hashmap.rs create mode 100644 src/datastruct/stacker/heap.rs create mode 100644 src/datastruct/stacker/mod.rs create mode 100644 src/datastruct/stacker/stack.rs delete mode 100644 src/postings/block_store.rs diff --git a/examples/simple_search.rs b/examples/simple_search.rs index 21eb77dd8..04e41d674 100644 --- a/examples/simple_search.rs +++ b/examples/simple_search.rs @@ -52,7 +52,11 @@ fn run(index_path: &Path) -> tantivy::Result<()> { // There can be only one writer at one time. // The writer will use more than one thread // to use your multicore CPU. - let mut index_writer = try!(index.writer()); + // + // Here we used a buffer of 1MB. In the + // real world, you want to use much more RAM on your indexer, + // to maximum your throughput. (300MB for instance) + let mut index_writer = try!(index.writer(1_000_000)); diff --git a/src/core/index.rs b/src/core/index.rs index beadc4e2e..cdd25c608 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -108,15 +108,15 @@ impl Index { /// Creates a multithreaded writer. /// Each writer produces an independant segment. - pub fn writer_with_num_threads(&self, num_threads: usize) -> Result { - IndexWriter::open(self, num_threads) + pub fn writer_with_num_threads(&self, num_threads: usize, heap_size_in_bytes: usize) -> Result { + IndexWriter::open(self, num_threads, heap_size_in_bytes) } /// Creates a multithreaded writer /// It just calls `writer_with_num_threads` with the number of core as `num_threads` - pub fn writer(&self,) -> Result { - self.writer_with_num_threads(num_cpus::get()) + pub fn writer(&self, heap_size_in_bytes: usize) -> Result { + self.writer_with_num_threads(num_cpus::get(), heap_size_in_bytes) } pub fn schema(&self,) -> Schema { diff --git a/src/datastruct/mod.rs b/src/datastruct/mod.rs index ba4bdbea8..85489973b 100644 --- a/src/datastruct/mod.rs +++ b/src/datastruct/mod.rs @@ -1,5 +1,6 @@ mod fstmap; mod skip; +pub mod stacker; pub use self::fstmap::FstMapBuilder; pub use self::fstmap::FstMap; diff --git a/src/datastruct/stacker/hashmap.rs b/src/datastruct/stacker/hashmap.rs new file mode 100644 index 000000000..b07374a20 --- /dev/null +++ b/src/datastruct/stacker/hashmap.rs @@ -0,0 +1,213 @@ +use std::iter; +use std::marker::PhantomData; +use super::heap::{Heap, BytesRef}; + +fn djb2(key: &[u8]) -> u64 { + let mut state: u64 = 5381; + for &b in key { + state = (state << 5).wrapping_add(state).wrapping_add(b as u64); + } + state +} + +impl Default for BytesRef { + fn default() -> BytesRef { + BytesRef { + start: 0u32, + stop: 0u32, + } + } +} + +#[derive(Copy, Clone, Default)] +struct KeyValue { + key: BytesRef, + value_addr: u32, +} + +impl KeyValue { + fn is_empty(&self,) -> bool { + self.key.stop == 0u32 + } +} + +pub struct HashMap<'a, V> where V: From { + table: Box<[KeyValue]>, + heap: &'a Heap, + _phantom: PhantomData, + mask: usize, + occupied: Vec, +} + +pub enum Entry { + Vacant(usize), + Occupied(u32), +} + + +impl<'a, V> HashMap<'a, V> where V: From { + + pub fn new(num_bucket_power_of_2: usize, heap: &'a Heap) -> HashMap<'a, V> { + let table_size = 1 << num_bucket_power_of_2; + let table: Vec = iter::repeat(KeyValue::default()) + .take(table_size) + .collect(); + HashMap { + table: table.into_boxed_slice(), + heap: heap, + _phantom: PhantomData, + mask: table_size - 1, + occupied: Vec::with_capacity(table_size / 2), + } + } + + #[inline(always)] + fn bucket(&self, key: &[u8]) -> usize { + let hash: u64 = djb2(key); + (hash as usize) & self.mask + } + + fn get_key(&self, bytes_ref: BytesRef) -> &[u8] { + self.heap.get_slice(bytes_ref) + } + + pub fn set_bucket(&mut self, key_bytes: &[u8], bucket: usize, addr: u32) -> u32 { + self.occupied.push(bucket); + self.table[bucket] = KeyValue { + key: self.heap.allocate_and_set(key_bytes), + value_addr: addr, + }; + addr + } + + pub fn iter<'b: 'a>(&'b self,) -> impl Iterator + 'b { + let heap: &'a Heap = &self.heap; + let table: &'b [KeyValue] = &self.table; + self.occupied + .iter() + .cloned() + .map(move |bucket: usize| { + let kv = table[bucket]; + let addr = kv.value_addr; + let v: &V = heap.get_mut_ref::(addr); + (heap.get_slice(kv.key), (addr, v)) + }) + // .map(move |addr: u32| (heap.get_mut_ref::(addr)) ) + } + + pub fn values_mut<'b: 'a>(&'b self,) -> impl Iterator + 'b { + let heap: &'a Heap = &self.heap; + let table: &'b [KeyValue] = &self.table; + self.occupied + .iter() + .cloned() + .map(move |bucket: usize| table[bucket].value_addr) + .map(move |addr: u32| heap.get_mut_ref::(addr)) + } + + pub fn get_or_create>(&mut self, key: S) -> &mut V { + let entry = self.lookup(key.as_ref()); + match entry { + Entry::Occupied(addr) => { + self.heap.get_mut_ref(addr) + } + Entry::Vacant(bucket) => { + let (addr, val): (u32, &mut V) = self.heap.new(); + self.set_bucket(key.as_ref(), bucket, addr); + val + } + } + } + + pub fn lookup>(&self, key: S) -> Entry { + let key_bytes: &[u8] = key.as_ref(); + let mut bucket = self.bucket(key_bytes); + loop { + let kv: KeyValue = self.table[bucket]; + if kv.is_empty() { + return Entry::Vacant(bucket); + } + if self.get_key(kv.key) == key_bytes { + return Entry::Occupied(kv.value_addr); + } + bucket = (bucket + 1) & self.mask; + } + } +} + + +#[cfg(test)] +mod tests { + + use super::*; + use super::super::heap::Heap; + use super::djb2; + use test::Bencher; + use std::hash::SipHasher; + use std::hash::Hasher; + + struct TestValue { + val: u32, + _addr: u32, + } + + impl From for TestValue { + fn from(addr: u32) -> TestValue { + TestValue { + val: 0u32, + _addr: addr, + } + } + } + + #[test] + fn test_hash_map() { + let heap = Heap::with_capacity(2_000_000); + let mut hash_map: HashMap = HashMap::new(18, &heap); + { + { + let v: &mut TestValue = hash_map.get_or_create("abc"); + assert_eq!(v.val, 0u32); + v.val = 3u32; + + } + } + { + let v: &mut TestValue = hash_map.get_or_create("abcd"); + assert_eq!(v.val, 0u32); + v.val = 4u32; + } + { + let v: &mut TestValue = hash_map.get_or_create("abc"); + assert_eq!(v.val, 3u32); + } + { + let v: &mut TestValue = hash_map.get_or_create("abcd"); + assert_eq!(v.val, 4u32); + } + let mut iter_values = hash_map.values_mut(); + assert_eq!(iter_values.next().unwrap().val, 3u32); + assert_eq!(iter_values.next().unwrap().val, 4u32); + assert!(!iter_values.next().is_some()); + } + + #[bench] + fn bench_djb2(bench: &mut Bencher) { + let v = String::from("abwer"); + bench.iter(|| { + djb2(v.as_bytes()) + }); + } + + #[bench] + fn bench_siphasher(bench: &mut Bencher) { + let v = String::from("abwer"); + bench.iter(|| { + let mut h = SipHasher::new(); + h.write(v.as_bytes()); + h.finish() + }); + } + +} + diff --git a/src/datastruct/stacker/heap.rs b/src/datastruct/stacker/heap.rs new file mode 100644 index 000000000..d276740c3 --- /dev/null +++ b/src/datastruct/stacker/heap.rs @@ -0,0 +1,139 @@ +use std::cell::UnsafeCell; +use std::mem; +use std::ptr; +use std::iter; + +#[derive(Copy, Clone)] +pub struct BytesRef { + pub start: u32, + pub stop: u32, +} + +struct InnerHeap { + buffer: Vec, + used: u32, +} + +pub struct Heap { + inner: UnsafeCell, +} + +impl Heap { + pub fn with_capacity(num_bytes: usize) -> Heap { + Heap { + inner: UnsafeCell::new( + InnerHeap::with_capacity(num_bytes) + ), + } + } + + fn inner(&self,) -> &mut InnerHeap { + unsafe { &mut *self.inner.get() } + } + + pub fn clear(&self) { + self.inner().clear(); + } + + pub fn len(&self,) -> u32 { + self.inner().len() + } + + pub fn free(&self,) -> u32 { + self.inner().free() + } + + pub fn allocate(&self, num_bytes: usize) -> u32 { + self.inner().allocate(num_bytes) + } + + pub fn new>(&self,) -> (u32, &mut V) { + let addr = self.inner().allocate(mem::size_of::()); + let v: V = V::from(addr); + self.inner().set(addr, &v); + (addr, self.inner().get_mut_ref(addr)) + } + + pub fn allocate_and_set(&self, data: &[u8]) -> BytesRef { + self.inner().allocate_and_set(data) + } + + pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] { + self.inner().get_slice(bytes_ref) + } + + pub fn set(&self, addr: u32, val: &Item) { + self.inner().set(addr, val); + } + + pub fn get_mut_ref(&self, addr: u32) -> &mut Item { + self.inner().get_mut_ref(addr) + } +} + + +impl InnerHeap { + + pub fn with_capacity(num_bytes: usize) -> InnerHeap { + InnerHeap { + buffer: iter::repeat(0u8).take(num_bytes).collect(), + used: 0u32, + } + } + + pub fn clear(&mut self) { + self.used = 0u32; + } + + pub fn len(&self,) -> u32 { + self.used + } + + pub fn free(&self,) -> u32 { + (self.buffer.len() as u32) - self.used + } + + pub fn allocate(&mut self, num_bytes: usize) -> u32 { + let addr = self.used; + self.used += num_bytes as u32; + let len_buffer = self.buffer.len(); + if self.used > len_buffer as u32 { + self.buffer.resize(self.used as usize * 2, 0u8); + } + addr + } + + pub fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef { + let start = self.allocate(data.len()) as usize; + let stop = start + data.len(); + &mut self.buffer[start..stop].clone_from_slice(data); + BytesRef { + start: start as u32, + stop: stop as u32, + } + } + + pub fn get_mut(&mut self, addr: u32) -> *mut u8 { + let addr_usize = addr as isize; + unsafe { self.buffer.as_mut_ptr().offset(addr_usize) } + } + + pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] { + &self.buffer[bytes_ref.start as usize .. bytes_ref.stop as usize] + } + + pub fn get_mut_ref(&mut self, addr: u32) -> &mut Item { + let v_ptr_u8 = self.get_mut(addr) as *mut u8; + let v_ptr = v_ptr_u8 as *mut Item; + unsafe { &mut *v_ptr } + } + + pub fn set(&mut self, addr: u32, val: &Item) { + let v_ptr: *const Item = val as *const Item; + let v_ptr_u8: *const u8 = v_ptr as *const u8; + unsafe { + let dest_ptr: *mut u8 = self.get_mut(addr); + ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::()); + } + } +} \ No newline at end of file diff --git a/src/datastruct/stacker/mod.rs b/src/datastruct/stacker/mod.rs new file mode 100644 index 000000000..ca40a3d8a --- /dev/null +++ b/src/datastruct/stacker/mod.rs @@ -0,0 +1,7 @@ +mod hashmap; +mod heap; +mod stack; + +pub use self::heap::Heap; +pub use self::stack::Stack; +pub use self::hashmap::HashMap; diff --git a/src/datastruct/stacker/stack.rs b/src/datastruct/stacker/stack.rs new file mode 100644 index 000000000..040551d7b --- /dev/null +++ b/src/datastruct/stacker/stack.rs @@ -0,0 +1,182 @@ +use std::mem; +use super::heap::Heap; + + +#[inline] +pub fn is_power_of_2(val: u32) -> bool { + val & (val - 1) == 0 +} + +#[inline] +pub fn jump_needed(val: u32) -> bool { + val > 3 && is_power_of_2(val) +} + + +#[derive(Debug)] +pub struct Stack { + len: u32, + end: u32, + val0: u32, + val1: u32, + val2: u32, + next: u32, // inline of the first block +} + +impl Stack { + + pub fn iterate<'a>(&self, addr: u32, heap: &'a Heap) -> StackIterator<'a> { + StackIterator { + heap: heap, + addr: addr + 2u32 * (mem::size_of::() as u32), + len: self.len, + consumed: 0, + } + } + + pub fn push(&mut self, val: u32, heap: &Heap) { + self.len += 1; + if jump_needed(self.len) { + // we need to allocate another block. + // ... As we want to grow block exponentially + // the next block as a size of (length so far), + // and we need to add 1u32 to store the pointer + // to the next element. + let new_block_size: usize = (self.len as usize + 1) * mem::size_of::(); + let new_block_addr: u32 = heap.allocate(new_block_size); + heap.set(self.end, &new_block_addr); + self.end = new_block_addr; + } + heap.set(self.end, &val); + self.end += mem::size_of::() as u32; + } +} + + +impl From for Stack { + fn from(addr: u32) -> Stack { + let last_addr = addr + mem::size_of::() as u32 * 2u32; + Stack { + len: 0u32, + end: last_addr, + val0: 0u32, + val1: 0u32, + val2: 0u32, + next: 0u32, + } + } +} + + + +impl Default for Stack { + fn default() -> Stack { + Stack { + len: 0u32, + end: 0u32, + val0: 0u32, + val1: 0u32, + val2: 0u32, + next: 0u32, + } + } +} + + +pub struct StackIterator<'a> { + heap: &'a Heap, + addr: u32, + len: u32, + consumed: u32, +} + +impl<'a> Iterator for StackIterator<'a> { + type Item = u32; + + fn next(&mut self,) -> Option { + if self.consumed == self.len { + None + } + else { + let addr: u32; + self.consumed += 1; + if jump_needed(self.consumed) { + addr = *self.heap.get_mut_ref(self.addr); + } + else { + addr = self.addr; + } + self.addr = addr + mem::size_of::() as u32; + Some(*self.heap.get_mut_ref(addr)) + } + + } +} + + + + + +#[cfg(test)] +mod tests { + + use super::*; + use super::super::heap::Heap; + use test::Bencher; + + const NUM_STACK: usize = 10_000; + const STACK_SIZE: u32 = 1000; + + #[test] + fn test_stack() { + let heap = Heap::with_capacity(1_000_000); + let (addr, stack) = heap.new::(); + stack.push(1u32, &heap); + stack.push(2u32, &heap); + stack.push(4u32, &heap); + stack.push(8u32, &heap); + { + let mut it = stack.iterate(addr, &heap); + assert_eq!(it.next().unwrap(), 1u32); + assert_eq!(it.next().unwrap(), 2u32); + assert_eq!(it.next().unwrap(), 4u32); + assert_eq!(it.next().unwrap(), 8u32); + assert!(it.next().is_none()); + } + } + + #[bench] + fn bench_push_vec(bench: &mut Bencher) { + bench.iter(|| { + let mut vecs = Vec::with_capacity(100); + for _ in 0..NUM_STACK { + vecs.push(Vec::new()); + } + for s in 0..NUM_STACK { + for i in 0u32..STACK_SIZE { + let t = s * 392017 % NUM_STACK; + vecs[t].push(i); + } + } + }); + } + + #[bench] + fn bench_push_stack(bench: &mut Bencher) { + let heap = Heap::with_capacity(64_000_000); + bench.iter(|| { + let mut stacks = Vec::with_capacity(100); + for _ in 0..NUM_STACK { + let (_, stack) = heap.new::(); + stacks.push(stack); + } + for s in 0..NUM_STACK { + for i in 0u32..STACK_SIZE { + let t = s * 392017 % NUM_STACK; + stacks[t].push(i, &heap); + } + } + heap.clear(); + }); + } +} \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index eb3614e0f..41ee7b83e 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -12,14 +12,15 @@ use std::thread; use std::collections::HashSet; use indexer::merger::IndexMerger; use core::SegmentId; +use datastruct::stacker::Heap; use std::mem::swap; -use postings::BlockStore; use chan; use Result; use Error; pub struct IndexWriter { + heap_size_in_bytes: usize, index: Index, workers_join_handle: Vec>, segment_ready_sender: chan::Sender>, @@ -33,12 +34,12 @@ pub struct IndexWriter { const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; -fn index_documents(block_store: &mut BlockStore, +fn index_documents(heap: &mut Heap, segment: Segment, schema: &Schema, document_iterator: &mut Iterator) -> Result { - block_store.clear(); - let mut segment_writer = try!(SegmentWriter::for_segment(block_store, segment, &schema)); + heap.clear(); + let mut segment_writer = try!(SegmentWriter::for_segment(heap, segment, &schema)); for doc in document_iterator { try!(segment_writer.add_document(&doc, &schema)); if segment_writer.is_buffer_full() { @@ -64,8 +65,9 @@ impl IndexWriter { let schema = self.index.schema(); let segment_ready_sender_clone = self.segment_ready_sender.clone(); let document_receiver_clone = self.document_receiver.clone(); + let heap_size_in_bytes = self.heap_size_in_bytes; let join_handle: JoinHandle<()> = thread::spawn(move || { - let mut block_store = BlockStore::allocate(1_500_000); + let mut heap = Heap::with_capacity(heap_size_in_bytes); loop { let segment = index.new_segment(); let segment_id = segment.id(); @@ -77,7 +79,7 @@ impl IndexWriter { // creating a new segment's files // if no document are available. if document_iterator.peek().is_some() { - let index_result = index_documents(&mut block_store, segment, &schema, &mut document_iterator) + let index_result = index_documents(&mut heap, segment, &schema, &mut document_iterator) .map(|num_docs| (segment_id, num_docs)); segment_ready_sender_clone.send(index_result); } @@ -94,10 +96,13 @@ impl IndexWriter { /// /// num_threads tells the number of indexing worker that /// should work at the same time. - pub fn open(index: &Index, num_threads: usize) -> Result { + pub fn open(index: &Index, + num_threads: usize, + heap_size_in_bytes: usize) -> Result { let (document_sender, document_receiver): (chan::Sender, chan::Receiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); let (segment_ready_sender, segment_ready_receiver): (chan::Sender>, chan::Receiver>) = chan::async(); let mut index_writer = IndexWriter { + heap_size_in_bytes: heap_size_in_bytes, index: index.clone(), segment_ready_receiver: segment_ready_receiver, segment_ready_sender: segment_ready_sender, @@ -280,7 +285,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(8).unwrap(); + let mut index_writer = index.writer_with_num_threads(3, 30_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a"); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 9f9238898..46e8667f1 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -294,7 +294,7 @@ mod tests { { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b"); @@ -318,7 +318,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b"); @@ -336,7 +336,7 @@ mod tests { } { let segments = index.segments().unwrap(); - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); index_writer.merge(&segments).unwrap(); } { diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 011bc5837..ad7e11f44 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -17,12 +17,12 @@ use schema::TextIndexingOptions; use postings::SpecializedPostingsWriter; use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; use indexer::segment_serializer::SegmentSerializer; -use postings::BlockStore; +use datastruct::stacker::Heap; pub struct SegmentWriter<'a> { - block_store: &'a mut BlockStore, + heap: &'a Heap, max_doc: DocId, - per_field_postings_writers: Vec>, + per_field_postings_writers: Vec>, segment_serializer: SegmentSerializer, fast_field_writers: U32FastFieldsWriter, fieldnorms_writer: U32FastFieldsWriter, @@ -38,23 +38,23 @@ fn create_fieldnorms_writer(schema: &Schema) -> U32FastFieldsWriter { U32FastFieldsWriter::new(u32_fields) } -fn posting_from_field_entry(field_entry: &FieldEntry) -> Box { +fn posting_from_field_entry<'a>(field_entry: &FieldEntry, heap: &'a Heap) -> Box { match field_entry.field_type() { &FieldType::Str(ref text_options) => { match text_options.get_indexing_options() { TextIndexingOptions::TokenizedWithFreq => { - SpecializedPostingsWriter::::new_boxed() + SpecializedPostingsWriter::::new_boxed(heap) } TextIndexingOptions::TokenizedWithFreqAndPosition => { - SpecializedPostingsWriter::::new_boxed() + SpecializedPostingsWriter::::new_boxed(heap) } _ => { - SpecializedPostingsWriter::::new_boxed() + SpecializedPostingsWriter::::new_boxed(heap) } } } &FieldType::U32(_) => { - SpecializedPostingsWriter::::new_boxed() + SpecializedPostingsWriter::::new_boxed(heap) } } } @@ -63,16 +63,15 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box { impl<'a> SegmentWriter<'a> { - pub fn for_segment(block_store: &'a mut BlockStore, mut segment: Segment, schema: &Schema) -> Result> { + pub fn for_segment(heap: &'a Heap, mut segment: Segment, schema: &Schema) -> Result> { let segment_serializer = try!(SegmentSerializer::for_segment(&mut segment)); - let per_field_postings_writers = schema.fields() - .iter() - .map(|field_entry| { - posting_from_field_entry(field_entry) - }) - .collect(); + let mut per_field_postings_writers: Vec> = Vec::new(); + for field_entry in schema.fields() { + let postings_writer: Box = posting_from_field_entry(field_entry, heap); + per_field_postings_writers.push(postings_writer); + } Ok(SegmentWriter { - block_store: block_store, + heap: heap, max_doc: 0, per_field_postings_writers: per_field_postings_writers, fieldnorms_writer: create_fieldnorms_writer(schema), @@ -91,18 +90,18 @@ impl<'a> SegmentWriter<'a> { pub fn finalize(mut self,) -> Result<()> { let segment_info = self.segment_info(); for per_field_postings_writer in self.per_field_postings_writers.iter_mut() { - per_field_postings_writer.close(&mut self.block_store); + per_field_postings_writer.close(self.heap); } - write(&mut self.block_store, - &self.per_field_postings_writers, + write(&self.per_field_postings_writers, &self.fast_field_writers, &self.fieldnorms_writer, segment_info, - self.segment_serializer) + self.segment_serializer, + self.heap) } pub fn is_buffer_full(&self,) -> bool { - self.block_store.num_free_blocks() < 100_000 + self.heap.free() <= 1_000_000 } pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> { @@ -112,17 +111,17 @@ impl<'a> SegmentWriter<'a> { let field_options = schema.get_field_entry(field); match *field_options.field_type() { FieldType::Str(ref text_options) => { - let mut num_tokens = 0u32; + let mut num_tokens = 0; if text_options.get_indexing_options().is_tokenized() { - num_tokens = field_posting_writer.index_text(&mut self.block_store, doc_id, field, &field_values); + num_tokens = field_posting_writer.index_text(doc_id, field, &field_values, self.heap); } else { for field_value in field_values { let term = Term::from_field_text(field, field_value.value().text()); - field_posting_writer.suscribe(&mut self.block_store, doc_id, 0, &term); + field_posting_writer.suscribe(doc_id, 0, &term, self.heap); num_tokens += 1u32; } - } + } self.fieldnorms_writer .get_field_writer(field) .map(|field_norms_writer| { @@ -133,14 +132,13 @@ impl<'a> SegmentWriter<'a> { if u32_options.is_indexed() { for field_value in field_values { let term = Term::from_field_u32(field_value.field(), field_value.value().u32_value()); - field_posting_writer.suscribe(&mut self.block_store, doc_id, 0, &term); + field_posting_writer.suscribe(doc_id, 0, &term, self.heap); } } } } } - self.fieldnorms_writer.fill_val_up_to(doc_id); self.fast_field_writers.add_document(&doc); @@ -168,14 +166,14 @@ impl<'a> SegmentWriter<'a> { } -fn write(block_store: &BlockStore, - per_field_postings_writers: &Vec>, +fn write<'a>(per_field_postings_writers: &Vec>, fast_field_writers: &U32FastFieldsWriter, fieldnorms_writer: &U32FastFieldsWriter, segment_info: SegmentInfo, - mut serializer: SegmentSerializer) -> Result<()> { + mut serializer: SegmentSerializer, + heap: &'a Heap,) -> Result<()> { for per_field_postings_writer in per_field_postings_writers.iter() { - try!(per_field_postings_writer.serialize(block_store, serializer.get_postings_serializer())); + try!(per_field_postings_writer.serialize(serializer.get_postings_serializer(), heap)); } try!(fast_field_writers.serialize(serializer.get_fast_field_serializer())); try!(fieldnorms_writer.serialize(serializer.get_fieldnorms_serializer())); @@ -186,11 +184,11 @@ fn write(block_store: &BlockStore, impl<'a> SerializableSegment for SegmentWriter<'a> { fn write(&self, serializer: SegmentSerializer) -> Result<()> { - write(&self.block_store, - &self.per_field_postings_writers, + write(&self.per_field_postings_writers, &self.fast_field_writers, &self.fieldnorms_writer, self.segment_info(), - serializer) + serializer, + self.heap) } } diff --git a/src/lib.rs b/src/lib.rs index e1f1d3ea0..59ac8fe6d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,9 +5,11 @@ Tantivy is a search engine library. */ #![feature(binary_heap_extras)] +#![feature(conservative_impl_trait)] #![cfg_attr(test, feature(test))] #![cfg_attr(test, feature(step_by))] #![doc(test(attr(allow(unused_variables), deny(warnings))))] +#![feature(conservative_impl_trait)] #[macro_use] @@ -33,7 +35,6 @@ extern crate itertools; extern crate chan; extern crate crossbeam; - #[cfg(test)] extern crate test; #[cfg(test)] extern crate rand; @@ -138,7 +139,7 @@ mod tests { let index = Index::create_from_tempdir(schema).unwrap(); { // writing the segment - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b"); @@ -165,14 +166,14 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let index = Index::create_in_ram(schema_builder.build()); { - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); let mut doc = Document::new(); doc.add_text(text_field, "a b c"); index_writer.add_document(doc).unwrap(); index_writer.commit().unwrap(); } { - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a"); @@ -186,7 +187,7 @@ mod tests { index_writer.commit().unwrap(); } { - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap(); let mut doc = Document::new(); doc.add_text(text_field, "c"); index_writer.add_document(doc).unwrap(); @@ -212,7 +213,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let index = Index::create_in_ram(schema_builder.build()); { - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a b c"); @@ -248,7 +249,7 @@ mod tests { let index = Index::create_in_ram(schema); { // writing the segment - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af af af bc bc"); @@ -276,7 +277,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af af af b"); @@ -345,7 +346,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 100_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b"); diff --git a/src/postings/block_store.rs b/src/postings/block_store.rs deleted file mode 100644 index 0e3dee480..000000000 --- a/src/postings/block_store.rs +++ /dev/null @@ -1,193 +0,0 @@ -pub const BLOCK_SIZE: u32 = 64u32; - -struct Block { - data: [u32; BLOCK_SIZE as usize], - next: u32, -} - -impl Block { - fn new() -> Block { - Block { - data: [0u32; BLOCK_SIZE as usize], - next: u32::max_value(), - } - } -} - -#[derive(Copy, Clone)] -struct ListInfo { - first: u32, - last: u32, - len: u32, -} - -pub struct BlockStore { - lists: Vec, - blocks: Vec, - free_block_id: usize, -} - -impl BlockStore { - pub fn allocate(num_blocks: usize) -> BlockStore { - BlockStore { - lists: Vec::with_capacity(num_blocks), - blocks: (0 .. num_blocks).map(|_| Block::new()).collect(), - free_block_id: 0, - } - } - - pub fn num_free_blocks(&self) -> usize { - self.blocks.len() - self.free_block_id - } - - pub fn new_list(&mut self) -> u32 { - let res = self.lists.len() as u32; - let new_block_id = self.new_block().unwrap(); - self.lists.push(ListInfo { - first: new_block_id, - last: new_block_id, - len: 0, - }); - res - } - - pub fn clear(&mut self,) { - self.free_block_id = 0; - } - - fn new_block(&mut self,) -> Option { - let block_id = self.free_block_id; - self.free_block_id += 1; - if block_id >= self.blocks.len() { - None - } - else { - self.blocks[block_id].next = u32::max_value(); - Some(block_id as u32) - } - } - - fn get_list_info(&mut self, list_id: u32) -> &mut ListInfo { - &mut self.lists[list_id as usize] - } - - - fn block_id_to_append(&mut self, list_id: u32) -> u32 { - let list_info: ListInfo = self.lists[list_id as usize]; - if list_info.len != 0 && list_info.len % BLOCK_SIZE == 0 { - // we need to add a fresh new block. - let new_block_id: u32 = { self.new_block().expect("Failed to allocate block") }; - let last_block_id: usize; - { - // update the list info. - let list_info: &mut ListInfo = self.get_list_info(list_id); - last_block_id = list_info.last as usize; - list_info.last = new_block_id; - } - self.blocks[last_block_id].next = new_block_id; - new_block_id - } - else { - list_info.last - } - } - - pub fn push(&mut self, list_id: u32, val: u32) { - let block_id: u32 = self.block_id_to_append(list_id); - let list_len: u32; - { - let list_info: &mut ListInfo = self.get_list_info(list_id); - list_len = list_info.len; - list_info.len += 1u32; - } - self.blocks[block_id as usize].data[(list_len % BLOCK_SIZE) as usize] = val; - } - - pub fn iter_list(&self, list_id: u32) -> BlockIterator { - let list_info = &self.lists[list_id as usize]; - BlockIterator { - current_block: &self.blocks[list_info.first as usize], - blocks: &self.blocks, - cursor: 0, - len: list_info.len as usize, - } - } -} - - -pub struct BlockIterator<'a> { - current_block: &'a Block, - blocks: &'a [Block], - cursor: usize, - len: usize, -} - - -impl<'a> Iterator for BlockIterator<'a> { - - type Item = u32; - - fn next(&mut self) -> Option { - if self.cursor == self.len { - None - } - else { - if self.cursor % (BLOCK_SIZE as usize) == 0 { - if self.cursor != 0 { - if self.current_block.next != u32::max_value() { - self.current_block = &self.blocks[self.current_block.next as usize]; - } - else { - panic!("Block linked list ended prematurely."); - } - } - } - let res = self.current_block.data[self.cursor % (BLOCK_SIZE as usize)]; - self.cursor += 1; - Some(res) - } - - } -} - - -#[cfg(test)] -mod tests { - - use super::*; - - #[test] - pub fn test_block_store() { - let mut block_store = BlockStore::allocate(50_000); - let list_2 = block_store.new_list(); - let list_3 = block_store.new_list(); - let list_4 = block_store.new_list(); - let list_5 = block_store.new_list(); - for i in 0 .. 2_000 { - block_store.push(list_2, i * 2); - block_store.push(list_3, i * 3); - } - for i in 0 .. 10 { - block_store.push(list_4, i * 4); - block_store.push(list_5, i * 5); - } - - let mut list2_iter = block_store.iter_list(list_2); - let mut list3_iter = block_store.iter_list(list_3); - let mut list4_iter = block_store.iter_list(list_4); - let mut list5_iter = block_store.iter_list(list_5); - for i in 0 .. 2_000 { - assert_eq!(list2_iter.next().unwrap(), i * 2); - assert_eq!(list3_iter.next().unwrap(), i * 3); - - } - assert!(list2_iter.next().is_none()); - assert!(list3_iter.next().is_none()); - for i in 0 .. 10 { - assert_eq!(list4_iter.next().unwrap(), i * 4); - assert_eq!(list5_iter.next().unwrap(), i * 5); - } - assert!(list4_iter.next().is_none()); - assert!(list5_iter.next().is_none()); - } -} diff --git a/src/postings/mod.rs b/src/postings/mod.rs index f84800a09..8dd90457f 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -12,7 +12,6 @@ mod freq_handler; mod docset; mod scored_docset; mod segment_postings_option; -mod block_store; pub use self::docset::{SkipResult, DocSet}; pub use self::offset_postings::OffsetPostings; @@ -31,7 +30,6 @@ pub use self::freq_handler::FreqHandler; pub use self::scored_docset::ScoredDocSet; pub use self::postings::HasLen; pub use self::segment_postings_option::SegmentPostingsOption; -pub use self::block_store::BlockStore; #[cfg(test)] mod tests { @@ -43,6 +41,7 @@ mod tests { use core::SegmentReader; use core::Index; use std::iter; + use datastruct::stacker::Heap; #[test] @@ -72,9 +71,9 @@ mod tests { let schema = schema_builder.build(); let index = Index::create_in_ram(schema.clone()); let segment = index.new_segment(); - let mut block_store = BlockStore::allocate(50_000); + let heap = Heap::with_capacity(10_000_000); { - let mut segment_writer = SegmentWriter::for_segment(&mut block_store, segment.clone(), &schema).unwrap(); + let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a b a c a d a a."); diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index bf8fb6e4a..c15ce689e 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -1,24 +1,23 @@ use DocId; -use std::collections::HashMap; use schema::Term; use schema::FieldValue; use postings::PostingsSerializer; use std::io; use postings::Recorder; -use postings::block_store::BlockStore; use analyzer::SimpleTokenizer; use schema::Field; use analyzer::StreamingIterator; +use datastruct::stacker::{HashMap, Heap}; pub trait PostingsWriter { - fn close(&mut self, block_store: &mut BlockStore); + fn close(&mut self, heap: &Heap); - fn suscribe(&mut self, block_store: &mut BlockStore, doc: DocId, pos: u32, term: &Term); + fn suscribe(&mut self, doc: DocId, pos: u32, term: &Term, heap: &Heap); - fn serialize(&self, block_store: &BlockStore, serializer: &mut PostingsSerializer) -> io::Result<()>; + fn serialize(&self, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()>; - fn index_text<'a>(&mut self, block_store: &mut BlockStore, doc_id: DocId, field: Field, field_values: &Vec<&'a FieldValue>) -> u32 { + fn index_text<'a>(&mut self, doc_id: DocId, field: Field, field_values: &Vec<&'a FieldValue>, heap: &Heap) -> u32 { let mut pos = 0u32; let mut num_tokens: u32 = 0u32; let mut term = Term::allocate(field, 100); @@ -30,7 +29,7 @@ pub trait PostingsWriter { match tokens.next() { Some(token) => { term.set_text(token); - self.suscribe(block_store, doc_id, pos, &term); + self.suscribe(doc_id, pos, &term, heap); pos += 1u32; num_tokens += 1u32; }, @@ -45,68 +44,58 @@ pub trait PostingsWriter { } } -pub struct SpecializedPostingsWriter { - term_index: HashMap, +pub struct SpecializedPostingsWriter<'a, Rec: Recorder + 'static> { + term_index: HashMap<'a, Rec>, } -#[inline(always)] -fn get_or_create_recorder<'a, Rec: Recorder>(term: &Term, term_index: &'a mut HashMap, block_store: &mut BlockStore) -> &'a mut Rec { - if term_index.contains_key(term) { - term_index.get_mut(term).expect("The term should be here as we just checked it") - } - else { - term_index - .entry(term.clone()) - .or_insert_with(|| Rec::new(block_store)) - } - - -} -impl SpecializedPostingsWriter { +impl<'a, Rec: Recorder + 'static> SpecializedPostingsWriter<'a, Rec> { - pub fn new() -> SpecializedPostingsWriter { + pub fn new(heap: &'a Heap) -> SpecializedPostingsWriter<'a, Rec> { SpecializedPostingsWriter { - term_index: HashMap::new(), + term_index: HashMap::new(25, heap), // TODO compute the size of the table as a % of the heap } } - pub fn new_boxed() -> Box { - Box::new(Self::new()) - } - + pub fn new_boxed(heap: &'a Heap) -> Box { + let res = SpecializedPostingsWriter::::new(heap); + Box::new(res) + } + } -impl PostingsWriter for SpecializedPostingsWriter { +impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<'a, Rec> { - fn close(&mut self, block_store: &mut BlockStore) { + fn close(&mut self, heap: &Heap) { for recorder in self.term_index.values_mut() { - recorder.close_doc(block_store); + recorder.close_doc(heap); } } #[inline(always)] - fn suscribe(&mut self, block_store: &mut BlockStore, doc: DocId, position: u32, term: &Term) { - let mut recorder = get_or_create_recorder(term, &mut self.term_index, block_store); + fn suscribe(&mut self, doc: DocId, position: u32, term: &Term, heap: &Heap) { + let mut recorder = self.term_index.get_or_create(term); let current_doc = recorder.current_doc(); if current_doc != doc { if current_doc != u32::max_value() { - recorder.close_doc(block_store); + recorder.close_doc(heap); } - recorder.new_doc(block_store, doc); + recorder.new_doc(doc, heap); } - recorder.record_position(block_store, position); + recorder.record_position(position, heap); } - fn serialize(&self, block_store: &BlockStore, serializer: &mut PostingsSerializer) -> io::Result<()> { - let mut term_offsets: Vec<(&Term, &Rec)> = self.term_index + fn serialize(&self, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> { + let mut term_offsets: Vec<(&[u8], (u32, &Rec))> = self.term_index .iter() - .map(|(k,v)| (k, v)) .collect(); term_offsets.sort_by_key(|&(k, _v)| k); - for (term, recorder) in term_offsets { - try!(serializer.new_term(term, recorder.doc_freq())); - try!(recorder.serialize(serializer, block_store)); + let mut term = Term::allocate(Field(0), 100); + for (term_bytes, (addr, recorder)) in term_offsets { + // TODO remove copy + term.set_content(term_bytes); + try!(serializer.new_term(&term, recorder.doc_freq())); + try!(recorder.serialize(addr, serializer, heap)); try!(serializer.close_term()); } Ok(()) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index e25adc566..8218941d3 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -1,62 +1,59 @@ -use postings::block_store::BlockStore; use DocId; use std::io; use postings::PostingsSerializer; - +use datastruct::stacker::{Stack, Heap}; const EMPTY_ARRAY: [u32; 0] = [0u32; 0]; const POSITION_END: u32 = 4294967295; -pub trait Recorder { +pub trait Recorder: From { fn current_doc(&self,) -> u32; - fn new(block_store: &mut BlockStore) -> Self; - fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId); - fn record_position(&mut self, block_store: &mut BlockStore, position: u32); - fn close_doc(&mut self, block_store: &mut BlockStore); + fn new_doc(&mut self, doc: DocId, heap: &Heap); + fn record_position(&mut self, position: u32, heap: &Heap); + fn close_doc(&mut self, heap: &Heap); fn doc_freq(&self,) -> u32; - - fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()>; + fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()>; } pub struct NothingRecorder { - list_id: u32, + stack: Stack, current_doc: DocId, doc_freq: u32, } +impl From for NothingRecorder { + fn from(addr: u32) -> NothingRecorder { + NothingRecorder { + stack: Stack::from(addr), + current_doc: u32::max_value(), + doc_freq: 0u32, + } + } +} + impl Recorder for NothingRecorder { fn current_doc(&self,) -> DocId { self.current_doc } - fn new(block_store: &mut BlockStore) -> Self { - NothingRecorder { - list_id: block_store.new_list(), - current_doc: u32::max_value(), - doc_freq: 0u32, - } - } - fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) { + fn new_doc(&mut self, doc: DocId, heap: &Heap) { self.current_doc = doc; - block_store.push(self.list_id, doc); + self.stack.push(doc, heap); self.doc_freq += 1; } - - fn record_position(&mut self, _block_store: &mut BlockStore, _position: u32) { - } - - fn close_doc(&mut self, _block_store: &mut BlockStore) { - } - + + fn record_position(&mut self, _position: u32, _heap: &Heap) {} + + fn close_doc(&mut self, _heap: &Heap) {} + fn doc_freq(&self,) -> u32 { self.doc_freq } - fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> { - let doc_id_iter = block_store.iter_list(self.list_id); - for doc in doc_id_iter { + fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> { + for doc in self.stack.iterate(self_addr, heap) { try!(serializer.write_doc(doc, 0u32, &EMPTY_ARRAY)); } Ok(()) @@ -66,40 +63,42 @@ impl Recorder for NothingRecorder { pub struct TermFrequencyRecorder { - list_id: u32, + stack: Stack, current_doc: DocId, current_tf: u32, doc_freq: u32, } -impl Recorder for TermFrequencyRecorder { - - fn new(block_store: &mut BlockStore) -> Self { +impl From for TermFrequencyRecorder { + fn from(addr: u32) -> TermFrequencyRecorder { TermFrequencyRecorder { - list_id: block_store.new_list(), + stack: Stack::from(addr), current_doc: u32::max_value(), current_tf: 0u32, - doc_freq: 0u32, - } + doc_freq: 0u32 + } } +} + +impl Recorder for TermFrequencyRecorder { fn current_doc(&self,) -> DocId { self.current_doc } - - fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) { + + fn new_doc(&mut self, doc: DocId, heap: &Heap) { self.doc_freq += 1u32; self.current_doc = doc; - block_store.push(self.list_id, doc); + self.stack.push(doc, heap); } - fn record_position(&mut self, _block_store: &mut BlockStore, _position: u32) { + fn record_position(&mut self, _position: u32, _heap: &Heap) { self.current_tf += 1; } - fn close_doc(&mut self, block_store: &mut BlockStore) { + fn close_doc(&mut self, heap: &Heap) { assert!(self.current_tf > 0); - block_store.push(self.list_id, self.current_tf); + self.stack.push(self.current_tf, heap); self.current_tf = 0; } @@ -107,8 +106,8 @@ impl Recorder for TermFrequencyRecorder { self.doc_freq } - fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> { - let mut doc_iter = block_store.iter_list(self.list_id); + fn serialize(&self, self_addr:u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> { + let mut doc_iter = self.stack.iterate(self_addr, heap); loop { if let Some(doc) = doc_iter.next() { if let Some(term_freq) = doc_iter.next() { @@ -125,48 +124,49 @@ impl Recorder for TermFrequencyRecorder { pub struct TFAndPositionRecorder { - list_id: u32, + stack: Stack, current_doc: DocId, doc_freq: u32, } - -impl Recorder for TFAndPositionRecorder { - - fn new(block_store: &mut BlockStore) -> Self { +impl From for TFAndPositionRecorder { + fn from(addr: u32) -> TFAndPositionRecorder { TFAndPositionRecorder { - list_id: block_store.new_list(), + stack: Stack::from(addr), current_doc: u32::max_value(), doc_freq: 0u32, } } +} + +impl Recorder for TFAndPositionRecorder { + fn current_doc(&self,) -> DocId { self.current_doc } - fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) { + fn new_doc(&mut self, doc: DocId, heap: &Heap) { self.doc_freq += 1; self.current_doc = doc; - block_store.push(self.list_id, doc); + self.stack.push(doc, heap); + } + + fn record_position(&mut self, position: u32, heap: &Heap) { + self.stack.push(position, heap); } - fn record_position(&mut self, block_store: &mut BlockStore, position: u32) { - block_store.push(self.list_id, position); - } - - fn close_doc(&mut self, block_store: &mut BlockStore) { - block_store.push(self.list_id, POSITION_END); + fn close_doc(&mut self, heap: &Heap) { + self.stack.push(POSITION_END, heap); } fn doc_freq(&self,) -> u32 { self.doc_freq } - - fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> { + fn serialize(&self, self_addr: u32, serializer: &mut PostingsSerializer, heap: &Heap) -> io::Result<()> { let mut doc_positions = Vec::with_capacity(100); - let mut positions_iter = block_store.iter_list(self.list_id); + let mut positions_iter = self.stack.iterate(self_addr, heap); loop { if let Some(doc) = positions_iter.next() { let mut prev_position = 0; diff --git a/src/schema/term.rs b/src/schema/term.rs index d9ec2cd00..08d5b78c5 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -3,17 +3,24 @@ use std::fmt; use common::BinarySerializable; use super::Field; + + + #[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)] pub struct Term(Vec); impl Term { - pub fn allocate(field: Field, num_bytes: usize) -> Term { let mut term = Term(Vec::with_capacity(num_bytes)); - field.serialize(&mut term.0); + field.serialize(&mut term.0).expect("Serializing term in a Vec should never fail"); term } + + pub fn set_content(&mut self, content: &[u8]) { + self.0.resize(content.len(), 0u8); + (&mut self.0[..]).clone_from_slice(content); + } fn field_id(&self,) -> u8 { self.0[0]