This commit is contained in:
Paul Masurel
2016-05-04 18:42:06 +09:00
parent 84e8919c1f
commit 042d2f175a
5 changed files with 130 additions and 129 deletions

View File

@@ -12,6 +12,8 @@ using namespace SIMDCompressionLib;
// sorted
static shared_ptr<IntegerCODEC> codec_sorted = CODECFactory::getFromName("s4-bp128-dm");
static CompositeCodec<SIMDBinaryPacking<SIMDBlockPacker<NoDelta, false>>, VariableByte<false>> composite_codec_unsorted = CompositeCodec<SIMDBinaryPacking<SIMDBlockPacker<NoDelta, false>>, VariableByte<false>>();
// variable byte
static VariableByte<false> codec_unsorted = VariableByte<false>();
@@ -120,6 +122,29 @@ extern "C" {
codec_sorted -> decodeArray(compressed_data, compressed_size, uncompressed, num_ints);
return num_ints;
}
size_t encode_composite_native(
uint32_t* begin,
const size_t num_els,
uint32_t* output,
const size_t output_capacity) {
size_t output_length = output_capacity;
composite_codec_unsorted.encodeArray(begin,
num_els,
output,
output_length);
return output_length;
}
size_t decode_composite_native(
const uint32_t* compressed_data,
const size_t compressed_size,
uint32_t* uncompressed,
const size_t uncompressed_capacity) {
size_t num_ints = uncompressed_capacity;
composite_codec_unsorted.decodeArray(compressed_data, compressed_size, uncompressed, num_ints);
return num_ints;
}
size_t encode_unsorted_native(

View File

@@ -8,6 +8,10 @@ extern {
// complete s4-bp128-dm
fn encode_s4_bp128_dm_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t;
fn decode_s4_bp128_dm_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t;
fn encode_composite_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t;
fn decode_composite_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t;
}
//-------------------------
@@ -26,6 +30,28 @@ impl S4BP128Encoder {
}
}
pub fn encode(&mut self, input: &[u32]) -> &[u32] {
self.input_buffer.clear();
let input_len = input.len();
if input_len + 10000 >= self.input_buffer.len() {
let target_length = input_len + 1024;
self.input_buffer.resize(target_length, 0);
self.output_buffer.resize(target_length, 0);
}
// TODO use clone_from when available
let written_size;
unsafe {
ptr::copy_nonoverlapping(input.as_ptr(), self.input_buffer.as_mut_ptr(), input_len);
written_size = encode_composite_native(
self.input_buffer.as_mut_ptr(),
input_len as size_t,
self.output_buffer.as_mut_ptr(),
self.output_buffer.len() as size_t,
);
}
&self.output_buffer[0..written_size]
}
pub fn encode_sorted(&mut self, input: &[u32]) -> &[u32] {
self.input_buffer.clear();
let input_len = input.len();
@@ -35,16 +61,17 @@ impl S4BP128Encoder {
self.output_buffer.resize(target_length, 0);
}
// TODO use clone_from when available
let written_size;
unsafe {
ptr::copy_nonoverlapping(input.as_ptr(), self.input_buffer.as_mut_ptr(), input_len);
let written_size = encode_s4_bp128_dm_native(
written_size = encode_s4_bp128_dm_native(
self.input_buffer.as_mut_ptr(),
input_len as size_t,
self.output_buffer.as_mut_ptr(),
self.output_buffer.len() as size_t,
);
return &self.output_buffer[0..written_size];
}
return &self.output_buffer[0..written_size];
}
}
@@ -67,18 +94,18 @@ impl S4BP128Decoder {
uncompressed_values.len() as size_t);
}
}
// pub fn decode_unsorted(&self,
// compressed_data: &[u32],
// uncompressed_values: &mut [u32]) -> size_t {
// unsafe {
// return decode_unsorted_native(
// compressed_data.as_ptr(),
// compressed_data.len() as size_t,
// uncompressed_values.as_mut_ptr(),
// uncompressed_values.len() as size_t);
// }
// }
pub fn decode(&self,
compressed_data: &[u32],
uncompressed_values: &mut [u32]) -> size_t {
unsafe {
return decode_composite_native(
compressed_data.as_ptr(),
compressed_data.len() as size_t,
uncompressed_values.as_mut_ptr(),
uncompressed_values.len() as size_t);
}
}
}
@@ -91,7 +118,7 @@ mod tests {
use compression::tests::generate_array;
#[test]
fn test_encode_big() {
fn test_encode_sorted_big() {
let mut encoder = S4BP128Encoder::new();
let num_ints = 10000 as usize;
let expected_length = 1274;
@@ -105,6 +132,22 @@ mod tests {
assert_eq!(num_ints, decoder.decode_sorted(&encoded_data[..], &mut decoded_data));
assert_eq!(decoded_data, input);
}
#[test]
fn test_encode_unsorted_big() {
let mut encoder = S4BP128Encoder::new();
let num_ints = 10000 as usize;
let expected_length = 1897;
let input: Vec<u32> = (0..num_ints as u32)
.map(|i| i * 7 % 37)
.into_iter().collect();
let encoded_data = encoder.encode(&input);
assert_eq!(encoded_data.len(), expected_length);
let decoder = S4BP128Decoder::new();
let mut decoded_data: Vec<u32> = (0..num_ints as u32).collect();
assert_eq!(num_ints, decoder.decode(&encoded_data[..], &mut decoded_data));
assert_eq!(decoded_data, input);
}
#[bench]
fn bench_decode(b: &mut Bencher) {

View File

@@ -1,37 +1,10 @@
// pub mod postings;
// pub mod schema;
// pub mod directory;
// pub mod writer;
// pub mod analyzer;
// pub mod reader;
// pub mod codec;
// pub mod searcher;
// pub mod collector;
// pub mod serialize;
// pub mod store;
// pub mod simdcompression;
// pub mod fstmap;
// pub mod index;
// pub mod fastfield;
// pub mod fastdivide;
// pub mod merger;
// pub mod timer;
// use std::error;
// use std::io;
// pub fn convert_to_ioerror<E: 'static + error::Error + Send + Sync>(err: E) -> io::Error {
// io::Error::new(
// io::ErrorKind::InvalidData,
// err
// )
// }
mod recorder;
mod serializer;
mod writer;
mod term_info;
use DocId;
pub use self::recorder::{Recorder, NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder};
pub use self::serializer::PostingsSerializer;
pub use self::writer::PostingsWriter;
pub use self::term_info::TermInfo;

View File

@@ -2,7 +2,7 @@ use datastruct::FstMapBuilder;
use super::TermInfo;
use schema::Term;
use directory::WritePtr;
use compression::{Block128Encoder, VIntsEncoder};
use compression::{NUM_DOCS_PER_BLOCK, Block128Encoder, VIntsEncoder, S4BP128Encoder};
use DocId;
use core::index::Segment;
use std::io;
@@ -10,17 +10,21 @@ use core::index::SegmentComponent;
use common::BinarySerializable;
use common::VInt;
pub struct PostingsSerializer {
terms_fst_builder: FstMapBuilder<WritePtr, TermInfo>, // TODO find an alternative to work around the "move"
postings_write: WritePtr,
positions_write: WritePtr,
written_bytes_postings: usize,
written_bytes_positions: usize,
positions_encoder: S4BP128Encoder,
block_encoder: Block128Encoder,
vints_encoder: VIntsEncoder,
doc_ids: Vec<DocId>,
term_freqs: Vec<u32>,
positions: Vec<u32>,
position_deltas: Vec<u32>,
is_termfreq_enabled: bool,
is_positions_enabled: bool,
}
impl PostingsSerializer {
@@ -36,17 +40,22 @@ impl PostingsSerializer {
positions_write: positions_write,
written_bytes_postings: 0,
written_bytes_positions: 0,
positions_encoder: S4BP128Encoder::new(),
block_encoder: Block128Encoder::new(),
vints_encoder: VIntsEncoder::new(),
doc_ids: Vec::new(),
term_freqs: Vec::new(),
positions: Vec::new(),
position_deltas: Vec::new(),
is_positions_enabled: false,
is_termfreq_enabled: false,
})
}
pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> {
try!(self.close_term());
self.doc_ids.clear();
self.term_freqs.clear();
self.position_deltas.clear();
let term_info = TermInfo {
doc_freq: doc_freq,
postings_offset: self.written_bytes_postings as u32,
@@ -55,7 +64,6 @@ impl PostingsSerializer {
.insert(term.as_slice(), &term_info)
}
pub fn close_term(&mut self,) -> io::Result<()> {
if !self.doc_ids.is_empty() {
{
@@ -72,13 +80,24 @@ impl PostingsSerializer {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}
}
if self.is_positions_enabled {
let mut num_blocks = self.position_deltas.len() / NUM_DOCS_PER_BLOCK;
let mut offset = 0;
for _ in 0..num_blocks {
let block_encoded = self.positions_encoder.encode(&self.position_deltas[offset..offset + NUM_DOCS_PER_BLOCK]);
offset += NUM_DOCS_PER_BLOCK;
// self.positions_write.wr
}
// self.position_deltas.extend_from_slice(position_deltas);
// let block_encoded &[u32] = self.positions_encoder.encode(&self.positions[..]);
}
self.doc_ids.clear();
self.term_freqs.clear();
}
Ok(())
}
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32, positions: &[u32]) -> io::Result<()> {
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32, position_deltas: &[u32]) -> io::Result<()> {
self.doc_ids.push(doc_id);
self.term_freqs.push(term_freq as u32);
if self.doc_ids.len() == 128 {
@@ -88,11 +107,14 @@ impl PostingsSerializer {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}
}
{
if self.is_termfreq_enabled {
let block_encoded: &[u32] = self.block_encoder.encode_sorted(&self.term_freqs);
for num in block_encoded {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}
}
}
if self.is_positions_enabled {
self.position_deltas.extend_from_slice(position_deltas);
}
self.doc_ids.clear();
self.term_freqs.clear();

View File

@@ -3,87 +3,28 @@ use std::collections::BTreeMap;
use schema::Term;
use postings::PostingsSerializer;
use std::io;
pub use postings::Recorder;
pub use postings::NothingRecorder;
pub trait U32sRecorder {
fn new() -> Self;
fn record(&mut self, val: u32);
fn get(&self, idx: usize) -> u32;
fn slice(&self, start: usize, stop: usize) -> &[u32];
}
pub struct VecRecorder(Vec<u32>);
impl U32sRecorder for VecRecorder {
fn new() -> VecRecorder {
VecRecorder(Vec::new())
}
fn record(&mut self, val: u32) {
self.0.push(val);
}
fn get(&self, idx: usize) -> u32 {
self.0[idx]
}
fn slice(&self, start: usize, stop: usize) -> &[u32] {
&self.0[start..stop]
}
}
const EMPTY_ARRAY: [u32; 0] = [];
pub struct ObliviousRecorder;
impl U32sRecorder for ObliviousRecorder {
fn new() -> ObliviousRecorder {
ObliviousRecorder
}
fn record(&mut self, _: u32) {
// do nothing here.
}
fn get(&self, _idx: usize) -> u32 {
0u32
}
fn slice(&self, _start: usize, _stop: usize) -> &[u32] {
&EMPTY_ARRAY[0..0]
}
}
struct TermPostingsWriter<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> {
struct TermPostingsWriter<Rec: Recorder> {
doc_ids: Vec<DocId>,
term_freqs: TermFreqsRec,
positions: PositionsRec,
current_position: u32,
current_freq: u32,
recorder: Rec,
}
impl<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> TermPostingsWriter<TermFreqsRec, PositionsRec> {
pub fn new() -> TermPostingsWriter<TermFreqsRec, PositionsRec> {
impl<Rec: Recorder> TermPostingsWriter<Rec> {
pub fn new() -> TermPostingsWriter<Rec> {
TermPostingsWriter {
doc_ids: Vec::new(),
term_freqs: TermFreqsRec::new(),
positions: PositionsRec::new(),
current_position: 0u32,
current_freq: 0u32,
recorder: Recorder::new(),
}
}
fn close_doc(&mut self,) {
self.term_freqs.record(self.current_freq);
self.current_freq = 0;
self.current_position = 0;
self.recorder.close_doc();
}
fn close(&mut self,) {
if self.current_freq > 0 {
self.close_doc();
}
}
fn is_new_doc(&self, doc: &DocId) -> bool {
match self.doc_ids.last() {
Some(&last_doc) => last_doc != *doc,
@@ -102,17 +43,14 @@ impl<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> TermPostingsWriter<
self.close_doc();
self.doc_ids.push(doc);
}
self.current_freq += 1;
self.positions.record(pos - self.current_position);
self.current_position = pos;
self.recorder.record_position(pos);
}
pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> {
let mut positions_idx = 0;
for (i, doc) in self.doc_ids.iter().enumerate() {
let term_freq: u32 = self.term_freqs.get(i);
let positions: &[u32] = self.positions.slice(positions_idx, positions_idx + term_freq as usize);
try!(serializer.write_doc(doc.clone(), term_freq, positions));
let (term_freq, position_deltas) = self.recorder.get_tf_and_posdeltas(i);
try!(serializer.write_doc(doc.clone(), term_freq, position_deltas));
positions_idx += term_freq as usize;
}
Ok(())
@@ -121,7 +59,7 @@ impl<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> TermPostingsWriter<
pub struct PostingsWriter {
postings: Vec<TermPostingsWriter<ObliviousRecorder, ObliviousRecorder>>,
postings: Vec<TermPostingsWriter<NothingRecorder>>,
term_index: BTreeMap<Term, usize>,
}
@@ -136,16 +74,16 @@ impl PostingsWriter {
pub fn close(&mut self,) {
for term_postings_writer in self.postings.iter_mut() {
term_postings_writer.close();
term_postings_writer.close_doc();
}
}
pub fn suscribe(&mut self, doc: DocId, pos: u32, term: Term) {
let doc_ids: &mut TermPostingsWriter<ObliviousRecorder, ObliviousRecorder> = self.get_term_postings(term);
let doc_ids: &mut TermPostingsWriter<NothingRecorder> = self.get_term_postings(term);
doc_ids.suscribe(doc, pos);
}
fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter<ObliviousRecorder, ObliviousRecorder> {
fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter<NothingRecorder> {
match self.term_index.get(&term) {
Some(unord_id) => {
return &mut self.postings[*unord_id];