diff --git a/src/common/mod.rs b/src/common/mod.rs index 2942438b4..82d4cbb23 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -10,7 +10,7 @@ pub(crate) use self::bitset::TinySet; pub(crate) use self::composite_file::{CompositeFile, CompositeWrite}; pub use self::counting_writer::CountingWriter; pub use self::serialize::{BinarySerializable, FixedSize}; -pub use self::vint::VInt; +pub use self::vint::{read_vint_u32, serialize_vint_u32, write_u32_vint, VInt}; pub use byteorder::LittleEndian as Endianness; use std::io; diff --git a/src/common/vint.rs b/src/common/vint.rs index 7b782a946..59356c613 100644 --- a/src/common/vint.rs +++ b/src/common/vint.rs @@ -1,4 +1,5 @@ use super::BinarySerializable; +use byteorder::{ByteOrder, LittleEndian}; use std::io; use std::io::Read; use std::io::Write; @@ -9,6 +10,83 @@ pub struct VInt(pub u64); const STOP_BIT: u8 = 128; +pub fn serialize_vint_u32(val: u32) -> (u64, usize) { + const START_2: u64 = 1 << 7; + const START_3: u64 = 1 << 14; + const START_4: u64 = 1 << 21; + const START_5: u64 = 1 << 28; + + const STOP_1: u64 = START_2 - 1; + const STOP_2: u64 = START_3 - 1; + const STOP_3: u64 = START_4 - 1; + const STOP_4: u64 = START_5 - 1; + + const MASK_1: u64 = 127; + const MASK_2: u64 = MASK_1 << 7; + const MASK_3: u64 = MASK_2 << 7; + const MASK_4: u64 = MASK_3 << 7; + const MASK_5: u64 = MASK_4 << 7; + + let val = u64::from(val); + const STOP_BIT: u64 = 128u64; + match val { + 0...STOP_1 => (val | STOP_BIT, 1), + START_2...STOP_2 => ( + (val & MASK_1) | ((val & MASK_2) << 1) | (STOP_BIT << (8)), + 2, + ), + START_3...STOP_3 => ( + (val & MASK_1) | ((val & MASK_2) << 1) | ((val & MASK_3) << 2) | (STOP_BIT << (8 * 2)), + 3, + ), + START_4...STOP_4 => ( + (val & MASK_1) + | ((val & MASK_2) << 1) + | ((val & MASK_3) << 2) + | ((val & MASK_4) << 3) + | (STOP_BIT << (8 * 3)), + 4, + ), + _ => ( + (val & MASK_1) + | ((val & MASK_2) << 1) + | ((val & MASK_3) << 2) + | ((val & MASK_4) << 3) + | ((val & MASK_5) << 4) + | (STOP_BIT << (8 * 4)), + 5, + ), + } +} + +fn vint_len(data: &[u8]) -> usize { + for i in 0..5.min(data.len()) { + if data[i] >= STOP_BIT { + return i + 1; + } + } + panic!("Corrupted data. Invalid VInt 32"); +} + +pub fn read_vint_u32(data: &mut &[u8]) -> u32 { + let vlen = vint_len(*data); + let mut result = 0u32; + let mut shift = 0u64; + for b in data[..vlen].iter().cloned().map(|b| b & 127u8) { + result |= (b as u32) << shift; + shift += 7; + } + *data = &data[vlen..]; + result +} + +pub fn write_u32_vint(val: u32, writer: &mut W) -> io::Result<()> { + let (val, num_bytes) = serialize_vint_u32(val); + let mut buffer = [0u8; 8]; + LittleEndian::write_u64(&mut buffer, val); + writer.write_all(&buffer[..num_bytes]) +} + impl VInt { pub fn val(&self) -> u64 { self.0 @@ -24,7 +102,7 @@ impl VInt { output.extend(&buffer[0..num_bytes]); } - fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize { + pub fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize { let mut remaining = self.0; for (i, b) in buffer.iter_mut().enumerate() { let next_byte: u8 = (remaining % 128u64) as u8; @@ -64,7 +142,7 @@ impl BinarySerializable for VInt { return Err(io::Error::new( io::ErrorKind::InvalidData, "Reach end of buffer while reading VInt", - )) + )); } } } @@ -74,7 +152,9 @@ impl BinarySerializable for VInt { #[cfg(test)] mod tests { + use super::serialize_vint_u32; use super::VInt; + use byteorder::{ByteOrder, LittleEndian}; use common::BinarySerializable; fn aux_test_vint(val: u64) { @@ -108,4 +188,28 @@ mod tests { } aux_test_vint(10); } + + fn aux_test_serialize_vint_u32(val: u32) { + let mut buffer = [0u8; 10]; + let mut buffer2 = [0u8; 10]; + let len_vint = VInt(val as u64).serialize_into(&mut buffer); + let (vint, len) = serialize_vint_u32(val); + assert_eq!(len, len_vint, "len wrong for val {}", val); + LittleEndian::write_u64(&mut buffer2, vint); + assert_eq!(&buffer[..len], &buffer2[..len], "array wrong for {}", val); + } + + #[test] + fn test_vint_u32() { + aux_test_serialize_vint_u32(0); + aux_test_serialize_vint_u32(1); + aux_test_serialize_vint_u32(5); + for i in 1..3 { + let power_of_128 = 1u32 << (7 * i); + aux_test_serialize_vint_u32(power_of_128 - 1u32); + aux_test_serialize_vint_u32(power_of_128); + aux_test_serialize_vint_u32(power_of_128 + 1u32); + } + aux_test_serialize_vint_u32(u32::max_value()); + } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index e453dc1d5..5f5d4500c 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -52,17 +52,19 @@ type DocumentReceiver = channel::Receiver; /// /// Returns (the heap size in bytes, the hash table size in number of bits) fn initial_table_size(per_thread_memory_budget: usize) -> usize { + assert!(per_thread_memory_budget > 1_000); let table_size_limit: usize = per_thread_memory_budget / 3; - (1..) + if let Some(limit) = (1..) .take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit) .last() - .unwrap_or_else(|| { - panic!( - "Per thread memory is too small: {}", - per_thread_memory_budget - ) - }) - .min(19) // we cap it at 512K + { + limit.min(19) // we cap it at 2^19 = 512K. + } else { + unreachable!( + "Per thread memory is too small: {}", + per_thread_memory_budget + ); + } } /// `IndexWriter` is the user entry-point to add document to an index. @@ -302,7 +304,7 @@ fn index_documents( let last_docstamp: u64 = *(doc_opstamps.last().unwrap()); - let segment_entry: SegmentEntry = if delete_cursor.get().is_some() { + let delete_bitset_opt = if delete_cursor.get().is_some() { let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps); let segment_reader = SegmentReader::open(segment)?; let mut deleted_bitset = BitSet::with_capacity(num_docs as usize); @@ -313,18 +315,17 @@ fn index_documents( &doc_to_opstamps, last_docstamp, )?; - SegmentEntry::new(segment_meta, delete_cursor, { - if may_have_deletes { - Some(deleted_bitset) - } else { - None - } - }) + if may_have_deletes { + Some(deleted_bitset) + } else { + None + } } else { // if there are no delete operation in the queue, no need // to even open the segment. - SegmentEntry::new(segment_meta, delete_cursor, None) + None }; + let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt); Ok(segment_updater.add_segment(generation, segment_entry)) } diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 694ca94b0..c408c15df 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -1,6 +1,8 @@ use super::stacker::{Addr, MemoryArena, TermHashMap}; -use postings::recorder::{NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder}; +use postings::recorder::{ + BufferLender, NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder, +}; use postings::UnorderedTermId; use postings::{FieldSerializer, InvertedIndexSerializer}; use schema::IndexRecordOption; @@ -213,7 +215,7 @@ pub trait PostingsWriter { /// The `SpecializedPostingsWriter` is just here to remove dynamic /// dispatch to the recorder information. -pub struct SpecializedPostingsWriter { +pub(crate) struct SpecializedPostingsWriter { total_num_tokens: u64, _recorder_type: PhantomData, } @@ -245,8 +247,7 @@ impl PostingsWriter for SpecializedPostingsWriter debug_assert!(term.as_slice().len() >= 4); self.total_num_tokens += 1; term_index.mutate_or_create(term, |opt_recorder: Option| { - if opt_recorder.is_some() { - let mut recorder = opt_recorder.unwrap(); + if let Some(mut recorder) = opt_recorder { let current_doc = recorder.current_doc(); if current_doc != doc { recorder.close_doc(heap); @@ -255,7 +256,7 @@ impl PostingsWriter for SpecializedPostingsWriter recorder.record_position(position, heap); recorder } else { - let mut recorder = Rec::new(heap); + let mut recorder = Rec::new(); recorder.new_doc(doc, heap); recorder.record_position(position, heap); recorder @@ -270,10 +271,11 @@ impl PostingsWriter for SpecializedPostingsWriter termdict_heap: &MemoryArena, heap: &MemoryArena, ) -> io::Result<()> { + let mut buffer_lender = BufferLender::default(); for &(term_bytes, addr, _) in term_addrs { let recorder: Rec = termdict_heap.read(addr); serializer.new_term(&term_bytes[4..])?; - recorder.serialize(serializer, heap)?; + recorder.serialize(&mut buffer_lender, serializer, heap)?; serializer.close_term()?; } Ok(()) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 37a186ef0..915539a74 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -1,10 +1,51 @@ use super::stacker::{ExpUnrolledLinkedList, MemoryArena}; +use common::{read_vint_u32, write_u32_vint}; use postings::FieldSerializer; -use std::{self, io}; +use std::io; use DocId; const EMPTY_ARRAY: [u32; 0] = [0u32; 0]; -const POSITION_END: u32 = std::u32::MAX; +const POSITION_END: u32 = 0; + +#[derive(Default)] +pub(crate) struct BufferLender { + buffer_u8: Vec, + buffer_u32: Vec, +} + +impl BufferLender { + pub fn lend_u8(&mut self) -> &mut Vec { + self.buffer_u8.clear(); + &mut self.buffer_u8 + } + pub fn lend_all(&mut self) -> (&mut Vec, &mut Vec) { + self.buffer_u8.clear(); + self.buffer_u32.clear(); + (&mut self.buffer_u8, &mut self.buffer_u32) + } +} + +pub struct VInt32Reader<'a> { + data: &'a [u8], +} + +impl<'a> VInt32Reader<'a> { + fn new(data: &'a [u8]) -> VInt32Reader<'a> { + VInt32Reader { data } + } +} + +impl<'a> Iterator for VInt32Reader<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + if self.data.is_empty() { + None + } else { + Some(read_vint_u32(&mut self.data)) + } + } +} /// Recorder is in charge of recording relevant information about /// the presence of a term in a document. @@ -15,9 +56,9 @@ const POSITION_END: u32 = std::u32::MAX; /// * the document id /// * the term frequency /// * the term positions -pub trait Recorder: Copy + 'static { +pub(crate) trait Recorder: Copy + 'static { /// - fn new(heap: &mut MemoryArena) -> Self; + fn new() -> Self; /// Returns the current document fn current_doc(&self) -> u32; /// Starts recording information about a new document @@ -29,7 +70,12 @@ pub trait Recorder: Copy + 'static { /// Close the document. It will help record the term frequency. fn close_doc(&mut self, heap: &mut MemoryArena); /// Pushes the postings information to the serializer. - fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()>; + fn serialize( + &self, + buffer_lender: &mut BufferLender, + serializer: &mut FieldSerializer, + heap: &MemoryArena, + ) -> io::Result<()>; } /// Only records the doc ids @@ -40,9 +86,9 @@ pub struct NothingRecorder { } impl Recorder for NothingRecorder { - fn new(heap: &mut MemoryArena) -> Self { + fn new() -> Self { NothingRecorder { - stack: ExpUnrolledLinkedList::new(heap), + stack: ExpUnrolledLinkedList::new(), current_doc: u32::max_value(), } } @@ -53,16 +99,23 @@ impl Recorder for NothingRecorder { fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) { self.current_doc = doc; - self.stack.push(doc, heap); + let _ = write_u32_vint(doc, &mut self.stack.writer(heap)); } fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {} fn close_doc(&mut self, _heap: &mut MemoryArena) {} - fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> { - for doc in self.stack.iter(heap) { - serializer.write_doc(doc, 0u32, &EMPTY_ARRAY)?; + fn serialize( + &self, + buffer_lender: &mut BufferLender, + serializer: &mut FieldSerializer, + heap: &MemoryArena, + ) -> io::Result<()> { + let buffer = buffer_lender.lend_u8(); + self.stack.read_to_end(heap, buffer); + for doc in VInt32Reader::new(&buffer[..]) { + serializer.write_doc(doc as u32, 0u32, &EMPTY_ARRAY)?; } Ok(()) } @@ -77,9 +130,9 @@ pub struct TermFrequencyRecorder { } impl Recorder for TermFrequencyRecorder { - fn new(heap: &mut MemoryArena) -> Self { + fn new() -> Self { TermFrequencyRecorder { - stack: ExpUnrolledLinkedList::new(heap), + stack: ExpUnrolledLinkedList::new(), current_doc: u32::max_value(), current_tf: 0u32, } @@ -91,7 +144,7 @@ impl Recorder for TermFrequencyRecorder { fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) { self.current_doc = doc; - self.stack.push(doc, heap); + let _ = write_u32_vint(doc, &mut self.stack.writer(heap)); } fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) { @@ -100,24 +153,24 @@ impl Recorder for TermFrequencyRecorder { fn close_doc(&mut self, heap: &mut MemoryArena) { debug_assert!(self.current_tf > 0); - self.stack.push(self.current_tf, heap); + let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(heap)); self.current_tf = 0; } - fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> { - // the last document has not been closed... - // its term freq is self.current_tf. - let mut doc_iter = self - .stack - .iter(heap) - .chain(Some(self.current_tf).into_iter()); - - while let Some(doc) = doc_iter.next() { - let term_freq = doc_iter - .next() - .expect("The IndexWriter recorded a doc without a term freq."); - serializer.write_doc(doc, term_freq, &EMPTY_ARRAY)?; + fn serialize( + &self, + buffer_lender: &mut BufferLender, + serializer: &mut FieldSerializer, + heap: &MemoryArena, + ) -> io::Result<()> { + let buffer = buffer_lender.lend_u8(); + self.stack.read_to_end(heap, buffer); + let mut u32_it = VInt32Reader::new(&buffer[..]); + while let Some(doc) = u32_it.next() { + let term_freq = u32_it.next().unwrap_or(self.current_tf); + serializer.write_doc(doc as u32, term_freq, &EMPTY_ARRAY)?; } + Ok(()) } } @@ -128,11 +181,10 @@ pub struct TFAndPositionRecorder { stack: ExpUnrolledLinkedList, current_doc: DocId, } - impl Recorder for TFAndPositionRecorder { - fn new(heap: &mut MemoryArena) -> Self { + fn new() -> Self { TFAndPositionRecorder { - stack: ExpUnrolledLinkedList::new(heap), + stack: ExpUnrolledLinkedList::new(), current_doc: u32::max_value(), } } @@ -143,33 +195,88 @@ impl Recorder for TFAndPositionRecorder { fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) { self.current_doc = doc; - self.stack.push(doc, heap); + let _ = write_u32_vint(doc, &mut self.stack.writer(heap)); } fn record_position(&mut self, position: u32, heap: &mut MemoryArena) { - self.stack.push(position, heap); + let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(heap)); } fn close_doc(&mut self, heap: &mut MemoryArena) { - self.stack.push(POSITION_END, heap); + let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(heap)); } - fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> { - let mut doc_positions = Vec::with_capacity(100); - let mut positions_iter = self.stack.iter(heap); - while let Some(doc) = positions_iter.next() { - let mut prev_position = 0; - doc_positions.clear(); - for position in &mut positions_iter { - if position == POSITION_END { - break; - } else { - doc_positions.push(position - prev_position); - prev_position = position; + fn serialize( + &self, + buffer_lender: &mut BufferLender, + serializer: &mut FieldSerializer, + heap: &MemoryArena, + ) -> io::Result<()> { + let (buffer_u8, buffer_positions) = buffer_lender.lend_all(); + self.stack.read_to_end(heap, buffer_u8); + let mut u32_it = VInt32Reader::new(&buffer_u8[..]); + while let Some(doc) = u32_it.next() { + let mut prev_position_plus_one = 1u32; + buffer_positions.clear(); + loop { + match u32_it.next() { + Some(POSITION_END) | None => { + break; + } + Some(position_plus_one) => { + let delta_position = position_plus_one - prev_position_plus_one; + buffer_positions.push(delta_position); + prev_position_plus_one = position_plus_one; + } } } - serializer.write_doc(doc, doc_positions.len() as u32, &doc_positions)?; + serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions)?; } Ok(()) } } + +#[cfg(test)] +mod tests { + + use super::write_u32_vint; + use super::BufferLender; + use super::VInt32Reader; + + #[test] + fn test_buffer_lender() { + let mut buffer_lender = BufferLender::default(); + { + let buf = buffer_lender.lend_u8(); + assert!(buf.is_empty()); + buf.push(1u8); + } + { + let buf = buffer_lender.lend_u8(); + assert!(buf.is_empty()); + buf.push(1u8); + } + { + let (_, buf) = buffer_lender.lend_all(); + assert!(buf.is_empty()); + buf.push(1u32); + } + { + let (_, buf) = buffer_lender.lend_all(); + assert!(buf.is_empty()); + buf.push(1u32); + } + } + + #[test] + fn test_vint_u32() { + let mut buffer = vec![]; + let vals = [0, 1, 324_234_234, u32::max_value()]; + for &i in &vals { + assert!(write_u32_vint(i, &mut buffer).is_ok()); + } + assert_eq!(buffer.len(), 1 + 1 + 5 + 5); + let res: Vec = VInt32Reader::new(&buffer[..]).collect(); + assert_eq!(&res[..], &vals[..]); + } +} diff --git a/src/postings/stacker/expull.rs b/src/postings/stacker/expull.rs index 759325a16..58fa6e1ba 100644 --- a/src/postings/stacker/expull.rs +++ b/src/postings/stacker/expull.rs @@ -1,28 +1,37 @@ use super::{Addr, MemoryArena}; -use common::is_power_of_2; +use postings::stacker::memory_arena::load; +use postings::stacker::memory_arena::store; +use std::io; use std::mem; const MAX_BLOCK_LEN: u32 = 1u32 << 15; +const FIRST_BLOCK: usize = 16; +const INLINED_BLOCK_LEN: usize = FIRST_BLOCK + mem::size_of::(); -const FIRST_BLOCK: u32 = 4u32; +enum CapacityResult { + Available(u32), + NeedAlloc(u32), +} -#[inline] -pub fn jump_needed(len: u32) -> Option { +fn len_to_capacity(len: u32) -> CapacityResult { match len { - 0...3 => None, - 4...MAX_BLOCK_LEN => { - if is_power_of_2(len as usize) { - Some(len as usize) + 0...15 => CapacityResult::Available(FIRST_BLOCK as u32 - len), + 16...MAX_BLOCK_LEN => { + let cap = 1 << (32u32 - (len - 1u32).leading_zeros()); + let available = cap - len; + if available == 0 { + CapacityResult::NeedAlloc(len) } else { - None + CapacityResult::Available(available) } } n => { - if n % MAX_BLOCK_LEN == 0 { - Some(MAX_BLOCK_LEN as usize) + let available = n % MAX_BLOCK_LEN; + if available == 0 { + CapacityResult::NeedAlloc(MAX_BLOCK_LEN) } else { - None + CapacityResult::Available(MAX_BLOCK_LEN - available) } } } @@ -52,70 +61,119 @@ pub fn jump_needed(len: u32) -> Option { #[derive(Debug, Clone, Copy)] pub struct ExpUnrolledLinkedList { len: u32, - head: Addr, tail: Addr, + inlined_data: [u8; INLINED_BLOCK_LEN as usize], +} + +pub struct ExpUnrolledLinkedListWriter<'a> { + eull: &'a mut ExpUnrolledLinkedList, + heap: &'a mut MemoryArena, +} + +fn ensure_capacity<'a>( + eull: &'a mut ExpUnrolledLinkedList, + heap: &'a mut MemoryArena, +) -> &'a mut [u8] { + if eull.len <= FIRST_BLOCK as u32 { + // We are still hitting the inline block. + if eull.len < FIRST_BLOCK as u32 { + return &mut eull.inlined_data[eull.len as usize..FIRST_BLOCK]; + } + // We need to allocate a new block! + let new_block_addr: Addr = heap.allocate_space(FIRST_BLOCK + mem::size_of::()); + store(&mut eull.inlined_data[FIRST_BLOCK..], new_block_addr); + eull.tail = new_block_addr; + return heap.slice_mut(eull.tail, FIRST_BLOCK); + } + let len = match len_to_capacity(eull.len) { + CapacityResult::NeedAlloc(new_block_len) => { + let new_block_addr: Addr = + heap.allocate_space(new_block_len as usize + mem::size_of::()); + heap.write_at(eull.tail, new_block_addr); + eull.tail = new_block_addr; + new_block_len + } + CapacityResult::Available(available) => available, + }; + heap.slice_mut(eull.tail, len as usize) +} + +impl<'a> ExpUnrolledLinkedListWriter<'a> { + pub fn extend_from_slice(&mut self, mut buf: &[u8]) { + if buf.is_empty() { + // we need to cut early, because `ensure_capacity` + // allocates if there is no capacity at all right now. + return; + } + while !buf.is_empty() { + let add_len: usize; + { + let output_buf = ensure_capacity(self.eull, self.heap); + add_len = buf.len().min(output_buf.len()); + output_buf[..add_len].copy_from_slice(&buf[..add_len]); + } + self.eull.len += add_len as u32; + self.eull.tail = self.eull.tail.offset(add_len as u32); + buf = &buf[add_len..]; + } + } +} + +impl<'a> io::Write for ExpUnrolledLinkedListWriter<'a> { + fn write(&mut self, buf: &[u8]) -> io::Result { + // There is no use case to only write the capacity. + // This is not IO after all, so we write the whole + // buffer even if the contract of `.write` is looser. + self.extend_from_slice(buf); + Ok(buf.len()) + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.extend_from_slice(buf); + Ok(()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } } impl ExpUnrolledLinkedList { - pub fn new(heap: &mut MemoryArena) -> ExpUnrolledLinkedList { - let addr = heap.allocate_space((FIRST_BLOCK as usize) * mem::size_of::()); + pub fn new() -> ExpUnrolledLinkedList { ExpUnrolledLinkedList { len: 0u32, - head: addr, - tail: addr, + tail: Addr::null_pointer(), + inlined_data: [0u8; INLINED_BLOCK_LEN as usize], } } - pub fn iter<'a>(&self, heap: &'a MemoryArena) -> ExpUnrolledLinkedListIterator<'a> { - ExpUnrolledLinkedListIterator { - heap, - addr: self.head, - len: self.len, - consumed: 0, - } + #[inline(always)] + pub fn writer<'a>(&'a mut self, heap: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> { + ExpUnrolledLinkedListWriter { eull: self, heap } } - /// Appends a new element to the current stack. - /// - /// If the current block end is reached, a new block is allocated. - pub fn push(&mut self, val: u32, heap: &mut MemoryArena) { - self.len += 1; - if let Some(new_block_len) = jump_needed(self.len) { - // We need to allocate another block. - // We also allocate an extra `u32` to store the pointer - // to the future next block. - let new_block_size: usize = (new_block_len + 1) * mem::size_of::(); - let new_block_addr: Addr = heap.allocate_space(new_block_size); - heap.write_at(self.tail, new_block_addr); - self.tail = new_block_addr; + pub fn read_to_end(&self, heap: &MemoryArena, output: &mut Vec) { + let len = self.len as usize; + if len <= FIRST_BLOCK { + output.extend_from_slice(&self.inlined_data[..len]); + return; } - heap.write_at(self.tail, val); - self.tail = self.tail.offset(mem::size_of::() as u32); - } -} - -pub struct ExpUnrolledLinkedListIterator<'a> { - heap: &'a MemoryArena, - addr: Addr, - len: u32, - consumed: u32, -} - -impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> { - type Item = u32; - - fn next(&mut self) -> Option { - if self.consumed == self.len { - None - } else { - self.consumed += 1; - let addr: Addr = if jump_needed(self.consumed).is_some() { - self.heap.read(self.addr) - } else { - self.addr - }; - self.addr = addr.offset(mem::size_of::() as u32); - Some(self.heap.read(addr)) + output.extend_from_slice(&self.inlined_data[..FIRST_BLOCK]); + let mut cur = FIRST_BLOCK; + let mut addr = load(&self.inlined_data[FIRST_BLOCK..]); + loop { + let cap = match len_to_capacity(cur as u32) { + CapacityResult::Available(capacity) => capacity, + CapacityResult::NeedAlloc(capacity) => capacity, + } as usize; + let data = heap.slice(addr, cap); + if cur + cap >= len { + output.extend_from_slice(&data[..(len - cur)]); + return; + } + output.extend_from_slice(data); + cur += cap; + addr = heap.read(addr.offset(cap as u32)); } } } @@ -124,39 +182,126 @@ impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> { mod tests { use super::super::MemoryArena; - use super::jump_needed; + use super::len_to_capacity; use super::*; + use byteorder::{ByteOrder, LittleEndian, WriteBytesExt}; + #[test] #[test] fn test_stack() { let mut heap = MemoryArena::new(); - let mut stack = ExpUnrolledLinkedList::new(&mut heap); - stack.push(1u32, &mut heap); - stack.push(2u32, &mut heap); - stack.push(4u32, &mut heap); - stack.push(8u32, &mut heap); + let mut stack = ExpUnrolledLinkedList::new(); + stack.writer(&mut heap).extend_from_slice(&[1u8]); + stack.writer(&mut heap).extend_from_slice(&[2u8]); + stack.writer(&mut heap).extend_from_slice(&[3u8, 4u8]); + stack.writer(&mut heap).extend_from_slice(&[5u8]); { - let mut it = stack.iter(&heap); - assert_eq!(it.next().unwrap(), 1u32); - assert_eq!(it.next().unwrap(), 2u32); - assert_eq!(it.next().unwrap(), 4u32); - assert_eq!(it.next().unwrap(), 8u32); - assert!(it.next().is_none()); + let mut buffer = Vec::new(); + stack.read_to_end(&heap, &mut buffer); + assert_eq!(&buffer[..], &[1u8, 2u8, 3u8, 4u8, 5u8]); } } #[test] - fn test_jump_if_needed() { - let mut block_len = 4u32; - let mut i = 0; - while i < 10_000_000 { - assert!(jump_needed(i + block_len - 1).is_none()); - assert!(jump_needed(i + block_len + 1).is_none()); - assert!(jump_needed(i + block_len).is_some()); - let new_block_len = jump_needed(i + block_len).unwrap(); - i += block_len; - block_len = new_block_len as u32; + fn test_stack_long() { + let mut heap = MemoryArena::new(); + let mut stack = ExpUnrolledLinkedList::new(); + let source: Vec = (0..100).collect(); + for &el in &source { + assert!(stack + .writer(&mut heap) + .write_u32::(el) + .is_ok()); } + let mut buffer = Vec::new(); + stack.read_to_end(&heap, &mut buffer); + let mut result = vec![]; + let mut remaining = &buffer[..]; + while !remaining.is_empty() { + result.push(LittleEndian::read_u32(&remaining[..4])); + remaining = &remaining[4..]; + } + assert_eq!(&result[..], &source[..]); + } + + #[test] + fn test_stack_interlaced() { + let mut heap = MemoryArena::new(); + let mut stack = ExpUnrolledLinkedList::new(); + let mut stack2 = ExpUnrolledLinkedList::new(); + + let mut vec1: Vec = vec![]; + let mut vec2: Vec = vec![]; + + for i in 0..9 { + assert!(stack.writer(&mut heap).write_u32::(i).is_ok()); + assert!(vec1.write_u32::(i).is_ok()); + if i % 2 == 0 { + assert!(stack2 + .writer(&mut heap) + .write_u32::(i) + .is_ok()); + assert!(vec2.write_u32::(i).is_ok()); + } + } + let mut res1 = vec![]; + let mut res2 = vec![]; + stack.read_to_end(&heap, &mut res1); + stack2.read_to_end(&heap, &mut res2); + assert_eq!(&vec1[..], &res1[..]); + assert_eq!(&vec2[..], &res2[..]); + } + + #[test] + fn test_jump_if_needed() { + let mut available = 16u32; + for i in 0..10_000_000 { + match len_to_capacity(i) { + CapacityResult::NeedAlloc(cap) => { + assert_eq!(available, 0, "Failed len={}: Expected 0 got {}", i, cap); + available = cap; + } + CapacityResult::Available(cap) => { + assert_eq!( + available, cap, + "Failed len={}: Expected {} Got {}", + i, available, cap + ); + } + } + available -= 1; + } + } + + #[test] + fn test_jump_if_needed_progression() { + let mut v = vec![]; + for i in 0.. { + if v.len() >= 10 { + break; + } + match len_to_capacity(i) { + CapacityResult::NeedAlloc(cap) => { + v.push((i, cap)); + } + _ => {} + } + } + assert_eq!( + &v[..], + &[ + (16, 16), + (32, 32), + (64, 64), + (128, 128), + (256, 256), + (512, 512), + (1024, 1024), + (2048, 2048), + (4096, 4096), + (8192, 8192) + ] + ); } } @@ -164,6 +309,7 @@ mod tests { mod bench { use super::super::MemoryArena; use super::ExpUnrolledLinkedList; + use byteorder::{NativeEndian, WriteBytesExt}; use test::Bencher; const NUM_STACK: usize = 10_000; @@ -191,13 +337,13 @@ mod bench { let mut heap = MemoryArena::new(); let mut stacks = Vec::with_capacity(100); for _ in 0..NUM_STACK { - let mut stack = ExpUnrolledLinkedList::new(&mut heap); + let mut stack = ExpUnrolledLinkedList::new(); stacks.push(stack); } for s in 0..NUM_STACK { for i in 0u32..STACK_SIZE { let t = s * 392017 % NUM_STACK; - stacks[t].push(i, &mut heap); + let _ = stacks[t].writer(&mut heap).write_u32::(i); } } }); diff --git a/src/postings/stacker/memory_arena.rs b/src/postings/stacker/memory_arena.rs index 874103739..816492d28 100644 --- a/src/postings/stacker/memory_arena.rs +++ b/src/postings/stacker/memory_arena.rs @@ -69,12 +69,17 @@ impl Addr { } } - pub fn store(dest: &mut [u8], val: Item) { assert_eq!(dest.len(), std::mem::size_of::()); - unsafe { ptr::write_unaligned(dest.as_mut_ptr() as *mut Item, val); } + unsafe { + ptr::write_unaligned(dest.as_mut_ptr() as *mut Item, val); + } } +pub fn load(data: &[u8]) -> Item { + assert_eq!(data.len(), std::mem::size_of::()); + unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) } +} /// The `MemoryArena` pub struct MemoryArena { @@ -116,8 +121,7 @@ impl MemoryArena { /// /// If the address is erroneous pub fn read(&self, addr: Addr) -> Item { - let data = self.slice(addr, mem::size_of::()); - unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) } + load(self.slice(addr, mem::size_of::())) } pub fn slice(&self, addr: Addr, len: usize) -> &[u8] { @@ -128,6 +132,7 @@ impl MemoryArena { self.pages[addr.page_id()].slice_from(addr.page_local_addr()) } + #[inline(always)] pub fn slice_mut(&mut self, addr: Addr, len: usize) -> &mut [u8] { self.pages[addr.page_id()].slice_mut(addr.page_local_addr(), len) } diff --git a/src/postings/stacker/term_hashmap.rs b/src/postings/stacker/term_hashmap.rs index c838d1dd9..072f8d77c 100644 --- a/src/postings/stacker/term_hashmap.rs +++ b/src/postings/stacker/term_hashmap.rs @@ -2,12 +2,12 @@ extern crate murmurhash32; use self::murmurhash32::murmurhash2; -use byteorder::{ByteOrder, NativeEndian}; use super::{Addr, MemoryArena}; +use byteorder::{ByteOrder, NativeEndian}; +use postings::stacker::memory_arena::store; use std::iter; use std::mem; use std::slice; -use postings::stacker::memory_arena::store; pub type BucketId = usize; @@ -90,8 +90,7 @@ impl<'a> Iterator for Iter<'a> { fn next(&mut self) -> Option { self.inner.next().cloned().map(move |bucket: usize| { let kv = self.hashmap.table[bucket]; - let (key, offset): (&'a [u8], Addr) = - self.hashmap.get_key_value(kv.key_value_addr); + let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr); (key, offset, bucket as BucketId) }) } @@ -122,12 +121,22 @@ impl TermHashMap { self.table.len() < self.occupied.len() * 3 } + #[inline(always)] fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) { let data = self.heap.slice_from(addr); let key_bytes_len = NativeEndian::read_u16(data) as usize; let key_bytes: &[u8] = &data[2..][..key_bytes_len]; - let val_addr: Addr = addr.offset(2u32 + key_bytes_len as u32); - (key_bytes, val_addr) + (key_bytes, addr.offset(2u32 + key_bytes_len as u32)) + } + + #[inline(always)] + fn get_value_addr_if_key_match(&self, target_key: &[u8], addr: Addr) -> Option { + let (stored_key, value_addr) = self.get_key_value(addr); + if stored_key == target_key { + Some(value_addr) + } else { + None + } } pub fn set_bucket(&mut self, hash: u32, key_value_addr: Addr, bucket: usize) { @@ -192,7 +201,8 @@ impl TermHashMap { let kv: KeyValue = self.table[bucket]; if kv.is_empty() { let val = updater(None); - let num_bytes = std::mem::size_of::() + key_bytes.len() + std::mem::size_of::(); + let num_bytes = + std::mem::size_of::() + key_bytes.len() + std::mem::size_of::(); let key_addr = self.heap.allocate_space(num_bytes); { let data = self.heap.slice_mut(key_addr, num_bytes); @@ -204,12 +214,9 @@ impl TermHashMap { self.set_bucket(hash, key_addr, bucket); return bucket as BucketId; } else if kv.hash == hash { - let (key_matches, val_addr) = { - let (stored_key, val_addr): (&[u8], Addr) = - self.get_key_value(kv.key_value_addr); - (stored_key == key_bytes, val_addr) - }; - if key_matches { + if let Some(val_addr) = + self.get_value_addr_if_key_match(key_bytes, kv.key_value_addr) + { let v = self.heap.read(val_addr); let new_v = updater(Some(v)); self.heap.write_at(val_addr, new_v); @@ -220,24 +227,6 @@ impl TermHashMap { } } -#[cfg(all(test, feature = "unstable"))] -mod bench { - use super::murmurhash2::murmurhash2; - use test::Bencher; - - #[bench] - fn bench_murmurhash2(b: &mut Bencher) { - let keys: [&'static str; 3] = ["wer qwe qwe qwe ", "werbq weqweqwe2 ", "weraq weqweqwe3 "]; - b.iter(|| { - let mut s = 0; - for &key in &keys { - s ^= murmurhash2(key.as_bytes()); - } - s - }); - } -} - #[cfg(test)] mod tests { diff --git a/src/schema/facet.rs b/src/schema/facet.rs index c44b73571..c6efe5ae2 100644 --- a/src/schema/facet.rs +++ b/src/schema/facet.rs @@ -133,7 +133,7 @@ impl<'a, T: ?Sized + AsRef> From<&'a T> for Facet { } let path: &str = path_asref.as_ref(); assert!(!path.is_empty()); - assert!(path.starts_with("/")); + assert!(path.starts_with('/')); let mut facet_encoded = String::new(); let mut state = State::Idle; let path_bytes = path.as_bytes(); diff --git a/src/termdict/term_info_store.rs b/src/termdict/term_info_store.rs index 157f6adcb..caac6fef7 100644 --- a/src/termdict/term_info_store.rs +++ b/src/termdict/term_info_store.rs @@ -1,4 +1,4 @@ -use byteorder::{LittleEndian, ByteOrder}; +use byteorder::{ByteOrder, LittleEndian}; use common::bitpacker::BitPacker; use common::compute_num_bits; use common::Endianness; @@ -20,8 +20,6 @@ struct TermInfoBlockMeta { positions_idx_nbits: u8, } - - impl BinarySerializable for TermInfoBlockMeta { fn serialize(&self, write: &mut W) -> io::Result<()> { self.offset.serialize(write)?; @@ -89,20 +87,21 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { assert!(num_bits <= 56); let addr_byte = addr_bits / 8; let bit_shift = (addr_bits % 8) as u64; - let val_unshifted_unmasked: u64; - if data.len() >= addr_byte + 8 { - val_unshifted_unmasked = LittleEndian::read_u64(&data[addr_byte..][..8]); + let val_unshifted_unmasked: u64 = if data.len() >= addr_byte + 8 { + LittleEndian::read_u64(&data[addr_byte..][..8]) } else { + // the buffer is not large enough. + // Let's copy the few remaining bytes to a 8 byte buffer + // padded with 0s. let mut buf = [0u8; 8]; let data_to_copy = &data[addr_byte..]; let nbytes = data_to_copy.len(); buf[..nbytes].copy_from_slice(data_to_copy); - val_unshifted_unmasked = LittleEndian::read_u64(&buf); - } + LittleEndian::read_u64(&buf) + }; let val_shifted_unmasked = val_unshifted_unmasked >> bit_shift; let mask = (1u64 << u64::from(num_bits)) - 1; val_shifted_unmasked & mask - } impl TermInfoStore {