diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 7f228596c..4e303f1d2 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -15,6 +15,8 @@ pub trait Codec: Clone + std::fmt::Debug + Send + Sync + 'static { fn from_json_props(json_value: &serde_json::Value) -> crate::Result; fn to_json_props(&self) -> serde_json::Value; + + fn postings_codec(&self) -> &Self::PostingsCodec; } #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/src/codec/postings/mod.rs b/src/codec/postings/mod.rs index 5432617fc..ca09c9886 100644 --- a/src/codec/postings/mod.rs +++ b/src/codec/postings/mod.rs @@ -1,20 +1,39 @@ use std::io; +use common::OwnedBytes; + use crate::fieldnorm::FieldNormReader; +use crate::postings::FreqReadingOption; use crate::schema::IndexRecordOption; use crate::{DocId, Score}; pub trait PostingsCodec { type PostingsSerializer: PostingsSerializer; -} + type PostingsReader: PostingsReader; -pub trait PostingsSerializer { - fn new( + fn new_serializer( + &self, avg_fieldnorm: Score, mode: IndexRecordOption, fieldnorm_reader: Option, - ) -> Self; + ) -> Self::PostingsSerializer; + /// Opens a `BlockSegmentPostings`. + /// `doc_freq` is the number of documents in the posting list. + /// `record_option` represents the amount of data available according to the schema. + /// `requested_option` is the amount of data requested by the user. + /// If for instance, we do not request for term frequencies, this function will not decompress + /// term frequency blocks. + // TODO simplify prototype (record_option + requested_option) + fn open( + doc_freq: u32, + data: common::OwnedBytes, + record_option: IndexRecordOption, + requested_option: IndexRecordOption, + ) -> std::io::Result; +} + +pub trait PostingsSerializer { fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool); fn write_doc(&mut self, doc_id: DocId, term_freq: u32); @@ -23,3 +42,32 @@ pub trait PostingsSerializer { fn clear(&mut self); } + +// TODO docs +// TODO Add blockwand trait +pub trait PostingsReader: Sized { + fn freq_reading_option(&self) -> FreqReadingOption; + + fn reset(&mut self, doc_freq: u32, postings_data: OwnedBytes) -> io::Result<()>; + + fn doc_freq(&self) -> u32; + + fn docs(&self) -> &[DocId]; + + fn doc(&self, idx: usize) -> u32; + + fn freqs(&self) -> &[u32]; + + fn freq(&self, idx: usize) -> u32; + + fn block_len(&self) -> usize; + + fn seek(&mut self, target_doc: DocId) -> usize; + + fn position_offset(&self) -> u64; + + fn advance(&mut self); + + // TODO Move to the codec and use the serializer. + fn empty() -> Self; +} diff --git a/src/codec/standard/mod.rs b/src/codec/standard/mod.rs index 9fb5cb12e..ff9f81748 100644 --- a/src/codec/standard/mod.rs +++ b/src/codec/standard/mod.rs @@ -26,4 +26,8 @@ impl Codec for StandardCodec { fn to_json_props(&self) -> serde_json::Value { serde_json::Value::Null } + + fn postings_codec(&self) -> &Self::PostingsCodec { + &StandardPostingsCodec + } } diff --git a/src/codec/standard/postings/block_segment_postings.rs b/src/codec/standard/postings/block_segment_postings.rs new file mode 100644 index 000000000..d6a7b4633 --- /dev/null +++ b/src/codec/standard/postings/block_segment_postings.rs @@ -0,0 +1,564 @@ +use std::io; + +use common::{OwnedBytes, VInt}; + +use crate::codec::postings::PostingsReader; +use crate::codec::standard::postings::skip::{BlockInfo, SkipReader}; +use crate::fieldnorm::FieldNormReader; +use crate::postings::compression::{BlockDecoder, VIntDecoder as _, COMPRESSION_BLOCK_SIZE}; +use crate::postings::FreqReadingOption; +use crate::query::Bm25Weight; +use crate::schema::IndexRecordOption; +use crate::{DocId, Score, TERMINATED}; + +fn max_score>(mut it: I) -> Option { + it.next().map(|first| it.fold(first, Score::max)) +} + +/// `BlockSegmentPostings` is a cursor iterating over blocks +/// of documents. +/// +/// # Warning +/// +/// While it is useful for some very specific high-performance +/// use cases, you should prefer using `SegmentPostings` for most usage. +#[derive(Clone)] +pub struct StandardPostingsReader { + pub(crate) doc_decoder: BlockDecoder, + block_loaded: bool, + freq_decoder: BlockDecoder, + freq_reading_option: FreqReadingOption, + block_max_score_cache: Option, + doc_freq: u32, + data: OwnedBytes, + skip_reader: SkipReader, +} + +fn decode_bitpacked_block( + doc_decoder: &mut BlockDecoder, + freq_decoder_opt: Option<&mut BlockDecoder>, + data: &[u8], + doc_offset: DocId, + doc_num_bits: u8, + tf_num_bits: u8, + strict_delta: bool, +) { + let num_consumed_bytes = + doc_decoder.uncompress_block_sorted(data, doc_offset, doc_num_bits, strict_delta); + if let Some(freq_decoder) = freq_decoder_opt { + freq_decoder.uncompress_block_unsorted( + &data[num_consumed_bytes..], + tf_num_bits, + strict_delta, + ); + } +} + +fn decode_vint_block( + doc_decoder: &mut BlockDecoder, + freq_decoder_opt: Option<&mut BlockDecoder>, + data: &[u8], + doc_offset: DocId, + num_vint_docs: usize, +) { + let num_consumed_bytes = + doc_decoder.uncompress_vint_sorted(data, doc_offset, num_vint_docs, TERMINATED); + if let Some(freq_decoder) = freq_decoder_opt { + // if it's a json term with freq, containing less than 256 docs, we can reach here thinking + // we have a freq, despite not really having one. + if data.len() > num_consumed_bytes { + freq_decoder.uncompress_vint_unsorted( + &data[num_consumed_bytes..], + num_vint_docs, + TERMINATED, + ); + } + } +} + +fn split_into_skips_and_postings( + doc_freq: u32, + mut bytes: OwnedBytes, +) -> io::Result<(Option, OwnedBytes)> { + if doc_freq < COMPRESSION_BLOCK_SIZE as u32 { + return Ok((None, bytes)); + } + let skip_len = VInt::deserialize_u64(&mut bytes)? as usize; + let (skip_data, postings_data) = bytes.split(skip_len); + Ok((Some(skip_data), postings_data)) +} + +impl StandardPostingsReader { + /// Opens a `BlockSegmentPostings`. + /// `doc_freq` is the number of documents in the posting list. + /// `record_option` represents the amount of data available according to the schema. + /// `requested_option` is the amount of data requested by the user. + /// If for instance, we do not request for term frequencies, this function will not decompress + /// term frequency blocks. + pub(crate) fn open( + doc_freq: u32, + bytes: OwnedBytes, + mut record_option: IndexRecordOption, + requested_option: IndexRecordOption, + ) -> io::Result { + let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?; + let skip_reader = match skip_data_opt { + Some(skip_data) => { + let block_count = doc_freq as usize / COMPRESSION_BLOCK_SIZE; + // 8 is the minimum size of a block with frequency (can be more if pos are stored + // too) + if skip_data.len() < 8 * block_count { + // the field might be encoded with frequency, but this term in particular isn't. + // This can happen for JSON field with term frequencies: + // - text terms are encoded with term freqs. + // - numerical terms are encoded without term freqs. + record_option = IndexRecordOption::Basic; + } + SkipReader::new(skip_data, doc_freq, record_option) + } + None => SkipReader::new(OwnedBytes::empty(), doc_freq, record_option), + }; + + let freq_reading_option = match (record_option, requested_option) { + (IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq, + (_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq, + (_, _) => FreqReadingOption::ReadFreq, + }; + + let mut block_segment_postings = StandardPostingsReader { + doc_decoder: BlockDecoder::with_val(TERMINATED), + block_loaded: false, + freq_decoder: BlockDecoder::with_val(1), + freq_reading_option, + block_max_score_cache: None, + doc_freq, + data: postings_data, + skip_reader, + }; + block_segment_postings.load_block(); + Ok(block_segment_postings) + } +} + +impl PostingsReader for StandardPostingsReader { + fn freq_reading_option(&self) -> FreqReadingOption { + self.freq_reading_option + } + + // Resets the block segment postings on another position + // in the postings file. + // + // This is useful for enumerating through a list of terms, + // and consuming the associated posting lists while avoiding + // reallocating a `BlockSegmentPostings`. + // + // # Warning + // + // This does not reset the positions list. + fn reset(&mut self, doc_freq: u32, postings_data: OwnedBytes) -> io::Result<()> { + let (skip_data_opt, postings_data) = + split_into_skips_and_postings(doc_freq, postings_data)?; + self.data = postings_data; + self.block_max_score_cache = None; + self.block_loaded = false; + if let Some(skip_data) = skip_data_opt { + self.skip_reader.reset(skip_data, doc_freq); + } else { + self.skip_reader.reset(OwnedBytes::empty(), doc_freq); + } + self.doc_freq = doc_freq; + self.load_block(); + Ok(()) + } + + /// Returns the overall number of documents in the block postings. + /// It does not take in account whether documents are deleted or not. + /// + /// This `doc_freq` is simply the sum of the length of all of the blocks + /// length, and it does not take in account deleted documents. + fn doc_freq(&self) -> u32 { + self.doc_freq + } + + /// Returns the array of docs in the current block. + /// + /// Before the first call to `.advance()`, the block + /// returned by `.docs()` is empty. + #[inline] + fn docs(&self) -> &[DocId] { + debug_assert!(self.block_is_loaded()); + self.doc_decoder.output_array() + } + + /// Return the document at index `idx` of the block. + #[inline] + fn doc(&self, idx: usize) -> u32 { + self.doc_decoder.output(idx) + } + + /// Return the array of `term freq` in the block. + #[inline] + fn freqs(&self) -> &[u32] { + debug_assert!(self.block_is_loaded()); + self.freq_decoder.output_array() + } + + /// Return the frequency at index `idx` of the block. + #[inline] + fn freq(&self, idx: usize) -> u32 { + debug_assert!(self.block_is_loaded()); + self.freq_decoder.output(idx) + } + + /// Returns the length of the current block. + /// + /// All blocks have a length of `NUM_DOCS_PER_BLOCK`, + /// except the last block that may have a length + /// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1` + #[inline] + fn block_len(&self) -> usize { + debug_assert!(self.block_is_loaded()); + self.doc_decoder.output_len + } + + /// Position on a block that may contains `target_doc`. + /// + /// If all docs are smaller than target, the block loaded may be empty, + /// or be the last an incomplete VInt block. + fn seek(&mut self, target_doc: DocId) -> usize { + // Move to the block that might contain our document. + self.seek_block(target_doc); + self.load_block(); + + // At this point we are on the block that might contain our document. + let doc = self.doc_decoder.seek_within_block(target_doc); + + // The last block is not full and padded with TERMINATED, + // so we are guaranteed to have at least one value (real or padding) + // that is >= target_doc. + debug_assert!(doc < COMPRESSION_BLOCK_SIZE); + + // `doc` is now the first element >= `target_doc`. + // If all docs are smaller than target, the current block is incomplete and padded + // with TERMINATED. After the search, the cursor points to the first TERMINATED. + doc + } + + fn position_offset(&self) -> u64 { + self.skip_reader.position_offset() + } + + /// Advance to the next block. + fn advance(&mut self) { + self.skip_reader.advance(); + self.block_loaded = false; + self.block_max_score_cache = None; + self.load_block(); + } + + /// Returns an empty segment postings object + fn empty() -> StandardPostingsReader { + StandardPostingsReader { + doc_decoder: BlockDecoder::with_val(TERMINATED), + block_loaded: true, + freq_decoder: BlockDecoder::with_val(1), + freq_reading_option: FreqReadingOption::NoFreq, + block_max_score_cache: None, + doc_freq: 0, + data: OwnedBytes::empty(), + skip_reader: SkipReader::new(OwnedBytes::empty(), 0, IndexRecordOption::Basic), + } + } +} + +impl StandardPostingsReader { + pub(crate) fn skip_reader(&self) -> &SkipReader { + &self.skip_reader + } + + pub(crate) fn block_is_loaded(&self) -> bool { + self.block_loaded + } + + /// Dangerous API! This calls seeks the next block on the skip list, + /// but does not `.load_block()` afterwards. + /// + /// `.load_block()` needs to be called manually afterwards. + /// If all docs are smaller than target, the block loaded may be empty, + /// or be the last an incomplete VInt block. + pub(crate) fn seek_block(&mut self, target_doc: DocId) { + if self.skip_reader.seek(target_doc) { + self.block_max_score_cache = None; + self.block_loaded = false; + } + } + + pub(crate) fn load_block(&mut self) { + let offset = self.skip_reader.byte_offset(); + if self.block_is_loaded() { + return; + } + match self.skip_reader.block_info() { + BlockInfo::BitPacked { + doc_num_bits, + strict_delta_encoded, + tf_num_bits, + .. + } => { + decode_bitpacked_block( + &mut self.doc_decoder, + if let FreqReadingOption::ReadFreq = self.freq_reading_option { + Some(&mut self.freq_decoder) + } else { + None + }, + &self.data.as_slice()[offset..], + self.skip_reader.last_doc_in_previous_block, + doc_num_bits, + tf_num_bits, + strict_delta_encoded, + ); + } + BlockInfo::VInt { num_docs } => { + let data = { + if num_docs == 0 { + &[] + } else { + &self.data.as_slice()[offset..] + } + }; + decode_vint_block( + &mut self.doc_decoder, + if let FreqReadingOption::ReadFreq = self.freq_reading_option { + Some(&mut self.freq_decoder) + } else { + None + }, + data, + self.skip_reader.last_doc_in_previous_block, + num_docs as usize, + ); + } + } + self.block_loaded = true; + } + + /// Returns the block_max_score for the current block. + /// It does not require the block to be loaded. For instance, it is ok to call this method + /// after having called `.shallow_advance(..)`. + /// + /// See `TermScorer::block_max_score(..)` for more information. + pub fn block_max_score( + &mut self, + fieldnorm_reader: &FieldNormReader, + bm25_weight: &Bm25Weight, + ) -> Score { + if let Some(score) = self.block_max_score_cache { + return score; + } + if let Some(skip_reader_max_score) = self.skip_reader.block_max_score(bm25_weight) { + // if we are on a full block, the skip reader should have the block max information + // for us + self.block_max_score_cache = Some(skip_reader_max_score); + return skip_reader_max_score; + } + // this is the last block of the segment posting list. + // If it is actually loaded, we can compute block max manually. + if self.block_is_loaded() { + let docs = self.doc_decoder.output_array().iter().cloned(); + let freqs = self.freq_decoder.output_array().iter().cloned(); + let bm25_scores = docs.zip(freqs).map(|(doc, term_freq)| { + let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc); + bm25_weight.score(fieldnorm_id, term_freq) + }); + let block_max_score = max_score(bm25_scores).unwrap_or(0.0); + self.block_max_score_cache = Some(block_max_score); + return block_max_score; + } + // We do not have access to any good block max value. We return bm25_weight.max_score() + // as it is a valid upperbound. + // + // We do not cache it however, so that it gets computed when once block is loaded. + bm25_weight.max_score() + } +} + +#[cfg(test)] +mod tests { + use common::HasLen; + + use super::StandardPostingsReader; + use crate::codec::postings::PostingsReader as _; + use crate::docset::{DocSet, TERMINATED}; + use crate::index::Index; + use crate::postings::compression::COMPRESSION_BLOCK_SIZE; + use crate::postings::{Postings as _, SegmentPostings}; + use crate::schema::{IndexRecordOption, Schema, Term, INDEXED}; + use crate::DocId; + + #[test] + fn test_empty_segment_postings() { + let mut postings = SegmentPostings::empty(); + assert_eq!(postings.doc(), TERMINATED); + assert_eq!(postings.advance(), TERMINATED); + assert_eq!(postings.advance(), TERMINATED); + assert_eq!(postings.doc_freq(), 0); + assert_eq!(postings.len(), 0); + } + + #[test] + fn test_empty_postings_doc_returns_terminated() { + let mut postings = SegmentPostings::empty(); + assert_eq!(postings.doc(), TERMINATED); + assert_eq!(postings.advance(), TERMINATED); + } + + #[test] + fn test_empty_postings_doc_term_freq_returns_0() { + let postings = SegmentPostings::empty(); + assert_eq!(postings.term_freq(), 1); + } + + #[test] + fn test_empty_block_segment_postings() { + let mut postings = StandardPostingsReader::empty(); + assert!(postings.docs().is_empty()); + assert_eq!(postings.doc_freq(), 0); + postings.advance(); + assert!(postings.docs().is_empty()); + assert_eq!(postings.doc_freq(), 0); + } + + #[test] + fn test_block_segment_postings() -> crate::Result<()> { + let mut block_segments = build_block_postings(&(0..100_000).collect::>())?; + let mut offset: u32 = 0u32; + // checking that the `doc_freq` is correct + assert_eq!(block_segments.doc_freq(), 100_000); + loop { + let block = block_segments.docs(); + if block.is_empty() { + break; + } + for (i, doc) in block.iter().cloned().enumerate() { + assert_eq!(offset + (i as u32), doc); + } + offset += block.len() as u32; + block_segments.advance(); + } + Ok(()) + } + + #[test] + fn test_skip_right_at_new_block() -> crate::Result<()> { + let mut doc_ids = (0..128).collect::>(); + // 128 is missing + doc_ids.push(129); + doc_ids.push(130); + { + let block_segments = build_block_postings(&doc_ids)?; + let mut docset = SegmentPostings::from_block_postings(block_segments, None); + assert_eq!(docset.seek(128), 129); + assert_eq!(docset.doc(), 129); + assert_eq!(docset.advance(), 130); + assert_eq!(docset.doc(), 130); + assert_eq!(docset.advance(), TERMINATED); + } + { + let block_segments = build_block_postings(&doc_ids).unwrap(); + let mut docset = SegmentPostings::from_block_postings(block_segments, None); + assert_eq!(docset.seek(129), 129); + assert_eq!(docset.doc(), 129); + assert_eq!(docset.advance(), 130); + assert_eq!(docset.doc(), 130); + assert_eq!(docset.advance(), TERMINATED); + } + { + let block_segments = build_block_postings(&doc_ids)?; + let mut docset = SegmentPostings::from_block_postings(block_segments, None); + assert_eq!(docset.doc(), 0); + assert_eq!(docset.seek(131), TERMINATED); + assert_eq!(docset.doc(), TERMINATED); + } + Ok(()) + } + + fn build_block_postings(docs: &[DocId]) -> crate::Result { + let mut schema_builder = Schema::builder(); + let int_field = schema_builder.add_u64_field("id", INDEXED); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests()?; + let mut last_doc = 0u32; + for &doc in docs { + for _ in last_doc..doc { + index_writer.add_document(doc!(int_field=>1u64))?; + } + index_writer.add_document(doc!(int_field=>0u64))?; + last_doc = doc + 1; + } + index_writer.commit()?; + let searcher = index.reader()?.searcher(); + let segment_reader = searcher.segment_reader(0); + let inverted_index = segment_reader.inverted_index(int_field).unwrap(); + let term = Term::from_field_u64(int_field, 0u64); + let term_info = inverted_index.get_term_info(&term)?.unwrap(); + let block_postings = inverted_index + .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?; + Ok(block_postings) + } + + #[test] + fn test_block_segment_postings_seek() -> crate::Result<()> { + let mut docs = vec![0]; + for i in 0..1300 { + docs.push((i * i / 100) + i); + } + let mut block_postings = build_block_postings(&docs[..])?; + for i in &[0, 424, 10000] { + block_postings.seek(*i); + let docs = block_postings.docs(); + assert!(docs[0] <= *i); + assert!(docs.last().cloned().unwrap_or(0u32) >= *i); + } + block_postings.seek(100_000); + assert_eq!(block_postings.doc(COMPRESSION_BLOCK_SIZE - 1), TERMINATED); + Ok(()) + } + + #[test] + fn test_reset_block_segment_postings() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let int_field = schema_builder.add_u64_field("id", INDEXED); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests()?; + // create two postings list, one containing even number, + // the other containing odd numbers. + for i in 0..6 { + let doc = doc!(int_field=> (i % 2) as u64); + index_writer.add_document(doc)?; + } + index_writer.commit()?; + let searcher = index.reader()?.searcher(); + let segment_reader = searcher.segment_reader(0); + + let mut block_segments; + { + let term = Term::from_field_u64(int_field, 0u64); + let inverted_index = segment_reader.inverted_index(int_field)?; + let term_info = inverted_index.get_term_info(&term)?.unwrap(); + block_segments = inverted_index + .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?; + } + assert_eq!(block_segments.docs(), &[0, 2, 4]); + { + let term = Term::from_field_u64(int_field, 1u64); + let inverted_index = segment_reader.inverted_index(int_field)?; + let term_info = inverted_index.get_term_info(&term)?.unwrap(); + inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?; + } + assert_eq!(block_segments.docs(), &[1, 3, 5]); + Ok(()) + } +} diff --git a/src/codec/standard/postings/mod.rs b/src/codec/standard/postings/mod.rs index 23eefbe52..745e6b507 100644 --- a/src/codec/standard/postings/mod.rs +++ b/src/codec/standard/postings/mod.rs @@ -1,13 +1,37 @@ use crate::codec::postings::PostingsCodec; +use crate::fieldnorm::FieldNormReader; +use crate::schema::IndexRecordOption; +use crate::Score; mod block; +mod block_segment_postings; mod postings_serializer; mod skip; +pub use block_segment_postings::StandardPostingsReader; pub use postings_serializer::StandardPostingsSerializer; pub struct StandardPostingsCodec; impl PostingsCodec for StandardPostingsCodec { type PostingsSerializer = StandardPostingsSerializer; + type PostingsReader = StandardPostingsReader; + + fn new_serializer( + &self, + avg_fieldnorm: Score, + mode: IndexRecordOption, + fieldnorm_reader: Option, + ) -> Self::PostingsSerializer { + StandardPostingsSerializer::new(avg_fieldnorm, mode, fieldnorm_reader) + } + + fn open( + doc_freq: u32, + data: common::OwnedBytes, + record_option: IndexRecordOption, + requested_option: IndexRecordOption, + ) -> std::io::Result { + StandardPostingsReader::open(doc_freq, data, record_option, requested_option) + } } diff --git a/src/codec/standard/postings/postings_serializer.rs b/src/codec/standard/postings/postings_serializer.rs index b9cf4bc6c..24ddf0e33 100644 --- a/src/codec/standard/postings/postings_serializer.rs +++ b/src/codec/standard/postings/postings_serializer.rs @@ -30,29 +30,28 @@ pub struct StandardPostingsSerializer { term_has_freq: bool, } -impl PostingsSerializer for StandardPostingsSerializer { - fn new( +impl StandardPostingsSerializer { + pub fn new( avg_fieldnorm: Score, mode: IndexRecordOption, fieldnorm_reader: Option, ) -> StandardPostingsSerializer { Self { + last_doc_id_encoded: 0, block_encoder: BlockEncoder::new(), block: Box::new(Block::new()), - postings_write: Vec::new(), skip_write: SkipSerializer::new(), - - last_doc_id_encoded: 0u32, mode, - fieldnorm_reader, bm25_weight: None, avg_fieldnorm, term_has_freq: false, } } +} +impl PostingsSerializer for StandardPostingsSerializer { fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) { self.bm25_weight = None; diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 7314f8741..53e52b3f6 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -208,7 +208,8 @@ impl InvertedIndexReader { ) -> io::Result { let postings_data = self .postings_file_slice - .slice(term_info.postings_range.clone()); + .slice(term_info.postings_range.clone()) + .read_bytes()?; BlockSegmentPostings::open( term_info.doc_freq, postings_data, diff --git a/src/postings/block_segment_postings.rs b/src/postings/block_segment_postings.rs index cc8894294..26646cdbd 100644 --- a/src/postings/block_segment_postings.rs +++ b/src/postings/block_segment_postings.rs @@ -2,7 +2,7 @@ use std::io; use common::VInt; -use crate::directory::{FileSlice, OwnedBytes}; +use crate::directory::OwnedBytes; use crate::fieldnorm::FieldNormReader; use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE}; use crate::postings::{BlockInfo, FreqReadingOption, SkipReader}; @@ -96,11 +96,10 @@ impl BlockSegmentPostings { /// term frequency blocks. pub(crate) fn open( doc_freq: u32, - data: FileSlice, + bytes: OwnedBytes, mut record_option: IndexRecordOption, requested_option: IndexRecordOption, ) -> io::Result { - let bytes = data.read_bytes()?; let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?; let skip_reader = match skip_data_opt { Some(skip_data) => { diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index f2b175a47..7638561ff 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -64,7 +64,8 @@ impl SegmentPostings { /// buffer with the serialized data. #[cfg(test)] pub fn create_from_docs(docs: &[u32]) -> SegmentPostings { - use crate::directory::FileSlice; + use common::OwnedBytes; + use crate::schema::IndexRecordOption; let mut buffer = Vec::new(); { @@ -86,7 +87,7 @@ impl SegmentPostings { } let block_segment_postings = BlockSegmentPostings::open( docs.len() as u32, - FileSlice::from(buffer), + OwnedBytes::new(buffer), IndexRecordOption::Basic, IndexRecordOption::Basic, ) @@ -100,9 +101,10 @@ impl SegmentPostings { doc_and_tfs: &[(u32, u32)], fieldnorms: Option<&[u32]>, ) -> SegmentPostings { + use common::OwnedBytes; + use crate::codec::postings::PostingsSerializer as _; use crate::codec::standard::postings::StandardPostingsSerializer; - use crate::directory::FileSlice; use crate::fieldnorm::FieldNormReader; use crate::schema::IndexRecordOption; use crate::Score; @@ -134,7 +136,7 @@ impl SegmentPostings { .unwrap(); let block_segment_postings = BlockSegmentPostings::open( doc_and_tfs.len() as u32, - FileSlice::from(buffer), + OwnedBytes::new(buffer), IndexRecordOption::WithFreqs, IndexRecordOption::WithFreqs, ) diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index bfead7823..a7b06c150 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -3,7 +3,7 @@ use std::io::{self, Write}; use common::{BinarySerializable, CountingWriter}; use super::TermInfo; -use crate::codec::postings::{PostingsCodec, PostingsSerializer}; +use crate::codec::postings::PostingsSerializer; use crate::codec::Codec; use crate::directory::{CompositeWrite, WritePtr}; use crate::fieldnorm::FieldNormReader; @@ -52,6 +52,8 @@ pub struct InvertedIndexSerializer { codec: C, } +use crate::codec::postings::PostingsCodec; + impl InvertedIndexSerializer { /// Open a new `InvertedIndexSerializer` for the given segment pub fn open(segment: &mut Segment) -> crate::Result> { @@ -89,6 +91,7 @@ impl InvertedIndexSerializer { postings_write, positions_write, fieldnorm_reader, + &self.codec, ) } @@ -121,6 +124,7 @@ impl<'a, C: Codec> FieldSerializer<'a, C> { postings_write: &'a mut CountingWriter, positions_write: &'a mut CountingWriter, fieldnorm_reader: Option, + codec: &C, ) -> io::Result> { total_num_tokens.serialize(postings_write)?; let index_record_option = field_type @@ -131,12 +135,11 @@ impl<'a, C: Codec> FieldSerializer<'a, C> { .as_ref() .map(|ff_reader| total_num_tokens as Score / ff_reader.num_docs() as Score) .unwrap_or(0.0); - let postings_serializer = - <::PostingsSerializer as PostingsSerializer>::new( - average_fieldnorm, - index_record_option, - fieldnorm_reader, - ); + let postings_serializer = codec.postings_codec().new_serializer( + average_fieldnorm, + index_record_option, + fieldnorm_reader, + ); let positions_serializer_opt = if index_record_option.has_positions() { Some(PositionSerializer::new(positions_write)) } else {