From 96eaa5bc631fbdf76d34dd7ee74ee2e2c0be6adc Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 5 Feb 2019 14:50:16 +0100 Subject: [PATCH] Positions --- CHANGELOG.md | 2 + src/common/composite_file.rs | 2 +- src/common/counting_writer.rs | 18 +++-- src/positions/reader.rs | 123 ++++++++++++++++++++++++---------- src/positions/serializer.rs | 12 ++-- 5 files changed, 106 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa7162ae6..3f92f3598 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ Tantivy 0.9.0 ===================== +*0.9.0 index format is not compatible with the +previous index format.* - Removed most unsafe (@fulmicoton) - Indexer memory footprint improved. (VInt comp, inlining the first block. (@fulmicoton) - Stemming in other language possible (@pentlander) diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index 0b7cdf03c..21538c096 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -39,7 +39,7 @@ impl BinarySerializable for FileAddr { /// A `CompositeWrite` is used to write a `CompositeFile`. pub struct CompositeWrite { write: CountingWriter, - offsets: HashMap, + offsets: HashMap, } impl CompositeWrite { diff --git a/src/common/counting_writer.rs b/src/common/counting_writer.rs index 5eaec208b..339c60bec 100644 --- a/src/common/counting_writer.rs +++ b/src/common/counting_writer.rs @@ -3,7 +3,7 @@ use std::io::Write; pub struct CountingWriter { underlying: W, - written_bytes: usize, + written_bytes: u64, } impl CountingWriter { @@ -14,11 +14,11 @@ impl CountingWriter { } } - pub fn written_bytes(&self) -> usize { + pub fn written_bytes(&self) -> u64 { self.written_bytes } - pub fn finish(mut self) -> io::Result<(W, usize)> { + pub fn finish(mut self) -> io::Result<(W, u64)> { self.flush()?; Ok((self.underlying, self.written_bytes)) } @@ -27,10 +27,16 @@ impl CountingWriter { impl Write for CountingWriter { fn write(&mut self, buf: &[u8]) -> io::Result { let written_size = self.underlying.write(buf)?; - self.written_bytes += written_size; + self.written_bytes += written_size as u64; Ok(written_size) } + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.underlying.write_all(buf)?; + self.written_bytes += buf.len() as u64; + Ok(()) + } + fn flush(&mut self) -> io::Result<()> { self.underlying.flush() } @@ -48,8 +54,8 @@ mod test { let mut counting_writer = CountingWriter::wrap(buffer); let bytes = (0u8..10u8).collect::>(); counting_writer.write_all(&bytes).unwrap(); - let (w, len): (Vec, usize) = counting_writer.finish().unwrap(); - assert_eq!(len, 10); + let (w, len): (Vec, u64) = counting_writer.finish().unwrap(); + assert_eq!(len, 10u64); assert_eq!(w.len(), 10); } } diff --git a/src/positions/reader.rs b/src/positions/reader.rs index 6d2a36036..43c3a4e29 100644 --- a/src/positions/reader.rs +++ b/src/positions/reader.rs @@ -1,5 +1,26 @@ -use super::BIT_PACKER; use bitpacking::{BitPacker, BitPacker4x}; +/// Positions works as a long sequence of compressed block. +/// All terms are chained one after the other. +/// +/// When accessing the position of a term, we get a positions_idx from the `Terminfo`. +/// This means we need to skip to the `nth` positions efficiently. +/// +/// This is done thanks to two levels of skiping that we refer to in the code +/// as `long_skip` and `short_skip`. +/// +/// The `long_skip` makes it possible to skip every 1_024 compression blocks (= 131_072 positions). +/// Skipping offset are simply stored one after as an offset stored over 8 bytes. +/// +/// We find the number of long skips, as `n / long_skip`. +/// +/// Blocks are compressed using bitpacking, so `skip_read` contains the number of bytes +/// (values can go from 0bit to 32 bits) required to decompressed every block. +/// +/// A given block obviously takes `(128 x num_bit_for_the_block / num_bits_in_a_byte)`, +/// so skipping a block without decompressing it is just a matter of advancing that many +/// bytes. + +use super::BIT_PACKER; use common::{BinarySerializable, FixedSize}; use directory::ReadOnlySource; use owned_read::OwnedRead; @@ -8,6 +29,59 @@ use positions::LONG_SKIP_INTERVAL; use positions::LONG_SKIP_IN_BLOCKS; use postings::compression::compressed_block_size; +struct Positions { + skip_source: ReadOnlySource, + position_source: ReadOnlySource, + long_skip_source: ReadOnlySource + +} + +impl Positions { + pub fn new(position_source: ReadOnlySource, skip_source: ReadOnlySource) -> Positions { + let skip_len = skip_source.len(); + let (body, footer) = skip_source.split(skip_len - u32::SIZE_IN_BYTES); + let num_long_skips = u32::deserialize(&mut footer.as_slice()).expect("Index corrupted"); + let body_split = body.len() - u64::SIZE_IN_BYTES * (num_long_skips as usize); + let (skip_source, long_skip_source) = body.split(body_split); + Positions { + skip_source, + long_skip_source, + position_source + } + } + + /// Returns the offset of the block associated to the given `long_skip_id`. + /// + /// One `long_skip_id` means `LONG_SKIP_IN_BLOCKS` blocks. + fn long_skip(&self, long_skip_id: usize) -> u64 { + if long_skip_id == 0 { + return 0; + } + let mut long_skip_blocks: &[u8] = + &self.long_skip_source.as_slice()[(long_skip_id - 1) * 8..][..8]; + u64::deserialize(&mut long_skip_blocks).expect("Index corrupted") + } + + fn reader(&self, offset: u64) -> PositionReader { + let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize; + let small_skip = (offset % LONG_SKIP_INTERVAL) as usize; + let offset_num_bytes: u64 = self.long_skip(long_skip_id); + let mut position_read = OwnedRead::new(self.position_source.clone()); + position_read.advance(offset_num_bytes as usize); + let mut skip_read = OwnedRead::new(self.skip_source.clone()); + skip_read.advance(long_skip_id * LONG_SKIP_IN_BLOCKS); + let mut position_reader = PositionReader { + skip_read, + position_read, + inner_offset: 0, + buffer: Box::new([0u32; 128]), + ahead: None, + }; + position_reader.skip(small_skip); + position_reader + } +} + pub struct PositionReader { skip_read: OwnedRead, position_read: OwnedRead, @@ -39,6 +113,9 @@ fn read_impl( let bitpacker = BitPacker4x::new(); loop { let available_len = COMPRESSION_BLOCK_SIZE - inner_offset; + // We have enough elements in the current block. + // Let's copy the requested elements in the output buffer, + // and return. if output_len <= available_len { output[output_start..].copy_from_slice(&buffer[inner_offset..][..output_len]); return ahead; @@ -61,35 +138,7 @@ impl PositionReader { skip_source: ReadOnlySource, offset: u64, ) -> PositionReader { - let skip_len = skip_source.len(); - let (body, footer) = skip_source.split(skip_len - u32::SIZE_IN_BYTES); - let num_long_skips = u32::deserialize(&mut footer.as_slice()).expect("Index corrupted"); - let body_split = body.len() - u64::SIZE_IN_BYTES * (num_long_skips as usize); - let (skip_body, long_skips) = body.split(body_split); - let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize; - let small_skip = (offset - (long_skip_id as u64) * (LONG_SKIP_INTERVAL as u64)) as usize; - let offset_num_bytes: u64 = { - if long_skip_id > 0 { - let mut long_skip_blocks: &[u8] = - &long_skips.as_slice()[(long_skip_id - 1) * 8..][..8]; - u64::deserialize(&mut long_skip_blocks).expect("Index corrupted") * 16 - } else { - 0 - } - }; - let mut position_read = OwnedRead::new(position_source); - position_read.advance(offset_num_bytes as usize); - let mut skip_read = OwnedRead::new(skip_body); - skip_read.advance(long_skip_id * LONG_SKIP_IN_BLOCKS); - let mut position_reader = PositionReader { - skip_read, - position_read, - inner_offset: 0, - buffer: Box::new([0u32; 128]), - ahead: None, - }; - position_reader.skip(small_skip); - position_reader + Positions::new(position_source, skip_source).reader(offset) } /// Fills a buffer with the next `output.len()` integers. @@ -102,6 +151,7 @@ impl PositionReader { // the block currently available is not the block // for the current position BIT_PACKER.decompress(position_data, self.buffer.as_mut(), num_bits); + self.ahead = Some(0); } let block_len = compressed_block_size(num_bits); self.ahead = Some(read_impl( @@ -109,7 +159,7 @@ impl PositionReader { self.buffer.as_mut(), self.inner_offset, &skip_data[1..], - output, + output )); } @@ -133,14 +183,13 @@ impl PositionReader { } }); - let skip_len = self.skip_read.as_ref()[..num_blocks_to_advance] + let skip_len_in_bits = self.skip_read.as_ref()[..num_blocks_to_advance] .iter() - .cloned() - .map(|num_bit| num_bit as usize) + .map(|num_bits| *num_bits as usize) .sum::() - * (COMPRESSION_BLOCK_SIZE / 8); - + * COMPRESSION_BLOCK_SIZE; + let skip_len_in_bytes = skip_len_in_bits / 8; self.skip_read.advance(num_blocks_to_advance); - self.position_read.advance(skip_len); + self.position_read.advance(skip_len_in_bytes); } } diff --git a/src/positions/serializer.rs b/src/positions/serializer.rs index 68c6885cd..afcacbabf 100644 --- a/src/positions/serializer.rs +++ b/src/positions/serializer.rs @@ -1,29 +1,28 @@ use super::BIT_PACKER; use bitpacking::BitPacker; use common::BinarySerializable; +use common::CountingWriter; use positions::{COMPRESSION_BLOCK_SIZE, LONG_SKIP_INTERVAL}; -use std::io; +use std::io::{self, Write}; pub struct PositionSerializer { - write_stream: W, + write_stream: CountingWriter, write_skiplist: W, block: Vec, buffer: Vec, num_ints: u64, long_skips: Vec, - cumulated_num_bits: u64, } impl PositionSerializer { pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer { PositionSerializer { - write_stream, + write_stream: CountingWriter::wrap(write_stream), write_skiplist, block: Vec::with_capacity(128), buffer: vec![0u8; 128 * 4], num_ints: 0u64, long_skips: Vec::new(), - cumulated_num_bits: 0u64, } } @@ -51,13 +50,12 @@ impl PositionSerializer { fn flush_block(&mut self) -> io::Result<()> { let num_bits = BIT_PACKER.num_bits(&self.block[..]); - self.cumulated_num_bits += u64::from(num_bits); self.write_skiplist.write_all(&[num_bits])?; let written_len = BIT_PACKER.compress(&self.block[..], &mut self.buffer, num_bits); self.write_stream.write_all(&self.buffer[..written_len])?; self.block.clear(); if (self.num_ints % LONG_SKIP_INTERVAL) == 0u64 { - self.long_skips.push(self.cumulated_num_bits); + self.long_skips.push(self.write_stream.written_bytes()); } Ok(()) }