This commit is contained in:
Paul Masurel
2017-08-04 10:28:59 +09:00
parent 447a9361d8
commit aff7e64d4e
12 changed files with 141 additions and 262 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.4.3"
version = "0.5.0-dev"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
build = "build.rs"
license = "MIT"

View File

@@ -1,170 +0,0 @@
use super::{BlockEncoder, BlockDecoder};
use super::NUM_DOCS_PER_BLOCK;
use compression::{VIntEncoder, VIntDecoder};
pub struct CompositeEncoder {
block_encoder: BlockEncoder,
output: Vec<u8>,
}
impl CompositeEncoder {
pub fn new() -> CompositeEncoder {
CompositeEncoder {
block_encoder: BlockEncoder::new(),
output: Vec::with_capacity(500_000),
}
}
pub fn compress_sorted(&mut self, vals: &[u32]) -> &[u8] {
self.output.clear();
let num_blocks = vals.len() / NUM_DOCS_PER_BLOCK;
let mut offset = 0u32;
for i in 0..num_blocks {
let vals_slice = &vals[i * NUM_DOCS_PER_BLOCK..(i + 1) * NUM_DOCS_PER_BLOCK];
let block_compressed = self.block_encoder.compress_block_sorted(vals_slice, offset);
offset = vals_slice[NUM_DOCS_PER_BLOCK - 1];
self.output.extend_from_slice(block_compressed);
}
let vint_compressed =
self.block_encoder
.compress_vint_sorted(&vals[num_blocks * NUM_DOCS_PER_BLOCK..], offset);
self.output.extend_from_slice(vint_compressed);
&self.output
}
pub fn compress_unsorted(&mut self, vals: &[u32]) -> &[u8] {
self.output.clear();
let num_blocks = vals.len() / NUM_DOCS_PER_BLOCK;
for i in 0..num_blocks {
let vals_slice = &vals[i * NUM_DOCS_PER_BLOCK..(i + 1) * NUM_DOCS_PER_BLOCK];
let block_compressed = self.block_encoder.compress_block_unsorted(vals_slice);
self.output.extend_from_slice(block_compressed);
}
let vint_compressed = self.block_encoder
.compress_vint_unsorted(&vals[num_blocks * NUM_DOCS_PER_BLOCK..]);
self.output.extend_from_slice(vint_compressed);
&self.output
}
}
pub struct CompositeDecoder {
block_decoder: BlockDecoder,
vals: Vec<u32>,
}
impl CompositeDecoder {
pub fn new() -> CompositeDecoder {
CompositeDecoder {
block_decoder: BlockDecoder::new(),
vals: Vec::with_capacity(500_000),
}
}
pub fn uncompress_sorted(&mut self,
mut compressed_data: &[u8],
uncompressed_len: usize)
-> &[u32] {
if uncompressed_len > self.vals.capacity() {
let extra_capacity = uncompressed_len - self.vals.capacity();
self.vals.reserve(extra_capacity);
}
let mut offset = 0u32;
self.vals.clear();
let num_blocks = uncompressed_len / NUM_DOCS_PER_BLOCK;
for _ in 0..num_blocks {
compressed_data = self.block_decoder
.uncompress_block_sorted(compressed_data, offset);
offset = self.block_decoder.output(NUM_DOCS_PER_BLOCK - 1);
self.vals
.extend_from_slice(self.block_decoder.output_array());
}
self.block_decoder
.uncompress_vint_sorted(compressed_data,
offset,
uncompressed_len % NUM_DOCS_PER_BLOCK);
self.vals
.extend_from_slice(self.block_decoder.output_array());
&self.vals
}
pub fn uncompress_unsorted(&mut self,
mut compressed_data: &[u8],
uncompressed_len: usize)
-> &[u32] {
self.vals.clear();
let num_blocks = uncompressed_len / NUM_DOCS_PER_BLOCK;
for _ in 0..num_blocks {
compressed_data = self.block_decoder
.uncompress_block_unsorted(compressed_data);
self.vals
.extend_from_slice(self.block_decoder.output_array());
}
self.block_decoder
.uncompress_vint_unsorted(compressed_data, uncompressed_len % NUM_DOCS_PER_BLOCK);
self.vals
.extend_from_slice(self.block_decoder.output_array());
&self.vals
}
}
impl Into<Vec<u32>> for CompositeDecoder {
fn into(self) -> Vec<u32> {
self.vals
}
}
#[cfg(test)]
pub mod tests {
use test::Bencher;
use super::*;
use tests;
#[test]
fn test_composite_unsorted() {
let data = tests::generate_array(10_000, 0.1);
let mut encoder = CompositeEncoder::new();
let compressed = encoder.compress_unsorted(&data);
assert!(compressed.len() <= 19_794);
let mut decoder = CompositeDecoder::new();
let result = decoder.uncompress_unsorted(&compressed, data.len());
for i in 0..data.len() {
assert_eq!(data[i], result[i]);
}
}
#[test]
fn test_composite_sorted() {
let data = tests::generate_array(10_000, 0.1);
let mut encoder = CompositeEncoder::new();
let compressed = encoder.compress_sorted(&data);
assert!(compressed.len() <= 7_826);
let mut decoder = CompositeDecoder::new();
let result = decoder.uncompress_sorted(&compressed, data.len());
for i in 0..data.len() {
assert_eq!(data[i], result[i]);
}
}
const BENCH_NUM_INTS: usize = 99_968;
#[bench]
fn bench_compress(b: &mut Bencher) {
let mut encoder = CompositeEncoder::new();
let data = tests::generate_array(BENCH_NUM_INTS, 0.1);
b.iter(|| { encoder.compress_sorted(&data); });
}
#[bench]
fn bench_uncompress(b: &mut Bencher) {
let mut encoder = CompositeEncoder::new();
let data = tests::generate_array(BENCH_NUM_INTS, 0.1);
let compressed = encoder.compress_sorted(&data);
let mut decoder = CompositeDecoder::new();
b.iter(|| { decoder.uncompress_sorted(compressed, BENCH_NUM_INTS); });
}
}

View File

@@ -1,10 +1,6 @@
#![allow(dead_code)]
mod composite;
pub use self::composite::{CompositeEncoder, CompositeDecoder};
#[cfg(not(feature="simdcompression"))]
mod pack {
mod compression_pack_nosimd;

View File

@@ -18,7 +18,7 @@ pub fn compress_sorted(vals: &mut [u32], mut output: &mut [u8], offset: u32) ->
local_offset = val;
}
}
let num_bits = compute_num_bits(max_delta);
let num_bits = compute_num_bits(max_delta as u64);
output.write_all(&[num_bits]).unwrap();
let mut bit_packer = BitPacker::new(num_bits as usize);
for val in vals {

View File

@@ -240,7 +240,7 @@ impl SegmentReader {
SegmentPostingsOption::FreqAndPositions => {
let offset = term_info.positions_offset as usize;
let offseted_position_data = &self.positions_data[offset..];
FreqHandler::new_with_freq_and_position(offseted_position_data)
FreqHandler::new_with_freq_and_position(offseted_position_data, term_info.positions_inner_offset)
}
};
BlockSegmentPostings::from_data(term_info.doc_freq as usize, postings_data, freq_handler)

View File

@@ -28,29 +28,6 @@ pub struct IndexMerger {
}
struct DeltaPositionComputer {
buffer: Vec<u32>,
}
impl DeltaPositionComputer {
fn new() -> DeltaPositionComputer {
DeltaPositionComputer { buffer: vec![0u32; 512] }
}
fn compute_delta_positions(&mut self, positions: &[u32]) -> &[u32] {
if positions.len() > self.buffer.len() {
self.buffer.resize(positions.len(), 0u32);
}
let mut last_pos = 0u32;
for (i, position) in positions.iter().cloned().enumerate() {
self.buffer[i] = position - last_pos;
last_pos = position;
}
&self.buffer[..positions.len()]
}
}
fn compute_min_max_val(u64_reader: &U64FastFieldReader,
max_doc: DocId,
delete_bitset: &DeleteBitSet)
@@ -193,7 +170,6 @@ impl IndexMerger {
fn write_postings(&self, serializer: &mut PostingsSerializer) -> Result<()> {
let mut merged_terms = TermMerger::from(&self.readers[..]);
let mut delta_position_computer = DeltaPositionComputer::new();
let mut max_doc = 0;
@@ -294,9 +270,7 @@ impl IndexMerger {
old_to_new_doc_id[segment_postings.doc() as usize] {
// we make sure to only write the term iff
// there is at least one document.
let delta_positions: &[u32] =
delta_position_computer
.compute_delta_positions(segment_postings.positions());
let delta_positions: &[u32] = segment_postings.delta_positions();
let term_freq = segment_postings.term_freq();
serializer
.write_doc(remapped_doc_id, term_freq, delta_positions)?;

View File

@@ -1,37 +1,26 @@
use compression::BlockDecoder;
use common::VInt;
use common::BinarySerializable;
use compression::{CompositeDecoder, VIntDecoder};
use compression::VIntDecoder;
use postings::SegmentPostingsOption;
use compression::NUM_DOCS_PER_BLOCK;
use std::cell::UnsafeCell;
/// `FreqHandler` is in charge of decompressing
/// frequencies and/or positions.
pub struct FreqHandler {
freq_decoder: BlockDecoder,
positions: Vec<u32>,
positions: UnsafeCell<Vec<u32>>,
option: SegmentPostingsOption,
positions_offsets: [usize; NUM_DOCS_PER_BLOCK + 1],
}
fn read_positions(data: &[u8]) -> Vec<u32> {
let mut composite_reader = CompositeDecoder::new();
let mut readable: &[u8] = data;
let uncompressed_len = VInt::deserialize(&mut readable).unwrap().0 as usize;
composite_reader.uncompress_unsorted(readable, uncompressed_len);
composite_reader.into()
}
impl FreqHandler {
/// Returns a `FreqHandler` that just decodes `DocId`s.
pub fn new_without_freq() -> FreqHandler {
FreqHandler {
freq_decoder: BlockDecoder::with_val(1u32),
positions: Vec::new(),
positions: UnsafeCell::new(Vec::with_capacity(0)),
option: SegmentPostingsOption::NoFreq,
positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1],
}
@@ -41,23 +30,23 @@ impl FreqHandler {
pub fn new_with_freq() -> FreqHandler {
FreqHandler {
freq_decoder: BlockDecoder::new(),
positions: Vec::new(),
positions: UnsafeCell::new(Vec::with_capacity(0)),
option: SegmentPostingsOption::Freq,
positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1],
}
}
/// Returns a `FreqHandler` that decodes `DocId`s, term frequencies, and term positions.
pub fn new_with_freq_and_position(position_data: &[u8]) -> FreqHandler {
let positions = read_positions(position_data);
pub fn new_with_freq_and_position(position_data: &[u8], within_block_offset: u8) -> FreqHandler {
FreqHandler {
freq_decoder: BlockDecoder::new(),
positions: positions,
positions: UnsafeCell::new(Vec::with_capacity(NUM_DOCS_PER_BLOCK)),
option: SegmentPostingsOption::FreqAndPositions,
positions_offsets: [0; NUM_DOCS_PER_BLOCK + 1],
}
}
/*
fn fill_positions_offset(&mut self) {
let mut cur_position: usize = self.positions_offsets[NUM_DOCS_PER_BLOCK];
let mut i: usize = 0;
@@ -75,7 +64,7 @@ impl FreqHandler {
self.positions_offsets[i] = cur_position;
last_cur_position = cur_position;
}
}
}*/
/// Accessor to term frequency
@@ -91,11 +80,31 @@ impl FreqHandler {
/// idx is the offset of the current doc in the block.
/// It takes value between 0 and 128.
pub fn positions(&self, idx: usize) -> &[u32] {
let start = self.positions_offsets[idx];
let stop = self.positions_offsets[idx + 1];
&self.positions[start..stop]
//unsafe { &self.positions.get() }
println!("fix positions");
self.delta_positions(idx)
}
/// Accessor to the delta positions.
/// Delta positions is simply the difference between
/// two consecutive positions.
/// The first delta position is the first position of the
/// term in the document.
///
/// For instance, if positions are `[7,13,17]`
/// then delta positions `[7, 6, 4]`
///
/// idx is the offset of the current doc in the docid/freq block.
/// It takes value between 0 and 128.
pub fn delta_positions(&self, idx: usize) -> &[u32] {
let freq = self.freq(idx);
let positions: &mut Vec<u32> = unsafe { &mut *self.positions.get() };
positions.resize(freq as usize, 0u32);
&positions[..]
}
/// Decompresses a complete frequency block
pub fn read_freq_block<'a>(&mut self, data: &'a [u8]) -> &'a [u8] {
match self.option {
@@ -103,7 +112,7 @@ impl FreqHandler {
SegmentPostingsOption::Freq => self.freq_decoder.uncompress_block_unsorted(data),
SegmentPostingsOption::FreqAndPositions => {
let remaining: &'a [u8] = self.freq_decoder.uncompress_block_unsorted(data);
self.fill_positions_offset();
// self.fill_positions_offset();
remaining
}
}
@@ -118,7 +127,7 @@ impl FreqHandler {
}
SegmentPostingsOption::FreqAndPositions => {
self.freq_decoder.uncompress_vint_unsorted(data, num_els);
self.fill_positions_offset();
// self.fill_positions_offset();
}
}
}

View File

@@ -17,6 +17,16 @@ pub trait Postings: DocSet {
/// Returns the list of positions of the term, expressed as a list of
/// token ordinals.
fn positions(&self) -> &[u32];
/// Return the list of delta positions.
///
/// Delta positions is simply the difference between
/// two consecutive positions.
/// The first delta position is the first position of the
/// term in the document.
///
/// For instance, if positions are `[7,13,17]`
/// then delta positions `[7, 6, 4]`
fn delta_positions(&self) -> &[u32];
}
impl<TPostings: Postings> Postings for Box<TPostings> {
@@ -29,6 +39,11 @@ impl<TPostings: Postings> Postings for Box<TPostings> {
let unboxed: &TPostings = self.borrow();
unboxed.positions()
}
fn delta_positions(&self) -> &[u32] {
let unboxed: &TPostings = self.borrow();
unboxed.delta_positions()
}
}
impl<'a, TPostings: Postings> Postings for &'a mut TPostings {
@@ -41,4 +56,9 @@ impl<'a, TPostings: Postings> Postings for &'a mut TPostings {
let unref: &TPostings = *self;
unref.positions()
}
fn delta_positions(&self) -> &[u32] {
let unref: &TPostings = *self;
unref.delta_positions()
}
}

View File

@@ -179,6 +179,11 @@ impl<'a> Postings for SegmentPostings<'a> {
fn positions(&self) -> &[u32] {
self.block_cursor.freq_handler().positions(self.cur)
}
fn delta_positions(&self) -> &[u32] {
self.block_cursor.freq_handler().delta_positions(self.cur)
}
}
/// `BlockSegmentPostings` is a cursor iterating over blocks

View File

@@ -7,7 +7,7 @@ use schema::FieldType;
use schema::Schema;
use schema::TextIndexingOptions;
use directory::WritePtr;
use compression::{NUM_DOCS_PER_BLOCK, BlockEncoder, CompositeEncoder};
use compression::{NUM_DOCS_PER_BLOCK, BlockEncoder};
use DocId;
use core::Segment;
use std::io::{self, Write};
@@ -16,6 +16,7 @@ use common::VInt;
use common::BinarySerializable;
use common::CountingWriter;
use termdict::TermDictionaryBuilder;
use datastruct::{SkipList, SkipListBuilder};
/// `PostingsSerializer` is in charge of serializing
@@ -52,19 +53,64 @@ use termdict::TermDictionaryBuilder;
pub struct PostingsSerializer {
terms_fst_builder: TermDictionaryBuilderImpl<WritePtr, TermInfo>,
postings_write: CountingWriter<WritePtr>,
positions_write: CountingWriter<WritePtr>,
last_doc_id_encoded: u32,
positions_encoder: CompositeEncoder,
positions_writer: PositionWriter,
block_encoder: BlockEncoder,
doc_ids: Vec<DocId>,
term_freqs: Vec<u32>,
position_deltas: Vec<u32>,
schema: Schema,
text_indexing_options: TextIndexingOptions,
term_open: bool,
current_term_info: TermInfo,
}
struct PositionWriter {
buffer: Vec<u32>,
write: CountingWriter<WritePtr>,
block_encoder: BlockEncoder,
}
impl PositionWriter {
fn new(write: WritePtr) -> PositionWriter {
PositionWriter {
buffer: Vec::with_capacity(NUM_DOCS_PER_BLOCK),
write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
}
}
fn addr(&self) -> (u32, u8) {
(self.write.written_bytes() as u32, self.buffer.len() as u8)
}
fn write_block(&mut self) -> io::Result<()> {
assert_eq!(self.buffer.len(), NUM_DOCS_PER_BLOCK);
let block_compressed: &[u8] = self.block_encoder.compress_block_unsorted(&self.buffer);
self.write.write_all(block_compressed)?;
self.buffer.clear();
Ok(())
}
fn write(&mut self, mut vals: &[u32]) -> io::Result<()> {
let mut buffer_len = self.buffer.len();
while vals.len() + buffer_len >= NUM_DOCS_PER_BLOCK {
let len_to_completion = NUM_DOCS_PER_BLOCK - buffer_len;
self.buffer.extend_from_slice(&vals[..len_to_completion]);
self.write_block()?;
vals = &vals[len_to_completion..];
buffer_len = self.buffer.len();
}
self.buffer.extend_from_slice(&vals);
Ok(())
}
fn close(mut self) -> io::Result<()> {
self.buffer.resize(NUM_DOCS_PER_BLOCK, 0u32);
self.write_block()?;
self.write.flush()
}
}
impl PostingsSerializer {
/// Open a new `PostingsSerializer` for the given segment
pub fn new(terms_write: WritePtr,
@@ -72,17 +118,15 @@ impl PostingsSerializer {
positions_write: WritePtr,
schema: Schema)
-> Result<PostingsSerializer> {
let terms_fst_builder = try!(TermDictionaryBuilderImpl::new(terms_write));
let terms_fst_builder = TermDictionaryBuilderImpl::new(terms_write)?;
Ok(PostingsSerializer {
terms_fst_builder: terms_fst_builder,
postings_write: CountingWriter::wrap(postings_write),
positions_write: CountingWriter::wrap(positions_write),
positions_writer: PositionWriter::new(positions_write),
last_doc_id_encoded: 0u32,
positions_encoder: CompositeEncoder::new(),
block_encoder: BlockEncoder::new(),
doc_ids: Vec::new(),
term_freqs: Vec::new(),
position_deltas: Vec::new(),
schema: schema,
text_indexing_options: TextIndexingOptions::Unindexed,
term_open: false,
@@ -131,11 +175,12 @@ impl PostingsSerializer {
self.doc_ids.clear();
self.last_doc_id_encoded = 0;
self.term_freqs.clear();
self.position_deltas.clear();
let (filepos, offset) = self.positions_writer.addr();
self.current_term_info = TermInfo {
doc_freq: 0,
postings_offset: self.postings_write.written_bytes() as u32,
positions_offset: self.positions_write.written_bytes() as u32,
positions_offset: filepos,
positions_inner_offset: offset,
};
self.terms_fst_builder.insert_key(term)
}
@@ -172,16 +217,6 @@ impl PostingsSerializer {
self.term_freqs.clear();
}
}
// On the other hand, positions are entirely buffered until the
// end of the term, at which point they are compressed and written.
if self.text_indexing_options.is_position_enabled() {
let posdelta_len = VInt(self.position_deltas.len() as u64);
posdelta_len.serialize(&mut self.positions_write)?;
let positions_encoded: &[u8] = self.positions_encoder
.compress_unsorted(&self.position_deltas[..]);
self.positions_write.write_all(positions_encoded)?;
self.position_deltas.clear();
}
self.term_open = false;
}
Ok(())
@@ -208,7 +243,7 @@ impl PostingsSerializer {
self.term_freqs.push(term_freq as u32);
}
if self.text_indexing_options.is_position_enabled() {
self.position_deltas.extend_from_slice(position_deltas);
self.positions_writer.write(position_deltas)?;
}
if self.doc_ids.len() == NUM_DOCS_PER_BLOCK {
{
@@ -233,10 +268,10 @@ impl PostingsSerializer {
/// Closes the serializer.
pub fn close(mut self) -> io::Result<()> {
try!(self.close_term());
try!(self.terms_fst_builder.finish());
try!(self.postings_write.flush());
try!(self.positions_write.flush());
self.close_term()?;
self.terms_fst_builder.finish()?;
self.postings_write.flush()?;
self.positions_writer.close()?;
Ok(())
}
}

View File

@@ -20,6 +20,8 @@ pub struct TermInfo {
pub postings_offset: u32,
/// Offset within the position (`.pos`) file.
pub positions_offset: u32,
/// Offset within the position block.
pub positions_inner_offset: u8,
}
@@ -27,17 +29,20 @@ impl BinarySerializable for TermInfo {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.doc_freq.serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.positions_offset.serialize(writer)
self.positions_offset.serialize(writer)?;
self.positions_inner_offset.serialize(writer)
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let doc_freq = try!(u32::deserialize(reader));
let postings_offset = try!(u32::deserialize(reader));
let positions_offset = try!(u32::deserialize(reader));
let doc_freq = u32::deserialize(reader)?;
let postings_offset = u32::deserialize(reader)?;
let positions_offset = u32::deserialize(reader)?;
let positions_inner_offset = u8::deserialize(reader)?;
Ok(TermInfo {
doc_freq: doc_freq,
postings_offset: postings_offset,
positions_offset: positions_offset,
})
doc_freq: doc_freq,
postings_offset: postings_offset,
positions_offset: positions_offset,
positions_inner_offset: positions_inner_offset,
})
}
}

View File

@@ -54,6 +54,11 @@ impl Postings for VecPostings {
fn positions(&self) -> &[u32] {
&EMPTY_ARRAY
}
fn delta_positions(&self) -> &[u32] {
&EMPTY_ARRAY
}
}
#[cfg(test)]