mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-24 20:20:44 +00:00
Making it possible to read positions twice
This commit is contained in:
@@ -81,7 +81,7 @@ impl BlockEncoder {
|
||||
|
||||
|
||||
pub struct BlockDecoder {
|
||||
pub output: [u32; COMPRESSED_BLOCK_MAX_SIZE],
|
||||
pub output: [u32; COMPRESSION_BLOCK_SIZE + 1],
|
||||
pub output_len: usize,
|
||||
}
|
||||
|
||||
@@ -91,8 +91,10 @@ impl BlockDecoder {
|
||||
}
|
||||
|
||||
pub fn with_val(val: u32) -> BlockDecoder {
|
||||
let mut output = [val; COMPRESSION_BLOCK_SIZE + 1];
|
||||
output[COMPRESSION_BLOCK_SIZE] = 0u32;
|
||||
BlockDecoder {
|
||||
output: [val; COMPRESSED_BLOCK_MAX_SIZE],
|
||||
output,
|
||||
output_len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,12 @@ use directory::{ReadOnlySource, SourceRead};
|
||||
/// decompressing blocks that are not required.
|
||||
pub struct CompressedIntStream {
|
||||
buffer: SourceRead,
|
||||
|
||||
block_decoder: BlockDecoder,
|
||||
cached_addr: usize, // address of the currently decoded block
|
||||
cached_next_addr: usize, // address following the currently decoded block
|
||||
|
||||
addr: usize, // address of the block associated to the current position
|
||||
inner_offset: usize,
|
||||
}
|
||||
|
||||
@@ -21,34 +26,47 @@ impl CompressedIntStream {
|
||||
CompressedIntStream {
|
||||
buffer: SourceRead::from(source),
|
||||
block_decoder: BlockDecoder::new(),
|
||||
inner_offset: COMPRESSION_BLOCK_SIZE,
|
||||
cached_addr: usize::max_value(),
|
||||
cached_next_addr: usize::max_value(),
|
||||
|
||||
addr: 0,
|
||||
inner_offset: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Fills a buffer with the next `output.len()` integers,
|
||||
/// and advance the stream by that many els.
|
||||
/// Loads the block at the given address and return the address of the
|
||||
/// following block
|
||||
pub fn read_block(&mut self, addr: usize) -> usize {
|
||||
if self.cached_addr == addr {
|
||||
// we are already on this block.
|
||||
// no need to read.
|
||||
self.cached_next_addr
|
||||
} else {
|
||||
let next_addr = addr + self.block_decoder.uncompress_block_unsorted(self.buffer.slice_from(addr));
|
||||
self.cached_addr = addr;
|
||||
self.cached_next_addr = next_addr;
|
||||
next_addr
|
||||
}
|
||||
}
|
||||
|
||||
/// Fills a buffer with the next `output.len()` integers.
|
||||
/// This does not consume / advance the stream.
|
||||
pub fn read(&mut self, output: &mut [u32]) {
|
||||
let mut cursor = self.addr;
|
||||
let mut inner_offset = self.inner_offset;
|
||||
let mut num_els: usize = output.len();
|
||||
let mut start: usize = 0;
|
||||
let mut start = 0;
|
||||
loop {
|
||||
let available = COMPRESSION_BLOCK_SIZE - self.inner_offset;
|
||||
if num_els >= available {
|
||||
if available > 0 {
|
||||
let uncompressed_block =
|
||||
&self.block_decoder.output_array()[self.inner_offset..];
|
||||
output[start..][..available].clone_from_slice(uncompressed_block);
|
||||
}
|
||||
num_els -= available;
|
||||
start += available;
|
||||
let num_consumed_bytes = self.block_decoder
|
||||
.uncompress_block_unsorted(self.buffer.as_ref());
|
||||
self.buffer.advance(num_consumed_bytes);
|
||||
self.inner_offset = 0;
|
||||
cursor = self.read_block(cursor);
|
||||
let block = &self.block_decoder.output_array()[inner_offset..];
|
||||
let block_len = block.len();
|
||||
if num_els >= block_len {
|
||||
output[start..start + block_len].clone_from_slice(&block);
|
||||
start += block_len;
|
||||
num_els -= block_len;
|
||||
inner_offset = 0;
|
||||
} else {
|
||||
let uncompressed_block = &self.block_decoder.output_array()
|
||||
[self.inner_offset..self.inner_offset + num_els];
|
||||
output[start..][..num_els].clone_from_slice(uncompressed_block);
|
||||
self.inner_offset += num_els;
|
||||
output[start..].clone_from_slice(&block[..num_els]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -61,22 +79,19 @@ impl CompressedIntStream {
|
||||
///
|
||||
/// May panic if the end of the stream is reached.
|
||||
pub fn skip(&mut self, mut skip_len: usize) {
|
||||
let available = COMPRESSION_BLOCK_SIZE - self.inner_offset;
|
||||
if available >= skip_len {
|
||||
self.inner_offset += skip_len;
|
||||
} else {
|
||||
skip_len -= available;
|
||||
// entirely skip decompressing some blocks.
|
||||
while skip_len >= COMPRESSION_BLOCK_SIZE {
|
||||
skip_len -= COMPRESSION_BLOCK_SIZE;
|
||||
let num_bits: u8 = self.buffer.as_ref()[0];
|
||||
loop {
|
||||
let available = COMPRESSION_BLOCK_SIZE - self.inner_offset;
|
||||
if available >= skip_len {
|
||||
self.inner_offset += skip_len;
|
||||
break;
|
||||
} else {
|
||||
skip_len -= available;
|
||||
// entirely skip decompressing some blocks.
|
||||
let num_bits: u8 = self.buffer.get(self.addr);
|
||||
let block_len = compressed_block_size(num_bits);
|
||||
self.buffer.advance(block_len);
|
||||
self.addr += block_len;
|
||||
self.inner_offset = 0;
|
||||
}
|
||||
let num_consumed_bytes = self.block_decoder
|
||||
.uncompress_block_unsorted(self.buffer.as_ref());
|
||||
self.buffer.advance(num_consumed_bytes);
|
||||
self.inner_offset = skip_len;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -115,13 +130,24 @@ pub mod tests {
|
||||
stream.read(&mut block[0..2]);
|
||||
assert_eq!(block[0], 0);
|
||||
assert_eq!(block[1], 1);
|
||||
|
||||
// reading does not consume the stream
|
||||
stream.read(&mut block[0..2]);
|
||||
assert_eq!(block[0], 0);
|
||||
assert_eq!(block[1], 1);
|
||||
stream.skip(2);
|
||||
|
||||
stream.skip(5);
|
||||
stream.read(&mut block[0..3]);
|
||||
stream.skip(3);
|
||||
|
||||
assert_eq!(block[0], 7);
|
||||
assert_eq!(block[1], 8);
|
||||
assert_eq!(block[2], 9);
|
||||
stream.skip(500);
|
||||
stream.read(&mut block[0..3]);
|
||||
stream.skip(3);
|
||||
|
||||
assert_eq!(block[0], 510);
|
||||
assert_eq!(block[1], 511);
|
||||
assert_eq!(block[2], 512);
|
||||
|
||||
@@ -122,6 +122,16 @@ impl SourceRead {
|
||||
pub fn advance(&mut self, len: usize) {
|
||||
self.cursor = &self.cursor[len..];
|
||||
}
|
||||
|
||||
pub fn slice_from(&self, start: usize) -> &[u8] {
|
||||
&self.cursor[start..]
|
||||
|
||||
}
|
||||
|
||||
pub fn get(&self, idx: usize) -> u8 {
|
||||
self.cursor[idx]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for SourceRead {
|
||||
|
||||
@@ -181,7 +181,6 @@ impl<TPostings: Postings> Scorer for PhraseScorer<TPostings> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use tests;
|
||||
use test::Bencher;
|
||||
use super::{intersection_count, intersection};
|
||||
|
||||
@@ -198,7 +197,7 @@ mod tests {
|
||||
#[bench]
|
||||
fn bench_intersection_count_short(b: &mut Bencher) {
|
||||
b.iter(|| {
|
||||
let mut left = [1, 5, 10, 12];
|
||||
let left = [1, 5, 10, 12];
|
||||
let right = [5, 7];
|
||||
intersection_count(&left, &right);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user