Compare commits

...

2 Commits

Author SHA1 Message Date
Paul Masurel
9eb87e91cc TermInfo contain the end_offset of the postings.
We slice the ReadOnlySource tightly.
2020-09-21 00:42:15 +09:00
Paul Masurel
36f43da4d8 Added Field stats to remove total num tokens from the beginning of posting list files 2020-09-19 23:23:03 +09:00
11 changed files with 193 additions and 68 deletions

50
doc/src/index-format.md Normal file
View File

@@ -0,0 +1,50 @@
# Managed files
+----------+-----------+-------------------+
| content | footer | footer_len: u32 |
+----------+-----------+-------------------+
# Term Dictionary (Composite File)
+---------+---------------------------+------------------------+
| fst | term_info_store | footer_len: u64 |
+---------+---------------------------+------------------------+
During a merge the term info store need to fit in memory.
It has a cost of n bytes per term.
# term_info_store
+-------------------+---------------------------+------------------------+
| len_block_meta | block_meta | term_infos |
+-------------------+---------------------------+------------------------+
# inverted_index
+------------------------+---------------------------+------------------------+
| total_num_tokens: u64 | posting_lists.. | term_infos |
+------------------------+---------------------------+------------------------+
# postings lists
+------------------------+---------------------------+------------------------+
|
+
# composite file
+----------------+-----+----------------+----------------------+----------------+
| field file 1 | ... | field field n |composite file footer | footer len: u32|
+----------------+-----+----------------+----------------------+----------------+
# composite file footer
+-----------------+---------------------------------------+
|num fields: vint | (file_addr, offset_delta: vint) []... |
+-----------------+---------------------------------------+
# FileAddr
+--------------+--------------+
| field: u32 | idx: VInt |
+--------------+--------------+
# Posting lists
+-----------------------------------------+
| skip_reader
+-----------------------------------------+

View File

@@ -116,6 +116,7 @@ impl SegmentMeta {
SegmentComponent::FASTFIELDS => ".fast".to_string(),
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
SegmentComponent::FIELDSTATS => ".fieldstats".to_string(),
});
PathBuf::from(path)
}

View File

@@ -1,4 +1,3 @@
use crate::common::BinarySerializable;
use crate::directory::ReadOnlySource;
use crate::positions::PositionReader;
use crate::postings::TermInfo;
@@ -36,14 +35,12 @@ impl InvertedIndexReader {
postings_source: ReadOnlySource,
positions_source: ReadOnlySource,
positions_idx_source: ReadOnlySource,
total_num_tokens: u64,
record_option: IndexRecordOption,
) -> InvertedIndexReader {
let total_num_tokens_data = postings_source.slice(0, 8);
let mut total_num_tokens_cursor = total_num_tokens_data.as_slice();
let total_num_tokens = u64::deserialize(&mut total_num_tokens_cursor).unwrap_or(0u64);
InvertedIndexReader {
termdict,
postings_source: postings_source.slice_from(8),
postings_source,
positions_source,
positions_idx_source,
record_option,
@@ -89,7 +86,7 @@ impl InvertedIndexReader {
term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings,
) {
let offset = term_info.postings_offset as usize;
let offset = term_info.postings_start_offset as usize;
let end_source = self.postings_source.len();
let postings_slice = self.postings_source.slice(offset, end_source);
block_postings.reset(term_info.doc_freq, postings_slice);
@@ -117,8 +114,10 @@ impl InvertedIndexReader {
term_info: &TermInfo,
requested_option: IndexRecordOption,
) -> BlockSegmentPostings {
let offset = term_info.postings_offset as usize;
let postings_data = self.postings_source.slice_from(offset);
let postings_data = self.postings_source.slice(
term_info.postings_start_offset as usize,
term_info.postings_end_offset as usize,
);
BlockSegmentPostings::from_data(
term_info.doc_freq,
postings_data,

View File

@@ -24,14 +24,17 @@ pub enum SegmentComponent {
/// Accessing a document from the store is relatively slow, as it
/// requires to decompress the entire block it belongs to.
STORE,
/// Bitset describing which document of the segment is deleted.
DELETE,
FIELDSTATS,
}
impl SegmentComponent {
/// Iterates through the components.
pub fn iterator() -> slice::Iter<'static, SegmentComponent> {
static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [
static SEGMENT_COMPONENTS: [SegmentComponent; 9] = [
SegmentComponent::POSTINGS,
SegmentComponent::POSITIONS,
SegmentComponent::POSITIONSSKIP,
@@ -40,6 +43,7 @@ impl SegmentComponent {
SegmentComponent::TERMS,
SegmentComponent::STORE,
SegmentComponent::DELETE,
SegmentComponent::FIELDSTATS,
];
SEGMENT_COMPONENTS.iter()
}

View File

@@ -1,4 +1,3 @@
use crate::common::CompositeFile;
use crate::common::HasLen;
use crate::core::InvertedIndexReader;
use crate::core::Segment;
@@ -16,6 +15,7 @@ use crate::space_usage::SegmentSpaceUsage;
use crate::store::StoreReader;
use crate::termdict::TermDictionary;
use crate::DocId;
use crate::{common::CompositeFile, postings::FieldStats};
use fail::fail_point;
use std::collections::HashMap;
use std::fmt;
@@ -49,6 +49,7 @@ pub struct SegmentReader {
positions_idx_composite: CompositeFile,
fast_fields_readers: Arc<FastFieldReaders>,
fieldnorm_readers: FieldNormReaders,
field_stats: FieldStats,
store_source: ReadOnlySource,
delete_bitset_opt: Option<DeleteBitSet>,
@@ -179,6 +180,9 @@ impl SegmentReader {
let fieldnorm_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
let field_stats_data = segment.open_read(SegmentComponent::FIELDSTATS)?;
let field_stats = FieldStats::from_source(field_stats_data.as_slice())?;
let delete_bitset_opt = if segment.meta().has_deletes() {
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
Some(DeleteBitSet::open(delete_data))
@@ -194,6 +198,7 @@ impl SegmentReader {
postings_composite,
fast_fields_readers: fast_field_readers,
fieldnorm_readers,
field_stats,
segment_id: segment.id(),
store_source,
delete_bitset_opt,
@@ -260,11 +265,17 @@ impl SegmentReader {
.open_read(field)
.expect("Index corrupted. Failed to open field positions in composite file.");
let total_num_tokens = self
.field_stats
.get(field)
.map(|field_stat| field_stat.num_tokens())
.unwrap_or(0u64);
let inv_idx_reader = Arc::new(InvertedIndexReader::new(
TermDictionary::from_source(&termdict_source),
postings_source,
positions_source,
positions_idx_source,
total_num_tokens,
record_option,
));

View File

@@ -5,6 +5,7 @@ Postings module (also called inverted index)
mod block_search;
mod block_segment_postings;
pub(crate) mod compression;
mod field_stats;
mod postings;
mod postings_writer;
mod recorder;
@@ -15,6 +16,7 @@ mod stacker;
mod term_info;
pub(crate) use self::block_search::BlockSearcher;
pub(crate) use self::field_stats::{FieldStat, FieldStats};
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};

View File

@@ -1,5 +1,4 @@
use super::TermInfo;
use crate::common::{BinarySerializable, VInt};
use super::{FieldStat, FieldStats, TermInfo};
use crate::common::{CompositeWrite, CountingWriter};
use crate::core::Segment;
use crate::directory::WritePtr;
@@ -11,6 +10,10 @@ use crate::query::BM25Weight;
use crate::schema::Schema;
use crate::schema::{Field, FieldEntry, FieldType};
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
use crate::{
common::{BinarySerializable, VInt},
directory::TerminatingWrite,
};
use crate::{DocId, Score};
use std::cmp::Ordering;
use std::io::{self, Write};
@@ -51,6 +54,8 @@ pub struct InvertedIndexSerializer {
postings_write: CompositeWrite<WritePtr>,
positions_write: CompositeWrite<WritePtr>,
positionsidx_write: CompositeWrite<WritePtr>,
field_stats: FieldStats,
field_stats_write: WritePtr,
schema: Schema,
}
@@ -61,6 +66,7 @@ impl InvertedIndexSerializer {
postings_write: CompositeWrite<WritePtr>,
positions_write: CompositeWrite<WritePtr>,
positionsidx_write: CompositeWrite<WritePtr>,
field_stats_write: WritePtr,
schema: Schema,
) -> crate::Result<InvertedIndexSerializer> {
Ok(InvertedIndexSerializer {
@@ -68,18 +74,21 @@ impl InvertedIndexSerializer {
postings_write,
positions_write,
positionsidx_write,
field_stats: FieldStats::default(),
field_stats_write,
schema,
})
}
/// Open a new `PostingsSerializer` for the given segment
pub fn open(segment: &mut Segment) -> crate::Result<InvertedIndexSerializer> {
use crate::SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
use crate::SegmentComponent::{FIELDSTATS, POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
InvertedIndexSerializer::create(
CompositeWrite::wrap(segment.open_write(TERMS)?),
CompositeWrite::wrap(segment.open_write(POSTINGS)?),
CompositeWrite::wrap(segment.open_write(POSITIONS)?),
CompositeWrite::wrap(segment.open_write(POSITIONSSKIP)?),
segment.open_write(FIELDSTATS)?,
segment.schema(),
)
}
@@ -94,6 +103,8 @@ impl InvertedIndexSerializer {
total_num_tokens: u64,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'_>> {
self.field_stats
.insert(field, FieldStat::new(total_num_tokens));
let field_entry: &FieldEntry = self.schema.get_field_entry(field);
let term_dictionary_write = self.terms_write.for_field(field);
let postings_write = self.postings_write.for_field(field);
@@ -112,7 +123,10 @@ impl InvertedIndexSerializer {
}
/// Closes the serializer.
pub fn close(self) -> io::Result<()> {
pub fn close(mut self) -> io::Result<()> {
self.field_stats
.serialize(self.field_stats_write.get_mut())?;
self.field_stats_write.terminate()?;
self.terms_write.close()?;
self.postings_write.close()?;
self.positions_write.close()?;
@@ -142,7 +156,6 @@ impl<'a> FieldSerializer<'a> {
positionsidx_write: &'a mut CountingWriter<WritePtr>,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'a>> {
total_num_tokens.serialize(postings_write)?;
let (term_freq_enabled, position_enabled): (bool, bool) = match field_type {
FieldType::Str(ref text_options) => {
if let Some(text_indexing_options) = text_options.get_indexing_options() {
@@ -190,7 +203,8 @@ impl<'a> FieldSerializer<'a> {
.unwrap_or(0u64);
TermInfo {
doc_freq: 0,
postings_offset: self.postings_serializer.addr(),
postings_start_offset: self.postings_serializer.addr(),
postings_end_offset: 0u64,
positions_idx,
}
}
@@ -244,10 +258,12 @@ impl<'a> FieldSerializer<'a> {
/// using `VInt` encoding.
pub fn close_term(&mut self) -> io::Result<()> {
if self.term_open {
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
let end_offset = self.postings_serializer.addr();
self.current_term_info.postings_end_offset = end_offset;
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false;
}
Ok(())

View File

@@ -7,35 +7,49 @@ use std::io;
pub struct TermInfo {
/// Number of documents in the segment containing the term
pub doc_freq: u32,
/// Start offset within the postings (`.idx`) file.
pub postings_offset: u64,
/// Start offset of the posting list within the postings (`.idx`) file.
pub postings_start_offset: u64,
/// End offset of the posting list within the postings (`.idx`) file.
pub postings_end_offset: u64,
/// Start offset of the first block within the position (`.pos`) file.
pub positions_idx: u64,
}
impl TermInfo {
pub(crate) fn posting_num_bytes(&self) -> u32 {
let num_bytes = self.postings_end_offset - self.postings_start_offset;
assert!(num_bytes <= std::u32::MAX as u64);
num_bytes as u32
}
}
impl FixedSize for TermInfo {
/// Size required for the binary serialization of a `TermInfo` object.
/// This is large, but in practise, `TermInfo` are encoded in blocks and
/// only the first `TermInfo` of a block is serialized uncompressed.
/// The subsequent `TermInfo` are delta encoded and bitpacked.
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
}
impl BinarySerializable for TermInfo {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.doc_freq.serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.postings_start_offset.serialize(writer)?;
self.posting_num_bytes().serialize(writer)?;
self.positions_idx.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let doc_freq = u32::deserialize(reader)?;
let postings_offset = u64::deserialize(reader)?;
let postings_start_offset = u64::deserialize(reader)?;
let postings_num_bytes = u32::deserialize(reader)?;
let postings_end_offset = postings_start_offset + u64::from(postings_num_bytes);
let positions_idx = u64::deserialize(reader)?;
Ok(TermInfo {
doc_freq,
postings_offset,
postings_start_offset,
postings_end_offset,
positions_idx,
})
}

View File

@@ -25,6 +25,8 @@ pub enum ComponentSpaceUsage {
Store(StoreSpaceUsage),
/// Some sort of raw byte count
Basic(ByteCount),
///
Unimplemented,
}
/// Represents combined space usage of an entire searcher and its component segments.
@@ -119,7 +121,7 @@ impl SegmentSpaceUsage {
/// Clones the underlying data.
/// Use the components directly if this is somehow in performance critical code.
pub fn component(&self, component: SegmentComponent) -> ComponentSpaceUsage {
use self::ComponentSpaceUsage::*;
use self::ComponentSpaceUsage::{Basic, PerField, Store, Unimplemented};
use crate::SegmentComponent::*;
match component {
POSTINGS => PerField(self.postings().clone()),
@@ -130,6 +132,7 @@ impl SegmentSpaceUsage {
TERMS => PerField(self.termdict().clone()),
STORE => Store(self.store().clone()),
DELETE => Basic(self.deletes()),
FIELDSTATS => Unimplemented,
}
}

View File

@@ -44,11 +44,13 @@ mod tests {
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(val: u64) -> TermInfo {
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
TermInfo {
doc_freq: val as u32,
positions_idx: val * 2u64,
postings_offset: val * 3u64,
doc_freq: term_ord as u32,
postings_start_offset: offset(term_ord),
postings_end_offset: offset(term_ord + 1),
positions_idx: offset(term_ord) * 2u64,
}
}
@@ -208,20 +210,14 @@ mod tests {
}
#[test]
fn test_stream_high_range_prefix_suffix() {
fn test_stream_high_range_prefix_suffix() -> std::io::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
// term requires more than 16bits
term_dictionary_builder
.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))
.unwrap();
term_dictionary_builder
.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))
.unwrap();
term_dictionary_builder
.insert("abr", &make_term_info(2))
.unwrap();
term_dictionary_builder.finish().unwrap()
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(3))?;
term_dictionary_builder.finish()?
};
let source = ReadOnlySource::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::from_source(&source);
@@ -229,12 +225,15 @@ mod tests {
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(1));
dbg!(make_term_info(1));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(3));
assert!(!kv_stream.advance());
Ok(())
}
#[test]

View File

@@ -57,21 +57,28 @@ impl TermInfoBlockMeta {
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
}
// Here inner_offset is the offset within the block, WITHOUT the first term_info.
// In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0
// is encoded without bitpacking.
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
assert!(inner_offset < BLOCK_LEN - 1);
let num_bits = self.num_bits() as usize;
let mut cursor = num_bits * inner_offset;
let postings_start_offset = extract_bits(data, cursor, self.postings_offset_nbits);
let postings_end_offset = self.ref_term_info.postings_start_offset
+ extract_bits(data, cursor + num_bits, self.postings_offset_nbits);
cursor += self.postings_offset_nbits as usize;
let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32;
cursor += self.doc_freq_nbits as usize;
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits);
cursor += self.postings_offset_nbits as usize;
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits);
TermInfo {
doc_freq,
postings_offset: postings_offset + self.ref_term_info.postings_offset,
postings_start_offset: postings_start_offset + self.ref_term_info.postings_start_offset,
postings_end_offset,
positions_idx: positions_idx + self.ref_term_info.positions_idx,
}
}
@@ -126,14 +133,13 @@ impl TermInfoStore {
.expect("Failed to deserialize terminfoblockmeta");
let inner_offset = (term_ord as usize) % BLOCK_LEN;
if inner_offset == 0 {
term_info_block_data.ref_term_info
} else {
let term_info_data = self.term_info_source.as_slice();
term_info_block_data.deserialize_term_info(
&term_info_data[term_info_block_data.offset as usize..],
inner_offset - 1,
)
return term_info_block_data.ref_term_info;
}
let term_info_data = self.term_info_source.as_slice();
term_info_block_data.deserialize_term_info(
&term_info_data[term_info_block_data.offset as usize..],
inner_offset - 1,
)
}
pub fn num_terms(&self) -> usize {
@@ -154,16 +160,17 @@ fn bitpack_serialize<W: Write>(
term_info_block_meta: &TermInfoBlockMeta,
term_info: &TermInfo,
) -> io::Result<()> {
bit_packer.write(
term_info.postings_start_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
u64::from(term_info.doc_freq),
term_info_block_meta.doc_freq_nbits,
write,
)?;
bit_packer.write(
term_info.postings_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
term_info.positions_idx,
term_info_block_meta.positions_idx_nbits,
@@ -183,23 +190,27 @@ impl TermInfoStoreWriter {
}
fn flush_block(&mut self) -> io::Result<()> {
if self.term_infos.is_empty() {
return Ok(());
}
let mut bit_packer = BitPacker::new();
let ref_term_info = self.term_infos[0].clone();
let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() {
last_term_info
} else {
return Ok(());
};
let postings_end_offset =
last_term_info.postings_end_offset - ref_term_info.postings_start_offset;
for term_info in &mut self.term_infos[1..] {
term_info.postings_offset -= ref_term_info.postings_offset;
term_info.postings_start_offset -= ref_term_info.postings_start_offset;
term_info.positions_idx -= ref_term_info.positions_idx;
}
let mut max_doc_freq: u32 = 0u32;
let mut max_postings_offset: u64 = 0u64;
let mut max_positions_idx: u64 = 0u64;
let max_postings_offset: u64 = postings_end_offset;
let max_positions_idx: u64 = last_term_info.positions_idx;
for term_info in &self.term_infos[1..] {
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
}
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
@@ -224,6 +235,12 @@ impl TermInfoStoreWriter {
)?;
}
bit_packer.write(
postings_end_offset,
term_info_block_meta.postings_offset_nbits,
&mut self.buffer_term_infos,
)?;
// Block need end up at the end of a byte.
bit_packer.flush(&mut self.buffer_term_infos)?;
self.term_infos.clear();
@@ -232,6 +249,7 @@ impl TermInfoStoreWriter {
}
pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> {
assert!(term_info.postings_end_offset >= term_info.postings_start_offset);
self.num_terms += 1u64;
self.term_infos.push(term_info.clone());
if self.term_infos.len() >= BLOCK_LEN {
@@ -291,10 +309,11 @@ mod tests {
#[test]
fn test_term_info_block_meta_serialization() {
let term_info_block_meta = TermInfoBlockMeta {
offset: 2009,
offset: 2009u64,
ref_term_info: TermInfo {
doc_freq: 512,
postings_offset: 51,
postings_start_offset: 51,
postings_end_offset: 57u64,
positions_idx: 3584,
},
doc_freq_nbits: 10,
@@ -312,10 +331,12 @@ mod tests {
fn test_pack() {
let mut store_writer = TermInfoStoreWriter::new();
let mut term_infos = vec![];
let offset = |i| (i * 13 + i * i) as u64;
for i in 0..1000 {
let term_info = TermInfo {
doc_freq: i as u32,
postings_offset: (i / 10) as u64,
postings_start_offset: offset(i),
postings_end_offset: offset(i + 1),
positions_idx: (i * 7) as u64,
};
store_writer.write_term_info(&term_info).unwrap();
@@ -325,7 +346,12 @@ mod tests {
store_writer.serialize(&mut buffer).unwrap();
let term_info_store = TermInfoStore::open(&ReadOnlySource::from(buffer));
for i in 0..1000 {
assert_eq!(term_info_store.get(i as u64), term_infos[i]);
assert_eq!(
term_info_store.get(i as u64),
term_infos[i],
"term info {}",
i
);
}
}
}