Issue/333 (#335)

* Add skip information for posting list (skip to doc ids) 
* Separate num bits from data for positions (skip n positions)
* Address in the position using a n-position offset
* Added a long skip structure to allow efficient opening of the position for a given term.
This commit is contained in:
Paul Masurel
2018-07-31 10:51:53 +09:00
committed by GitHub
parent 55928d756a
commit e8707c02c0
21 changed files with 1121 additions and 460 deletions

View File

@@ -64,7 +64,7 @@ impl<W: Write> CompositeWrite<W> {
&mut self.write
}
/// Close the composite file.
/// Close the composite file
///
/// An index of the different field offsets
/// will be written as a footer.
@@ -112,7 +112,6 @@ impl CompositeFile {
let end = data.len();
let footer_len_data = data.slice_from(end - 4);
let footer_len = u32::deserialize(&mut footer_len_data.as_slice())? as usize;
let footer_start = end - 4 - footer_len;
let footer_data = data.slice(footer_start, footer_start + footer_len);
let mut footer_buffer = footer_data.as_slice();

View File

@@ -7,7 +7,11 @@ use std::io::Write;
#[derive(Debug, Eq, PartialEq)]
pub struct VInt(pub u64);
const STOP_BIT: u8 = 128;
impl VInt {
pub fn val(&self) -> u64 {
self.0
}
@@ -15,24 +19,35 @@ impl VInt {
pub fn deserialize_u64<R: Read>(reader: &mut R) -> io::Result<u64> {
VInt::deserialize(reader).map(|vint| vint.0)
}
pub fn serialize_into_vec(&self, output: &mut Vec<u8>){
let mut buffer = [0u8; 10];
let num_bytes = self.serialize_into(&mut buffer);
output.extend(&buffer[0..num_bytes]);
}
fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize {
let mut remaining = self.0;
for (i, b) in buffer.iter_mut().enumerate() {
let next_byte: u8 = (remaining % 128u64) as u8;
remaining /= 128u64;
if remaining == 0u64 {
*b = next_byte | STOP_BIT;
return i + 1;
} else {
*b = next_byte;
}
}
unreachable!();
}
}
impl BinarySerializable for VInt {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
let mut remaining = self.0;
let mut buffer = [0u8; 10];
let mut i = 0;
loop {
let next_byte: u8 = (remaining % 128u64) as u8;
remaining /= 128u64;
if remaining == 0u64 {
buffer[i] = next_byte | 128u8;
return writer.write_all(&buffer[0..i + 1]);
} else {
buffer[i] = next_byte;
}
i += 1;
}
let num_bytes = self.serialize_into(&mut buffer);
writer.write_all(&buffer[0..num_bytes])
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
@@ -42,20 +57,59 @@ impl BinarySerializable for VInt {
loop {
match bytes.next() {
Some(Ok(b)) => {
result += u64::from(b % 128u8) << shift;
if b & 128u8 != 0u8 {
break;
result |= u64::from(b % 128u8) << shift;
if b >= STOP_BIT {
return Ok(VInt(result));
}
shift += 7;
}
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Reach end of buffer",
"Reach end of buffer while reading VInt",
))
}
}
}
Ok(VInt(result))
}
}
#[cfg(test)]
mod tests {
use super::VInt;
use common::BinarySerializable;
fn aux_test_vint(val: u64) {
let mut v = [14u8; 10];
let num_bytes = VInt(val).serialize_into(&mut v);
for i in num_bytes..10 {
assert_eq!(v[i], 14u8);
}
assert!(num_bytes > 0);
if num_bytes < 10 {
assert!(1u64 << (7*num_bytes) > val);
}
if num_bytes > 1 {
assert!(1u64 << (7*(num_bytes-1)) <= val);
}
let serdeser_val = VInt::deserialize(&mut &v[..]).unwrap();
assert_eq!(val, serdeser_val.0);
}
#[test]
fn test_vint() {
aux_test_vint(0);
aux_test_vint(1);
aux_test_vint(5);
aux_test_vint(u64::max_value());
for i in 1..9 {
let power_of_128 = 1u64 << (7*i);
aux_test_vint(power_of_128 - 1u64);
aux_test_vint(power_of_128 );
aux_test_vint(power_of_128 + 1u64);
}
aux_test_vint(10);
}
}

View File

@@ -1,160 +0,0 @@
use compression::compressed_block_size;
use compression::BlockDecoder;
use compression::COMPRESSION_BLOCK_SIZE;
use directory::ReadOnlySource;
use owned_read::OwnedRead;
/// Reads a stream of compressed ints.
///
/// Tantivy uses `CompressedIntStream` to read
/// the position file.
/// The `.skip(...)` makes it possible to avoid
/// decompressing blocks that are not required.
pub struct CompressedIntStream {
buffer: OwnedRead,
block_decoder: BlockDecoder,
cached_addr: usize, // address of the currently decoded block
cached_next_addr: usize, // address following the currently decoded block
addr: usize, // address of the block associated to the current position
inner_offset: usize,
}
impl CompressedIntStream {
/// Opens a compressed int stream.
pub(crate) fn wrap(source: ReadOnlySource) -> CompressedIntStream {
CompressedIntStream {
buffer: OwnedRead::new(source),
block_decoder: BlockDecoder::new(),
cached_addr: usize::max_value(),
cached_next_addr: usize::max_value(),
addr: 0,
inner_offset: 0,
}
}
/// Loads the block at the given address and return the address of the
/// following block
pub fn read_block(&mut self, addr: usize) -> usize {
if self.cached_addr == addr {
// we are already on this block.
// no need to read.
self.cached_next_addr
} else {
let next_addr = addr + self.block_decoder
.uncompress_block_unsorted(self.buffer.slice_from(addr));
self.cached_addr = addr;
self.cached_next_addr = next_addr;
next_addr
}
}
/// Fills a buffer with the next `output.len()` integers.
/// This does not consume / advance the stream.
pub fn read(&mut self, output: &mut [u32]) {
let mut cursor = self.addr;
let mut inner_offset = self.inner_offset;
let mut num_els: usize = output.len();
let mut start = 0;
loop {
cursor = self.read_block(cursor);
let block = &self.block_decoder.output_array()[inner_offset..];
let block_len = block.len();
if num_els >= block_len {
output[start..start + block_len].clone_from_slice(&block);
start += block_len;
num_els -= block_len;
inner_offset = 0;
} else {
output[start..].clone_from_slice(&block[..num_els]);
break;
}
}
}
/// Skip the next `skip_len` integer.
///
/// If a full block is skipped, calling
/// `.skip(...)` will avoid decompressing it.
///
/// May panic if the end of the stream is reached.
pub fn skip(&mut self, mut skip_len: usize) {
loop {
let available = COMPRESSION_BLOCK_SIZE - self.inner_offset;
if available >= skip_len {
self.inner_offset += skip_len;
break;
} else {
skip_len -= available;
// entirely skip decompressing some blocks.
let num_bits: u8 = self.buffer.get(self.addr);
let block_len = compressed_block_size(num_bits);
self.addr += block_len;
self.inner_offset = 0;
}
}
}
}
#[cfg(test)]
pub mod tests {
use super::CompressedIntStream;
use compression::compressed_block_size;
use compression::BlockEncoder;
use compression::COMPRESSION_BLOCK_SIZE;
use directory::ReadOnlySource;
fn create_stream_buffer() -> ReadOnlySource {
let mut buffer: Vec<u8> = vec![];
let mut encoder = BlockEncoder::new();
let vals: Vec<u32> = (0u32..1152u32).collect();
for chunk in vals.chunks(COMPRESSION_BLOCK_SIZE) {
let compressed_block = encoder.compress_block_unsorted(chunk);
let num_bits = compressed_block[0];
assert_eq!(compressed_block_size(num_bits), compressed_block.len());
buffer.extend_from_slice(compressed_block);
}
if cfg!(simd) {
buffer.extend_from_slice(&[0u8; 7]);
}
ReadOnlySource::from(buffer)
}
#[test]
fn test_compressed_int_stream() {
let buffer = create_stream_buffer();
let mut stream = CompressedIntStream::wrap(buffer);
let mut block: [u32; COMPRESSION_BLOCK_SIZE] = [0u32; COMPRESSION_BLOCK_SIZE];
stream.read(&mut block[0..2]);
assert_eq!(block[0], 0);
assert_eq!(block[1], 1);
// reading does not consume the stream
stream.read(&mut block[0..2]);
assert_eq!(block[0], 0);
assert_eq!(block[1], 1);
stream.skip(2);
stream.skip(5);
stream.read(&mut block[0..3]);
stream.skip(3);
assert_eq!(block[0], 7);
assert_eq!(block[1], 8);
assert_eq!(block[2], 9);
stream.skip(500);
stream.read(&mut block[0..3]);
stream.skip(3);
assert_eq!(block[0], 510);
assert_eq!(block[1], 511);
assert_eq!(block[2], 512);
stream.skip(511);
stream.read(&mut block[..1]);
assert_eq!(block[0], 1024);
}
}

View File

@@ -1,7 +1,5 @@
use common::BinarySerializable;
use compression::CompressedIntStream;
use directory::ReadOnlySource;
use postings::FreqReadingOption;
use postings::TermInfo;
use postings::{BlockSegmentPostings, SegmentPostings};
use schema::FieldType;
@@ -9,6 +7,7 @@ use schema::IndexRecordOption;
use schema::Term;
use termdict::TermDictionary;
use owned_read::OwnedRead;
use positions::PositionReader;
/// The inverted index reader is in charge of accessing
/// the inverted index associated to a specific field.
@@ -27,6 +26,7 @@ pub struct InvertedIndexReader {
termdict: TermDictionary,
postings_source: ReadOnlySource,
positions_source: ReadOnlySource,
positions_idx_source: ReadOnlySource,
record_option: IndexRecordOption,
total_num_tokens: u64,
}
@@ -36,6 +36,7 @@ impl InvertedIndexReader {
termdict: TermDictionary,
postings_source: ReadOnlySource,
positions_source: ReadOnlySource,
positions_idx_source: ReadOnlySource,
record_option: IndexRecordOption,
) -> InvertedIndexReader {
let total_num_tokens_data = postings_source.slice(0, 8);
@@ -45,6 +46,7 @@ impl InvertedIndexReader {
termdict,
postings_source: postings_source.slice_from(8),
positions_source,
positions_idx_source,
record_option,
total_num_tokens,
}
@@ -60,6 +62,7 @@ impl InvertedIndexReader {
termdict: TermDictionary::empty(field_type),
postings_source: ReadOnlySource::empty(),
positions_source: ReadOnlySource::empty(),
positions_idx_source: ReadOnlySource::empty(),
record_option,
total_num_tokens: 0u64,
}
@@ -94,7 +97,7 @@ impl InvertedIndexReader {
let end_source = self.postings_source.len();
let postings_slice = self.postings_source.slice(offset, end_source);
let postings_reader = OwnedRead::new(postings_slice);
block_postings.reset(term_info.doc_freq as usize, postings_reader);
block_postings.reset(term_info.doc_freq, postings_reader);
}
/// Returns a block postings given a `term_info`.
@@ -108,15 +111,11 @@ impl InvertedIndexReader {
) -> BlockSegmentPostings {
let offset = term_info.postings_offset as usize;
let postings_data = self.postings_source.slice_from(offset);
let freq_reading_option = match (self.record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};
BlockSegmentPostings::from_data(
term_info.doc_freq as usize,
term_info.doc_freq,
OwnedRead::new(postings_data),
freq_reading_option,
self.record_option,
requested_option,
)
}
@@ -132,11 +131,10 @@ impl InvertedIndexReader {
let block_postings = self.read_block_postings_from_terminfo(term_info, option);
let position_stream = {
if option.has_positions() {
let position_offset = term_info.positions_offset;
let positions_source = self.positions_source.slice_from(position_offset as usize);
let mut stream = CompressedIntStream::wrap(positions_source);
stream.skip(term_info.positions_inner_offset as usize);
Some(stream)
let position_reader = self.positions_source.clone();
let skip_reader = self.positions_idx_source.clone();
let position_reader = PositionReader::new(position_reader, skip_reader, term_info.positions_idx);
Some(position_reader)
} else {
None
}

View File

@@ -10,6 +10,8 @@ pub enum SegmentComponent {
POSTINGS,
/// Positions of terms in each document.
POSITIONS,
/// Index to seek within the position file
POSITIONSSKIP,
/// Column-oriented random-access storage of fields.
FASTFIELDS,
/// Stores the sum of the length (in terms) of each field for each document.
@@ -29,9 +31,10 @@ pub enum SegmentComponent {
impl SegmentComponent {
/// Iterates through the components.
pub fn iterator() -> slice::Iter<'static, SegmentComponent> {
static SEGMENT_COMPONENTS: [SegmentComponent; 7] = [
static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [
SegmentComponent::POSTINGS,
SegmentComponent::POSITIONS,
SegmentComponent::POSITIONSSKIP,
SegmentComponent::FASTFIELDS,
SegmentComponent::FIELDNORMS,
SegmentComponent::TERMS,

View File

@@ -110,8 +110,9 @@ impl SegmentMeta {
pub fn relative_path(&self, component: SegmentComponent) -> PathBuf {
let mut path = self.id().uuid_string();
path.push_str(&*match component {
SegmentComponent::POSITIONS => ".pos".to_string(),
SegmentComponent::POSTINGS => ".idx".to_string(),
SegmentComponent::POSITIONS => ".pos".to_string(),
SegmentComponent::POSITIONSSKIP => ".posidx".to_string(),
SegmentComponent::TERMS => ".term".to_string(),
SegmentComponent::STORE => ".store".to_string(),
SegmentComponent::FASTFIELDS => ".fast".to_string(),

View File

@@ -49,6 +49,7 @@ pub struct SegmentReader {
termdict_composite: CompositeFile,
postings_composite: CompositeFile,
positions_composite: CompositeFile,
positions_idx_composite: CompositeFile,
fast_fields_composite: CompositeFile,
fieldnorms_composite: CompositeFile,
@@ -235,6 +236,14 @@ impl SegmentReader {
}
};
let positions_idx_composite = {
if let Ok(source) = segment.open_read(SegmentComponent::POSITIONSSKIP) {
CompositeFile::open(&source)?
} else {
CompositeFile::empty()
}
};
let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
@@ -260,6 +269,7 @@ impl SegmentReader {
store_reader,
delete_bitset_opt,
positions_composite,
positions_idx_composite,
schema,
})
}
@@ -309,10 +319,15 @@ impl SegmentReader {
.open_read(field)
.expect("Index corrupted. Failed to open field positions in composite file.");
let positions_idx_source = self.positions_idx_composite
.open_read(field)
.expect("Index corrupted. Failed to open field positions in composite file.");
let inv_idx_reader = Arc::new(InvertedIndexReader::new(
TermDictionary::from_source(termdict_source),
postings_source,
positions_source,
positions_idx_source,
record_option,
));
@@ -447,7 +462,9 @@ mod test {
index.load_searchers().unwrap();
let searcher = index.searcher();
let docs: Vec<DocId> = searcher.segment_reader(0).doc_ids_alive().collect();
let docs: Vec<DocId> = searcher.segment_reader(0)
.doc_ids_alive()
.collect();
assert_eq!(vec![0u32, 2u32], docs);
}
}

View File

@@ -187,7 +187,6 @@ extern crate owned_read;
pub type Result<T> = std::result::Result<T, Error>;
mod common;
mod compression;
mod core;
mod indexer;
@@ -200,6 +199,7 @@ pub mod directory;
pub mod fastfield;
pub mod fieldnorm;
pub mod postings;
pub(crate) mod positions;
pub mod query;
pub mod schema;
pub mod store;

148
src/positions/mod.rs Normal file
View File

@@ -0,0 +1,148 @@
/// Positions are stored in three parts and over two files.
//
/// The `SegmentComponent::POSITIONS` file contains all of the bitpacked positions delta,
/// for all terms of a given field, one term after the other.
///
/// If the last block is incomplete, it is simply padded with zeros.
/// It cannot be read alone, as it actually does not contain the number of bits used for
/// each blocks.
/// .
/// Each block is serialized one after the other.
/// If the last block is incomplete, it is simply padded with zeros.
///
///
/// The `SegmentComponent::POSITIONSSKIP` file contains the number of bits used in each block in `u8`
/// stream.
///
/// This makes it possible to rapidly skip over `n positions`.
///
/// For every block #n where n = k * `LONG_SKIP_INTERVAL` blocks (k>=1), we also store
/// in this file the sum of number of bits used for all of the previous block (blocks `[0, n[`).
/// That is useful to start reading the positions for a given term: The TermInfo contains
/// an address in the positions stream, expressed in "number of positions".
/// The long skip structure makes it possible to skip rapidly to the a checkpoint close to this
/// value, and then skip normally.
///
mod reader;
mod serializer;
pub use self::reader::PositionReader;
pub use self::serializer::PositionSerializer;
use bitpacking::{BitPacker4x, BitPacker};
const COMPRESSION_BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
const LONG_SKIP_IN_BLOCKS: usize = 1_024;
const LONG_SKIP_INTERVAL: u64 = (LONG_SKIP_IN_BLOCKS * COMPRESSION_BLOCK_SIZE) as u64;
lazy_static! {
static ref BIT_PACKER: BitPacker4x = BitPacker4x::new();
}
#[cfg(test)]
pub mod tests {
use std::iter;
use super::{PositionSerializer, PositionReader};
use directory::ReadOnlySource;
use positions::COMPRESSION_BLOCK_SIZE;
fn create_stream_buffer(vals: &[u32]) -> (ReadOnlySource, ReadOnlySource) {
let mut skip_buffer = vec![];
let mut stream_buffer = vec![];
{
let mut serializer = PositionSerializer::new(&mut stream_buffer, &mut skip_buffer);
for (i, &val) in vals.iter().enumerate() {
assert_eq!(serializer.positions_idx(), i as u64);
serializer.write_all(&[val]).unwrap();
}
serializer.close().unwrap();
}
(ReadOnlySource::from(stream_buffer), ReadOnlySource::from(skip_buffer))
}
#[test]
fn test_position_read() {
let v: Vec<u32> = (0..1000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 12);
assert_eq!(stream.len(), 1168);
let mut position_reader = PositionReader::new(stream, skip, 0u64);
for &n in &[1, 10, 127, 128, 130, 312] {
let mut v = vec![0u32; n];
position_reader.read(&mut v[..n]);
for i in 0..n {
assert_eq!(v[i], i as u32);
}
}
}
#[test]
fn test_position_skip() {
let v: Vec<u32> = (0..1_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 12);
assert_eq!(stream.len(), 1168);
let mut position_reader = PositionReader::new(stream, skip, 0u64);
position_reader.skip(10);
for &n in &[10, 127, COMPRESSION_BLOCK_SIZE, 130, 312] {
let mut v = vec![0u32; n];
position_reader.read(&mut v[..n]);
for i in 0..n {
assert_eq!(v[i], 10u32 + i as u32);
}
}
}
#[test]
fn test_position_read_after_skip() {
let v: Vec<u32> = (0..1_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 12);
assert_eq!(stream.len(), 1168);
let mut position_reader = PositionReader::new(stream,skip, 0u64);
let mut buf = [0u32; 7];
let mut c = 0;
for _ in 0..100 {
position_reader.read(&mut buf);
position_reader.read(&mut buf);
position_reader.skip(4);
position_reader.skip(3);
for &el in &buf {
assert_eq!(c, el);
c += 1;
}
}
}
#[test]
fn test_position_long_skip_const() {
const CONST_VAL: u32 = 9u32;
let v: Vec<u32> = iter::repeat(CONST_VAL).take(2_000_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 15_749);
assert_eq!(stream.len(), 1_000_000);
let mut position_reader = PositionReader::new(stream,skip, 128 * 1024);
let mut buf = [0u32; 1];
position_reader.read(&mut buf);
assert_eq!(buf[0], CONST_VAL);
}
#[test]
fn test_position_long_skip_2() {
let v: Vec<u32> = (0..2_000_000).collect();
let (stream, skip) = create_stream_buffer(&v[..]);
assert_eq!(skip.len(), 15_749);
assert_eq!(stream.len(), 4_987_872);
for &offset in &[10, 128 * 1024, 128 * 1024 - 1, 128 * 1024 + 7, 128 * 10 * 1024 + 10] {
let mut position_reader = PositionReader::new(stream.clone(),skip.clone(), offset);
let mut buf = [0u32; 1];
position_reader.read(&mut buf);
assert_eq!(buf[0], offset as u32);
}
}
}

146
src/positions/reader.rs Normal file
View File

@@ -0,0 +1,146 @@
use bitpacking::{BitPacker4x, BitPacker};
use owned_read::OwnedRead;
use common::{BinarySerializable, FixedSize};
use postings::compression::compressed_block_size;
use directory::ReadOnlySource;
use positions::COMPRESSION_BLOCK_SIZE;
use positions::LONG_SKIP_IN_BLOCKS;
use positions::LONG_SKIP_INTERVAL;
use super::BIT_PACKER;
pub struct PositionReader {
skip_read: OwnedRead,
position_read: OwnedRead,
inner_offset: usize,
buffer: Box<[u32; 128]>,
ahead: Option<usize>, // if None, no block is loaded.
// if Some(num_blocks), the block currently loaded is num_blocks ahead
// of the block of the next int to read.
}
// `ahead` represents the offset of the block currently loaded
// compared to the cursor of the actual stream.
//
// By contract, when this function is called, the current block has to be
// decompressed.
//
// If the requested number of els ends exactly at a given block, the next
// block is not decompressed.
fn read_impl(
mut position: &[u8],
buffer: &mut [u32; 128],
mut inner_offset: usize,
num_bits: &[u8],
output: &mut [u32]) -> usize {
let mut output_start = 0;
let mut output_len = output.len();
let mut ahead = 0;
loop {
let available_len = 128 - inner_offset;
if output_len <= available_len {
output[output_start..].copy_from_slice(&buffer[inner_offset..][..output_len]);
return ahead;
} else {
output[output_start..][..available_len].copy_from_slice(&buffer[inner_offset..]);
output_len -= available_len;
output_start += available_len;
inner_offset = 0;
let num_bits = num_bits[ahead];
BitPacker4x::new()
.decompress(position, &mut buffer[..], num_bits);
let block_len = compressed_block_size(num_bits);
position = &position[block_len..];
ahead += 1;
}
}
}
impl PositionReader {
pub fn new(position_source: ReadOnlySource,
skip_source: ReadOnlySource,
offset: u64) -> PositionReader {
let skip_len = skip_source.len();
let (body, footer) = skip_source.split(skip_len - u32::SIZE_IN_BYTES);
let num_long_skips = u32::deserialize(&mut footer.as_slice()).expect("Index corrupted");
let body_split = body.len() - u64::SIZE_IN_BYTES * (num_long_skips as usize);
let (skip_body, long_skips) = body.split(body_split);
let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize;
let small_skip = (offset - (long_skip_id as u64) * (LONG_SKIP_INTERVAL as u64)) as usize;
let offset_num_bytes: u64 = {
if long_skip_id > 0 {
let mut long_skip_blocks: &[u8] = &long_skips.as_slice()[(long_skip_id - 1) * 8..][..8];
u64::deserialize(&mut long_skip_blocks).expect("Index corrupted") * 16
} else {
0
}
};
let mut position_read = OwnedRead::new(position_source);
position_read.advance(offset_num_bytes as usize);
let mut skip_read = OwnedRead::new(skip_body);
skip_read.advance(long_skip_id * LONG_SKIP_IN_BLOCKS);
let mut position_reader = PositionReader {
skip_read,
position_read,
inner_offset: 0,
buffer: Box::new([0u32; 128]),
ahead: None
};
position_reader.skip(small_skip);
position_reader
}
/// Fills a buffer with the next `output.len()` integers.
/// This does not consume / advance the stream.
pub fn read(&mut self, output: &mut [u32]) {
let skip_data = self.skip_read.as_ref();
let position_data = self.position_read.as_ref();
let num_bits = self.skip_read.get(0);
if self.ahead != Some(0) {
// the block currently available is not the block
// for the current position
BIT_PACKER.decompress(position_data, self.buffer.as_mut(), num_bits);
}
let block_len = compressed_block_size(num_bits);
self.ahead = Some(read_impl(
&position_data[block_len..],
self.buffer.as_mut(),
self.inner_offset,
&skip_data[1..],
output));
}
/// Skip the next `skip_len` integer.
///
/// If a full block is skipped, calling
/// `.skip(...)` will avoid decompressing it.
///
/// May panic if the end of the stream is reached.
pub fn skip(&mut self, skip_len: usize) {
let skip_len_plus_inner_offset = skip_len + self.inner_offset;
let num_blocks_to_advance = skip_len_plus_inner_offset / COMPRESSION_BLOCK_SIZE;
self.inner_offset = skip_len_plus_inner_offset % COMPRESSION_BLOCK_SIZE;
self.ahead = self.ahead
.and_then(|num_blocks| {
if num_blocks >= num_blocks_to_advance {
Some(num_blocks_to_advance - num_blocks_to_advance)
} else {
None
}
});
let skip_len = self.skip_read
.as_ref()[..num_blocks_to_advance]
.iter()
.cloned()
.map(|num_bit| num_bit as usize)
.sum::<usize>() * (COMPRESSION_BLOCK_SIZE / 8);
self.skip_read.advance(num_blocks_to_advance);
self.position_read.advance(skip_len);
}
}

View File

@@ -0,0 +1,79 @@
use std::io;
use bitpacking::BitPacker;
use positions::{COMPRESSION_BLOCK_SIZE, LONG_SKIP_INTERVAL};
use common::BinarySerializable;
use super::BIT_PACKER;
pub struct PositionSerializer<W: io::Write> {
write_stream: W,
write_skiplist: W,
block: Vec<u32>,
buffer: Vec<u8>,
num_ints: u64,
long_skips: Vec<u64>,
cumulated_num_bits: u64,
}
impl<W: io::Write> PositionSerializer<W> {
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> {
PositionSerializer {
write_stream,
write_skiplist,
block: Vec::with_capacity(128),
buffer: vec![0u8; 128 * 4],
num_ints: 0u64,
long_skips: Vec::new(),
cumulated_num_bits: 0u64
}
}
pub fn positions_idx(&self) -> u64 {
self.num_ints
}
fn remaining_block_len(&self) -> usize {
COMPRESSION_BLOCK_SIZE - self.block.len()
}
pub fn write_all(&mut self, mut vals: &[u32]) -> io::Result<()> {
while !vals.is_empty() {
let remaining_block_len = self.remaining_block_len();
let num_to_write = remaining_block_len.min(vals.len());
self.block.extend(&vals[..num_to_write]);
self.num_ints += num_to_write as u64;
vals = &vals[num_to_write..];
if self.remaining_block_len() == 0 {
self.flush_block()?;
}
}
Ok(())
}
fn flush_block(&mut self) -> io::Result<()> {
let num_bits = BIT_PACKER.num_bits(&self.block[..]);
self.cumulated_num_bits += num_bits as u64;
self.write_skiplist.write(&[num_bits])?;
let written_len = BIT_PACKER.compress(&self.block[..], &mut self.buffer, num_bits);
self.write_stream.write_all(&self.buffer[..written_len])?;
self.block.clear();
if (self.num_ints % LONG_SKIP_INTERVAL) == 0u64 {
self.long_skips.push(self.cumulated_num_bits);
}
Ok(())
}
pub fn close(mut self) -> io::Result<()> {
if !self.block.is_empty() {
self.block.resize(COMPRESSION_BLOCK_SIZE, 0u32);
self.flush_block()?;
}
for &long_skip in &self.long_skips {
long_skip.serialize(&mut self.write_skiplist)?;
}
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?;
self.write_skiplist.flush()?;
self.write_stream.flush()?;
Ok(())
}
}

View File

@@ -1,18 +1,14 @@
#![allow(dead_code)]
mod stream;
pub const COMPRESSION_BLOCK_SIZE: usize = 128;
const COMPRESSED_BLOCK_MAX_SIZE: usize = COMPRESSION_BLOCK_SIZE * 4 + 1;
pub use self::stream::CompressedIntStream;
use bitpacking::{BitPacker, BitPacker4x};
use common::FixedSize;
pub const COMPRESSION_BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
const COMPRESSED_BLOCK_MAX_SIZE: usize = COMPRESSION_BLOCK_SIZE * u32::SIZE_IN_BYTES;
mod vint;
/// Returns the size in bytes of a compressed block, given `num_bits`.
pub fn compressed_block_size(num_bits: u8) -> usize {
1 + (num_bits as usize) * COMPRESSION_BLOCK_SIZE / 8
(num_bits as usize) * COMPRESSION_BLOCK_SIZE / 8
}
pub struct BlockEncoder {
@@ -30,21 +26,18 @@ impl BlockEncoder {
}
}
pub fn compress_block_sorted(&mut self, block: &[u32], offset: u32) -> &[u8] {
pub fn compress_block_sorted(&mut self, block: &[u32], offset: u32) -> (u8, &[u8]) {
let num_bits = self.bitpacker.num_bits_sorted(offset, block);
self.output[0] = num_bits;
let written_size =
1 + self.bitpacker
.compress_sorted(offset, block, &mut self.output[1..], num_bits);
&self.output[..written_size]
let written_size = self.bitpacker
.compress_sorted(offset, block, &mut self.output[..], num_bits);
(num_bits, &self.output[..written_size])
}
pub fn compress_block_unsorted(&mut self, block: &[u32]) -> &[u8] {
pub fn compress_block_unsorted(&mut self, block: &[u32]) -> (u8, &[u8]) {
let num_bits = self.bitpacker.num_bits(block);
self.output[0] = num_bits;
let written_size = 1 + self.bitpacker
.compress(block, &mut self.output[1..], num_bits);
&self.output[..written_size]
let written_size = self.bitpacker
.compress(block, &mut self.output[..], num_bits);
(num_bits, &self.output[..written_size])
}
}
@@ -69,22 +62,19 @@ impl BlockDecoder {
}
}
pub fn uncompress_block_sorted(&mut self, compressed_data: &[u8], offset: u32) -> usize {
let num_bits = compressed_data[0];
pub fn uncompress_block_sorted(&mut self, compressed_data: &[u8], offset: u32, num_bits: u8) -> usize {
self.output_len = COMPRESSION_BLOCK_SIZE;
1 + self.bitpacker.decompress_sorted(
self.bitpacker.decompress_sorted(
offset,
&compressed_data[1..],
&compressed_data,
&mut self.output,
num_bits,
)
}
pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize {
let num_bits = compressed_data[0];
pub fn uncompress_block_unsorted(&mut self, compressed_data: &[u8], num_bits: u8) -> usize {
self.output_len = COMPRESSION_BLOCK_SIZE;
1 + self.bitpacker
.decompress(&compressed_data[1..], &mut self.output, num_bits)
self.bitpacker.decompress(&compressed_data, &mut self.output, num_bits)
}
#[inline]
@@ -98,11 +88,10 @@ impl BlockDecoder {
}
}
mod vint;
pub trait VIntEncoder {
/// Compresses an array of `u32` integers,
/// using [delta-encoding](https://en.wikipedia.org/wiki/Delta_encoding)
/// using [delta-encoding](https://en.wikipedia.org/wiki/Delta_ encoding)
/// and variable bytes encoding.
///
/// The method takes an array of ints to compress, and returns
@@ -185,10 +174,10 @@ pub mod tests {
fn test_encode_sorted_block() {
let vals: Vec<u32> = (0u32..128u32).map(|i| i * 7).collect();
let mut encoder = BlockEncoder::new();
let compressed_data = encoder.compress_block_sorted(&vals, 0);
let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 0);
let mut decoder = BlockDecoder::new();
{
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 0);
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 0, num_bits);
assert_eq!(consumed_num_bytes, compressed_data.len());
}
for i in 0..128 {
@@ -200,10 +189,10 @@ pub mod tests {
fn test_encode_sorted_block_with_offset() {
let vals: Vec<u32> = (0u32..128u32).map(|i| 11 + i * 7).collect();
let mut encoder = BlockEncoder::new();
let compressed_data = encoder.compress_block_sorted(&vals, 10);
let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 10);
let mut decoder = BlockDecoder::new();
{
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 10);
let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 10, num_bits);
assert_eq!(consumed_num_bytes, compressed_data.len());
}
for i in 0..128 {
@@ -217,12 +206,12 @@ pub mod tests {
let n = 128;
let vals: Vec<u32> = (0..n).map(|i| 11u32 + (i as u32) * 7u32).collect();
let mut encoder = BlockEncoder::new();
let compressed_data = encoder.compress_block_sorted(&vals, 10);
let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 10);
compressed.extend_from_slice(compressed_data);
compressed.push(173u8);
let mut decoder = BlockDecoder::new();
{
let consumed_num_bytes = decoder.uncompress_block_sorted(&compressed, 10);
let consumed_num_bytes = decoder.uncompress_block_sorted(&compressed, 10, num_bits);
assert_eq!(consumed_num_bytes, compressed.len() - 1);
assert_eq!(compressed[consumed_num_bytes], 173u8);
}
@@ -237,12 +226,12 @@ pub mod tests {
let n = 128;
let vals: Vec<u32> = (0..n).map(|i| 11u32 + (i as u32) * 7u32 % 12).collect();
let mut encoder = BlockEncoder::new();
let compressed_data = encoder.compress_block_unsorted(&vals);
let (num_bits, compressed_data) = encoder.compress_block_unsorted(&vals);
compressed.extend_from_slice(compressed_data);
compressed.push(173u8);
let mut decoder = BlockDecoder::new();
{
let consumed_num_bytes = decoder.uncompress_block_unsorted(&compressed);
let consumed_num_bytes = decoder.uncompress_block_unsorted(&compressed, num_bits);
assert_eq!(consumed_num_bytes + 1, compressed.len());
assert_eq!(compressed[consumed_num_bytes], 173u8);
}
@@ -305,7 +294,7 @@ mod bench {
fn bench_uncompress(b: &mut Bencher) {
let mut encoder = BlockEncoder::new();
let data = generate_array(COMPRESSION_BLOCK_SIZE, 0.1);
let compressed = encoder.compress_block_sorted(&data, 0u32);
let (_, compressed) = encoder.compress_block_sorted(&data, 0u32);
let mut decoder = BlockDecoder::new();
b.iter(|| {
decoder.uncompress_block_sorted(compressed, 0u32);

View File

@@ -1,5 +1,5 @@
#[inline(always)]
pub(crate) fn compress_sorted<'a>(
pub fn compress_sorted<'a>(
input: &[u32],
output: &'a mut [u8],
mut offset: u32,
@@ -46,7 +46,7 @@ pub(crate) fn compress_unsorted<'a>(input: &[u32], output: &'a mut [u8]) -> &'a
}
#[inline(always)]
pub(crate) fn uncompress_sorted<'a>(
pub fn uncompress_sorted<'a>(
compressed_data: &'a [u8],
output: &mut [u32],
offset: u32,

View File

@@ -11,14 +11,18 @@ mod postings_writer;
mod recorder;
mod segment_postings;
mod serializer;
pub(crate) mod compression;
mod stacker;
mod term_info;
mod skip;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub use self::postings::Postings;
pub use self::term_info::TermInfo;
pub(crate) use self::skip::SkipReader;
use self::compression::{COMPRESSION_BLOCK_SIZE};
pub use self::segment_postings::{BlockSegmentPostings, SegmentPostings};
@@ -26,9 +30,12 @@ pub(crate) use self::stacker::compute_table_size;
pub use common::HasLen;
pub(crate) const USE_SKIP_INFO_LIMIT: u32 = COMPRESSION_BLOCK_SIZE as u32;
pub(crate) type UnorderedTermId = u64;
#[allow(enum_variant_names)]
#[derive(Debug, PartialEq, Clone, Copy, Eq)]
pub(crate) enum FreqReadingOption {
NoFreq,
SkipFreq,
@@ -64,7 +71,8 @@ pub mod tests {
let mut segment = index.new_segment();
let mut posting_serializer = InvertedIndexSerializer::open(&mut segment).unwrap();
{
let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4).unwrap();
let mut field_serializer = posting_serializer
.new_field(text_field, 120 * 4).unwrap();
field_serializer.new_term("abc".as_bytes()).unwrap();
for doc_id in 0u32..120u32 {
let delta_positions = vec![1, 2, 3, 2];
@@ -327,7 +335,6 @@ pub mod tests {
assert!(index_writer.commit().is_ok());
}
index.load_searchers().unwrap();
index
};
let searcher = index.searcher();

View File

@@ -1,15 +1,22 @@
use compression::{BlockDecoder, CompressedIntStream, VIntDecoder, COMPRESSION_BLOCK_SIZE};
use postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
use DocId;
use common::BitSet;
use common::HasLen;
use compression::compressed_block_size;
use postings::compression::compressed_block_size;
use docset::{DocSet, SkipResult};
use fst::Streamer;
use postings::serializer::PostingsSerializer;
use postings::FreqReadingOption;
use postings::Postings;
use owned_read::OwnedRead;
use common::{VInt, BinarySerializable};
use postings::USE_SKIP_INFO_LIMIT;
use postings::SkipReader;
use schema::IndexRecordOption;
use positions::PositionReader;
use std::cmp::Ordering;
const EMPTY_ARR: [u8; 0] = [];
struct PositionComputer {
// store the amount of position int
@@ -18,14 +25,14 @@ struct PositionComputer {
// if none, position are already loaded in
// the positions vec.
position_to_skip: usize,
positions_stream: CompressedIntStream,
position_reader: PositionReader,
}
impl PositionComputer {
pub fn new(positions_stream: CompressedIntStream) -> PositionComputer {
pub fn new(position_reader: PositionReader) -> PositionComputer {
PositionComputer {
position_to_skip: 0,
positions_stream,
position_reader,
}
}
@@ -35,9 +42,9 @@ impl PositionComputer {
// Positions can only be read once.
pub fn positions_with_offset(&mut self, offset: u32, output: &mut [u32]) {
self.positions_stream.skip(self.position_to_skip);
self.position_reader.skip(self.position_to_skip);
self.position_to_skip = 0;
self.positions_stream.read(output);
self.position_reader.read(output);
let mut cum = offset;
for output_mut in output.iter_mut() {
cum += *output_mut;
@@ -79,18 +86,19 @@ impl SegmentPostings {
pub fn create_from_docs(docs: &[u32]) -> SegmentPostings {
let mut buffer = Vec::new();
{
let mut postings_serializer = PostingsSerializer::new(&mut buffer, false);
let mut postings_serializer = PostingsSerializer::new(&mut buffer, false, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32).unwrap();
postings_serializer.write_doc(doc, 1u32);
}
postings_serializer
.close_term()
.close_term(docs.len() as u32)
.expect("In memory Serialization should never fail.");
}
let block_segment_postings = BlockSegmentPostings::from_data(
docs.len(),
docs.len() as u32,
OwnedRead::new(buffer),
FreqReadingOption::NoFreq,
IndexRecordOption::Basic,
IndexRecordOption::Basic
);
SegmentPostings::from_block_postings(block_segment_postings, None)
}
@@ -103,9 +111,9 @@ impl SegmentPostings {
/// * `data` - data array. The complete data is not necessarily used.
/// * `freq_handler` - the freq handler is in charge of decoding
/// frequencies and/or positions
pub fn from_block_postings(
pub(crate) fn from_block_postings(
segment_block_postings: BlockSegmentPostings,
positions_stream_opt: Option<CompressedIntStream>,
positions_stream_opt: Option<PositionReader>,
) -> SegmentPostings {
SegmentPostings {
block_cursor: segment_block_postings,
@@ -115,7 +123,8 @@ impl SegmentPostings {
}
}
fn exponential_search(target: u32, mut start: usize, arr: &[u32]) -> (usize, usize) {
fn exponential_search(target: u32, arr: &[u32]) -> (usize, usize) {
let mut start = 0;
let end = arr.len();
debug_assert!(target >= arr[start]);
debug_assert!(target <= arr[end - 1]);
@@ -133,77 +142,92 @@ fn exponential_search(target: u32, mut start: usize, arr: &[u32]) -> (usize, usi
}
}
/// Search the first index containing an element greater or equal to the target.
///
/// # Assumption
///
/// The array is assumed non empty.
/// The target is assumed greater or equal to the first element.
/// The target is assumed smaller or equal to the last element.
fn search_within_block(block_docs: &[u32], target: u32) -> usize {
let (start, end) = exponential_search(target, block_docs);
start.wrapping_add(block_docs[start..end].binary_search(&target).unwrap_or_else(|e| e))
}
impl DocSet for SegmentPostings {
fn skip_next(&mut self, target: DocId) -> SkipResult {
if !self.advance() {
return SkipResult::End;
}
if self.doc() == target {
return SkipResult::Reached;
match self.doc().cmp(&target) {
Ordering::Equal => {
return SkipResult::Reached;
}
Ordering::Greater => {
return SkipResult::OverStep;
}
_ => {
// ...
}
}
// in the following, thanks to the call to advance above,
// In the following, thanks to the call to advance above,
// we know that the position is not loaded and we need
// to skip every doc_freq we cross.
// skip blocks until one that might contain the target
loop {
// check if we need to go to the next block
let (current_doc, last_doc_in_block) = {
let block_docs = self.block_cursor.docs();
(block_docs[self.cur], block_docs[block_docs.len() - 1])
};
if target > last_doc_in_block {
// we add skip for the current term independantly,
// so that position_add_skip will decide if it should
// just set itself to Some(0) or effectively
// add the term freq.
if self.position_computer.is_some() {
let freqs_skipped = &self.block_cursor.freqs()[self.cur..];
let sum_freq: u32 = freqs_skipped.iter().sum();
self.position_computer
.as_mut()
.unwrap()
.add_skip(sum_freq as usize);
// check if we need to go to the next block
let need_positions = self.position_computer.is_some();
let mut sum_freqs_skipped: u32 = 0;
if !self.block_cursor
.docs()
.last()
.map(|doc| *doc >= target)
.unwrap_or(false) // there should always be at least a document in the block
// since advance returned.
{
// we are not in the right block.
//
// First compute all of the freqs skipped from the current block.
if need_positions {
sum_freqs_skipped = self.block_cursor
.freqs()[self.cur..]
.iter()
.sum();
match self.block_cursor.skip_to(target) {
BlockSegmentPostingsSkipResult::Success(block_skip_freqs) => {
sum_freqs_skipped += block_skip_freqs;
}
BlockSegmentPostingsSkipResult::Terminated => {
return SkipResult::End;
}
}
if !self.block_cursor.advance() {
} else {
// no positions needed. no need to sum freqs.
if self.block_cursor.skip_to(target) == BlockSegmentPostingsSkipResult::Terminated {
return SkipResult::End;
}
self.cur = 0;
} else {
if target < current_doc {
// We've passed the target after the first `advance` call
// or we're at the beginning of a block.
// Either way, we're on the first `DocId` greater than `target`
return SkipResult::OverStep;
}
break;
}
self.cur = 0;
}
// we're in the right block now, start with an exponential search
let block_docs = self.block_cursor.docs();
let (mut start, end) = exponential_search(target, self.cur, block_docs);
start += block_docs[start..end]
.binary_search(&target)
.unwrap_or_else(|e| e);
// `doc` is now the first element >= `target`
let doc = block_docs[start];
debug_assert!(doc >= target);
if self.position_computer.is_some() {
let freqs_skipped = &self.block_cursor.freqs()[self.cur..start];
let sum_freqs: u32 = freqs_skipped.iter().sum();
debug_assert!(target >= self.doc());
let new_cur = self.cur.wrapping_add(search_within_block(&block_docs[self.cur..], target));
if need_positions {
sum_freqs_skipped += self.block_cursor.freqs()[self.cur..new_cur].iter().sum::<u32>();
self.position_computer
.as_mut()
.unwrap()
.add_skip(sum_freqs as usize);
.add_skip(sum_freqs_skipped as usize);
}
self.cur = new_cur;
self.cur = start;
// `doc` is now the first element >= `target`
let doc = block_docs[new_cur];
debug_assert!(doc >= target);
if doc == target {
return SkipResult::Reached;
} else {
@@ -299,28 +323,61 @@ pub struct BlockSegmentPostings {
doc_freq: usize,
doc_offset: DocId,
num_bitpacked_blocks: usize,
num_vint_docs: usize,
remaining_data: OwnedRead,
skip_reader: SkipReader,
}
fn split_into_skips_and_postings(doc_freq: u32, mut data: OwnedRead) -> (Option<OwnedRead>, OwnedRead) {
if doc_freq >= USE_SKIP_INFO_LIMIT {
let skip_len = VInt::deserialize(&mut data).expect("Data corrupted").0 as usize;
let mut postings_data = data.clone();
postings_data.advance(skip_len);
data.clip(skip_len);
(Some(data), postings_data)
} else {
(None, data)
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum BlockSegmentPostingsSkipResult {
Terminated,
Success(u32) //< number of term freqs to skip
}
impl BlockSegmentPostings {
pub(crate) fn from_data(
doc_freq: usize,
doc_freq: u32,
data: OwnedRead,
freq_reading_option: FreqReadingOption,
record_option: IndexRecordOption,
requested_option: IndexRecordOption
) -> BlockSegmentPostings {
let num_bitpacked_blocks: usize = (doc_freq as usize) / COMPRESSION_BLOCK_SIZE;
let num_vint_docs = (doc_freq as usize) - COMPRESSION_BLOCK_SIZE * num_bitpacked_blocks;
let freq_reading_option = match (record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data);
let skip_reader =
match skip_data_opt {
Some(skip_data) => SkipReader::new(skip_data, record_option),
None => SkipReader::new(OwnedRead::new(&EMPTY_ARR[..]), record_option)
};
let doc_freq = doc_freq as usize;
let num_vint_docs = doc_freq % COMPRESSION_BLOCK_SIZE;
BlockSegmentPostings {
num_bitpacked_blocks,
num_vint_docs,
doc_decoder: BlockDecoder::new(),
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option,
remaining_data: data,
doc_offset: 0,
doc_freq,
remaining_data: postings_data,
skip_reader,
}
}
@@ -334,14 +391,18 @@ impl BlockSegmentPostings {
// # Warning
//
// This does not reset the positions list.
pub(crate) fn reset(&mut self, doc_freq: usize, postings_data: OwnedRead) {
let num_binpacked_blocks: usize = doc_freq / COMPRESSION_BLOCK_SIZE;
let num_vint_docs = doc_freq & (COMPRESSION_BLOCK_SIZE - 1);
self.num_bitpacked_blocks = num_binpacked_blocks;
pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedRead) {
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data);
let num_vint_docs = (doc_freq as usize) & (COMPRESSION_BLOCK_SIZE - 1);
self.num_vint_docs = num_vint_docs;
self.remaining_data = postings_data;
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data);
} else {
self.skip_reader.reset(OwnedRead::new(&EMPTY_ARR[..]))
}
self.doc_offset = 0;
self.doc_freq = doc_freq;
self.doc_freq = doc_freq as usize;
}
/// Returns the document frequency associated to this block postings.
@@ -389,29 +450,116 @@ impl BlockSegmentPostings {
self.doc_decoder.output_len
}
/// position on a block that may contains `doc_id`.
/// Always advance the current block.
///
/// Returns true if a block that has an element greater or equal to the target is found.
/// Returning true does not guarantee that the smallest element of the block is smaller
/// than the target. It only guarantees that the last element is greater or equal.
///
/// Returns false iff all of the document remaining are smaller than
/// `doc_id`. In that case, all of these document are consumed.
///
pub fn skip_to(&mut self,
target_doc: DocId) -> BlockSegmentPostingsSkipResult {
let mut skip_freqs = 0u32;
while self.skip_reader.advance() {
if self.skip_reader.doc() >= target_doc {
// the last document of the current block is larger
// than the target.
//
// We found our block!
let num_bits = self.skip_reader.doc_num_bits();
let num_consumed_bytes = self.doc_decoder
.uncompress_block_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
num_bits);
self.remaining_data.advance(num_consumed_bytes);
let tf_num_bits = self.skip_reader.tf_num_bits();
match self.freq_reading_option {
FreqReadingOption::NoFreq => {}
FreqReadingOption::SkipFreq => {
let num_bytes_to_skip = compressed_block_size(tf_num_bits);
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref(),
tf_num_bits);
self.remaining_data.advance(num_consumed_bytes);
}
}
self.doc_offset = self.skip_reader.doc();
return BlockSegmentPostingsSkipResult::Success(skip_freqs);
} else {
skip_freqs += self.skip_reader.tf_sum();
let advance_len = self.skip_reader.total_block_len();
self.doc_offset = self.skip_reader.doc();
self.remaining_data.advance(advance_len);
}
}
// we are now on the last, incomplete, variable encoded block.
if self.num_vint_docs > 0 {
let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
self.num_vint_docs,
);
self.remaining_data.advance(num_compressed_bytes);
match self.freq_reading_option {
FreqReadingOption::NoFreq | FreqReadingOption::SkipFreq => {}
FreqReadingOption::ReadFreq => {
self.freq_decoder
.uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs);
}
}
self.num_vint_docs = 0;
return self.docs()
.last()
.map(|last_doc| {
if *last_doc >= target_doc {
BlockSegmentPostingsSkipResult::Success(skip_freqs)
} else {
BlockSegmentPostingsSkipResult::Terminated
}
})
.unwrap_or(BlockSegmentPostingsSkipResult::Terminated);
}
BlockSegmentPostingsSkipResult::Terminated
}
/// Advance to the next block.
///
/// Returns false iff there was no remaining blocks.
pub fn advance(&mut self) -> bool {
if self.num_bitpacked_blocks > 0 {
if self.skip_reader.advance() {
let num_bits = self.skip_reader.doc_num_bits();
let num_consumed_bytes = self.doc_decoder
.uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset);
.uncompress_block_sorted(
self.remaining_data.as_ref(),
self.doc_offset,
num_bits);
self.remaining_data.advance(num_consumed_bytes);
let tf_num_bits = self.skip_reader.tf_num_bits();
match self.freq_reading_option {
FreqReadingOption::NoFreq => {}
FreqReadingOption::SkipFreq => {
let num_bytes_to_skip = compressed_block_size(self.remaining_data.as_ref()[0]);
let num_bytes_to_skip = compressed_block_size(tf_num_bits);
self.remaining_data.advance(num_bytes_to_skip);
}
FreqReadingOption::ReadFreq => {
let num_consumed_bytes = self.freq_decoder
.uncompress_block_unsorted(self.remaining_data.as_ref());
.uncompress_block_unsorted(self.remaining_data.as_ref(),
tf_num_bits);
self.remaining_data.advance(num_consumed_bytes);
}
}
// it will be used as the next offset.
self.doc_offset = self.doc_decoder.output(COMPRESSION_BLOCK_SIZE - 1);
self.num_bitpacked_blocks -= 1;
true
} else if self.num_vint_docs > 0 {
let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted(
@@ -437,16 +585,18 @@ impl BlockSegmentPostings {
/// Returns an empty segment postings object
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
num_bitpacked_blocks: 0,
num_vint_docs: 0,
doc_decoder: BlockDecoder::new(),
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
remaining_data: OwnedRead::new(vec![]),
doc_offset: 0,
doc_freq: 0,
remaining_data: OwnedRead::new(vec![]),
skip_reader: SkipReader::new(OwnedRead::new(vec![]), IndexRecordOption::Basic),
}
}
}
@@ -476,6 +626,9 @@ mod tests {
use schema::SchemaBuilder;
use schema::Term;
use schema::INT_INDEXED;
use super::BlockSegmentPostingsSkipResult;
use DocId;
use super::search_within_block;
#[test]
fn test_empty_segment_postings() {
@@ -492,26 +645,46 @@ mod tests {
assert_eq!(postings.doc_freq(), 0);
}
fn search_within_block_trivial_but_slow(block: &[u32], target: u32) -> usize {
block
.iter()
.cloned()
.enumerate()
.filter(|&(_, ref val)| *val >= target)
.next()
.unwrap().0
}
fn util_test_search_within_block(block: &[u32], target: u32) {
assert_eq!(search_within_block(block, target), search_within_block_trivial_but_slow(block, target));
}
fn util_test_search_within_block_all(block: &[u32]) {
use std::collections::HashSet;
let mut targets = HashSet::new();
for (i, val) in block.iter().cloned().enumerate() {
if i > 0 {
targets.insert(val - 1);
}
targets.insert(val);
}
for target in targets {
util_test_search_within_block(block, target);
}
}
#[test]
fn test_search_within_block() {
for len in 1u32..128u32 {
let v: Vec<u32> = (0..len).map(|i| i*2).collect();
util_test_search_within_block_all(&v[..]);
}
}
#[test]
fn test_block_segment_postings() {
let mut schema_builder = SchemaBuilder::default();
let int_field = schema_builder.add_u64_field("id", INT_INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
for _ in 0..100_000 {
let doc = doc!(int_field=>0u64);
index_writer.add_document(doc);
}
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let searcher = index.searcher();
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field);
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap();
let mut block_segments =
inverted_index.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic);
let mut block_segments = build_block_postings((0..100_000).collect::<Vec<u32>>());
let mut offset: u32 = 0u32;
// checking that the block before calling advance is empty
assert!(block_segments.docs().is_empty());
@@ -525,6 +698,59 @@ mod tests {
}
}
fn build_block_postings(docs: Vec<DocId>) -> BlockSegmentPostings {
let mut schema_builder = SchemaBuilder::default();
let int_field = schema_builder.add_u64_field("id", INT_INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
let mut last_doc = 0u32;
for doc in docs {
for _ in last_doc..doc {
index_writer.add_document(doc!(int_field=>1u64));
}
index_writer.add_document(doc!(int_field=>0u64));
last_doc = doc + 1;
}
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let searcher = index.searcher();
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field);
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
}
#[test]
fn test_block_segment_postings_skip() {
for i in 0..4 {
let mut block_postings = build_block_postings(vec![3]);
assert_eq!(block_postings.skip_to(i), BlockSegmentPostingsSkipResult::Success(0u32));
assert_eq!(block_postings.skip_to(i), BlockSegmentPostingsSkipResult::Terminated);
}
let mut block_postings = build_block_postings(vec![3]);
assert_eq!(block_postings.skip_to(4u32), BlockSegmentPostingsSkipResult::Terminated);
}
#[test]
fn test_block_segment_postings_skip2() {
let mut docs = vec![0];
for i in 0..1300 {
docs.push((i * i / 100) + i);
}
let mut block_postings = build_block_postings(docs.clone());
for i in vec![0, 424, 10000] {
assert_eq!(block_postings.skip_to(i), BlockSegmentPostingsSkipResult::Success(0u32));
let docs = block_postings.docs();
assert!(docs[0] <= i);
assert!(docs.last().cloned().unwrap_or(0u32) >= i);
}
assert_eq!(block_postings.skip_to(100_000), BlockSegmentPostingsSkipResult::Terminated);
assert_eq!(block_postings.skip_to(101_000), BlockSegmentPostingsSkipResult::Terminated);
}
#[test]
fn test_reset_block_segment_postings() {
let mut schema_builder = SchemaBuilder::default();

View File

@@ -1,8 +1,7 @@
use super::TermInfo;
use common::BinarySerializable;
use common::{VInt, BinarySerializable};
use common::{CompositeWrite, CountingWriter};
use compression::VIntEncoder;
use compression::{BlockEncoder, COMPRESSION_BLOCK_SIZE};
use postings::compression::{VIntEncoder, BlockEncoder, COMPRESSION_BLOCK_SIZE};
use core::Segment;
use directory::WritePtr;
use schema::Schema;
@@ -11,6 +10,9 @@ use std::io::{self, Write};
use termdict::{TermDictionaryBuilder, TermOrdinal};
use DocId;
use Result;
use postings::USE_SKIP_INFO_LIMIT;
use postings::skip::SkipSerializer;
use positions::PositionSerializer;
/// `PostingsSerializer` is in charge of serializing
/// postings on disk, in the
@@ -47,6 +49,7 @@ pub struct InvertedIndexSerializer {
terms_write: CompositeWrite<WritePtr>,
postings_write: CompositeWrite<WritePtr>,
positions_write: CompositeWrite<WritePtr>,
positionsidx_write: CompositeWrite<WritePtr>,
schema: Schema,
}
@@ -56,23 +59,26 @@ impl InvertedIndexSerializer {
terms_write: CompositeWrite<WritePtr>,
postings_write: CompositeWrite<WritePtr>,
positions_write: CompositeWrite<WritePtr>,
positionsidx_write: CompositeWrite<WritePtr>,
schema: Schema,
) -> Result<InvertedIndexSerializer> {
Ok(InvertedIndexSerializer {
terms_write,
postings_write,
positions_write,
positionsidx_write,
schema,
})
}
/// Open a new `PostingsSerializer` for the given segment
pub fn open(segment: &mut Segment) -> Result<InvertedIndexSerializer> {
use SegmentComponent::{POSITIONS, POSTINGS, TERMS};
use SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
InvertedIndexSerializer::new(
CompositeWrite::wrap(segment.open_write(TERMS)?),
CompositeWrite::wrap(segment.open_write(POSTINGS)?),
CompositeWrite::wrap(segment.open_write(POSITIONS)?),
CompositeWrite::wrap(segment.open_write(POSITIONSSKIP)?),
segment.schema(),
)
}
@@ -91,11 +97,14 @@ impl InvertedIndexSerializer {
let postings_write = self.postings_write.for_field(field);
total_num_tokens.serialize(postings_write)?;
let positions_write = self.positions_write.for_field(field);
let positionsidx_write = self.positionsidx_write.for_field(field);
let field_type: FieldType = (*field_entry.field_type()).clone();
FieldSerializer::new(
field_entry.field_type().clone(),
field_type,
term_dictionary_write,
postings_write,
positions_write,
positionsidx_write
)
}
@@ -104,6 +113,7 @@ impl InvertedIndexSerializer {
self.terms_write.close()?;
self.postings_write.close()?;
self.positions_write.close()?;
self.positionsidx_write.close()?;
Ok(())
}
}
@@ -125,6 +135,7 @@ impl<'a> FieldSerializer<'a> {
term_dictionary_write: &'a mut CountingWriter<WritePtr>,
postings_write: &'a mut CountingWriter<WritePtr>,
positions_write: &'a mut CountingWriter<WritePtr>,
positionsidx_write: &'a mut CountingWriter<WritePtr>
) -> io::Result<FieldSerializer<'a>> {
let (term_freq_enabled, position_enabled): (bool, bool) = match field_type {
FieldType::Str(ref text_options) => {
@@ -142,9 +153,9 @@ impl<'a> FieldSerializer<'a> {
};
let term_dictionary_builder =
TermDictionaryBuilder::new(term_dictionary_write, field_type)?;
let postings_serializer = PostingsSerializer::new(postings_write, term_freq_enabled);
let postings_serializer = PostingsSerializer::new(postings_write, term_freq_enabled, position_enabled);
let positions_serializer_opt = if position_enabled {
Some(PositionSerializer::new(positions_write))
Some(PositionSerializer::new(positions_write, positionsidx_write))
} else {
None
};
@@ -160,15 +171,14 @@ impl<'a> FieldSerializer<'a> {
}
fn current_term_info(&self) -> TermInfo {
let (filepos, offset) = self.positions_serializer_opt
let positions_idx = self.positions_serializer_opt
.as_ref()
.map(|positions_serializer| positions_serializer.addr())
.unwrap_or((0u64, 0u8));
.map(|positions_serializer| positions_serializer.positions_idx())
.unwrap_or(0u64);
TermInfo {
doc_freq: 0,
postings_offset: self.postings_serializer.addr(),
positions_offset: filepos,
positions_inner_offset: offset,
positions_idx
}
}
@@ -206,9 +216,9 @@ impl<'a> FieldSerializer<'a> {
position_deltas: &[u32],
) -> io::Result<()> {
self.current_term_info.doc_freq += 1;
self.postings_serializer.write_doc(doc_id, term_freq)?;
self.postings_serializer.write_doc(doc_id, term_freq);
if let Some(ref mut positions_serializer) = self.positions_serializer_opt.as_mut() {
positions_serializer.write(position_deltas)?;
positions_serializer.write_all(position_deltas)?;
}
Ok(())
}
@@ -221,7 +231,8 @@ impl<'a> FieldSerializer<'a> {
if self.term_open {
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.postings_serializer.close_term()?;
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.term_open = false;
}
Ok(())
@@ -288,55 +299,74 @@ impl Block {
}
pub struct PostingsSerializer<W: Write> {
postings_write: CountingWriter<W>,
output_write: CountingWriter<W>,
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
block: Box<Block>,
postings_write: Vec<u8>,
skip_write: SkipSerializer,
termfreq_enabled: bool,
termfreq_sum_enabled: bool,
}
impl<W: Write> PostingsSerializer<W> {
pub fn new(write: W, termfreq_enabled: bool) -> PostingsSerializer<W> {
pub fn new(write: W, termfreq_enabled: bool, termfreq_sum_enabled: bool) -> PostingsSerializer<W> {
PostingsSerializer {
postings_write: CountingWriter::wrap(write),
output_write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
postings_write: Vec::new(),
skip_write: SkipSerializer::new(),
last_doc_id_encoded: 0u32,
termfreq_enabled,
termfreq_sum_enabled,
}
}
fn write_block(&mut self) -> io::Result<()> {
fn write_block(&mut self) {
{
// encode the doc ids
let block_encoded: &[u8] = self.block_encoder
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_sorted(&self.block.doc_ids(), self.last_doc_id_encoded);
self.last_doc_id_encoded = self.block.last_doc();
self.postings_write.write_all(block_encoded)?;
self.skip_write.write_doc(self.last_doc_id_encoded, num_bits);
// last el block 0, offset block 1,
self.postings_write.extend(block_encoded);
}
if self.termfreq_enabled {
// encode the term_freqs
let block_encoded: &[u8] =
let (num_bits, block_encoded): (u8, &[u8]) =
self.block_encoder.compress_block_unsorted(&self.block.term_freqs());
self.postings_write.write_all(block_encoded)?;
self.postings_write.extend(block_encoded);
self.skip_write.write_term_freq(num_bits);
if self.termfreq_sum_enabled {
let sum_freq = self.block.term_freqs().iter().cloned().sum();
self.skip_write.write_total_term_freq(sum_freq);
}
}
self.block.clear();
Ok(())
}
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) -> io::Result<()> {
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
self.block.append_doc(doc_id, term_freq);
if self.block.is_full() {
self.write_block()?;
self.write_block();
}
Ok(())
}
pub fn close_term(&mut self) -> io::Result<()> {
fn close(mut self) -> io::Result<()> {
self.postings_write.flush()
}
pub fn close_term(&mut self, doc_freq: u32) -> io::Result<()> {
if !self.block.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
@@ -357,15 +387,22 @@ impl<W: Write> PostingsSerializer<W> {
}
self.block.clear();
}
if doc_freq >= USE_SKIP_INFO_LIMIT {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(&mut self.output_write)?;
self.output_write.write_all(skip_data)?;
self.output_write.write_all(&self.postings_write[..])?;
} else {
self.output_write.write_all(&self.postings_write[..])?;
}
self.skip_write.clear();
self.postings_write.clear();
Ok(())
}
fn close(mut self) -> io::Result<()> {
self.postings_write.flush()
}
fn addr(&self) -> u64 {
self.postings_write.written_bytes() as u64
self.output_write.written_bytes() as u64
}
fn clear(&mut self) {
@@ -373,50 +410,3 @@ impl<W: Write> PostingsSerializer<W> {
self.last_doc_id_encoded = 0;
}
}
struct PositionSerializer<W: Write> {
buffer: Vec<u32>,
write: CountingWriter<W>, // See if we can offset the original counting writer.
block_encoder: BlockEncoder,
}
impl<W: Write> PositionSerializer<W> {
fn new(write: W) -> PositionSerializer<W> {
PositionSerializer {
buffer: Vec::with_capacity(COMPRESSION_BLOCK_SIZE),
write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
}
}
fn addr(&self) -> (u64, u8) {
(self.write.written_bytes() as u64, self.buffer.len() as u8)
}
fn write_block(&mut self) -> io::Result<()> {
assert_eq!(self.buffer.len(), COMPRESSION_BLOCK_SIZE);
let block_compressed: &[u8] = self.block_encoder.compress_block_unsorted(&self.buffer);
self.write.write_all(block_compressed)?;
self.buffer.clear();
Ok(())
}
fn write(&mut self, mut vals: &[u32]) -> io::Result<()> {
let mut buffer_len = self.buffer.len();
while vals.len() + buffer_len >= COMPRESSION_BLOCK_SIZE {
let len_to_completion = COMPRESSION_BLOCK_SIZE - buffer_len;
self.buffer.extend_from_slice(&vals[..len_to_completion]);
self.write_block()?;
vals = &vals[len_to_completion..];
buffer_len = self.buffer.len();
}
self.buffer.extend_from_slice(vals);
Ok(())
}
fn close(mut self) -> io::Result<()> {
self.buffer.resize(COMPRESSION_BLOCK_SIZE, 0u32);
self.write_block()?;
self.write.flush()
}
}

174
src/postings/skip.rs Normal file
View File

@@ -0,0 +1,174 @@
use DocId;
use common::BinarySerializable;
use owned_read::OwnedRead;
use postings::compression::COMPRESSION_BLOCK_SIZE;
use schema::IndexRecordOption;
pub struct SkipSerializer {
buffer: Vec<u8>,
prev_doc: DocId,
}
impl SkipSerializer {
pub fn new() -> SkipSerializer {
SkipSerializer {
buffer: Vec::new(),
prev_doc: 0u32,
}
}
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
assert!(last_doc > self.prev_doc, "write_doc(...) called with non-increasing doc ids. \
Did you forget to call clear maybe?");
let delta_doc = last_doc - self.prev_doc;
self.prev_doc = last_doc;
delta_doc.serialize(&mut self.buffer).unwrap();
self.buffer.push(doc_num_bits);
}
pub fn write_term_freq(&mut self, tf_num_bits: u8) {
self.buffer.push(tf_num_bits);
}
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
tf_sum.serialize(&mut self.buffer).expect("Should never fail");
}
pub fn data(&self) -> &[u8] {
&self.buffer[..]
}
pub fn clear(&mut self) {
self.prev_doc = 0u32;
self.buffer.clear();
}
}
pub(crate) struct SkipReader {
doc: DocId,
owned_read: OwnedRead,
doc_num_bits: u8,
tf_num_bits: u8,
tf_sum: u32,
skip_info: IndexRecordOption,
}
impl SkipReader {
pub fn new(data: OwnedRead, skip_info: IndexRecordOption) -> SkipReader {
SkipReader {
doc: 0u32,
owned_read: data,
skip_info,
doc_num_bits: 0u8,
tf_num_bits: 0u8,
tf_sum: 0u32,
}
}
pub fn reset(&mut self, data: OwnedRead) {
self.doc = 0u32;
self.owned_read = data;
self.doc_num_bits = 0u8;
self.tf_num_bits = 0u8;
self.tf_sum = 0u32;
}
pub fn total_block_len(&self) -> usize {
(self.doc_num_bits + self.tf_num_bits) as usize * COMPRESSION_BLOCK_SIZE / 8
}
pub fn doc(&self) -> DocId {
self.doc
}
pub fn doc_num_bits(&self) -> u8 {
self.doc_num_bits
}
/// Number of bits used to encode term frequencies
///
/// 0 if term frequencies are not enabled.
pub fn tf_num_bits(&self) -> u8 {
self.tf_num_bits
}
pub fn tf_sum(&self) -> u32 {
self.tf_sum
}
pub fn advance(&mut self) -> bool {
if self.owned_read.as_ref().is_empty() {
false
} else {
let doc_delta = u32::deserialize(&mut self.owned_read).expect("Skip data corrupted");
self.doc += doc_delta as DocId;
self.doc_num_bits = self.owned_read.get(0);
match self.skip_info {
IndexRecordOption::Basic => {
self.owned_read.advance(1);
}
IndexRecordOption::WithFreqs=> {
self.tf_num_bits = self.owned_read.get(1);
self.owned_read.advance(2);
}
IndexRecordOption::WithFreqsAndPositions => {
self.tf_num_bits = self.owned_read.get(1);
self.owned_read.advance(2);
self.tf_sum = u32::deserialize(&mut self.owned_read)
.expect("Failed reading tf_sum");
}
}
true
}
}
}
#[cfg(test)]
mod tests {
use super::{SkipReader, SkipSerializer};
use super::IndexRecordOption;
use owned_read::OwnedRead;
#[test]
fn test_skip_with_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_term_freq(3u8);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.write_term_freq(2u8);
skip_serializer.data().to_owned()
};
let mut skip_reader = SkipReader::new(OwnedRead::new(buf), IndexRecordOption::WithFreqs);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 1u32);
assert_eq!(skip_reader.doc_num_bits(), 2u8);
assert_eq!(skip_reader.tf_num_bits(), 3u8);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 5u32);
assert_eq!(skip_reader.doc_num_bits(), 5u8);
assert_eq!(skip_reader.tf_num_bits(), 2u8);
assert!(!skip_reader.advance());
}
#[test]
fn test_skip_no_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.data().to_owned()
};
let mut skip_reader = SkipReader::new(OwnedRead::new(buf), IndexRecordOption::Basic);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 1u32);
assert_eq!(skip_reader.doc_num_bits(), 2u8);
assert!(skip_reader.advance());
assert_eq!(skip_reader.doc(), 5u32);
assert_eq!(skip_reader.doc_num_bits(), 5u8);
assert!(!skip_reader.advance());
}
}

View File

@@ -10,9 +10,7 @@ pub struct TermInfo {
/// Start offset within the postings (`.idx`) file.
pub postings_offset: u64,
/// Start offset of the first block within the position (`.pos`) file.
pub positions_offset: u64,
/// Start offset within this position block.
pub positions_inner_offset: u8,
pub positions_idx: u64,
}
impl FixedSize for TermInfo {
@@ -20,27 +18,25 @@ impl FixedSize for TermInfo {
/// This is large, but in practise, `TermInfo` are encoded in blocks and
/// only the first `TermInfo` of a block is serialized uncompressed.
/// The subsequent `TermInfo` are delta encoded and bitpacked.
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES + u8::SIZE_IN_BYTES;
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
}
impl BinarySerializable for TermInfo {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.doc_freq.serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.positions_offset.serialize(writer)?;
self.positions_inner_offset.serialize(writer)
self.positions_idx.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let doc_freq = u32::deserialize(reader)?;
let postings_offset = u64::deserialize(reader)?;
let positions_offset = u64::deserialize(reader)?;
let positions_inner_offset = u8::deserialize(reader)?;
let positions_idx = u64::deserialize(reader)?;
Ok(TermInfo {
doc_freq,
postings_offset,
positions_offset,
positions_inner_offset,
positions_idx,
})
}
}

View File

@@ -33,7 +33,7 @@ impl<T: BinarySerializable> LayerBuilder<T> {
fn insert(&mut self, key: u64, value: &T) -> io::Result<Option<(u64, u64)>> {
self.len += 1;
let offset = self.written_size() as u64;
VInt(key).serialize(&mut self.buffer)?;
VInt(key).serialize_into_vec(&mut self.buffer);
value.serialize(&mut self.buffer)?;
let emit_skip_info = (self.period_mask & self.len) == 0;
if emit_skip_info {

View File

@@ -44,9 +44,8 @@ mod tests {
fn make_term_info(val: u64) -> TermInfo {
TermInfo {
doc_freq: val as u32,
positions_offset: val * 2u64,
positions_idx: val * 2u64,
postings_offset: val * 3u64,
positions_inner_offset: 5u8,
}
}

View File

@@ -18,7 +18,7 @@ struct TermInfoBlockMeta {
ref_term_info: TermInfo,
doc_freq_nbits: u8,
postings_offset_nbits: u8,
positions_offset_nbits: u8,
positions_idx_nbits: u8,
}
impl BinarySerializable for TermInfoBlockMeta {
@@ -28,7 +28,7 @@ impl BinarySerializable for TermInfoBlockMeta {
write.write_all(&[
self.doc_freq_nbits,
self.postings_offset_nbits,
self.positions_offset_nbits,
self.positions_idx_nbits,
])?;
Ok(())
}
@@ -43,7 +43,7 @@ impl BinarySerializable for TermInfoBlockMeta {
ref_term_info,
doc_freq_nbits: buffer[0],
postings_offset_nbits: buffer[1],
positions_offset_nbits: buffer[2],
positions_idx_nbits: buffer[2],
})
}
}
@@ -55,10 +55,11 @@ impl FixedSize for TermInfoBlockMeta {
impl TermInfoBlockMeta {
fn num_bits(&self) -> u8 {
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_offset_nbits + 7
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
}
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
let num_bits = self.num_bits() as usize;
let mut cursor = num_bits * inner_offset;
@@ -68,16 +69,13 @@ impl TermInfoBlockMeta {
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits);
cursor += self.postings_offset_nbits as usize;
let positions_offset = extract_bits(data, cursor, self.positions_offset_nbits);
cursor += self.positions_offset_nbits as usize;
let positions_inner_offset = extract_bits(data, cursor, 7) as u8;
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits);
self.positions_idx_nbits as usize;
TermInfo {
doc_freq,
postings_offset: postings_offset + self.ref_term_info.postings_offset,
positions_offset: positions_offset + self.ref_term_info.positions_offset,
positions_inner_offset,
positions_idx: positions_idx + self.ref_term_info.positions_idx,
}
}
}
@@ -92,7 +90,7 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 {
assert!(num_bits <= 56);
let addr_byte = addr_bits / 8;
let bit_shift = (addr_bits % 8) as u64;
assert!(data.len() >= addr_byte + 8);
assert!(data.len() >= addr_byte + 7);
let val_unshifted_unmasked: u64 = unsafe {
// ok thanks to the 7 byte padding on `.close`
let addr = data.as_ptr().offset(addr_byte as isize) as *const u64;
@@ -164,11 +162,10 @@ fn bitpack_serialize<W: Write>(
write,
)?;
bit_packer.write(
term_info.positions_offset,
term_info_block_meta.positions_offset_nbits,
term_info.positions_idx,
term_info_block_meta.positions_idx_nbits,
write,
)?;
bit_packer.write(u64::from(term_info.positions_inner_offset), 7, write)?;
Ok(())
}
@@ -190,28 +187,28 @@ impl TermInfoStoreWriter {
let ref_term_info = self.term_infos[0].clone();
for term_info in &mut self.term_infos[1..] {
term_info.postings_offset -= ref_term_info.postings_offset;
term_info.positions_offset -= ref_term_info.positions_offset;
term_info.positions_idx -= ref_term_info.positions_idx;
}
let mut max_doc_freq: u32 = 0u32;
let mut max_postings_offset: u64 = 0u64;
let mut max_positions_offset: u64 = 0u64;
let mut max_positions_idx: u64 = 0u64;
for term_info in &self.term_infos[1..] {
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
max_positions_offset = cmp::max(max_positions_offset, term_info.positions_offset);
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
}
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
let max_postings_offset_nbits = compute_num_bits(max_postings_offset);
let max_positions_offset_nbits = compute_num_bits(max_positions_offset);
let max_positions_idx_nbits = compute_num_bits(max_positions_idx);
let term_info_block_meta = TermInfoBlockMeta {
offset: self.buffer_term_infos.len() as u64,
ref_term_info,
doc_freq_nbits: max_doc_freq_nbits,
postings_offset_nbits: max_postings_offset_nbits,
positions_offset_nbits: max_positions_offset_nbits,
positions_idx_nbits: max_positions_idx_nbits,
};
term_info_block_meta.serialize(&mut self.buffer_block_metas)?;
@@ -296,12 +293,11 @@ mod tests {
ref_term_info: TermInfo {
doc_freq: 512,
postings_offset: 51,
positions_offset: 3584,
positions_inner_offset: 0,
positions_idx: 3584,
},
doc_freq_nbits: 10,
postings_offset_nbits: 5,
positions_offset_nbits: 11,
positions_idx_nbits: 11,
};
let mut buffer: Vec<u8> = Vec::new();
term_info_block_meta.serialize(&mut buffer).unwrap();
@@ -318,8 +314,7 @@ mod tests {
let term_info = TermInfo {
doc_freq: i as u32,
postings_offset: (i / 10) as u64,
positions_offset: (i * 7) as u64,
positions_inner_offset: (i % 128) as u8,
positions_idx: (i * 7) as u64,
};
store_writer.write_term_info(&term_info).unwrap();
term_infos.push(term_info);