From 7275ebdf3cdec9e3b81e0fc32d8e9818e26d0b77 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 25 May 2020 09:51:23 +0900 Subject: [PATCH] Skiprefactoring skipabsolute (#831) Simplification of the way we handle positions. --- src/core/inverted_index_reader.rs | 6 +- src/positions/mod.rs | 89 +++- src/positions/reader.rs | 168 +++---- src/postings/block_segment_postings.rs | 439 +++++++++++++++++ src/postings/compression/mod.rs | 35 +- src/postings/mod.rs | 12 +- src/postings/segment_postings.rs | 629 ++----------------------- src/postings/serializer.rs | 3 +- src/postings/skip.rs | 256 +++++++--- src/query/automaton_weight.rs | 5 +- src/query/range_query.rs | 5 +- 11 files changed, 847 insertions(+), 800 deletions(-) create mode 100644 src/postings/block_segment_postings.rs diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index 31d234644..f68f0a117 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -7,7 +7,6 @@ use crate::schema::FieldType; use crate::schema::IndexRecordOption; use crate::schema::Term; use crate::termdict::TermDictionary; -use owned_read::OwnedRead; /// The inverted index reader is in charge of accessing /// the inverted index associated to a specific field. @@ -97,8 +96,7 @@ impl InvertedIndexReader { let offset = term_info.postings_offset as usize; let end_source = self.postings_source.len(); let postings_slice = self.postings_source.slice(offset, end_source); - let postings_reader = OwnedRead::new(postings_slice); - block_postings.reset(term_info.doc_freq, postings_reader); + block_postings.reset(term_info.doc_freq, postings_slice); } /// Returns a block postings given a `Term`. @@ -127,7 +125,7 @@ impl InvertedIndexReader { let postings_data = self.postings_source.slice_from(offset); BlockSegmentPostings::from_data( term_info.doc_freq, - OwnedRead::new(postings_data), + postings_data, self.record_option, requested_option, ) diff --git a/src/positions/mod.rs b/src/positions/mod.rs index 7ac62d075..46abdc3b0 100644 --- a/src/positions/mod.rs +++ b/src/positions/mod.rs @@ -37,9 +37,9 @@ const LONG_SKIP_INTERVAL: u64 = (LONG_SKIP_IN_BLOCKS * COMPRESSION_BLOCK_SIZE) a #[cfg(test)] pub mod tests { - use super::{PositionReader, PositionSerializer}; + use super::PositionSerializer; use crate::directory::ReadOnlySource; - use crate::positions::COMPRESSION_BLOCK_SIZE; + use crate::positions::reader::PositionReader; use std::iter; fn create_stream_buffer(vals: &[u32]) -> (ReadOnlySource, ReadOnlySource) { @@ -68,7 +68,7 @@ pub mod tests { let mut position_reader = PositionReader::new(stream, skip, 0u64); for &n in &[1, 10, 127, 128, 130, 312] { let mut v = vec![0u32; n]; - position_reader.read(&mut v[..n]); + position_reader.read(0, &mut v[..]); for i in 0..n { assert_eq!(v[i], i as u32); } @@ -76,19 +76,19 @@ pub mod tests { } #[test] - fn test_position_skip() { - let v: Vec = (0..1_000).collect(); + fn test_position_read_with_offset() { + let v: Vec = (0..1000).collect(); let (stream, skip) = create_stream_buffer(&v[..]); assert_eq!(skip.len(), 12); assert_eq!(stream.len(), 1168); - let mut position_reader = PositionReader::new(stream, skip, 0u64); - position_reader.skip(10); - for &n in &[10, 127, COMPRESSION_BLOCK_SIZE, 130, 312] { - let mut v = vec![0u32; n]; - position_reader.read(&mut v[..n]); - for i in 0..n { - assert_eq!(v[i], 10u32 + i as u32); + for &offset in &[1u64, 10u64, 127u64, 128u64, 130u64, 312u64] { + for &len in &[1, 10, 130, 500] { + let mut v = vec![0u32; len]; + position_reader.read(offset, &mut v[..]); + for i in 0..len { + assert_eq!(v[i], i as u32 + offset as u32); + } } } } @@ -103,11 +103,12 @@ pub mod tests { let mut position_reader = PositionReader::new(stream, skip, 0u64); let mut buf = [0u32; 7]; let mut c = 0; + + let mut offset = 0; for _ in 0..100 { - position_reader.read(&mut buf); - position_reader.read(&mut buf); - position_reader.skip(4); - position_reader.skip(3); + position_reader.read(offset, &mut buf); + position_reader.read(offset, &mut buf); + offset += 7; for &el in &buf { assert_eq!(c, el); c += 1; @@ -115,6 +116,58 @@ pub mod tests { } } + #[test] + fn test_position_reread_anchor_different_than_block() { + let v: Vec = (0..2_000_000).collect(); + let (stream, skip) = create_stream_buffer(&v[..]); + assert_eq!(skip.len(), 15_749); + assert_eq!(stream.len(), 4_987_872); + let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), 0); + let mut buf = [0u32; 256]; + position_reader.read(128, &mut buf); + for i in 0..256 { + assert_eq!(buf[i], (128 + i) as u32); + } + position_reader.read(128, &mut buf); + for i in 0..256 { + assert_eq!(buf[i], (128 + i) as u32); + } + } + + #[test] + #[should_panic(expected = "offset arguments should be increasing.")] + fn test_position_panic_if_called_previous_anchor() { + let v: Vec = (0..2_000_000).collect(); + let (stream, skip) = create_stream_buffer(&v[..]); + assert_eq!(skip.len(), 15_749); + assert_eq!(stream.len(), 4_987_872); + let mut buf = [0u32; 1]; + let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), 200_000); + position_reader.read(230, &mut buf); + position_reader.read(9, &mut buf); + } + + #[test] + fn test_positions_bug() { + let mut v: Vec = vec![]; + for i in 1..200 { + for j in 0..i { + v.push(j); + } + } + let (stream, skip) = create_stream_buffer(&v[..]); + let mut buf = Vec::new(); + let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), 0); + let mut offset = 0; + for i in 1..24 { + buf.resize(i, 0); + position_reader.read(offset, &mut buf[..]); + offset += i as u64; + let r: Vec = (0..i).map(|el| el as u32).collect(); + assert_eq!(buf, &r[..]); + } + } + #[test] fn test_position_long_skip_const() { const CONST_VAL: u32 = 9u32; @@ -124,7 +177,7 @@ pub mod tests { assert_eq!(stream.len(), 1_000_000); let mut position_reader = PositionReader::new(stream, skip, 128 * 1024); let mut buf = [0u32; 1]; - position_reader.read(&mut buf); + position_reader.read(0, &mut buf); assert_eq!(buf[0], CONST_VAL); } @@ -143,7 +196,7 @@ pub mod tests { ] { let mut position_reader = PositionReader::new(stream.clone(), skip.clone(), offset); let mut buf = [0u32; 1]; - position_reader.read(&mut buf); + position_reader.read(0, &mut buf); assert_eq!(buf[0], offset as u32); } } diff --git a/src/positions/reader.rs b/src/positions/reader.rs index 6737f4bbc..9a710ae3a 100644 --- a/src/positions/reader.rs +++ b/src/positions/reader.rs @@ -3,7 +3,6 @@ use crate::directory::ReadOnlySource; use crate::positions::COMPRESSION_BLOCK_SIZE; use crate::positions::LONG_SKIP_INTERVAL; use crate::positions::LONG_SKIP_IN_BLOCKS; -use crate::postings::compression::compressed_block_size; /// Positions works as a long sequence of compressed block. /// All terms are chained one after the other. /// @@ -62,22 +61,20 @@ impl Positions { fn reader(&self, offset: u64) -> PositionReader { let long_skip_id = (offset / LONG_SKIP_INTERVAL) as usize; - let small_skip = (offset % LONG_SKIP_INTERVAL) as usize; let offset_num_bytes: u64 = self.long_skip(long_skip_id); let mut position_read = OwnedRead::new(self.position_source.clone()); position_read.advance(offset_num_bytes as usize); let mut skip_read = OwnedRead::new(self.skip_source.clone()); skip_read.advance(long_skip_id * LONG_SKIP_IN_BLOCKS); - let mut position_reader = PositionReader { + PositionReader { bit_packer: self.bit_packer, skip_read, position_read, - inner_offset: 0, buffer: Box::new([0u32; 128]), - ahead: None, - }; - position_reader.skip(small_skip); - position_reader + block_offset: std::i64::MAX as u64, + anchor_offset: (long_skip_id as u64) * LONG_SKIP_INTERVAL, + abs_offset: offset, + } } } @@ -85,51 +82,12 @@ pub struct PositionReader { skip_read: OwnedRead, position_read: OwnedRead, bit_packer: BitPacker4x, - inner_offset: usize, - buffer: Box<[u32; 128]>, - ahead: Option, // if None, no block is loaded. - // if Some(num_blocks), the block currently loaded is num_blocks ahead - // of the block of the next int to read. -} + buffer: Box<[u32; COMPRESSION_BLOCK_SIZE]>, -// `ahead` represents the offset of the block currently loaded -// compared to the cursor of the actual stream. -// -// By contract, when this function is called, the current block has to be -// decompressed. -// -// If the requested number of els ends exactly at a given block, the next -// block is not decompressed. -fn read_impl( - bit_packer: BitPacker4x, - mut position: &[u8], - buffer: &mut [u32; 128], - mut inner_offset: usize, - num_bits: &[u8], - output: &mut [u32], -) -> usize { - let mut output_start = 0; - let mut output_len = output.len(); - let mut ahead = 0; - loop { - let available_len = COMPRESSION_BLOCK_SIZE - inner_offset; - // We have enough elements in the current block. - // Let's copy the requested elements in the output buffer, - // and return. - if output_len <= available_len { - output[output_start..].copy_from_slice(&buffer[inner_offset..][..output_len]); - return ahead; - } - output[output_start..][..available_len].copy_from_slice(&buffer[inner_offset..]); - output_len -= available_len; - output_start += available_len; - inner_offset = 0; - let num_bits = num_bits[ahead]; - bit_packer.decompress(position, &mut buffer[..], num_bits); - let block_len = compressed_block_size(num_bits); - position = &position[block_len..]; - ahead += 1; - } + block_offset: u64, + anchor_offset: u64, + + abs_offset: u64, } impl PositionReader { @@ -141,57 +99,65 @@ impl PositionReader { Positions::new(position_source, skip_source).reader(offset) } - /// Fills a buffer with the next `output.len()` integers. - /// This does not consume / advance the stream. - pub fn read(&mut self, output: &mut [u32]) { - let skip_data = self.skip_read.as_ref(); - let position_data = self.position_read.as_ref(); - let num_bits = self.skip_read.get(0); - if self.ahead != Some(0) { - // the block currently available is not the block - // for the current position + fn advance_num_blocks(&mut self, num_blocks: usize) { + let num_bits: usize = self.skip_read.as_ref()[..num_blocks] + .iter() + .cloned() + .map(|num_bits| num_bits as usize) + .sum(); + let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8; + self.skip_read.advance(num_blocks as usize); + self.position_read.advance(num_bytes_to_skip); + } + + /// Fills a buffer with the positions `[offset..offset+output.len())` integers. + /// + /// `offset` is required to have a value >= to the offsets given in previous calls + /// for the given `PositionReaderAbsolute` instance. + pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) { + offset += self.abs_offset; + assert!( + offset >= self.anchor_offset, + "offset arguments should be increasing." + ); + let delta_to_block_offset = offset as i64 - self.block_offset as i64; + if delta_to_block_offset < 0 || delta_to_block_offset >= 128 { + // The first position is not within the first block. + // We need to decompress the first block. + let delta_to_anchor_offset = offset - self.anchor_offset; + let num_blocks_to_skip = + (delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize; + self.advance_num_blocks(num_blocks_to_skip); + self.anchor_offset = offset - (offset % COMPRESSION_BLOCK_SIZE as u64); + self.block_offset = self.anchor_offset; + let num_bits = self.skip_read.get(0); + self.bit_packer + .decompress(self.position_read.as_ref(), self.buffer.as_mut(), num_bits); + } else { + let num_blocks_to_skip = + ((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize; + self.advance_num_blocks(num_blocks_to_skip); + self.anchor_offset = self.block_offset; + } + + let mut num_bits = self.skip_read.get(0); + let mut position_data = self.position_read.as_ref(); + + for i in 1.. { + let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE; + let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block; + if remaining_in_block >= output.len() { + output.copy_from_slice(&self.buffer[offset_in_block..][..output.len()]); + break; + } + output[..remaining_in_block].copy_from_slice(&self.buffer[offset_in_block..]); + output = &mut output[remaining_in_block..]; + offset += remaining_in_block as u64; + position_data = &position_data[(num_bits as usize * COMPRESSION_BLOCK_SIZE / 8)..]; + num_bits = self.skip_read.get(i); self.bit_packer .decompress(position_data, self.buffer.as_mut(), num_bits); - self.ahead = Some(0); + self.block_offset += COMPRESSION_BLOCK_SIZE as u64; } - let block_len = compressed_block_size(num_bits); - self.ahead = Some(read_impl( - self.bit_packer, - &position_data[block_len..], - self.buffer.as_mut(), - self.inner_offset, - &skip_data[1..], - output, - )); - } - - /// Skip the next `skip_len` integer. - /// - /// If a full block is skipped, calling - /// `.skip(...)` will avoid decompressing it. - /// - /// May panic if the end of the stream is reached. - pub fn skip(&mut self, skip_len: usize) { - let skip_len_plus_inner_offset = skip_len + self.inner_offset; - - let num_blocks_to_advance = skip_len_plus_inner_offset / COMPRESSION_BLOCK_SIZE; - self.inner_offset = skip_len_plus_inner_offset % COMPRESSION_BLOCK_SIZE; - - self.ahead = self.ahead.and_then(|num_blocks| { - if num_blocks >= num_blocks_to_advance { - Some(num_blocks - num_blocks_to_advance) - } else { - None - } - }); - - let skip_len_in_bits = self.skip_read.as_ref()[..num_blocks_to_advance] - .iter() - .map(|num_bits| *num_bits as usize) - .sum::() - * COMPRESSION_BLOCK_SIZE; - let skip_len_in_bytes = skip_len_in_bits / 8; - self.skip_read.advance(num_blocks_to_advance); - self.position_read.advance(skip_len_in_bytes); } } diff --git a/src/postings/block_segment_postings.rs b/src/postings/block_segment_postings.rs new file mode 100644 index 000000000..2d35dfadd --- /dev/null +++ b/src/postings/block_segment_postings.rs @@ -0,0 +1,439 @@ +use crate::common::{BinarySerializable, VInt}; +use crate::directory::ReadOnlySource; +use crate::postings::compression::{ + AlignedBuffer, BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE, +}; +use crate::postings::{BlockInfo, FreqReadingOption, SkipReader}; +use crate::schema::IndexRecordOption; +use crate::{DocId, TERMINATED}; + +/// `BlockSegmentPostings` is a cursor iterating over blocks +/// of documents. +/// +/// # Warning +/// +/// While it is useful for some very specific high-performance +/// use cases, you should prefer using `SegmentPostings` for most usage. +pub struct BlockSegmentPostings { + pub(crate) doc_decoder: BlockDecoder, + freq_decoder: BlockDecoder, + freq_reading_option: FreqReadingOption, + + doc_freq: usize, + + data: ReadOnlySource, + skip_reader: SkipReader, +} + +fn decode_bitpacked_block( + doc_decoder: &mut BlockDecoder, + freq_decoder_opt: Option<&mut BlockDecoder>, + data: &[u8], + doc_offset: DocId, + doc_num_bits: u8, + tf_num_bits: u8, +) { + let num_consumed_bytes = doc_decoder.uncompress_block_sorted(data, doc_offset, doc_num_bits); + if let Some(freq_decoder) = freq_decoder_opt { + freq_decoder.uncompress_block_unsorted(&data[num_consumed_bytes..], tf_num_bits); + } +} + +fn decode_vint_block( + doc_decoder: &mut BlockDecoder, + freq_decoder_opt: Option<&mut BlockDecoder>, + data: &[u8], + doc_offset: DocId, + num_vint_docs: usize, +) { + doc_decoder.clear(); + let num_consumed_bytes = doc_decoder.uncompress_vint_sorted(data, doc_offset, num_vint_docs); + if let Some(freq_decoder) = freq_decoder_opt { + freq_decoder.uncompress_vint_unsorted(&data[num_consumed_bytes..], num_vint_docs); + } +} + +fn split_into_skips_and_postings( + doc_freq: u32, + data: ReadOnlySource, +) -> (Option, ReadOnlySource) { + if doc_freq < COMPRESSION_BLOCK_SIZE as u32 { + return (None, data); + } + let mut data_byte_arr = data.as_slice(); + let skip_len = VInt::deserialize(&mut data_byte_arr) + .expect("Data corrupted") + .0 as usize; + let vint_len = data.len() - data_byte_arr.len(); + let (skip_data, postings_data) = data.slice_from(vint_len).split(skip_len); + (Some(skip_data), postings_data) +} + +impl BlockSegmentPostings { + pub(crate) fn from_data( + doc_freq: u32, + data: ReadOnlySource, + record_option: IndexRecordOption, + requested_option: IndexRecordOption, + ) -> BlockSegmentPostings { + let freq_reading_option = match (record_option, requested_option) { + (IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq, + (_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq, + (_, _) => FreqReadingOption::ReadFreq, + }; + + let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data); + let skip_reader = match skip_data_opt { + Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option), + None => SkipReader::new(ReadOnlySource::empty(), doc_freq, record_option), + }; + + let doc_freq = doc_freq as usize; + let mut block_segment_postings = BlockSegmentPostings { + doc_decoder: BlockDecoder::with_val(TERMINATED), + freq_decoder: BlockDecoder::with_val(1), + freq_reading_option, + doc_freq, + data: postings_data, + skip_reader, + }; + block_segment_postings.advance(); + block_segment_postings + } + + // Resets the block segment postings on another position + // in the postings file. + // + // This is useful for enumerating through a list of terms, + // and consuming the associated posting lists while avoiding + // reallocating a `BlockSegmentPostings`. + // + // # Warning + // + // This does not reset the positions list. + pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: ReadOnlySource) { + let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data); + self.data = ReadOnlySource::new(postings_data); + if let Some(skip_data) = skip_data_opt { + self.skip_reader.reset(skip_data, doc_freq); + } else { + self.skip_reader.reset(ReadOnlySource::empty(), doc_freq); + } + self.doc_freq = doc_freq as usize; + } + + /// Returns the document frequency associated to this block postings. + /// + /// This `doc_freq` is simply the sum of the length of all of the blocks + /// length, and it does not take in account deleted documents. + pub fn doc_freq(&self) -> usize { + self.doc_freq + } + + /// Returns the array of docs in the current block. + /// + /// Before the first call to `.advance()`, the block + /// returned by `.docs()` is empty. + #[inline] + pub fn docs(&self) -> &[DocId] { + self.doc_decoder.output_array() + } + + pub(crate) fn docs_aligned(&self) -> &AlignedBuffer { + self.doc_decoder.output_aligned() + } + + /// Return the document at index `idx` of the block. + #[inline] + pub fn doc(&self, idx: usize) -> u32 { + self.doc_decoder.output(idx) + } + + /// Return the array of `term freq` in the block. + #[inline] + pub fn freqs(&self) -> &[u32] { + self.freq_decoder.output_array() + } + + /// Return the frequency at index `idx` of the block. + #[inline] + pub fn freq(&self, idx: usize) -> u32 { + self.freq_decoder.output(idx) + } + + /// Returns the length of the current block. + /// + /// All blocks have a length of `NUM_DOCS_PER_BLOCK`, + /// except the last block that may have a length + /// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1` + #[inline] + pub fn block_len(&self) -> usize { + self.doc_decoder.output_len + } + + pub(crate) fn position_offset(&self) -> u64 { + self.skip_reader.position_offset() + } + + /// Position on a block that may contains `target_doc`. + /// + /// If the current block last element is greater or equal to `target_doc`, return true. + /// + /// Returns true if a block that has an element greater or equal to the target is found. + /// Returning true does not guarantee that the smallest element of the block is smaller + /// than the target. It only guarantees that the last element is greater or equal. + /// + /// Returns false iff all of the document remaining are smaller than + /// `doc_id`. In that case, all of these document are consumed. + pub fn seek(&mut self, target_doc: DocId) -> bool { + self.skip_reader.seek(target_doc); + self.read_block(); + + // The last block last doc may actually stop before the target. + self.docs() + .last() + .map(|last_doc| *last_doc >= target_doc) + .unwrap_or(false) + } + + fn read_block(&mut self) { + let offset = self.skip_reader.byte_offset(); + match self.skip_reader.block_info() { + BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits, + .. + } => { + decode_bitpacked_block( + &mut self.doc_decoder, + if let FreqReadingOption::ReadFreq = self.freq_reading_option { + Some(&mut self.freq_decoder) + } else { + None + }, + &self.data.as_slice()[offset..], + self.skip_reader.last_doc_in_previous_block, + doc_num_bits, + tf_num_bits, + ); + } + BlockInfo::VInt(num_vint_docs) => { + decode_vint_block( + &mut self.doc_decoder, + if let FreqReadingOption::ReadFreq = self.freq_reading_option { + Some(&mut self.freq_decoder) + } else { + None + }, + &self.data.as_slice()[offset..], + self.skip_reader.last_doc_in_previous_block, + num_vint_docs as usize, + ); + } + } + } + + /// Advance to the next block. + /// + /// Returns false iff there was no remaining blocks. + pub fn advance(&mut self) -> bool { + if !self.skip_reader.advance() { + return false; + } + self.read_block(); + true + } + + /// Returns an empty segment postings object + pub fn empty() -> BlockSegmentPostings { + BlockSegmentPostings { + doc_decoder: BlockDecoder::with_val(TERMINATED), + freq_decoder: BlockDecoder::with_val(1), + freq_reading_option: FreqReadingOption::NoFreq, + doc_freq: 0, + data: ReadOnlySource::new(vec![]), + skip_reader: SkipReader::new(ReadOnlySource::new(vec![]), 0, IndexRecordOption::Basic), + } + } +} + +#[cfg(test)] +mod tests { + use super::BlockSegmentPostings; + use crate::common::HasLen; + use crate::core::Index; + use crate::docset::{DocSet, TERMINATED}; + use crate::postings::postings::Postings; + use crate::postings::SegmentPostings; + use crate::schema::IndexRecordOption; + use crate::schema::Schema; + use crate::schema::Term; + use crate::schema::INDEXED; + use crate::DocId; + + #[test] + fn test_empty_segment_postings() { + let mut postings = SegmentPostings::empty(); + assert_eq!(postings.advance(), TERMINATED); + assert_eq!(postings.advance(), TERMINATED); + assert_eq!(postings.len(), 0); + } + + #[test] + fn test_empty_postings_doc_returns_terminated() { + let mut postings = SegmentPostings::empty(); + assert_eq!(postings.doc(), TERMINATED); + assert_eq!(postings.advance(), TERMINATED); + } + + #[test] + fn test_empty_postings_doc_term_freq_returns_0() { + let postings = SegmentPostings::empty(); + assert_eq!(postings.term_freq(), 1); + } + + #[test] + fn test_empty_block_segment_postings() { + let mut postings = BlockSegmentPostings::empty(); + assert!(!postings.advance()); + assert_eq!(postings.doc_freq(), 0); + } + + #[test] + fn test_block_segment_postings() { + let mut block_segments = build_block_postings(&(0..100_000).collect::>()); + let mut offset: u32 = 0u32; + // checking that the `doc_freq` is correct + assert_eq!(block_segments.doc_freq(), 100_000); + loop { + let block = block_segments.docs(); + for (i, doc) in block.iter().cloned().enumerate() { + assert_eq!(offset + (i as u32), doc); + } + offset += block.len() as u32; + if block_segments.advance() { + break; + } + } + } + + #[test] + fn test_skip_right_at_new_block() { + let mut doc_ids = (0..128).collect::>(); + // 128 is missing + doc_ids.push(129); + doc_ids.push(130); + { + let block_segments = build_block_postings(&doc_ids); + let mut docset = SegmentPostings::from_block_postings(block_segments, None); + assert_eq!(docset.seek(128), 129); + assert_eq!(docset.doc(), 129); + assert_eq!(docset.advance(), 130); + assert_eq!(docset.doc(), 130); + assert_eq!(docset.advance(), TERMINATED); + } + { + let block_segments = build_block_postings(&doc_ids); + let mut docset = SegmentPostings::from_block_postings(block_segments, None); + assert_eq!(docset.seek(129), 129); + assert_eq!(docset.doc(), 129); + assert_eq!(docset.advance(), 130); + assert_eq!(docset.doc(), 130); + assert_eq!(docset.advance(), TERMINATED); + } + { + let block_segments = build_block_postings(&doc_ids); + let mut docset = SegmentPostings::from_block_postings(block_segments, None); + assert_eq!(docset.doc(), 0); + assert_eq!(docset.seek(131), TERMINATED); + assert_eq!(docset.doc(), TERMINATED); + } + } + + fn build_block_postings(docs: &[DocId]) -> BlockSegmentPostings { + let mut schema_builder = Schema::builder(); + let int_field = schema_builder.add_u64_field("id", INDEXED); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + let mut last_doc = 0u32; + for &doc in docs { + for _ in last_doc..doc { + index_writer.add_document(doc!(int_field=>1u64)); + } + index_writer.add_document(doc!(int_field=>0u64)); + last_doc = doc + 1; + } + index_writer.commit().unwrap(); + let searcher = index.reader().unwrap().searcher(); + let segment_reader = searcher.segment_reader(0); + let inverted_index = segment_reader.inverted_index(int_field); + let term = Term::from_field_u64(int_field, 0u64); + let term_info = inverted_index.get_term_info(&term).unwrap(); + inverted_index.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic) + } + + #[test] + fn test_block_segment_postings_skip() { + for i in 0..4 { + let mut block_postings = build_block_postings(&[3]); + assert_eq!(block_postings.seek(i), true); + assert_eq!(block_postings.seek(i), true); + } + let mut block_postings = build_block_postings(&[3]); + assert_eq!(block_postings.seek(4u32), false); + } + + #[test] + fn test_block_segment_postings_skip2() { + let mut docs = vec![0]; + for i in 0..1300 { + docs.push((i * i / 100) + i); + } + let mut block_postings = build_block_postings(&docs[..]); + for i in vec![0, 424, 10000] { + assert!(block_postings.seek(i)); + let docs = block_postings.docs(); + assert!(docs[0] <= i); + assert!(docs.last().cloned().unwrap_or(0u32) >= i); + } + assert!(!block_postings.seek(100_000)); + assert!(!block_postings.seek(101_000)); + } + + #[test] + fn test_reset_block_segment_postings() { + let mut schema_builder = Schema::builder(); + let int_field = schema_builder.add_u64_field("id", INDEXED); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + // create two postings list, one containg even number, + // the other containing odd numbers. + for i in 0..6 { + let doc = doc!(int_field=> (i % 2) as u64); + index_writer.add_document(doc); + } + index_writer.commit().unwrap(); + let searcher = index.reader().unwrap().searcher(); + let segment_reader = searcher.segment_reader(0); + + let mut block_segments; + { + let term = Term::from_field_u64(int_field, 0u64); + let inverted_index = segment_reader.inverted_index(int_field); + let term_info = inverted_index.get_term_info(&term).unwrap(); + block_segments = inverted_index + .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic); + } + assert_eq!(block_segments.docs(), &[0, 2, 4]); + { + let term = Term::from_field_u64(int_field, 1u64); + let inverted_index = segment_reader.inverted_index(int_field); + let term_info = inverted_index.get_term_info(&term).unwrap(); + inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments); + } + assert!(block_segments.advance()); + assert_eq!(block_segments.docs(), &[1, 3, 5]); + } +} diff --git a/src/postings/compression/mod.rs b/src/postings/compression/mod.rs index 00648d3f0..cb3dc2bde 100644 --- a/src/postings/compression/mod.rs +++ b/src/postings/compression/mod.rs @@ -18,6 +18,12 @@ pub struct BlockEncoder { pub output_len: usize, } +impl Default for BlockEncoder { + fn default() -> Self { + BlockEncoder::new() + } +} + impl BlockEncoder { pub fn new() -> BlockEncoder { BlockEncoder { @@ -55,11 +61,13 @@ pub struct BlockDecoder { pub output_len: usize, } -impl BlockDecoder { - pub fn new() -> BlockDecoder { +impl Default for BlockDecoder { + fn default() -> Self { BlockDecoder::with_val(0u32) } +} +impl BlockDecoder { pub fn with_val(val: u32) -> BlockDecoder { BlockDecoder { bitpacker: BitPacker4x::new(), @@ -175,7 +183,7 @@ impl VIntDecoder for BlockDecoder { vint::uncompress_sorted(compressed_data, &mut self.output.0[..num_els], offset) } - fn uncompress_vint_unsorted<'a>(&mut self, compressed_data: &'a [u8], num_els: usize) -> usize { + fn uncompress_vint_unsorted(&mut self, compressed_data: &[u8], num_els: usize) -> usize { self.output_len = num_els; vint::uncompress_unsorted(compressed_data, &mut self.output.0[..num_els]) } @@ -191,7 +199,7 @@ pub mod tests { let vals: Vec = (0u32..128u32).map(|i| i * 7).collect(); let mut encoder = BlockEncoder::new(); let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 0); - let mut decoder = BlockDecoder::new(); + let mut decoder = BlockDecoder::default(); { let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 0, num_bits); assert_eq!(consumed_num_bytes, compressed_data.len()); @@ -204,9 +212,9 @@ pub mod tests { #[test] fn test_encode_sorted_block_with_offset() { let vals: Vec = (0u32..128u32).map(|i| 11 + i * 7).collect(); - let mut encoder = BlockEncoder::new(); + let mut encoder = BlockEncoder::default(); let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 10); - let mut decoder = BlockDecoder::new(); + let mut decoder = BlockDecoder::default(); { let consumed_num_bytes = decoder.uncompress_block_sorted(compressed_data, 10, num_bits); assert_eq!(consumed_num_bytes, compressed_data.len()); @@ -221,11 +229,11 @@ pub mod tests { let mut compressed: Vec = Vec::new(); let n = 128; let vals: Vec = (0..n).map(|i| 11u32 + (i as u32) * 7u32).collect(); - let mut encoder = BlockEncoder::new(); + let mut encoder = BlockEncoder::default(); let (num_bits, compressed_data) = encoder.compress_block_sorted(&vals, 10); compressed.extend_from_slice(compressed_data); compressed.push(173u8); - let mut decoder = BlockDecoder::new(); + let mut decoder = BlockDecoder::default(); { let consumed_num_bytes = decoder.uncompress_block_sorted(&compressed, 10, num_bits); assert_eq!(consumed_num_bytes, compressed.len() - 1); @@ -241,11 +249,11 @@ pub mod tests { let mut compressed: Vec = Vec::new(); let n = 128; let vals: Vec = (0..n).map(|i| 11u32 + (i as u32) * 7u32 % 12).collect(); - let mut encoder = BlockEncoder::new(); + let mut encoder = BlockEncoder::default(); let (num_bits, compressed_data) = encoder.compress_block_unsorted(&vals); compressed.extend_from_slice(compressed_data); compressed.push(173u8); - let mut decoder = BlockDecoder::new(); + let mut decoder = BlockDecoder::default(); { let consumed_num_bytes = decoder.uncompress_block_unsorted(&compressed, num_bits); assert_eq!(consumed_num_bytes + 1, compressed.len()); @@ -256,6 +264,11 @@ pub mod tests { } } + #[test] + fn test_block_decoder_initialization() { + let block = BlockDecoder::with_val(TERMINATED); + assert_eq!(block.output(0), TERMINATED); + } #[test] fn test_encode_vint() { { @@ -265,7 +278,7 @@ pub mod tests { for offset in &[0u32, 1u32, 2u32] { let encoded_data = encoder.compress_vint_sorted(&input, *offset); assert!(encoded_data.len() <= expected_length); - let mut decoder = BlockDecoder::new(); + let mut decoder = BlockDecoder::default(); let consumed_num_bytes = decoder.uncompress_vint_sorted(&encoded_data, *offset, input.len()); assert_eq!(consumed_num_bytes, encoded_data.len()); diff --git a/src/postings/mod.rs b/src/postings/mod.rs index a01cadfc7..20ca73d44 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -3,11 +3,8 @@ Postings module (also called inverted index) */ mod block_search; +mod block_segment_postings; pub(crate) mod compression; -/// Postings module -/// -/// Postings, also called inverted lists, is the key datastructure -/// to full-text search. mod postings; mod postings_writer; mod recorder; @@ -22,18 +19,17 @@ pub(crate) use self::block_search::BlockSearcher; pub(crate) use self::postings_writer::MultiFieldPostingsWriter; pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; -use self::compression::COMPRESSION_BLOCK_SIZE; pub use self::postings::Postings; -pub(crate) use self::skip::SkipReader; +pub(crate) use self::skip::{BlockInfo, SkipReader}; pub use self::term_info::TermInfo; -pub use self::segment_postings::{BlockSegmentPostings, SegmentPostings}; +pub use self::block_segment_postings::BlockSegmentPostings; +pub use self::segment_postings::SegmentPostings; pub(crate) use self::stacker::compute_table_size; pub use crate::common::HasLen; -pub(crate) const USE_SKIP_INFO_LIMIT: u32 = COMPRESSION_BLOCK_SIZE as u32; pub(crate) type UnorderedTermId = u64; #[cfg_attr(feature = "cargo-clippy", allow(clippy::enum_variant_names))] diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index b1c1ed75c..e04b8f607 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -1,54 +1,19 @@ use crate::common::HasLen; -use crate::common::{BinarySerializable, VInt}; + use crate::docset::{DocSet, TERMINATED}; use crate::positions::PositionReader; -use crate::postings::compression::{compressed_block_size, AlignedBuffer}; -use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE}; + +use crate::postings::compression::COMPRESSION_BLOCK_SIZE; use crate::postings::serializer::PostingsSerializer; use crate::postings::BlockSearcher; -use crate::postings::FreqReadingOption; + use crate::postings::Postings; -use crate::postings::SkipReader; -use crate::postings::USE_SKIP_INFO_LIMIT; + use crate::schema::IndexRecordOption; use crate::DocId; -use owned_read::OwnedRead; -use tantivy_fst::Streamer; -struct PositionComputer { - // store the amount of position int - // before reading positions. - // - // if none, position are already loaded in - // the positions vec. - position_to_skip: usize, - position_reader: PositionReader, -} - -impl PositionComputer { - pub fn new(position_reader: PositionReader) -> PositionComputer { - PositionComputer { - position_to_skip: 0, - position_reader, - } - } - - pub fn add_skip(&mut self, num_skip: usize) { - self.position_to_skip += num_skip; - } - - // Positions can only be read once. - pub fn positions_with_offset(&mut self, offset: u32, output: &mut [u32]) { - self.position_reader.skip(self.position_to_skip); - self.position_to_skip = 0; - self.position_reader.read(output); - let mut cum = offset; - for output_mut in output.iter_mut() { - cum += *output_mut; - *output_mut = cum; - } - } -} +use crate::directory::ReadOnlySource; +use crate::postings::BlockSegmentPostings; /// `SegmentPostings` represents the inverted list or postings associated to /// a term in a `Segment`. @@ -58,22 +23,19 @@ impl PositionComputer { pub struct SegmentPostings { block_cursor: BlockSegmentPostings, cur: usize, - position_computer: Option, + position_reader: Option, block_searcher: BlockSearcher, } impl SegmentPostings { /// Returns an empty segment postings object pub fn empty() -> Self { - let empty_block_cursor = BlockSegmentPostings::empty(); - let mut segment_postings = SegmentPostings { - block_cursor: empty_block_cursor, - cur: COMPRESSION_BLOCK_SIZE, - position_computer: None, + SegmentPostings { + block_cursor: BlockSegmentPostings::empty(), + cur: 0, + position_reader: None, block_searcher: BlockSearcher::default(), - }; - segment_postings.advance(); - segment_postings + } } /// Creates a segment postings object with the given documents @@ -97,15 +59,13 @@ impl SegmentPostings { } let block_segment_postings = BlockSegmentPostings::from_data( docs.len() as u32, - OwnedRead::new(buffer), + ReadOnlySource::from(buffer), IndexRecordOption::Basic, IndexRecordOption::Basic, ); SegmentPostings::from_block_postings(block_segment_postings, None) } -} -impl SegmentPostings { /// Reads a Segment postings from an &[u8] /// /// * `len` - number of document in the posting lists. @@ -114,16 +74,14 @@ impl SegmentPostings { /// frequencies and/or positions pub(crate) fn from_block_postings( segment_block_postings: BlockSegmentPostings, - positions_stream_opt: Option, + position_reader: Option, ) -> SegmentPostings { - let mut postings = SegmentPostings { + SegmentPostings { block_cursor: segment_block_postings, - cur: COMPRESSION_BLOCK_SIZE, // cursor within the block - position_computer: positions_stream_opt.map(PositionComputer::new), + cur: 0, // cursor within the block + position_reader, block_searcher: BlockSearcher::default(), - }; - postings.advance(); - postings + } } } @@ -132,12 +90,6 @@ impl DocSet for SegmentPostings { // next needs to be called a first time to point to the correct element. #[inline] fn advance(&mut self) -> DocId { - if self.position_computer.is_some() && self.cur < COMPRESSION_BLOCK_SIZE { - let term_freq = self.term_freq() as usize; - if let Some(position_computer) = self.position_computer.as_mut() { - position_computer.add_skip(term_freq); - } - } self.cur += 1; if self.cur >= self.block_cursor.block_len() { if self.block_cursor.advance() { @@ -151,17 +103,13 @@ impl DocSet for SegmentPostings { } fn seek(&mut self, target: DocId) -> DocId { - if self.doc() >= target { - return self.doc(); + let doc = self.doc(); + if doc >= target { + return doc; } - // In the following, thanks to the call to advance above, - // we know that the position is not loaded and we need - // to skip every doc_freq we cross. - // skip blocks until one that might contain the target // check if we need to go to the next block - let mut sum_freqs_skipped: u32 = 0; if self .block_cursor .docs() @@ -170,23 +118,7 @@ impl DocSet for SegmentPostings { .unwrap_or(true) { // We are not in the right block. - if self.position_computer.is_some() { - // First compute all of the freqs skipped from the current block. - sum_freqs_skipped = self.block_cursor.freqs()[self.cur..].iter().sum::(); - match self.block_cursor.skip_to(target) { - BlockSegmentPostingsSkipResult::Success(block_skip_freqs) => { - sum_freqs_skipped += block_skip_freqs; - } - BlockSegmentPostingsSkipResult::Terminated => { - self.block_cursor.doc_decoder.clear(); - self.cur = 0; - return TERMINATED; - } - } - } else if self.block_cursor.skip_to(target) - == BlockSegmentPostingsSkipResult::Terminated - { - // no positions needed. no need to sum freqs. + if !self.block_cursor.seek(target) { self.block_cursor.doc_decoder.clear(); self.cur = 0; return TERMINATED; @@ -200,10 +132,6 @@ impl DocSet for SegmentPostings { let output = self.block_cursor.docs_aligned(); let new_cur = self.block_searcher.search_in_block(&output, cur, target); - if let Some(position_computer) = self.position_computer.as_mut() { - sum_freqs_skipped += self.block_cursor.freqs()[cur..new_cur].iter().sum::(); - position_computer.add_skip(sum_freqs_skipped as usize); - } self.cur = new_cur; // `doc` is now the first element >= `target` @@ -225,23 +153,6 @@ impl DocSet for SegmentPostings { fn size_hint(&self) -> u32 { self.len() as u32 } - - /* - fn append_to_bitset(&mut self, bitset: &mut BitSet) { - // finish the current block - if self.advance() { - for &doc in &self.block_cursor.docs()[self.cur..] { - bitset.insert(doc); - } - // ... iterate through the remaining blocks. - while self.block_cursor.advance() { - for &doc in self.block_cursor.docs() { - bitset.insert(doc); - } - } - } - } - */ } impl HasLen for SegmentPostings { @@ -275,338 +186,33 @@ impl Postings for SegmentPostings { fn positions_with_offset(&mut self, offset: u32, output: &mut Vec) { let term_freq = self.term_freq() as usize; - if let Some(position_comp) = self.position_computer.as_mut() { + if let Some(position_reader) = self.position_reader.as_mut() { + let read_offset = self.block_cursor.position_offset() + + (self.block_cursor.freqs()[..self.cur] + .iter() + .cloned() + .sum::() as u64); output.resize(term_freq, 0u32); - position_comp.positions_with_offset(offset, &mut output[..]); + position_reader.read(read_offset, &mut output[..]); + let mut cum = offset; + for output_mut in output.iter_mut() { + cum += *output_mut; + *output_mut = cum; + } } else { output.clear(); } } } -/// `BlockSegmentPostings` is a cursor iterating over blocks -/// of documents. -/// -/// # Warning -/// -/// While it is useful for some very specific high-performance -/// use cases, you should prefer using `SegmentPostings` for most usage. -pub struct BlockSegmentPostings { - doc_decoder: BlockDecoder, - freq_decoder: BlockDecoder, - freq_reading_option: FreqReadingOption, - - doc_freq: usize, - doc_offset: DocId, - - num_vint_docs: usize, - - remaining_data: OwnedRead, - skip_reader: SkipReader, -} - -fn split_into_skips_and_postings( - doc_freq: u32, - mut data: OwnedRead, -) -> (Option, OwnedRead) { - if doc_freq < USE_SKIP_INFO_LIMIT { - return (None, data); - } - let skip_len = VInt::deserialize(&mut data).expect("Data corrupted").0 as usize; - let mut postings_data = data.clone(); - postings_data.advance(skip_len); - data.clip(skip_len); - (Some(data), postings_data) -} - -#[derive(Debug, Eq, PartialEq)] -pub enum BlockSegmentPostingsSkipResult { - Terminated, - Success(u32), //< number of term freqs to skip -} - -impl BlockSegmentPostings { - pub(crate) fn from_data( - doc_freq: u32, - data: OwnedRead, - record_option: IndexRecordOption, - requested_option: IndexRecordOption, - ) -> BlockSegmentPostings { - let freq_reading_option = match (record_option, requested_option) { - (IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq, - (_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq, - (_, _) => FreqReadingOption::ReadFreq, - }; - - let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, data); - let skip_reader = match skip_data_opt { - Some(skip_data) => SkipReader::new(skip_data, record_option), - None => SkipReader::new(OwnedRead::new(&[][..]), record_option), - }; - let doc_freq = doc_freq as usize; - let num_vint_docs = doc_freq % COMPRESSION_BLOCK_SIZE; - BlockSegmentPostings { - num_vint_docs, - doc_decoder: BlockDecoder::new(), - freq_decoder: BlockDecoder::with_val(1), - freq_reading_option, - doc_offset: 0, - doc_freq, - remaining_data: postings_data, - skip_reader, - } - } - - // Resets the block segment postings on another position - // in the postings file. - // - // This is useful for enumerating through a list of terms, - // and consuming the associated posting lists while avoiding - // reallocating a `BlockSegmentPostings`. - // - // # Warning - // - // This does not reset the positions list. - pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedRead) { - let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data); - let num_vint_docs = (doc_freq as usize) & (COMPRESSION_BLOCK_SIZE - 1); - self.num_vint_docs = num_vint_docs; - self.remaining_data = postings_data; - if let Some(skip_data) = skip_data_opt { - self.skip_reader.reset(skip_data); - } else { - self.skip_reader.reset(OwnedRead::new(&[][..])) - } - self.doc_offset = 0; - self.doc_freq = doc_freq as usize; - } - - /// Returns the document frequency associated to this block postings. - /// - /// This `doc_freq` is simply the sum of the length of all of the blocks - /// length, and it does not take in account deleted documents. - pub fn doc_freq(&self) -> usize { - self.doc_freq - } - - /// Returns the array of docs in the current block. - /// - /// Before the first call to `.advance()`, the block - /// returned by `.docs()` is empty. - #[inline] - pub fn docs(&self) -> &[DocId] { - self.doc_decoder.output_array() - } - - pub(crate) fn docs_aligned(&self) -> &AlignedBuffer { - self.doc_decoder.output_aligned() - } - - /// Return the document at index `idx` of the block. - #[inline] - pub fn doc(&self, idx: usize) -> u32 { - self.doc_decoder.output(idx) - } - - /// Return the array of `term freq` in the block. - #[inline] - pub fn freqs(&self) -> &[u32] { - self.freq_decoder.output_array() - } - - /// Return the frequency at index `idx` of the block. - #[inline] - pub fn freq(&self, idx: usize) -> u32 { - self.freq_decoder.output(idx) - } - - /// Returns the length of the current block. - /// - /// All blocks have a length of `NUM_DOCS_PER_BLOCK`, - /// except the last block that may have a length - /// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1` - #[inline] - fn block_len(&self) -> usize { - self.doc_decoder.output_len - } - - /// position on a block that may contains `doc_id`. - /// Always advance the current block. - /// - /// Returns true if a block that has an element greater or equal to the target is found. - /// Returning true does not guarantee that the smallest element of the block is smaller - /// than the target. It only guarantees that the last element is greater or equal. - /// - /// Returns false iff all of the document remaining are smaller than - /// `doc_id`. In that case, all of these document are consumed. - /// - pub fn skip_to(&mut self, target_doc: DocId) -> BlockSegmentPostingsSkipResult { - let mut skip_freqs = 0u32; - while self.skip_reader.advance() { - if self.skip_reader.doc() >= target_doc { - // the last document of the current block is larger - // than the target. - // - // We found our block! - let num_bits = self.skip_reader.doc_num_bits(); - let num_consumed_bytes = self.doc_decoder.uncompress_block_sorted( - self.remaining_data.as_ref(), - self.doc_offset, - num_bits, - ); - self.remaining_data.advance(num_consumed_bytes); - let tf_num_bits = self.skip_reader.tf_num_bits(); - match self.freq_reading_option { - FreqReadingOption::NoFreq => {} - FreqReadingOption::SkipFreq => { - let num_bytes_to_skip = compressed_block_size(tf_num_bits); - self.remaining_data.advance(num_bytes_to_skip); - } - FreqReadingOption::ReadFreq => { - let num_consumed_bytes = self - .freq_decoder - .uncompress_block_unsorted(self.remaining_data.as_ref(), tf_num_bits); - self.remaining_data.advance(num_consumed_bytes); - } - } - self.doc_offset = self.skip_reader.doc(); - return BlockSegmentPostingsSkipResult::Success(skip_freqs); - } else { - skip_freqs += self.skip_reader.tf_sum(); - let advance_len = self.skip_reader.total_block_len(); - self.doc_offset = self.skip_reader.doc(); - self.remaining_data.advance(advance_len); - } - } - - self.doc_decoder.clear(); - - if self.num_vint_docs == 0 { - return BlockSegmentPostingsSkipResult::Terminated; - } - // we are now on the last, incomplete, variable encoded block. - let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted( - self.remaining_data.as_ref(), - self.doc_offset, - self.num_vint_docs, - ); - self.remaining_data.advance(num_compressed_bytes); - match self.freq_reading_option { - FreqReadingOption::NoFreq | FreqReadingOption::SkipFreq => {} - FreqReadingOption::ReadFreq => { - self.freq_decoder - .uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs); - } - } - self.num_vint_docs = 0; - self.docs() - .last() - .map(|last_doc| { - if *last_doc >= target_doc { - BlockSegmentPostingsSkipResult::Success(skip_freqs) - } else { - BlockSegmentPostingsSkipResult::Terminated - } - }) - .unwrap_or(BlockSegmentPostingsSkipResult::Terminated) - } - - /// Advance to the next block. - /// - /// Returns false iff there was no remaining blocks. - pub fn advance(&mut self) -> bool { - if self.skip_reader.advance() { - let num_bits = self.skip_reader.doc_num_bits(); - let num_consumed_bytes = self.doc_decoder.uncompress_block_sorted( - self.remaining_data.as_ref(), - self.doc_offset, - num_bits, - ); - self.remaining_data.advance(num_consumed_bytes); - let tf_num_bits = self.skip_reader.tf_num_bits(); - match self.freq_reading_option { - FreqReadingOption::NoFreq => {} - FreqReadingOption::SkipFreq => { - let num_bytes_to_skip = compressed_block_size(tf_num_bits); - self.remaining_data.advance(num_bytes_to_skip); - } - FreqReadingOption::ReadFreq => { - let num_consumed_bytes = self - .freq_decoder - .uncompress_block_unsorted(self.remaining_data.as_ref(), tf_num_bits); - self.remaining_data.advance(num_consumed_bytes); - } - } - // it will be used as the next offset. - self.doc_offset = self.doc_decoder.output(COMPRESSION_BLOCK_SIZE - 1); - return true; - } - self.doc_decoder.clear(); - if self.num_vint_docs == 0 { - return false; - } - let num_compressed_bytes = self.doc_decoder.uncompress_vint_sorted( - self.remaining_data.as_ref(), - self.doc_offset, - self.num_vint_docs, - ); - self.remaining_data.advance(num_compressed_bytes); - match self.freq_reading_option { - FreqReadingOption::NoFreq | FreqReadingOption::SkipFreq => {} - FreqReadingOption::ReadFreq => { - self.freq_decoder - .uncompress_vint_unsorted(self.remaining_data.as_ref(), self.num_vint_docs); - } - } - self.num_vint_docs = 0; - true - } - - /// Returns an empty segment postings object - pub fn empty() -> BlockSegmentPostings { - BlockSegmentPostings { - num_vint_docs: 0, - - doc_decoder: BlockDecoder::new(), - freq_decoder: BlockDecoder::with_val(1), - freq_reading_option: FreqReadingOption::NoFreq, - - doc_offset: 0, - doc_freq: 0, - - remaining_data: OwnedRead::new(vec![]), - skip_reader: SkipReader::new(OwnedRead::new(vec![]), IndexRecordOption::Basic), - } - } -} - -impl<'b> Streamer<'b> for BlockSegmentPostings { - type Item = &'b [DocId]; - - fn next(&'b mut self) -> Option<&'b [DocId]> { - if self.advance() { - Some(self.docs()) - } else { - None - } - } -} - #[cfg(test)] mod tests { - use super::BlockSegmentPostings; - use super::BlockSegmentPostingsSkipResult; + use super::SegmentPostings; use crate::common::HasLen; - use crate::core::Index; + use crate::docset::{DocSet, TERMINATED}; use crate::postings::postings::Postings; - use crate::schema::IndexRecordOption; - use crate::schema::Schema; - use crate::schema::Term; - use crate::schema::INDEXED; - use crate::DocId; - use tantivy_fst::Streamer; #[test] fn test_empty_segment_postings() { @@ -628,165 +234,4 @@ mod tests { let postings = SegmentPostings::empty(); assert_eq!(postings.term_freq(), 1); } - - #[test] - fn test_empty_block_segment_postings() { - let mut postings = BlockSegmentPostings::empty(); - assert!(!postings.advance()); - assert_eq!(postings.doc_freq(), 0); - } - - #[test] - fn test_block_segment_postings() { - let mut block_segments = build_block_postings(&(0..100_000).collect::>()); - let mut offset: u32 = 0u32; - // checking that the block before calling advance is empty - assert!(block_segments.docs().is_empty()); - // checking that the `doc_freq` is correct - assert_eq!(block_segments.doc_freq(), 100_000); - while let Some(block) = block_segments.next() { - for (i, doc) in block.iter().cloned().enumerate() { - assert_eq!(offset + (i as u32), doc); - } - offset += block.len() as u32; - } - } - - #[test] - fn test_skip_right_at_new_block() { - let mut doc_ids = (0..128).collect::>(); - doc_ids.push(129); - doc_ids.push(130); - { - let block_segments = build_block_postings(&doc_ids); - let mut docset = SegmentPostings::from_block_postings(block_segments, None); - assert_eq!(docset.seek(128), 129); - assert_eq!(docset.doc(), 129); - assert_eq!(docset.advance(), 130); - assert_eq!(docset.doc(), 130); - assert_eq!(docset.advance(), TERMINATED); - } - { - let block_segments = build_block_postings(&doc_ids); - let mut docset = SegmentPostings::from_block_postings(block_segments, None); - assert_eq!(docset.seek(129), 129); - assert_eq!(docset.doc(), 129); - assert_eq!(docset.advance(), 130); - assert_eq!(docset.doc(), 130); - assert_eq!(docset.advance(), TERMINATED); - } - { - let block_segments = build_block_postings(&doc_ids); - let mut docset = SegmentPostings::from_block_postings(block_segments, None); - assert_eq!(docset.doc(), 0); - assert_eq!(docset.seek(131), TERMINATED); - assert_eq!(docset.doc(), TERMINATED); - } - } - - fn build_block_postings(docs: &[DocId]) -> BlockSegmentPostings { - let mut schema_builder = Schema::builder(); - let int_field = schema_builder.add_u64_field("id", INDEXED); - let schema = schema_builder.build(); - let index = Index::create_in_ram(schema); - let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - let mut last_doc = 0u32; - for &doc in docs { - for _ in last_doc..doc { - index_writer.add_document(doc!(int_field=>1u64)); - } - index_writer.add_document(doc!(int_field=>0u64)); - last_doc = doc + 1; - } - index_writer.commit().unwrap(); - let searcher = index.reader().unwrap().searcher(); - let segment_reader = searcher.segment_reader(0); - let inverted_index = segment_reader.inverted_index(int_field); - let term = Term::from_field_u64(int_field, 0u64); - let term_info = inverted_index.get_term_info(&term).unwrap(); - inverted_index.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic) - } - - #[test] - fn test_block_segment_postings_skip() { - for i in 0..4 { - let mut block_postings = build_block_postings(&[3]); - assert_eq!( - block_postings.skip_to(i), - BlockSegmentPostingsSkipResult::Success(0u32) - ); - assert_eq!( - block_postings.skip_to(i), - BlockSegmentPostingsSkipResult::Terminated - ); - } - let mut block_postings = build_block_postings(&[3]); - assert_eq!( - block_postings.skip_to(4u32), - BlockSegmentPostingsSkipResult::Terminated - ); - } - - #[test] - fn test_block_segment_postings_skip2() { - let mut docs = vec![0]; - for i in 0..1300 { - docs.push((i * i / 100) + i); - } - let mut block_postings = build_block_postings(&docs[..]); - for i in vec![0, 424, 10000] { - assert_eq!( - block_postings.skip_to(i), - BlockSegmentPostingsSkipResult::Success(0u32) - ); - let docs = block_postings.docs(); - assert!(docs[0] <= i); - assert!(docs.last().cloned().unwrap_or(0u32) >= i); - } - assert_eq!( - block_postings.skip_to(100_000), - BlockSegmentPostingsSkipResult::Terminated - ); - assert_eq!( - block_postings.skip_to(101_000), - BlockSegmentPostingsSkipResult::Terminated - ); - } - - #[test] - fn test_reset_block_segment_postings() { - let mut schema_builder = Schema::builder(); - let int_field = schema_builder.add_u64_field("id", INDEXED); - let schema = schema_builder.build(); - let index = Index::create_in_ram(schema); - let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - // create two postings list, one containg even number, - // the other containing odd numbers. - for i in 0..6 { - let doc = doc!(int_field=> (i % 2) as u64); - index_writer.add_document(doc); - } - index_writer.commit().unwrap(); - let searcher = index.reader().unwrap().searcher(); - let segment_reader = searcher.segment_reader(0); - - let mut block_segments; - { - let term = Term::from_field_u64(int_field, 0u64); - let inverted_index = segment_reader.inverted_index(int_field); - let term_info = inverted_index.get_term_info(&term).unwrap(); - block_segments = inverted_index - .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic); - } - assert!(block_segments.advance()); - assert_eq!(block_segments.docs(), &[0, 2, 4]); - { - let term = Term::from_field_u64(int_field, 1u64); - let inverted_index = segment_reader.inverted_index(int_field); - let term_info = inverted_index.get_term_info(&term).unwrap(); - inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments); - } - assert!(block_segments.advance()); - assert_eq!(block_segments.docs(), &[1, 3, 5]); - } } diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index ac619b52b..c1245eb1a 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -6,7 +6,6 @@ use crate::directory::WritePtr; use crate::positions::PositionSerializer; use crate::postings::compression::{BlockEncoder, VIntEncoder, COMPRESSION_BLOCK_SIZE}; use crate::postings::skip::SkipSerializer; -use crate::postings::USE_SKIP_INFO_LIMIT; use crate::schema::Schema; use crate::schema::{Field, FieldEntry, FieldType}; use crate::termdict::{TermDictionaryBuilder, TermOrdinal}; @@ -391,7 +390,7 @@ impl PostingsSerializer { } self.block.clear(); } - if doc_freq >= USE_SKIP_INFO_LIMIT { + if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { let skip_data = self.skip_write.data(); VInt(skip_data.len() as u64).serialize(&mut self.output_write)?; self.output_write.write_all(skip_data)?; diff --git a/src/postings/skip.rs b/src/postings/skip.rs index 165664847..ad133a3f1 100644 --- a/src/postings/skip.rs +++ b/src/postings/skip.rs @@ -1,7 +1,8 @@ use crate::common::BinarySerializable; -use crate::postings::compression::COMPRESSION_BLOCK_SIZE; +use crate::directory::ReadOnlySource; +use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE}; use crate::schema::IndexRecordOption; -use crate::DocId; +use crate::{DocId, TERMINATED}; use owned_read::OwnedRead; pub struct SkipSerializer { @@ -50,80 +51,143 @@ impl SkipSerializer { } pub(crate) struct SkipReader { - doc: DocId, + last_doc_in_block: DocId, + pub(crate) last_doc_in_previous_block: DocId, owned_read: OwnedRead, - doc_num_bits: u8, - tf_num_bits: u8, - tf_sum: u32, skip_info: IndexRecordOption, + byte_offset: usize, + remaining_docs: u32, // number of docs remaining, including the + // documents in the current block. + block_info: BlockInfo, + + position_offset: u64, +} + +#[derive(Clone, Eq, PartialEq, Copy, Debug)] +pub(crate) enum BlockInfo { + BitPacked { + doc_num_bits: u8, + tf_num_bits: u8, + tf_sum: u32, + }, + VInt(u32), +} + +impl Default for BlockInfo { + fn default() -> Self { + BlockInfo::VInt(0) + } } impl SkipReader { - pub fn new(data: OwnedRead, skip_info: IndexRecordOption) -> SkipReader { + pub fn new(data: ReadOnlySource, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader { SkipReader { - doc: 0u32, - owned_read: data, + last_doc_in_block: 0u32, + last_doc_in_previous_block: 0u32, + owned_read: OwnedRead::new(data), skip_info, - doc_num_bits: 0u8, - tf_num_bits: 0u8, - tf_sum: 0u32, + block_info: BlockInfo::default(), + byte_offset: 0, + remaining_docs: doc_freq, + position_offset: 0u64, } } - pub fn reset(&mut self, data: OwnedRead) { - self.doc = 0u32; - self.owned_read = data; - self.doc_num_bits = 0u8; - self.tf_num_bits = 0u8; - self.tf_sum = 0u32; - } - - pub fn total_block_len(&self) -> usize { - (self.doc_num_bits + self.tf_num_bits) as usize * COMPRESSION_BLOCK_SIZE / 8 + pub fn reset(&mut self, data: ReadOnlySource, doc_freq: u32) { + self.last_doc_in_block = 0u32; + self.last_doc_in_previous_block = 0u32; + self.owned_read = OwnedRead::new(data); + self.block_info = BlockInfo::default(); + self.byte_offset = 0; + self.remaining_docs = doc_freq; } pub fn doc(&self) -> DocId { - self.doc + self.last_doc_in_block } - pub fn doc_num_bits(&self) -> u8 { - self.doc_num_bits + pub fn position_offset(&self) -> u64 { + self.position_offset } - /// Number of bits used to encode term frequencies + pub fn byte_offset(&self) -> usize { + self.byte_offset + } + + fn read_block_info(&mut self) { + let doc_delta = u32::deserialize(&mut self.owned_read).expect("Skip data corrupted"); + self.last_doc_in_block += doc_delta as DocId; + let doc_num_bits = self.owned_read.get(0); + match self.skip_info { + IndexRecordOption::Basic => { + self.owned_read.advance(1); + self.block_info = BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits: 0, + tf_sum: 0, + }; + } + IndexRecordOption::WithFreqs => { + let tf_num_bits = self.owned_read.get(1); + self.block_info = BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits, + tf_sum: 0, + }; + self.owned_read.advance(2); + } + IndexRecordOption::WithFreqsAndPositions => { + let tf_num_bits = self.owned_read.get(1); + self.owned_read.advance(2); + let tf_sum = u32::deserialize(&mut self.owned_read).expect("Failed reading tf_sum"); + self.block_info = BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits, + tf_sum, + }; + } + } + } + + pub fn block_info(&self) -> BlockInfo { + self.block_info + } + + /// Advance the skip reader to the block that may contain the target. /// - /// 0 if term frequencies are not enabled. - pub fn tf_num_bits(&self) -> u8 { - self.tf_num_bits - } - - pub fn tf_sum(&self) -> u32 { - self.tf_sum + /// If the target is larger than all documents, the skip_reader + /// then advance to the last Variable In block. + /// + /// Returns true if the last block is reached. + pub fn seek(&mut self, target: DocId) { + while self.doc() < target { + self.advance(); + } } pub fn advance(&mut self) -> bool { - if self.owned_read.as_ref().is_empty() { - false - } else { - let doc_delta = u32::deserialize(&mut self.owned_read).expect("Skip data corrupted"); - self.doc += doc_delta as DocId; - self.doc_num_bits = self.owned_read.get(0); - match self.skip_info { - IndexRecordOption::Basic => { - self.owned_read.advance(1); - } - IndexRecordOption::WithFreqs => { - self.tf_num_bits = self.owned_read.get(1); - self.owned_read.advance(2); - } - IndexRecordOption::WithFreqsAndPositions => { - self.tf_num_bits = self.owned_read.get(1); - self.owned_read.advance(2); - self.tf_sum = - u32::deserialize(&mut self.owned_read).expect("Failed reading tf_sum"); - } + match self.block_info { + BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits, + tf_sum, + } => { + self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32; + self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits); + self.position_offset += tf_sum as u64; } + BlockInfo::VInt(num_vint_docs) => { + self.remaining_docs -= num_vint_docs; + } + } + self.last_doc_in_previous_block = self.last_doc_in_block; + if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 { + self.read_block_info(); true + } else { + self.last_doc_in_block = TERMINATED; + self.block_info = BlockInfo::VInt(self.remaining_docs); + self.remaining_docs > 0 } } } @@ -131,9 +195,11 @@ impl SkipReader { #[cfg(test)] mod tests { + use super::BlockInfo; use super::IndexRecordOption; use super::{SkipReader, SkipSerializer}; - use owned_read::OwnedRead; + use crate::directory::ReadOnlySource; + use crate::postings::compression::COMPRESSION_BLOCK_SIZE; #[test] fn test_skip_with_freq() { @@ -145,15 +211,34 @@ mod tests { skip_serializer.write_term_freq(2u8); skip_serializer.data().to_owned() }; - let mut skip_reader = SkipReader::new(OwnedRead::new(buf), IndexRecordOption::WithFreqs); + let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32; + let mut skip_reader = SkipReader::new( + ReadOnlySource::new(buf), + doc_freq, + IndexRecordOption::WithFreqs, + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 1u32); - assert_eq!(skip_reader.doc_num_bits(), 2u8); - assert_eq!(skip_reader.tf_num_bits(), 3u8); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 2u8, + tf_num_bits: 3u8, + tf_sum: 0 + } + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 5u32); - assert_eq!(skip_reader.doc_num_bits(), 5u8); - assert_eq!(skip_reader.tf_num_bits(), 2u8); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 5u8, + tf_num_bits: 2u8, + tf_sum: 0 + } + ); + assert!(skip_reader.advance()); + assert_eq!(skip_reader.block_info(), BlockInfo::VInt(3u32)); assert!(!skip_reader.advance()); } @@ -165,13 +250,60 @@ mod tests { skip_serializer.write_doc(5u32, 5u8); skip_serializer.data().to_owned() }; - let mut skip_reader = SkipReader::new(OwnedRead::new(buf), IndexRecordOption::Basic); + let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32; + let mut skip_reader = SkipReader::new( + ReadOnlySource::from(buf), + doc_freq, + IndexRecordOption::Basic, + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 1u32); - assert_eq!(skip_reader.doc_num_bits(), 2u8); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 2u8, + tf_num_bits: 0, + tf_sum: 0u32 + } + ); assert!(skip_reader.advance()); assert_eq!(skip_reader.doc(), 5u32); - assert_eq!(skip_reader.doc_num_bits(), 5u8); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 5u8, + tf_num_bits: 0, + tf_sum: 0u32 + } + ); + assert!(skip_reader.advance()); + assert_eq!(skip_reader.block_info(), BlockInfo::VInt(3u32)); + assert!(!skip_reader.advance()); + } + + #[test] + fn test_skip_multiple_of_block_size() { + let buf = { + let mut skip_serializer = SkipSerializer::new(); + skip_serializer.write_doc(1u32, 2u8); + skip_serializer.data().to_owned() + }; + let doc_freq = COMPRESSION_BLOCK_SIZE as u32; + let mut skip_reader = SkipReader::new( + ReadOnlySource::from(buf), + doc_freq, + IndexRecordOption::Basic, + ); + assert!(skip_reader.advance()); + assert_eq!(skip_reader.doc(), 1u32); + assert_eq!( + skip_reader.block_info(), + BlockInfo::BitPacked { + doc_num_bits: 2u8, + tf_num_bits: 0, + tf_sum: 0u32 + } + ); assert!(!skip_reader.advance()); } } diff --git a/src/query/automaton_weight.rs b/src/query/automaton_weight.rs index bf5a0f8df..855ba8f64 100644 --- a/src/query/automaton_weight.rs +++ b/src/query/automaton_weight.rs @@ -51,10 +51,13 @@ where let term_info = term_stream.value(); let mut block_segment_postings = inverted_index .read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic); - while block_segment_postings.advance() { + loop { for &doc in block_segment_postings.docs() { doc_bitset.insert(doc); } + if !block_segment_postings.advance() { + break; + } } } let doc_bitset = BitSetDocSet::from(doc_bitset); diff --git a/src/query/range_query.rs b/src/query/range_query.rs index 26f957af0..4869fba9a 100644 --- a/src/query/range_query.rs +++ b/src/query/range_query.rs @@ -300,10 +300,13 @@ impl Weight for RangeWeight { let term_info = term_range.value(); let mut block_segment_postings = inverted_index .read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic); - while block_segment_postings.advance() { + loop { for &doc in block_segment_postings.docs() { doc_bitset.insert(doc); } + if !block_segment_postings.advance() { + break; + } } } let doc_bitset = BitSetDocSet::from(doc_bitset);