issue/207 Lazily decompressing positions.

This commit is contained in:
Paul Masurel
2017-08-06 16:03:07 +09:00
parent 236fa74767
commit d1f61a50c1
11 changed files with 110 additions and 83 deletions

View File

@@ -40,8 +40,3 @@ size_t uncompress_unsorted(
simdunpack((__m128i *)compressed_data, output, b);
return 1 + b * sizeof(__m128i);
}
size_t compressedbytes(const uint32_t length, const uint8_t num_bits) {
return simdpack_compressedbytes((int)length, (uint32_t)num_bits);
}

View File

@@ -17,7 +17,7 @@ mod pack {
pub use self::compression_pack_simd::*;
}
pub use self::pack::{BlockEncoder, BlockDecoder, compressedbytes};
pub use self::pack::{BlockEncoder, BlockDecoder};
#[cfg( any(not(feature="simdcompression"), target_env="msvc") )]
mod vint {
@@ -31,6 +31,10 @@ mod vint {
pub use self::compression_vint_simd::*;
}
/// Returns the size in bytes of a compressed block, given num_bits.
pub fn compressed_block_size(num_bits: u8) -> usize {
1 + (num_bits as usize) * 16
}
pub trait VIntEncoder {
fn compress_vint_sorted(&mut self, input: &[u32], offset: u32) -> &[u8];
@@ -87,6 +91,7 @@ pub mod tests {
use super::*;
use tests;
use test::Bencher;
use std::iter;
#[test]
fn test_encode_sorted_block() {
@@ -194,6 +199,19 @@ pub mod tests {
b.iter(|| { decoder.uncompress_block_sorted(compressed, 0u32); });
}
#[test]
fn test_all_docs_compression_numbits() {
for num_bits in 0..33 {
let mut data: Vec<u32> = iter::repeat(0u32).take(128).collect();
if num_bits > 0 {
data[0] = 1 << (num_bits - 1);
}
let mut encoder = BlockEncoder::new();
let compressed = encoder.compress_block_unsorted(&data);
assert_eq!(compressed[0] as usize, num_bits);
assert_eq!(compressed.len(), compressed_block_size(compressed[0]));
}
}
const NUM_INTS_BENCH_VINT: usize = 10;

View File

@@ -16,15 +16,9 @@ mod simdcomp {
pub fn compress_unsorted(data: *const u32, output: *mut u8) -> size_t;
pub fn uncompress_unsorted(compressed_data: *const u8, output: *mut u32) -> size_t;
pub fn compressedbytes(length: u32, num_bits: u8) -> size_t;
}
}
pub fn compressedbytes(length: u32, num_bits: u8) -> usize {
unsafe { simdcomp::compressedbytes(length, num_bits) }
}
fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize {
unsafe { simdcomp::compress_sorted(vals.as_ptr(), output.as_mut_ptr(), offset) }
}
@@ -123,4 +117,5 @@ mod tests {
let compressed = encoder.compress_block_sorted(&data, 0u32);
assert_eq!(compressed.len(), 17);
}
}

View File

@@ -1,6 +1,6 @@
use compression::BlockDecoder;
use compression::NUM_DOCS_PER_BLOCK;
use compression::compressedbytes;
use compression::compressed_block_size;
pub struct CompressedIntStream<'a> {
buffer: &'a [u8],
@@ -52,8 +52,8 @@ impl<'a> CompressedIntStream<'a> {
while skip_len >= NUM_DOCS_PER_BLOCK {
skip_len -= NUM_DOCS_PER_BLOCK;
let num_bits: u8 = self.buffer[0];
let block_len = compressedbytes(128, num_bits);
self.buffer = &self.buffer[1 + block_len..];
let block_len = compressed_block_size(num_bits);
self.buffer = &self.buffer[block_len..];
}
self.buffer = self.block_decoder.uncompress_block_unsorted(self.buffer);
self.inner_offset = skip_len;
@@ -66,8 +66,7 @@ impl<'a> CompressedIntStream<'a> {
pub mod tests {
use super::CompressedIntStream;
use tests;
use compression::compressedbytes;
use compression::compressed_block_size;
use compression::NUM_DOCS_PER_BLOCK;
use compression::BlockEncoder;
@@ -78,7 +77,7 @@ pub mod tests {
for chunk in vals.chunks(NUM_DOCS_PER_BLOCK) {
let compressed_block = encoder.compress_block_unsorted(chunk);
let num_bits = compressed_block[0];
assert_eq!(compressedbytes(128, num_bits) + 1, compressed_block.len());
assert_eq!(compressed_block_size(num_bits), compressed_block.len());
buffer.extend_from_slice(compressed_block);
}
buffer

View File

@@ -197,10 +197,10 @@ impl SegmentReader {
/// For instance, requesting `SegmentPostingsOption::FreqAndPositions` for a
/// `TextIndexingOptions` that does not index position will return a `SegmentPostings`
/// with `DocId`s and frequencies.
pub fn read_postings<'a>(&'a self,
pub fn read_postings(&self,
term: &Term,
option: SegmentPostingsOption)
-> Option<SegmentPostings<'a>> {
-> Option<SegmentPostings> {
let field = term.field();
let field_entry = self.schema.get_field_entry(field);
let term_info = get!(self.get_term_info(term));

View File

@@ -61,6 +61,31 @@ fn extract_fast_field_reader(segment_reader: &SegmentReader,
segment_reader.fast_fields_reader().open_reader(field)
}
struct DeltaComputer {
buffer: Vec<u32>,
}
impl DeltaComputer {
fn new() -> DeltaComputer {
DeltaComputer { buffer: vec![0u32; 512] }
}
fn compute_delta(&mut self, positions: &[u32]) -> &[u32] {
if positions.len() > self.buffer.len() {
self.buffer.resize(positions.len(), 0u32);
}
let mut last_pos = 0u32;
let num_positions = positions.len();
for i in 0..num_positions {
let cur_pos = positions[i];
self.buffer[i] = cur_pos - last_pos;
last_pos = cur_pos;
}
&self.buffer[..positions.len()]
}
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> Result<IndexMerger> {
let mut readers = vec![];
@@ -169,6 +194,7 @@ impl IndexMerger {
fn write_postings(&self, serializer: &mut PostingsSerializer) -> Result<()> {
let mut delta_computer = DeltaComputer::new();
let mut merged_terms = TermMerger::from(&self.readers[..]);
let mut max_doc = 0;
@@ -270,8 +296,9 @@ impl IndexMerger {
old_to_new_doc_id[segment_postings.doc() as usize] {
// we make sure to only write the term iff
// there is at least one document.
let delta_positions: &[u32] = segment_postings.delta_positions();
let positions: &[u32] = segment_postings.positions();
let term_freq = segment_postings.term_freq();
let delta_positions = delta_computer.compute_delta(positions);
serializer
.write_doc(remapped_doc_id, term_freq, delta_positions)?;
}

View File

@@ -17,16 +17,6 @@ pub trait Postings: DocSet {
/// Returns the list of positions of the term, expressed as a list of
/// token ordinals.
fn positions(&self) -> &[u32];
/// Return the list of delta positions.
///
/// Delta positions is simply the difference between
/// two consecutive positions.
/// The first delta position is the first position of the
/// term in the document.
///
/// For instance, if positions are `[7,13,17]`
/// then delta positions `[7, 6, 4]`
fn delta_positions(&self) -> &[u32];
}
impl<TPostings: Postings> Postings for Box<TPostings> {
@@ -39,11 +29,6 @@ impl<TPostings: Postings> Postings for Box<TPostings> {
let unboxed: &TPostings = self.borrow();
unboxed.positions()
}
fn delta_positions(&self) -> &[u32] {
let unboxed: &TPostings = self.borrow();
unboxed.delta_positions()
}
}
impl<'a, TPostings: Postings> Postings for &'a mut TPostings {
@@ -56,9 +41,4 @@ impl<'a, TPostings: Postings> Postings for &'a mut TPostings {
let unref: &TPostings = *self;
unref.positions()
}
fn delta_positions(&self) -> &[u32] {
let unref: &TPostings = *self;
unref.delta_positions()
}
}

View File

@@ -16,8 +16,6 @@ struct PositionComputer<'a> {
// if none, position are already loaded in
// the positions vec.
position_to_skip: Option<usize>,
delta_positions: Vec<u32>,
positions: Vec<u32>,
positions_stream: CompressedIntStream<'a>,
}
@@ -28,7 +26,6 @@ impl<'a> PositionComputer<'a> {
PositionComputer {
position_to_skip: None,
positions: vec!(),
delta_positions: vec!(),
positions_stream: positions_stream,
}
}
@@ -42,24 +39,21 @@ impl<'a> PositionComputer<'a> {
}
pub fn positions(&mut self, term_freq: usize) -> &[u32] {
self.delta_positions(term_freq);
&self.positions[..term_freq]
}
pub fn delta_positions(&mut self, term_freq: usize) -> &[u32] {
if let Some(num_skip) = self.position_to_skip {
self.delta_positions.resize(term_freq, 0u32);
self.positions_stream.skip(num_skip);
self.positions_stream.read(&mut self.delta_positions[..term_freq]);
self.positions.resize(term_freq, 0u32);
self.positions_stream.skip(num_skip);
self.positions_stream.read(&mut self.positions[..term_freq]);
let mut cum = 0u32;
for i in 0..term_freq as usize {
cum += self.delta_positions[i];
cum += self.positions[i];
self.positions[i] = cum;
}
self.position_to_skip = None;
}
&self.delta_positions[..term_freq]
&self.positions[..term_freq]
}
}
@@ -74,7 +68,6 @@ pub struct SegmentPostings<'a> {
block_cursor: BlockSegmentPostings<'a>,
cur: usize,
delete_bitset: DeleteBitSet,
position_computer: Option<UnsafeCell<PositionComputer<'a>>>,
}
@@ -111,6 +104,16 @@ impl<'a> SegmentPostings<'a> {
position_computer: None,
}
}
fn position_add_skip<F: FnOnce()->usize>(&self, num_skips_fn: F) {
if let Some(ref position_computer) = self.position_computer.as_ref() {
let num_skips = num_skips_fn();
unsafe {
(*position_computer.get()).add_skip(num_skips);
}
}
}
}
@@ -119,9 +122,7 @@ impl<'a> DocSet for SegmentPostings<'a> {
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> bool {
let mut pos_to_skip = 0u32;
loop {
pos_to_skip += self.term_freq();
self.cur += 1;
if self.cur >= self.block_cursor.block_len() {
self.cur = 0;
@@ -130,12 +131,8 @@ impl<'a> DocSet for SegmentPostings<'a> {
return false;
}
}
self.position_add_skip(|| { self.term_freq() as usize });
if !self.delete_bitset.is_deleted(self.doc()) {
if let Some(ref mut position_computer) = self.position_computer.as_mut() {
unsafe {
(*position_computer.get()).add_skip(pos_to_skip as usize);
}
}
return true;
}
}
@@ -147,6 +144,10 @@ impl<'a> DocSet for SegmentPostings<'a> {
return SkipResult::End;
}
// in the following, thanks to the call to advance above,
// we know that the position is not loaded and we need
// to skip every doc_freq we cross.
// skip blocks until one that might contain the target
loop {
// check if we need to go to the next block
@@ -155,13 +156,26 @@ impl<'a> DocSet for SegmentPostings<'a> {
(block_docs[self.cur], block_docs[block_docs.len() - 1])
};
if target > last_doc_in_block {
// we add skip for the current term independantly,
// so that position_add_skip will decide if it should
// just set itself to Some(0) or effectively
// add the term freq.
//let num_skips: u32 = ;
self.position_add_skip(|| {
let freqs_skipped = &self.block_cursor.freqs()[self.cur..];
let sum_freq: u32 = freqs_skipped.iter().cloned().sum();
sum_freq as usize
});
if !self.block_cursor.advance() {
return SkipResult::End;
}
self.cur = 0;
} else {
if target < current_doc {
// We've overpassed the target after the first `advance` call
// We've passed the target after the first `advance` call
// or we're at the beginning of a block.
// Either way, we're on the first `DocId` greater than `target`
return SkipResult::OverStep;
@@ -207,6 +221,13 @@ impl<'a> DocSet for SegmentPostings<'a> {
// `doc` is now >= `target`
let doc = block_docs[start];
self.position_add_skip(|| {
let freqs_skipped = &self.block_cursor.freqs()[self.cur..start];
let sum_freqs: u32 = freqs_skipped.iter().sum();
sum_freqs as usize
});
self.cur = start;
if !self.delete_bitset.is_deleted(doc) {
@@ -228,6 +249,7 @@ impl<'a> DocSet for SegmentPostings<'a> {
self.len()
}
/// Return the current document's `DocId`.
#[inline]
fn doc(&self) -> DocId {
let docs = self.block_cursor.docs();
@@ -249,28 +271,19 @@ impl<'a> Postings for SegmentPostings<'a> {
}
fn positions(&self) -> &[u32] {
let term_freq = self.term_freq();
let position_computer_ptr: *mut PositionComputer = self.position_computer
.as_ref()
.expect("Segment reader does not have positions.")
.get();
unsafe {
(&mut *position_computer_ptr).positions(term_freq as usize)
}
}
fn delta_positions(&self) -> &[u32] {
let term_freq = self.term_freq();
self.position_computer
.as_ref()
.map(|position_computer| {
unsafe {
(&mut *position_computer.get()).delta_positions(term_freq as usize)
(&mut *position_computer.get()).positions(term_freq as usize)
}
})
.unwrap_or(&EMPTY_POSITIONS[..])
}
}
/// `BlockSegmentPostings` is a cursor iterating over blocks
@@ -351,16 +364,19 @@ impl<'a> BlockSegmentPostings<'a> {
self.doc_decoder.output_array()
}
/// Return the document at index `idx` of the block.
#[inline]
pub fn doc(&self, idx: usize) -> u32 {
self.doc_decoder.output(idx)
}
/// Return the array of `term freq` in the block.
#[inline]
pub fn freqs(&self) -> &[u32] {
self.freq_decoder.output_array()
}
/// Return the frequency at index `idx` of the block.
#[inline]
pub fn freq(&self, idx: usize) -> u32 {
self.freq_decoder.output(idx)

View File

@@ -17,6 +17,9 @@ pub enum SegmentPostingsOption {
}
impl SegmentPostingsOption {
/// Returns true iff this option includes encoding
/// term frequencies.
pub fn has_freq(&self) -> bool {
match *self {
SegmentPostingsOption::NoFreq => false,
@@ -24,6 +27,8 @@ impl SegmentPostingsOption {
}
}
/// Returns true iff this option include encoding
/// term positions.
pub fn has_positions(&self) -> bool {
match *self {
SegmentPostingsOption::FreqAndPositions => true,

View File

@@ -12,11 +12,8 @@ use DocId;
use core::Segment;
use std::io::{self, Write};
use compression::VIntEncoder;
use common::VInt;
use common::BinarySerializable;
use common::CountingWriter;
use termdict::TermDictionaryBuilder;
use datastruct::{SkipList, SkipListBuilder};
/// `PostingsSerializer` is in charge of serializing

View File

@@ -54,11 +54,6 @@ impl Postings for VecPostings {
fn positions(&self) -> &[u32] {
&EMPTY_ARRAY
}
fn delta_positions(&self) -> &[u32] {
&EMPTY_ARRAY
}
}
#[cfg(test)]