From b5f3dcdc8b2c04a81e9b8cc2bdae3a8abb01c4ba Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 21 Sep 2020 00:15:09 +0900 Subject: [PATCH] TermInfo contain the end_offset of the postings. We slice the ReadOnlySource tightly. --- doc/src/index-format.md | 50 ++++++++++++++++++++++ src/core/inverted_index_reader.rs | 12 +++--- src/postings/mod.rs | 14 +++---- src/postings/serializer.rs | 9 ++-- src/postings/term_info.rs | 26 +++++++++--- src/termdict/mod.rs | 13 ++++-- src/termdict/term_info_store.rs | 69 +++++++++++++++++++++---------- 7 files changed, 145 insertions(+), 48 deletions(-) create mode 100644 doc/src/index-format.md diff --git a/doc/src/index-format.md b/doc/src/index-format.md new file mode 100644 index 000000000..e21025653 --- /dev/null +++ b/doc/src/index-format.md @@ -0,0 +1,50 @@ + +# Managed files ++----------+-----------+-------------------+ +| content | footer | footer_len: u32 | ++----------+-----------+-------------------+ + +# Term Dictionary (Composite File) + ++---------+---------------------------+------------------------+ +| fst | term_info_store | footer_len: u64 | ++---------+---------------------------+------------------------+ + +During a merge the term info store need to fit in memory. +It has a cost of n bytes per term. + +# term_info_store ++-------------------+---------------------------+------------------------+ +| len_block_meta | block_meta | term_infos | ++-------------------+---------------------------+------------------------+ + +# inverted_index ++------------------------+---------------------------+------------------------+ +| total_num_tokens: u64 | posting_lists.. | term_infos | ++------------------------+---------------------------+------------------------+ + +# postings lists ++------------------------+---------------------------+------------------------+ +| ++ + +# composite file ++----------------+-----+----------------+----------------------+----------------+ +| field file 1 | ... | field field n |composite file footer | footer len: u32| ++----------------+-----+----------------+----------------------+----------------+ + +# composite file footer + ++-----------------+---------------------------------------+ +|num fields: vint | (file_addr, offset_delta: vint) []... | ++-----------------+---------------------------------------+ + +# FileAddr ++--------------+--------------+ +| field: u32 | idx: VInt | ++--------------+--------------+ + +# Posting lists ++-----------------------------------------+ +| skip_reader ++-----------------------------------------+ \ No newline at end of file diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index dec2bf684..b9173bbc7 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -90,9 +90,9 @@ impl InvertedIndexReader { term_info: &TermInfo, block_postings: &mut BlockSegmentPostings, ) -> io::Result<()> { - let postings_slice = self - .postings_file_slice - .slice_from(term_info.postings_offset as usize); + let offset = term_info.postings_start_offset as usize; + let end_source = term_info.postings_end_offset as usize; + let postings_slice = self.postings_file_slice.slice(offset, end_source); block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?); Ok(()) } @@ -121,8 +121,10 @@ impl InvertedIndexReader { term_info: &TermInfo, requested_option: IndexRecordOption, ) -> io::Result { - let offset = term_info.postings_offset as usize; - let postings_data = self.postings_file_slice.slice_from(offset); + let postings_data = self.postings_file_slice.slice( + term_info.postings_start_offset as usize, + term_info.postings_end_offset as usize, + ); BlockSegmentPostings::open( term_info.doc_freq, postings_data, diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 57806109c..226adf46d 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -15,18 +15,14 @@ mod stacker; mod term_info; pub(crate) use self::block_search::BlockSearcher; - -pub(crate) use self::postings_writer::MultiFieldPostingsWriter; -pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; - -pub use self::postings::Postings; -pub(crate) use self::skip::{BlockInfo, SkipReader}; -pub use self::term_info::TermInfo; - pub use self::block_segment_postings::BlockSegmentPostings; +pub use self::postings::Postings; +pub(crate) use self::postings_writer::MultiFieldPostingsWriter; pub use self::segment_postings::SegmentPostings; - +pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; +pub(crate) use self::skip::{BlockInfo, SkipReader}; pub(crate) use self::stacker::compute_table_size; +pub use self::term_info::TermInfo; pub(crate) type UnorderedTermId = u64; diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index f6745e64e..e95bfda73 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -184,7 +184,8 @@ impl<'a> FieldSerializer<'a> { .unwrap_or(0u64); TermInfo { doc_freq: 0, - postings_offset: self.postings_serializer.addr(), + postings_start_offset: self.postings_serializer.addr(), + postings_end_offset: 0u64, positions_idx, } } @@ -238,10 +239,12 @@ impl<'a> FieldSerializer<'a> { /// using `VInt` encoding. pub fn close_term(&mut self) -> io::Result<()> { if self.term_open { - self.term_dictionary_builder - .insert_value(&self.current_term_info)?; self.postings_serializer .close_term(self.current_term_info.doc_freq)?; + let end_offset = self.postings_serializer.addr(); + self.current_term_info.postings_end_offset = end_offset; + self.term_dictionary_builder + .insert_value(&self.current_term_info)?; self.term_open = false; } Ok(()) diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index 55a414955..59278eabc 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -7,35 +7,49 @@ use std::io; pub struct TermInfo { /// Number of documents in the segment containing the term pub doc_freq: u32, - /// Start offset within the postings (`.idx`) file. - pub postings_offset: u64, + /// Start offset of the posting list within the postings (`.idx`) file. + pub postings_start_offset: u64, + /// End offset of the posting list within the postings (`.idx`) file. + pub postings_end_offset: u64, /// Start offset of the first block within the position (`.pos`) file. pub positions_idx: u64, } +impl TermInfo { + pub(crate) fn posting_num_bytes(&self) -> u32 { + let num_bytes = self.postings_end_offset - self.postings_start_offset; + assert!(num_bytes <= std::u32::MAX as u64); + num_bytes as u32 + } +} + impl FixedSize for TermInfo { /// Size required for the binary serialization of a `TermInfo` object. /// This is large, but in practise, `TermInfo` are encoded in blocks and /// only the first `TermInfo` of a block is serialized uncompressed. /// The subsequent `TermInfo` are delta encoded and bitpacked. - const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES; + const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES; } impl BinarySerializable for TermInfo { fn serialize(&self, writer: &mut W) -> io::Result<()> { self.doc_freq.serialize(writer)?; - self.postings_offset.serialize(writer)?; + self.postings_start_offset.serialize(writer)?; + self.posting_num_bytes().serialize(writer)?; self.positions_idx.serialize(writer)?; Ok(()) } fn deserialize(reader: &mut R) -> io::Result { let doc_freq = u32::deserialize(reader)?; - let postings_offset = u64::deserialize(reader)?; + let postings_start_offset = u64::deserialize(reader)?; + let postings_num_bytes = u32::deserialize(reader)?; + let postings_end_offset = postings_start_offset + u64::from(postings_num_bytes); let positions_idx = u64::deserialize(reader)?; Ok(TermInfo { doc_freq, - postings_offset, + postings_start_offset, + postings_end_offset, positions_idx, }) } diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 4a2e4fe2b..d1b2fb564 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -44,11 +44,13 @@ mod tests { const BLOCK_SIZE: usize = 1_500; - fn make_term_info(val: u64) -> TermInfo { + fn make_term_info(term_ord: u64) -> TermInfo { + let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord; TermInfo { - doc_freq: val as u32, - positions_idx: val * 2u64, - postings_offset: val * 3u64, + doc_freq: term_ord as u32, + postings_start_offset: offset(term_ord), + postings_end_offset: offset(term_ord + 1), + positions_idx: offset(term_ord) * 2u64, } } @@ -198,6 +200,7 @@ mod tests { term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?; term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?; term_dictionary_builder.insert("abr", &make_term_info(2))?; + term_dictionary_builder.insert("abr", &make_term_info(3))?; term_dictionary_builder.finish()? }; let term_dict_file = FileSlice::from(buffer); @@ -206,11 +209,13 @@ mod tests { assert!(kv_stream.advance()); assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes()); assert_eq!(kv_stream.value(), &make_term_info(1)); + dbg!(make_term_info(1)); assert!(kv_stream.advance()); assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes()); assert_eq!(kv_stream.value(), &make_term_info(2)); assert!(kv_stream.advance()); assert_eq!(kv_stream.key(), "abr".as_bytes()); + assert_eq!(kv_stream.value(), &make_term_info(3)); assert!(!kv_stream.advance()); Ok(()) } diff --git a/src/termdict/term_info_store.rs b/src/termdict/term_info_store.rs index bf5049d1e..f80773c1a 100644 --- a/src/termdict/term_info_store.rs +++ b/src/termdict/term_info_store.rs @@ -55,21 +55,28 @@ impl TermInfoBlockMeta { self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits } + // Here inner_offset is the offset within the block, WITHOUT the first term_info. + // In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0 + // is encoded without bitpacking. fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo { + assert!(inner_offset < BLOCK_LEN - 1); let num_bits = self.num_bits() as usize; let mut cursor = num_bits * inner_offset; + let postings_start_offset = extract_bits(data, cursor, self.postings_offset_nbits); + let postings_end_offset = self.ref_term_info.postings_start_offset + + extract_bits(data, cursor + num_bits, self.postings_offset_nbits); + cursor += self.postings_offset_nbits as usize; + let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32; cursor += self.doc_freq_nbits as usize; - let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits); - cursor += self.postings_offset_nbits as usize; - let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits); TermInfo { doc_freq, - postings_offset: postings_offset + self.ref_term_info.postings_offset, + postings_start_offset: postings_start_offset + self.ref_term_info.postings_start_offset, + postings_end_offset, positions_idx: positions_idx + self.ref_term_info.positions_idx, } } @@ -152,16 +159,17 @@ fn bitpack_serialize( term_info_block_meta: &TermInfoBlockMeta, term_info: &TermInfo, ) -> io::Result<()> { + bit_packer.write( + term_info.postings_start_offset, + term_info_block_meta.postings_offset_nbits, + write, + )?; bit_packer.write( u64::from(term_info.doc_freq), term_info_block_meta.doc_freq_nbits, write, )?; - bit_packer.write( - term_info.postings_offset, - term_info_block_meta.postings_offset_nbits, - write, - )?; + bit_packer.write( term_info.positions_idx, term_info_block_meta.positions_idx_nbits, @@ -181,23 +189,27 @@ impl TermInfoStoreWriter { } fn flush_block(&mut self) -> io::Result<()> { - if self.term_infos.is_empty() { - return Ok(()); - } let mut bit_packer = BitPacker::new(); let ref_term_info = self.term_infos[0].clone(); + + let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() { + last_term_info + } else { + return Ok(()); + }; + let postings_end_offset = + last_term_info.postings_end_offset - ref_term_info.postings_start_offset; for term_info in &mut self.term_infos[1..] { - term_info.postings_offset -= ref_term_info.postings_offset; + term_info.postings_start_offset -= ref_term_info.postings_start_offset; term_info.positions_idx -= ref_term_info.positions_idx; } let mut max_doc_freq: u32 = 0u32; - let mut max_postings_offset: u64 = 0u64; - let mut max_positions_idx: u64 = 0u64; + let max_postings_offset: u64 = postings_end_offset; + let max_positions_idx: u64 = last_term_info.positions_idx; + for term_info in &self.term_infos[1..] { max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq); - max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset); - max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx); } let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq)); @@ -222,6 +234,12 @@ impl TermInfoStoreWriter { )?; } + bit_packer.write( + postings_end_offset, + term_info_block_meta.postings_offset_nbits, + &mut self.buffer_term_infos, + )?; + // Block need end up at the end of a byte. bit_packer.flush(&mut self.buffer_term_infos)?; self.term_infos.clear(); @@ -230,6 +248,7 @@ impl TermInfoStoreWriter { } pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> { + assert!(term_info.postings_end_offset >= term_info.postings_start_offset); self.num_terms += 1u64; self.term_infos.push(term_info.clone()); if self.term_infos.len() >= BLOCK_LEN { @@ -289,10 +308,11 @@ mod tests { #[test] fn test_term_info_block_meta_serialization() { let term_info_block_meta = TermInfoBlockMeta { - offset: 2009, + offset: 2009u64, ref_term_info: TermInfo { doc_freq: 512, - postings_offset: 51, + postings_start_offset: 51, + postings_end_offset: 57u64, positions_idx: 3584, }, doc_freq_nbits: 10, @@ -310,10 +330,12 @@ mod tests { fn test_pack() -> crate::Result<()> { let mut store_writer = TermInfoStoreWriter::new(); let mut term_infos = vec![]; + let offset = |i| (i * 13 + i * i) as u64; for i in 0..1000 { let term_info = TermInfo { doc_freq: i as u32, - postings_offset: (i / 10) as u64, + postings_start_offset: offset(i), + postings_end_offset: offset(i + 1), positions_idx: (i * 7) as u64, }; store_writer.write_term_info(&term_info)?; @@ -323,7 +345,12 @@ mod tests { store_writer.serialize(&mut buffer)?; let term_info_store = TermInfoStore::open(FileSlice::from(buffer))?; for i in 0..1000 { - assert_eq!(term_info_store.get(i as u64), term_infos[i]); + assert_eq!( + term_info_store.get(i as u64), + term_infos[i], + "term info {}", + i + ); } Ok(()) }