From 9eb87e91cc8e5dbeb36d495da2b6b0becc9829c9 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 21 Sep 2020 00:15:09 +0900 Subject: [PATCH] TermInfo contain the end_offset of the postings. We slice the ReadOnlySource tightly. --- doc/src/index-format.md | 50 +++++++++++++++++++ src/core/inverted_index_reader.rs | 8 +-- src/core/segment_component.rs | 1 + src/core/segment_reader.rs | 6 ++- src/postings/mod.rs | 4 +- src/postings/serializer.rs | 24 ++++++--- src/postings/term_info.rs | 26 +++++++--- src/space_usage/mod.rs | 4 +- src/termdict/mod.rs | 29 ++++++----- src/termdict/term_info_store.rs | 82 ++++++++++++++++++++----------- 10 files changed, 168 insertions(+), 66 deletions(-) create mode 100644 doc/src/index-format.md diff --git a/doc/src/index-format.md b/doc/src/index-format.md new file mode 100644 index 000000000..e21025653 --- /dev/null +++ b/doc/src/index-format.md @@ -0,0 +1,50 @@ + +# Managed files ++----------+-----------+-------------------+ +| content | footer | footer_len: u32 | ++----------+-----------+-------------------+ + +# Term Dictionary (Composite File) + ++---------+---------------------------+------------------------+ +| fst | term_info_store | footer_len: u64 | ++---------+---------------------------+------------------------+ + +During a merge the term info store need to fit in memory. +It has a cost of n bytes per term. + +# term_info_store ++-------------------+---------------------------+------------------------+ +| len_block_meta | block_meta | term_infos | ++-------------------+---------------------------+------------------------+ + +# inverted_index ++------------------------+---------------------------+------------------------+ +| total_num_tokens: u64 | posting_lists.. | term_infos | ++------------------------+---------------------------+------------------------+ + +# postings lists ++------------------------+---------------------------+------------------------+ +| ++ + +# composite file ++----------------+-----+----------------+----------------------+----------------+ +| field file 1 | ... | field field n |composite file footer | footer len: u32| ++----------------+-----+----------------+----------------------+----------------+ + +# composite file footer + ++-----------------+---------------------------------------+ +|num fields: vint | (file_addr, offset_delta: vint) []... | ++-----------------+---------------------------------------+ + +# FileAddr ++--------------+--------------+ +| field: u32 | idx: VInt | ++--------------+--------------+ + +# Posting lists ++-----------------------------------------+ +| skip_reader ++-----------------------------------------+ \ No newline at end of file diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index 28556d9cb..7f86b9e9d 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -86,7 +86,7 @@ impl InvertedIndexReader { term_info: &TermInfo, block_postings: &mut BlockSegmentPostings, ) { - let offset = term_info.postings_offset as usize; + let offset = term_info.postings_start_offset as usize; let end_source = self.postings_source.len(); let postings_slice = self.postings_source.slice(offset, end_source); block_postings.reset(term_info.doc_freq, postings_slice); @@ -114,8 +114,10 @@ impl InvertedIndexReader { term_info: &TermInfo, requested_option: IndexRecordOption, ) -> BlockSegmentPostings { - let offset = term_info.postings_offset as usize; - let postings_data = self.postings_source.slice_from(offset); + let postings_data = self.postings_source.slice( + term_info.postings_start_offset as usize, + term_info.postings_end_offset as usize, + ); BlockSegmentPostings::from_data( term_info.doc_freq, postings_data, diff --git a/src/core/segment_component.rs b/src/core/segment_component.rs index 98c74d3d9..ba07a5a84 100644 --- a/src/core/segment_component.rs +++ b/src/core/segment_component.rs @@ -24,6 +24,7 @@ pub enum SegmentComponent { /// Accessing a document from the store is relatively slow, as it /// requires to decompress the entire block it belongs to. STORE, + /// Bitset describing which document of the segment is deleted. DELETE, diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index ed8f33e3c..6e606e238 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -1,4 +1,3 @@ -use crate::{common::CompositeFile, postings::FieldStats}; use crate::common::HasLen; use crate::core::InvertedIndexReader; use crate::core::Segment; @@ -16,6 +15,7 @@ use crate::space_usage::SegmentSpaceUsage; use crate::store::StoreReader; use crate::termdict::TermDictionary; use crate::DocId; +use crate::{common::CompositeFile, postings::FieldStats}; use fail::fail_point; use std::collections::HashMap; use std::fmt; @@ -265,7 +265,9 @@ impl SegmentReader { .open_read(field) .expect("Index corrupted. Failed to open field positions in composite file."); - let total_num_tokens = self.field_stats.get(field) + let total_num_tokens = self + .field_stats + .get(field) .map(|field_stat| field_stat.num_tokens()) .unwrap_or(0u64); let inv_idx_reader = Arc::new(InvertedIndexReader::new( diff --git a/src/postings/mod.rs b/src/postings/mod.rs index ebb8ab942..424f12d1e 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -5,6 +5,7 @@ Postings module (also called inverted index) mod block_search; mod block_segment_postings; pub(crate) mod compression; +mod field_stats; mod postings; mod postings_writer; mod recorder; @@ -13,10 +14,9 @@ mod serializer; mod skip; mod stacker; mod term_info; -mod field_stats; -pub(crate) use self::field_stats::{FieldStats, FieldStat}; pub(crate) use self::block_search::BlockSearcher; +pub(crate) use self::field_stats::{FieldStat, FieldStats}; pub(crate) use self::postings_writer::MultiFieldPostingsWriter; pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index b0a333131..4b6a3b2cd 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -1,5 +1,4 @@ -use super::{TermInfo, FieldStats, FieldStat}; -use crate::{common::{BinarySerializable, VInt}, directory::TerminatingWrite}; +use super::{FieldStat, FieldStats, TermInfo}; use crate::common::{CompositeWrite, CountingWriter}; use crate::core::Segment; use crate::directory::WritePtr; @@ -11,6 +10,10 @@ use crate::query::BM25Weight; use crate::schema::Schema; use crate::schema::{Field, FieldEntry, FieldType}; use crate::termdict::{TermDictionaryBuilder, TermOrdinal}; +use crate::{ + common::{BinarySerializable, VInt}, + directory::TerminatingWrite, +}; use crate::{DocId, Score}; use std::cmp::Ordering; use std::io::{self, Write}; @@ -79,7 +82,7 @@ impl InvertedIndexSerializer { /// Open a new `PostingsSerializer` for the given segment pub fn open(segment: &mut Segment) -> crate::Result { - use crate::SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS, FIELDSTATS}; + use crate::SegmentComponent::{FIELDSTATS, POSITIONS, POSITIONSSKIP, POSTINGS, TERMS}; InvertedIndexSerializer::create( CompositeWrite::wrap(segment.open_write(TERMS)?), CompositeWrite::wrap(segment.open_write(POSTINGS)?), @@ -100,7 +103,8 @@ impl InvertedIndexSerializer { total_num_tokens: u64, fieldnorm_reader: Option, ) -> io::Result> { - self.field_stats.insert(field, FieldStat::new(total_num_tokens)); + self.field_stats + .insert(field, FieldStat::new(total_num_tokens)); let field_entry: &FieldEntry = self.schema.get_field_entry(field); let term_dictionary_write = self.terms_write.for_field(field); let postings_write = self.postings_write.for_field(field); @@ -120,7 +124,8 @@ impl InvertedIndexSerializer { /// Closes the serializer. pub fn close(mut self) -> io::Result<()> { - self.field_stats.serialize(self.field_stats_write.get_mut())?; + self.field_stats + .serialize(self.field_stats_write.get_mut())?; self.field_stats_write.terminate()?; self.terms_write.close()?; self.postings_write.close()?; @@ -198,7 +203,8 @@ impl<'a> FieldSerializer<'a> { .unwrap_or(0u64); TermInfo { doc_freq: 0, - postings_offset: self.postings_serializer.addr(), + postings_start_offset: self.postings_serializer.addr(), + postings_end_offset: 0u64, positions_idx, } } @@ -252,10 +258,12 @@ impl<'a> FieldSerializer<'a> { /// using `VInt` encoding. pub fn close_term(&mut self) -> io::Result<()> { if self.term_open { - self.term_dictionary_builder - .insert_value(&self.current_term_info)?; self.postings_serializer .close_term(self.current_term_info.doc_freq)?; + let end_offset = self.postings_serializer.addr(); + self.current_term_info.postings_end_offset = end_offset; + self.term_dictionary_builder + .insert_value(&self.current_term_info)?; self.term_open = false; } Ok(()) diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index 55a414955..59278eabc 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -7,35 +7,49 @@ use std::io; pub struct TermInfo { /// Number of documents in the segment containing the term pub doc_freq: u32, - /// Start offset within the postings (`.idx`) file. - pub postings_offset: u64, + /// Start offset of the posting list within the postings (`.idx`) file. + pub postings_start_offset: u64, + /// End offset of the posting list within the postings (`.idx`) file. + pub postings_end_offset: u64, /// Start offset of the first block within the position (`.pos`) file. pub positions_idx: u64, } +impl TermInfo { + pub(crate) fn posting_num_bytes(&self) -> u32 { + let num_bytes = self.postings_end_offset - self.postings_start_offset; + assert!(num_bytes <= std::u32::MAX as u64); + num_bytes as u32 + } +} + impl FixedSize for TermInfo { /// Size required for the binary serialization of a `TermInfo` object. /// This is large, but in practise, `TermInfo` are encoded in blocks and /// only the first `TermInfo` of a block is serialized uncompressed. /// The subsequent `TermInfo` are delta encoded and bitpacked. - const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES; + const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES; } impl BinarySerializable for TermInfo { fn serialize(&self, writer: &mut W) -> io::Result<()> { self.doc_freq.serialize(writer)?; - self.postings_offset.serialize(writer)?; + self.postings_start_offset.serialize(writer)?; + self.posting_num_bytes().serialize(writer)?; self.positions_idx.serialize(writer)?; Ok(()) } fn deserialize(reader: &mut R) -> io::Result { let doc_freq = u32::deserialize(reader)?; - let postings_offset = u64::deserialize(reader)?; + let postings_start_offset = u64::deserialize(reader)?; + let postings_num_bytes = u32::deserialize(reader)?; + let postings_end_offset = postings_start_offset + u64::from(postings_num_bytes); let positions_idx = u64::deserialize(reader)?; Ok(TermInfo { doc_freq, - postings_offset, + postings_start_offset, + postings_end_offset, positions_idx, }) } diff --git a/src/space_usage/mod.rs b/src/space_usage/mod.rs index 2f2ed24fa..37a90d0d8 100644 --- a/src/space_usage/mod.rs +++ b/src/space_usage/mod.rs @@ -121,7 +121,7 @@ impl SegmentSpaceUsage { /// Clones the underlying data. /// Use the components directly if this is somehow in performance critical code. pub fn component(&self, component: SegmentComponent) -> ComponentSpaceUsage { - use self::ComponentSpaceUsage::{PerField, Basic, Store, Unimplemented}; + use self::ComponentSpaceUsage::{Basic, PerField, Store, Unimplemented}; use crate::SegmentComponent::*; match component { POSTINGS => PerField(self.postings().clone()), @@ -132,7 +132,7 @@ impl SegmentSpaceUsage { TERMS => PerField(self.termdict().clone()), STORE => Store(self.store().clone()), DELETE => Basic(self.deletes()), - FIELDSTATS => Unimplemented + FIELDSTATS => Unimplemented, } } diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index fd1c4fa19..d4babecdb 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -44,11 +44,13 @@ mod tests { const BLOCK_SIZE: usize = 1_500; - fn make_term_info(val: u64) -> TermInfo { + fn make_term_info(term_ord: u64) -> TermInfo { + let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord; TermInfo { - doc_freq: val as u32, - positions_idx: val * 2u64, - postings_offset: val * 3u64, + doc_freq: term_ord as u32, + postings_start_offset: offset(term_ord), + postings_end_offset: offset(term_ord + 1), + positions_idx: offset(term_ord) * 2u64, } } @@ -208,20 +210,14 @@ mod tests { } #[test] - fn test_stream_high_range_prefix_suffix() { + fn test_stream_high_range_prefix_suffix() -> std::io::Result<()> { let buffer: Vec = { let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap(); // term requires more than 16bits - term_dictionary_builder - .insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1)) - .unwrap(); - term_dictionary_builder - .insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2)) - .unwrap(); - term_dictionary_builder - .insert("abr", &make_term_info(2)) - .unwrap(); - term_dictionary_builder.finish().unwrap() + term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?; + term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?; + term_dictionary_builder.insert("abr", &make_term_info(3))?; + term_dictionary_builder.finish()? }; let source = ReadOnlySource::from(buffer); let term_dictionary: TermDictionary = TermDictionary::from_source(&source); @@ -229,12 +225,15 @@ mod tests { assert!(kv_stream.advance()); assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes()); assert_eq!(kv_stream.value(), &make_term_info(1)); + dbg!(make_term_info(1)); assert!(kv_stream.advance()); assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes()); assert_eq!(kv_stream.value(), &make_term_info(2)); assert!(kv_stream.advance()); assert_eq!(kv_stream.key(), "abr".as_bytes()); + assert_eq!(kv_stream.value(), &make_term_info(3)); assert!(!kv_stream.advance()); + Ok(()) } #[test] diff --git a/src/termdict/term_info_store.rs b/src/termdict/term_info_store.rs index cba4de2ca..288bc0198 100644 --- a/src/termdict/term_info_store.rs +++ b/src/termdict/term_info_store.rs @@ -57,21 +57,28 @@ impl TermInfoBlockMeta { self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits } + // Here inner_offset is the offset within the block, WITHOUT the first term_info. + // In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0 + // is encoded without bitpacking. fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo { + assert!(inner_offset < BLOCK_LEN - 1); let num_bits = self.num_bits() as usize; let mut cursor = num_bits * inner_offset; + let postings_start_offset = extract_bits(data, cursor, self.postings_offset_nbits); + let postings_end_offset = self.ref_term_info.postings_start_offset + + extract_bits(data, cursor + num_bits, self.postings_offset_nbits); + cursor += self.postings_offset_nbits as usize; + let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32; cursor += self.doc_freq_nbits as usize; - let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits); - cursor += self.postings_offset_nbits as usize; - let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits); TermInfo { doc_freq, - postings_offset: postings_offset + self.ref_term_info.postings_offset, + postings_start_offset: postings_start_offset + self.ref_term_info.postings_start_offset, + postings_end_offset, positions_idx: positions_idx + self.ref_term_info.positions_idx, } } @@ -126,14 +133,13 @@ impl TermInfoStore { .expect("Failed to deserialize terminfoblockmeta"); let inner_offset = (term_ord as usize) % BLOCK_LEN; if inner_offset == 0 { - term_info_block_data.ref_term_info - } else { - let term_info_data = self.term_info_source.as_slice(); - term_info_block_data.deserialize_term_info( - &term_info_data[term_info_block_data.offset as usize..], - inner_offset - 1, - ) + return term_info_block_data.ref_term_info; } + let term_info_data = self.term_info_source.as_slice(); + term_info_block_data.deserialize_term_info( + &term_info_data[term_info_block_data.offset as usize..], + inner_offset - 1, + ) } pub fn num_terms(&self) -> usize { @@ -154,16 +160,17 @@ fn bitpack_serialize( term_info_block_meta: &TermInfoBlockMeta, term_info: &TermInfo, ) -> io::Result<()> { + bit_packer.write( + term_info.postings_start_offset, + term_info_block_meta.postings_offset_nbits, + write, + )?; bit_packer.write( u64::from(term_info.doc_freq), term_info_block_meta.doc_freq_nbits, write, )?; - bit_packer.write( - term_info.postings_offset, - term_info_block_meta.postings_offset_nbits, - write, - )?; + bit_packer.write( term_info.positions_idx, term_info_block_meta.positions_idx_nbits, @@ -183,23 +190,27 @@ impl TermInfoStoreWriter { } fn flush_block(&mut self) -> io::Result<()> { - if self.term_infos.is_empty() { - return Ok(()); - } let mut bit_packer = BitPacker::new(); let ref_term_info = self.term_infos[0].clone(); + + let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() { + last_term_info + } else { + return Ok(()); + }; + let postings_end_offset = + last_term_info.postings_end_offset - ref_term_info.postings_start_offset; for term_info in &mut self.term_infos[1..] { - term_info.postings_offset -= ref_term_info.postings_offset; + term_info.postings_start_offset -= ref_term_info.postings_start_offset; term_info.positions_idx -= ref_term_info.positions_idx; } let mut max_doc_freq: u32 = 0u32; - let mut max_postings_offset: u64 = 0u64; - let mut max_positions_idx: u64 = 0u64; + let max_postings_offset: u64 = postings_end_offset; + let max_positions_idx: u64 = last_term_info.positions_idx; + for term_info in &self.term_infos[1..] { max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq); - max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset); - max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx); } let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq)); @@ -224,6 +235,12 @@ impl TermInfoStoreWriter { )?; } + bit_packer.write( + postings_end_offset, + term_info_block_meta.postings_offset_nbits, + &mut self.buffer_term_infos, + )?; + // Block need end up at the end of a byte. bit_packer.flush(&mut self.buffer_term_infos)?; self.term_infos.clear(); @@ -232,6 +249,7 @@ impl TermInfoStoreWriter { } pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> { + assert!(term_info.postings_end_offset >= term_info.postings_start_offset); self.num_terms += 1u64; self.term_infos.push(term_info.clone()); if self.term_infos.len() >= BLOCK_LEN { @@ -291,10 +309,11 @@ mod tests { #[test] fn test_term_info_block_meta_serialization() { let term_info_block_meta = TermInfoBlockMeta { - offset: 2009, + offset: 2009u64, ref_term_info: TermInfo { doc_freq: 512, - postings_offset: 51, + postings_start_offset: 51, + postings_end_offset: 57u64, positions_idx: 3584, }, doc_freq_nbits: 10, @@ -312,10 +331,12 @@ mod tests { fn test_pack() { let mut store_writer = TermInfoStoreWriter::new(); let mut term_infos = vec![]; + let offset = |i| (i * 13 + i * i) as u64; for i in 0..1000 { let term_info = TermInfo { doc_freq: i as u32, - postings_offset: (i / 10) as u64, + postings_start_offset: offset(i), + postings_end_offset: offset(i + 1), positions_idx: (i * 7) as u64, }; store_writer.write_term_info(&term_info).unwrap(); @@ -325,7 +346,12 @@ mod tests { store_writer.serialize(&mut buffer).unwrap(); let term_info_store = TermInfoStore::open(&ReadOnlySource::from(buffer)); for i in 0..1000 { - assert_eq!(term_info_store.get(i as u64), term_infos[i]); + assert_eq!( + term_info_store.get(i as u64), + term_infos[i], + "term info {}", + i + ); } } }