This commit is contained in:
Paul Masurel
2017-08-07 19:16:49 +09:00
parent d1f61a50c1
commit 1e89f86267
6 changed files with 159 additions and 108 deletions

View File

@@ -5,7 +5,7 @@ use DocId;
use core::SerializableSegment;
use schema::FieldValue;
use indexer::SegmentSerializer;
use postings::PostingsSerializer;
use postings::InvertedIndexSerializer;
use fastfield::U64FastFieldReader;
use itertools::Itertools;
use postings::Postings;
@@ -192,7 +192,7 @@ impl IndexMerger {
Ok(())
}
fn write_postings(&self, serializer: &mut PostingsSerializer) -> Result<()> {
fn write_postings(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> {
let mut delta_computer = DeltaComputer::new();
let mut merged_terms = TermMerger::from(&self.readers[..]);

View File

@@ -4,7 +4,7 @@ use core::Segment;
use core::SegmentComponent;
use fastfield::FastFieldSerializer;
use store::StoreWriter;
use postings::PostingsSerializer;
use postings::InvertedIndexSerializer;
/// Segment serializer is in charge of laying out on disk
@@ -13,7 +13,7 @@ pub struct SegmentSerializer {
store_writer: StoreWriter,
fast_field_serializer: FastFieldSerializer,
fieldnorms_serializer: FastFieldSerializer,
postings_serializer: PostingsSerializer,
postings_serializer: InvertedIndexSerializer,
}
impl SegmentSerializer {
@@ -27,7 +27,7 @@ impl SegmentSerializer {
let fieldnorms_write = try!(segment.open_write(SegmentComponent::FIELDNORMS));
let fieldnorms_serializer = try!(FastFieldSerializer::new(fieldnorms_write));
let postings_serializer = try!(PostingsSerializer::open(segment));
let postings_serializer = try!(InvertedIndexSerializer::open(segment));
Ok(SegmentSerializer {
postings_serializer: postings_serializer,
store_writer: StoreWriter::new(store_write),
@@ -37,7 +37,7 @@ impl SegmentSerializer {
}
/// Accessor to the `PostingsSerializer`.
pub fn get_postings_serializer(&mut self) -> &mut PostingsSerializer {
pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer {
&mut self.postings_serializer
}

View File

@@ -17,7 +17,7 @@ mod segment_postings_option;
pub use self::docset::{SkipResult, DocSet};
use self::recorder::{Recorder, NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder};
pub use self::serializer::PostingsSerializer;
pub use self::serializer::InvertedIndexSerializer;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::term_info::TermInfo;
pub use self::postings::Postings;
@@ -58,7 +58,7 @@ mod tests {
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut segment = index.new_segment();
let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap();
let mut posting_serializer = InvertedIndexSerializer::open(&mut segment).unwrap();
posting_serializer.new_field(text_field);
posting_serializer.new_term("abc".as_bytes()).unwrap();
for doc_id in 0u32..120u32 {

View File

@@ -1,7 +1,7 @@
use DocId;
use schema::Term;
use schema::FieldValue;
use postings::PostingsSerializer;
use postings::InvertedIndexSerializer;
use std::io;
use postings::Recorder;
use analyzer::SimpleTokenizer;
@@ -78,7 +78,7 @@ impl<'a> MultiFieldPostingsWriter<'a> {
/// It pushes all term, one field at a time, towards the
/// postings serializer.
#[allow(needless_range_loop)]
pub fn serialize(&self, serializer: &mut PostingsSerializer) -> Result<()> {
pub fn serialize(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> {
let mut term_offsets: Vec<(&[u8], u32)> = self.term_index.iter().collect();
term_offsets.sort_by_key(|&(k, _v)| k);
@@ -138,7 +138,7 @@ pub trait PostingsWriter {
fn serialize(&self,
field: Field,
term_addrs: &[(&[u8], u32)],
serializer: &mut PostingsSerializer,
serializer: &mut InvertedIndexSerializer,
heap: &Heap)
-> io::Result<()>;
@@ -216,7 +216,7 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<'
fn serialize(&self,
field: Field,
term_addrs: &[(&[u8], u32)],
serializer: &mut PostingsSerializer,
serializer: &mut InvertedIndexSerializer,
heap: &Heap)
-> io::Result<()> {
serializer.new_field(field);

View File

@@ -1,6 +1,6 @@
use DocId;
use std::io;
use postings::PostingsSerializer;
use postings::InvertedIndexSerializer;
use datastruct::stacker::{ExpUnrolledLinkedList, Heap, HeapAllocable};
const EMPTY_ARRAY: [u32; 0] = [0u32; 0];
@@ -29,7 +29,7 @@ pub trait Recorder: HeapAllocable {
/// Pushes the postings information to the serializer.
fn serialize(&self,
self_addr: u32,
serializer: &mut PostingsSerializer,
serializer: &mut InvertedIndexSerializer,
heap: &Heap)
-> io::Result<()>;
}
@@ -66,7 +66,7 @@ impl Recorder for NothingRecorder {
fn serialize(&self,
self_addr: u32,
serializer: &mut PostingsSerializer,
serializer: &mut InvertedIndexSerializer,
heap: &Heap)
-> io::Result<()> {
for doc in self.stack.iter(self_addr, heap) {
@@ -118,7 +118,7 @@ impl Recorder for TermFrequencyRecorder {
fn serialize(&self,
self_addr: u32,
serializer: &mut PostingsSerializer,
serializer: &mut InvertedIndexSerializer,
heap: &Heap)
-> io::Result<()> {
// the last document has not been closed...
@@ -173,7 +173,7 @@ impl Recorder for TFAndPositionRecorder {
fn serialize(&self,
self_addr: u32,
serializer: &mut PostingsSerializer,
serializer: &mut InvertedIndexSerializer,
heap: &Heap)
-> io::Result<()> {
let mut doc_positions = Vec::with_capacity(100);

View File

@@ -47,29 +47,125 @@ use termdict::TermDictionaryBuilder;
///
/// A description of the serialization format is
/// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html).
pub struct PostingsSerializer {
pub struct InvertedIndexSerializer {
terms_fst_builder: TermDictionaryBuilderImpl<WritePtr, TermInfo>,
postings_serializer: PostingsSerializer,
positions_serializer: PositionSerializer,
schema: Schema,
term_open: bool,
text_indexing_options: TextIndexingOptions,
current_term_info: TermInfo,
}
struct PostingsSerializer {
postings_write: CountingWriter<WritePtr>,
last_doc_id_encoded: u32,
positions_writer: PositionWriter,
block_encoder: BlockEncoder,
doc_ids: Vec<DocId>,
term_freqs: Vec<u32>,
schema: Schema,
text_indexing_options: TextIndexingOptions,
term_open: bool,
current_term_info: TermInfo,
termfreq_enabled: bool,
}
struct PositionWriter {
impl PostingsSerializer {
fn new(write: WritePtr) -> PostingsSerializer {
PostingsSerializer {
postings_write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
doc_ids: vec!(),
term_freqs: vec!(),
last_doc_id_encoded: 0u32,
termfreq_enabled: false,
}
}
fn write_doc(&mut self, doc_id: DocId, term_freq: u32) -> io::Result<()> {
self.doc_ids.push(doc_id);
if self.termfreq_enabled {
self.term_freqs.push(term_freq as u32);
}
if self.doc_ids.len() == NUM_DOCS_PER_BLOCK {
{
// encode the doc ids
let block_encoded: &[u8] =
self.block_encoder
.compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1];
self.postings_write.write_all(block_encoded)?;
}
if self.termfreq_enabled {
// encode the term_freqs
let block_encoded: &[u8] = self.block_encoder
.compress_block_unsorted(&self.term_freqs);
self.postings_write.write_all(block_encoded)?;
self.term_freqs.clear();
}
self.doc_ids.clear();
}
Ok(())
}
fn set_termfreq_enabled(&mut self, termfreq_enabled: bool) {
self.termfreq_enabled = termfreq_enabled;
}
fn close_term(&mut self) -> io::Result<()> {
if !self.doc_ids.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
// not a perfect multiple of our block size.
//
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded =
self.block_encoder
.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
self.doc_ids.clear();
}
// ... Idem for term frequencies
if self.termfreq_enabled {
let block_encoded = self.block_encoder
.compress_vint_unsorted(&self.term_freqs[..]);
self.postings_write.write_all(block_encoded)?;
self.term_freqs.clear();
}
}
Ok(())
}
fn close(mut self) -> io::Result<()> {
self.postings_write.flush()
}
fn addr(&self) -> u32 {
self.postings_write.written_bytes() as u32
}
fn clear(&mut self) {
self.doc_ids.clear();
self.term_freqs.clear();
self.last_doc_id_encoded = 0;
}
}
struct PositionSerializer {
buffer: Vec<u32>,
write: CountingWriter<WritePtr>,
block_encoder: BlockEncoder,
}
impl PositionWriter {
fn new(write: WritePtr) -> PositionWriter {
PositionWriter {
impl PositionSerializer {
fn new(write: WritePtr) -> PositionSerializer {
PositionSerializer {
buffer: Vec::with_capacity(NUM_DOCS_PER_BLOCK),
write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
@@ -108,37 +204,33 @@ impl PositionWriter {
}
}
impl PostingsSerializer {
impl InvertedIndexSerializer {
/// Open a new `PostingsSerializer` for the given segment
pub fn new(terms_write: WritePtr,
postings_write: WritePtr,
positions_write: WritePtr,
schema: Schema)
-> Result<PostingsSerializer> {
-> Result<InvertedIndexSerializer> {
let terms_fst_builder = TermDictionaryBuilderImpl::new(terms_write)?;
Ok(PostingsSerializer {
terms_fst_builder: terms_fst_builder,
postings_write: CountingWriter::wrap(postings_write),
positions_writer: PositionWriter::new(positions_write),
last_doc_id_encoded: 0u32,
block_encoder: BlockEncoder::new(),
doc_ids: Vec::new(),
term_freqs: Vec::new(),
schema: schema,
text_indexing_options: TextIndexingOptions::Unindexed,
term_open: false,
current_term_info: TermInfo::default(),
})
Ok(InvertedIndexSerializer {
terms_fst_builder: terms_fst_builder,
positions_serializer: PositionSerializer::new(positions_write),
postings_serializer: PostingsSerializer::new(postings_write),
schema: schema,
term_open: false,
current_term_info: TermInfo::default(),
text_indexing_options: TextIndexingOptions::Untokenized,
})
}
/// Open a new `PostingsSerializer` for the given segment
pub fn open(segment: &mut Segment) -> Result<PostingsSerializer> {
pub fn open(segment: &mut Segment) -> Result<InvertedIndexSerializer> {
use SegmentComponent::{TERMS, POSTINGS, POSITIONS};
PostingsSerializer::new(segment.open_write(TERMS)?,
segment.open_write(POSTINGS)?,
segment.open_write(POSITIONS)?,
segment.schema())
InvertedIndexSerializer::new(segment.open_write(TERMS)?,
segment.open_write(POSTINGS)?,
segment.open_write(POSITIONS)?,
segment.schema())
}
/// Must be called before starting pushing terms of
@@ -158,6 +250,17 @@ impl PostingsSerializer {
}
}
};
self.postings_serializer.set_termfreq_enabled(self.text_indexing_options.is_termfreq_enabled());
}
fn current_term_info(&self) -> TermInfo {
let (filepos, offset) = self.positions_serializer.addr();
TermInfo {
doc_freq: 0,
postings_offset: self.postings_serializer.addr(),
positions_offset: filepos,
positions_inner_offset: offset,
}
}
/// Starts the postings for a new term.
@@ -169,16 +272,8 @@ impl PostingsSerializer {
panic!("Called new_term, while the previous term was not closed.");
}
self.term_open = true;
self.doc_ids.clear();
self.last_doc_id_encoded = 0;
self.term_freqs.clear();
let (filepos, offset) = self.positions_writer.addr();
self.current_term_info = TermInfo {
doc_freq: 0,
postings_offset: self.postings_write.written_bytes() as u32,
positions_offset: filepos,
positions_inner_offset: offset,
};
self.postings_serializer.clear();
self.current_term_info = self.current_term_info();
self.terms_fst_builder.insert_key(term)
}
@@ -188,32 +283,8 @@ impl PostingsSerializer {
/// using `VInt` encoding.
pub fn close_term(&mut self) -> io::Result<()> {
if self.term_open {
self.terms_fst_builder
.insert_value(&self.current_term_info)?;
if !self.doc_ids.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
// not a perfect multiple of our block size.
//
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded =
self.block_encoder
.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
self.doc_ids.clear();
}
// ... Idem for term frequencies
if self.text_indexing_options.is_termfreq_enabled() {
let block_encoded = self.block_encoder
.compress_vint_unsorted(&self.term_freqs[..]);
self.postings_write.write_all(block_encoded)?;
self.term_freqs.clear();
}
}
self.terms_fst_builder.insert_value(&self.current_term_info)?;
self.postings_serializer.close_term()?;
self.term_open = false;
}
Ok(())
@@ -235,31 +306,11 @@ impl PostingsSerializer {
position_deltas: &[u32])
-> io::Result<()> {
self.current_term_info.doc_freq += 1;
self.doc_ids.push(doc_id);
if self.text_indexing_options.is_termfreq_enabled() {
self.term_freqs.push(term_freq as u32);
}
self.postings_serializer.write_doc(doc_id, term_freq)?;
if self.text_indexing_options.is_position_enabled() {
self.positions_writer.write(position_deltas)?;
}
if self.doc_ids.len() == NUM_DOCS_PER_BLOCK {
{
// encode the doc ids
let block_encoded: &[u8] =
self.block_encoder
.compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1];
self.postings_write.write_all(block_encoded)?;
}
if self.text_indexing_options.is_termfreq_enabled() {
// encode the term_freqs
let block_encoded: &[u8] = self.block_encoder
.compress_block_unsorted(&self.term_freqs);
self.postings_write.write_all(block_encoded)?;
self.term_freqs.clear();
}
self.doc_ids.clear();
self.positions_serializer.write(position_deltas)?;
}
Ok(())
}
@@ -267,8 +318,8 @@ impl PostingsSerializer {
pub fn close(mut self) -> io::Result<()> {
self.close_term()?;
self.terms_fst_builder.finish()?;
self.postings_write.flush()?;
self.positions_writer.close()?;
self.postings_serializer.close()?;
self.positions_serializer.close()?;
Ok(())
}
}