diff --git a/cpp/encode.cpp b/cpp/encode.cpp index 91427a8ea..20e4c6064 100644 --- a/cpp/encode.cpp +++ b/cpp/encode.cpp @@ -12,6 +12,8 @@ using namespace SIMDCompressionLib; // sorted static shared_ptr codec_sorted = CODECFactory::getFromName("s4-bp128-dm"); +static CompositeCodec>, VariableByte> composite_codec_unsorted = CompositeCodec>, VariableByte>(); + // variable byte static VariableByte codec_unsorted = VariableByte(); @@ -120,6 +122,29 @@ extern "C" { codec_sorted -> decodeArray(compressed_data, compressed_size, uncompressed, num_ints); return num_ints; } + + size_t encode_composite_native( + uint32_t* begin, + const size_t num_els, + uint32_t* output, + const size_t output_capacity) { + size_t output_length = output_capacity; + composite_codec_unsorted.encodeArray(begin, + num_els, + output, + output_length); + return output_length; + } + + size_t decode_composite_native( + const uint32_t* compressed_data, + const size_t compressed_size, + uint32_t* uncompressed, + const size_t uncompressed_capacity) { + size_t num_ints = uncompressed_capacity; + composite_codec_unsorted.decodeArray(compressed_data, compressed_size, uncompressed, num_ints); + return num_ints; + } size_t encode_unsorted_native( diff --git a/src/compression/s4bp128.rs b/src/compression/s4bp128.rs index 2da125c76..3b57d5e2c 100644 --- a/src/compression/s4bp128.rs +++ b/src/compression/s4bp128.rs @@ -8,6 +8,10 @@ extern { // complete s4-bp128-dm fn encode_s4_bp128_dm_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t; fn decode_s4_bp128_dm_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t; + + fn encode_composite_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t; + fn decode_composite_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t; + } //------------------------- @@ -26,6 +30,28 @@ impl S4BP128Encoder { } } + pub fn encode(&mut self, input: &[u32]) -> &[u32] { + self.input_buffer.clear(); + let input_len = input.len(); + if input_len + 10000 >= self.input_buffer.len() { + let target_length = input_len + 1024; + self.input_buffer.resize(target_length, 0); + self.output_buffer.resize(target_length, 0); + } + // TODO use clone_from when available + let written_size; + unsafe { + ptr::copy_nonoverlapping(input.as_ptr(), self.input_buffer.as_mut_ptr(), input_len); + written_size = encode_composite_native( + self.input_buffer.as_mut_ptr(), + input_len as size_t, + self.output_buffer.as_mut_ptr(), + self.output_buffer.len() as size_t, + ); + } + &self.output_buffer[0..written_size] + } + pub fn encode_sorted(&mut self, input: &[u32]) -> &[u32] { self.input_buffer.clear(); let input_len = input.len(); @@ -35,16 +61,17 @@ impl S4BP128Encoder { self.output_buffer.resize(target_length, 0); } // TODO use clone_from when available + let written_size; unsafe { ptr::copy_nonoverlapping(input.as_ptr(), self.input_buffer.as_mut_ptr(), input_len); - let written_size = encode_s4_bp128_dm_native( + written_size = encode_s4_bp128_dm_native( self.input_buffer.as_mut_ptr(), input_len as size_t, self.output_buffer.as_mut_ptr(), self.output_buffer.len() as size_t, ); - return &self.output_buffer[0..written_size]; } + return &self.output_buffer[0..written_size]; } } @@ -67,18 +94,18 @@ impl S4BP128Decoder { uncompressed_values.len() as size_t); } } - - // pub fn decode_unsorted(&self, - // compressed_data: &[u32], - // uncompressed_values: &mut [u32]) -> size_t { - // unsafe { - // return decode_unsorted_native( - // compressed_data.as_ptr(), - // compressed_data.len() as size_t, - // uncompressed_values.as_mut_ptr(), - // uncompressed_values.len() as size_t); - // } - // } + + pub fn decode(&self, + compressed_data: &[u32], + uncompressed_values: &mut [u32]) -> size_t { + unsafe { + return decode_composite_native( + compressed_data.as_ptr(), + compressed_data.len() as size_t, + uncompressed_values.as_mut_ptr(), + uncompressed_values.len() as size_t); + } + } } @@ -91,7 +118,7 @@ mod tests { use compression::tests::generate_array; #[test] - fn test_encode_big() { + fn test_encode_sorted_big() { let mut encoder = S4BP128Encoder::new(); let num_ints = 10000 as usize; let expected_length = 1274; @@ -105,6 +132,22 @@ mod tests { assert_eq!(num_ints, decoder.decode_sorted(&encoded_data[..], &mut decoded_data)); assert_eq!(decoded_data, input); } + + #[test] + fn test_encode_unsorted_big() { + let mut encoder = S4BP128Encoder::new(); + let num_ints = 10000 as usize; + let expected_length = 1897; + let input: Vec = (0..num_ints as u32) + .map(|i| i * 7 % 37) + .into_iter().collect(); + let encoded_data = encoder.encode(&input); + assert_eq!(encoded_data.len(), expected_length); + let decoder = S4BP128Decoder::new(); + let mut decoded_data: Vec = (0..num_ints as u32).collect(); + assert_eq!(num_ints, decoder.decode(&encoded_data[..], &mut decoded_data)); + assert_eq!(decoded_data, input); + } #[bench] fn bench_decode(b: &mut Bencher) { diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 2cfafb5c6..717afdb0e 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -1,37 +1,10 @@ -// pub mod postings; -// pub mod schema; -// pub mod directory; -// pub mod writer; -// pub mod analyzer; -// pub mod reader; -// pub mod codec; -// pub mod searcher; -// pub mod collector; -// pub mod serialize; -// pub mod store; -// pub mod simdcompression; -// pub mod fstmap; -// pub mod index; -// pub mod fastfield; -// pub mod fastdivide; -// pub mod merger; -// pub mod timer; - -// use std::error; -// use std::io; - -// pub fn convert_to_ioerror(err: E) -> io::Error { -// io::Error::new( -// io::ErrorKind::InvalidData, -// err -// ) -// } - +mod recorder; mod serializer; mod writer; mod term_info; use DocId; +pub use self::recorder::{Recorder, NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; pub use self::serializer::PostingsSerializer; pub use self::writer::PostingsWriter; pub use self::term_info::TermInfo; diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index db18f42ed..30b2666a3 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -2,7 +2,7 @@ use datastruct::FstMapBuilder; use super::TermInfo; use schema::Term; use directory::WritePtr; -use compression::{Block128Encoder, VIntsEncoder}; +use compression::{NUM_DOCS_PER_BLOCK, Block128Encoder, VIntsEncoder, S4BP128Encoder}; use DocId; use core::index::Segment; use std::io; @@ -10,17 +10,21 @@ use core::index::SegmentComponent; use common::BinarySerializable; use common::VInt; + pub struct PostingsSerializer { terms_fst_builder: FstMapBuilder, // TODO find an alternative to work around the "move" postings_write: WritePtr, positions_write: WritePtr, written_bytes_postings: usize, written_bytes_positions: usize, + positions_encoder: S4BP128Encoder, block_encoder: Block128Encoder, vints_encoder: VIntsEncoder, doc_ids: Vec, term_freqs: Vec, - positions: Vec, + position_deltas: Vec, + is_termfreq_enabled: bool, + is_positions_enabled: bool, } impl PostingsSerializer { @@ -36,17 +40,22 @@ impl PostingsSerializer { positions_write: positions_write, written_bytes_postings: 0, written_bytes_positions: 0, + positions_encoder: S4BP128Encoder::new(), block_encoder: Block128Encoder::new(), vints_encoder: VIntsEncoder::new(), doc_ids: Vec::new(), term_freqs: Vec::new(), - positions: Vec::new(), + position_deltas: Vec::new(), + is_positions_enabled: false, + is_termfreq_enabled: false, }) } pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> { try!(self.close_term()); self.doc_ids.clear(); + self.term_freqs.clear(); + self.position_deltas.clear(); let term_info = TermInfo { doc_freq: doc_freq, postings_offset: self.written_bytes_postings as u32, @@ -55,7 +64,6 @@ impl PostingsSerializer { .insert(term.as_slice(), &term_info) } - pub fn close_term(&mut self,) -> io::Result<()> { if !self.doc_ids.is_empty() { { @@ -72,13 +80,24 @@ impl PostingsSerializer { self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); } } + if self.is_positions_enabled { + let mut num_blocks = self.position_deltas.len() / NUM_DOCS_PER_BLOCK; + let mut offset = 0; + for _ in 0..num_blocks { + let block_encoded = self.positions_encoder.encode(&self.position_deltas[offset..offset + NUM_DOCS_PER_BLOCK]); + offset += NUM_DOCS_PER_BLOCK; + // self.positions_write.wr + } + // self.position_deltas.extend_from_slice(position_deltas); + // let block_encoded &[u32] = self.positions_encoder.encode(&self.positions[..]); + } self.doc_ids.clear(); self.term_freqs.clear(); } Ok(()) } - pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32, positions: &[u32]) -> io::Result<()> { + pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32, position_deltas: &[u32]) -> io::Result<()> { self.doc_ids.push(doc_id); self.term_freqs.push(term_freq as u32); if self.doc_ids.len() == 128 { @@ -88,11 +107,14 @@ impl PostingsSerializer { self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); } } - { + if self.is_termfreq_enabled { let block_encoded: &[u32] = self.block_encoder.encode_sorted(&self.term_freqs); for num in block_encoded { self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); - } + } + } + if self.is_positions_enabled { + self.position_deltas.extend_from_slice(position_deltas); } self.doc_ids.clear(); self.term_freqs.clear(); diff --git a/src/postings/writer.rs b/src/postings/writer.rs index ce99de463..d14e75bd7 100644 --- a/src/postings/writer.rs +++ b/src/postings/writer.rs @@ -3,87 +3,28 @@ use std::collections::BTreeMap; use schema::Term; use postings::PostingsSerializer; use std::io; +pub use postings::Recorder; +pub use postings::NothingRecorder; -pub trait U32sRecorder { - fn new() -> Self; - fn record(&mut self, val: u32); - fn get(&self, idx: usize) -> u32; - fn slice(&self, start: usize, stop: usize) -> &[u32]; -} -pub struct VecRecorder(Vec); -impl U32sRecorder for VecRecorder { - fn new() -> VecRecorder { - VecRecorder(Vec::new()) - } - - fn record(&mut self, val: u32) { - self.0.push(val); - } - - fn get(&self, idx: usize) -> u32 { - self.0[idx] - } - - fn slice(&self, start: usize, stop: usize) -> &[u32] { - &self.0[start..stop] - } -} - -const EMPTY_ARRAY: [u32; 0] = []; - -pub struct ObliviousRecorder; - -impl U32sRecorder for ObliviousRecorder { - fn new() -> ObliviousRecorder { - ObliviousRecorder - } - fn record(&mut self, _: u32) { - // do nothing here. - } - - fn get(&self, _idx: usize) -> u32 { - 0u32 - } - - fn slice(&self, _start: usize, _stop: usize) -> &[u32] { - &EMPTY_ARRAY[0..0] - } -} - -struct TermPostingsWriter { +struct TermPostingsWriter { doc_ids: Vec, - term_freqs: TermFreqsRec, - positions: PositionsRec, - current_position: u32, - current_freq: u32, + recorder: Rec, } - -impl TermPostingsWriter { - pub fn new() -> TermPostingsWriter { +impl TermPostingsWriter { + pub fn new() -> TermPostingsWriter { TermPostingsWriter { doc_ids: Vec::new(), - term_freqs: TermFreqsRec::new(), - positions: PositionsRec::new(), - current_position: 0u32, - current_freq: 0u32, + recorder: Recorder::new(), } } fn close_doc(&mut self,) { - self.term_freqs.record(self.current_freq); - self.current_freq = 0; - self.current_position = 0; + self.recorder.close_doc(); } - - fn close(&mut self,) { - if self.current_freq > 0 { - self.close_doc(); - } - } - + fn is_new_doc(&self, doc: &DocId) -> bool { match self.doc_ids.last() { Some(&last_doc) => last_doc != *doc, @@ -102,17 +43,14 @@ impl TermPostingsWriter< self.close_doc(); self.doc_ids.push(doc); } - self.current_freq += 1; - self.positions.record(pos - self.current_position); - self.current_position = pos; + self.recorder.record_position(pos); } pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> { let mut positions_idx = 0; for (i, doc) in self.doc_ids.iter().enumerate() { - let term_freq: u32 = self.term_freqs.get(i); - let positions: &[u32] = self.positions.slice(positions_idx, positions_idx + term_freq as usize); - try!(serializer.write_doc(doc.clone(), term_freq, positions)); + let (term_freq, position_deltas) = self.recorder.get_tf_and_posdeltas(i); + try!(serializer.write_doc(doc.clone(), term_freq, position_deltas)); positions_idx += term_freq as usize; } Ok(()) @@ -121,7 +59,7 @@ impl TermPostingsWriter< pub struct PostingsWriter { - postings: Vec>, + postings: Vec>, term_index: BTreeMap, } @@ -136,16 +74,16 @@ impl PostingsWriter { pub fn close(&mut self,) { for term_postings_writer in self.postings.iter_mut() { - term_postings_writer.close(); + term_postings_writer.close_doc(); } } pub fn suscribe(&mut self, doc: DocId, pos: u32, term: Term) { - let doc_ids: &mut TermPostingsWriter = self.get_term_postings(term); + let doc_ids: &mut TermPostingsWriter = self.get_term_postings(term); doc_ids.suscribe(doc, pos); } - fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter { + fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter { match self.term_index.get(&term) { Some(unord_id) => { return &mut self.postings[*unord_id];