This commit is contained in:
Paul Masurel
2016-05-05 23:02:07 +09:00
parent 0410908894
commit 8d10e48c74
8 changed files with 297 additions and 128 deletions

View File

@@ -5,8 +5,13 @@ use std::ptr;
extern {
fn encode_sorted_block128_native(data: *mut u32, output: *mut u32, output_capacity: size_t) -> size_t;
fn decode_sorted_block128_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32) -> usize;
fn encode_block128_native(data: *mut u32, output: *mut u32, output_capacity: size_t) -> size_t;
fn decode_block128_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32) -> usize;
fn encode_sorted_vint_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t;
fn decode_sorted_vint_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t;
}
//-------------------------
@@ -71,27 +76,43 @@ impl Block128Decoder {
pub fn decode<'a, 'b>(
&'b mut self,
compressed_data: &'a [u32]) -> (&'a[u32], &'b[u32; 128]) {
compressed_data: &'a [u32]) -> &'a[u32] {
unsafe {
let consumed_num_bytes: usize = decode_block128_native(
compressed_data.as_ptr(),
compressed_data.len() as size_t,
self.output.as_mut_ptr());
(&compressed_data[consumed_num_bytes..], &self.output)
&compressed_data[consumed_num_bytes..]
}
}
pub fn decode_sorted<'a, 'b>(
&'b mut self,
compressed_data: &'a [u32]) -> (&'a[u32], &'b[u32; 128]) {
compressed_data: &'a [u32]) -> &'a [u32] {
unsafe {
let consumed_num_bytes: usize = decode_sorted_block128_native(
compressed_data.as_ptr(),
compressed_data.len() as size_t,
self.output.as_mut_ptr());
(&compressed_data[consumed_num_bytes..], &self.output)
&compressed_data[consumed_num_bytes..]
}
}
pub fn decode_sorted_remaining(&mut self,
compressed_data: &[u32]) -> &[u32] {
unsafe {
let num_uncompressed = decode_sorted_vint_native(
compressed_data.as_ptr(),
compressed_data.len() as size_t,
self.output.as_mut_ptr(),
128);
&self.output[..num_uncompressed]
}
}
pub fn output(&self,) -> &[u32; 128] {
&self.output
}
}
@@ -114,7 +135,8 @@ mod tests {
encoded_vec.push(i);
}
let mut decoder = Block128Decoder::new();
let (remaining_input, uncompressed_values) = decoder.decode_sorted(&encoded_vec[..]);
let remaining_input = decoder.decode_sorted(&encoded_vec[..]);
let uncompressed_values = decoder.output();
assert_eq!(remaining_input.len(), *num_extra_values);
for i in 0..128 {
assert_eq!(uncompressed_values[i], input[i]);
@@ -139,7 +161,8 @@ mod tests {
encoded_vec.push(i);
}
let mut decoder = Block128Decoder::new();
let (remaining_input, uncompressed_values) = decoder.decode(&encoded_vec[..]);
let remaining_input = decoder.decode(&encoded_vec[..]);
let uncompressed_values = decoder.output();
assert_eq!(remaining_input.len(), *num_extra_values);
for i in 0..128 {
assert_eq!(uncompressed_values[i], input[i]);

View File

@@ -7,7 +7,7 @@ use core::codec::SegmentSerializer;
use postings::PostingsSerializer;
use postings::TermInfo;
use postings::Postings;
use std::collections::BinaryHeap;
use datastruct::FstMapIter;
use schema::{Term, Schema, U32Field};
@@ -93,8 +93,9 @@ impl<'a> PostingsMerger<'a> {
{
let offset = self.doc_offsets[heap_item.segment_ord];
let reader = &self.readers[heap_item.segment_ord];
for doc_id in reader.read_postings(&heap_item.term_info) {
self.doc_ids.push(offset + doc_id);
let mut segment_postings = reader.read_postings(&heap_item.term_info);
while segment_postings.next() {
self.doc_ids.push(segment_postings.doc());
}
}
self.push_next_segment_el(heap_item.segment_ord);
@@ -179,6 +180,7 @@ impl IndexMerger {
match postings_merger.next() {
Some((term, doc_ids)) => {
try!(postings_serializer.new_term(&Term::from(&term), doc_ids.len() as DocId));
for doc_id in doc_ids.iter() {
// TODO fix this
// try!(postings_serializer.write_doc(doc_id.clone(), None));

View File

@@ -3,7 +3,6 @@ use schema::Term;
use store::StoreReader;
use schema::Document;
use directory::ReadOnlySource;
use std::io::Cursor;
use DocId;
use core::index::SegmentComponent;
use std::io;
@@ -12,16 +11,14 @@ use postings::TermInfo;
use datastruct::FstMap;
use std::fmt;
use rustc_serialize::json;
use common::VInt;
use core::index::SegmentInfo;
use common::OpenTimer;
use schema::U32Field;
use core::convert_to_ioerror;
use common::BinarySerializable;
use postings::SegmentPostings;
use postings::Postings;
use fastfield::{U32FastFieldsReader, U32FastFieldReader};
use compression;
use compression::{Block128Decoder, VIntsDecoder};
use std::mem;
use postings::intersection;
impl fmt::Debug for SegmentReader {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -29,107 +26,6 @@ impl fmt::Debug for SegmentReader {
}
}
#[inline(never)]
pub fn intersection(mut postings: Vec<SegmentPostings>) -> SegmentPostings {
let min_len = postings.iter()
.map(|v| v.len())
.min()
.unwrap();
let buffer: Vec<u32> = postings.pop().unwrap().0;
let mut output: Vec<u32> = Vec::with_capacity(min_len);
unsafe {
output.set_len(min_len);
}
let mut pair = (output, buffer);
for posting in postings.iter() {
pair = (pair.1, pair.0);
let output_len = compression::intersection(posting.0.as_slice(),
pair.0.as_slice(),
pair.1.as_mut_slice());
unsafe {
pair.1.set_len(output_len);
}
}
SegmentPostings(pair.1)
}
pub struct SegmentPostings(Vec<DocId>);
impl IntoIterator for SegmentPostings {
type Item = DocId;
type IntoIter = ::std::vec::IntoIter<DocId>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
impl SegmentPostings {
pub fn empty() -> SegmentPostings {
SegmentPostings(Vec::new())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn from_data(doc_freq: DocId, data: &[u8]) -> SegmentPostings {
let mut data_u32: &[u32] = unsafe { mem::transmute(data) };
let mut doc_ids: Vec<u32> = Vec::with_capacity(doc_freq as usize);
{
let mut block_decoder = Block128Decoder::new();
let num_blocks = doc_freq / (compression::NUM_DOCS_PER_BLOCK as u32);
for _ in 0..num_blocks {
let (remaining, uncompressed) = block_decoder.decode_sorted(data_u32);
doc_ids.extend_from_slice(uncompressed);
data_u32 = remaining;
}
if doc_freq % 128 != 0 {
let data_u8: &[u8] = unsafe { mem::transmute(data_u32) };
let mut cursor = Cursor::new(data_u8);
let vint_len: usize = VInt::deserialize(&mut cursor).unwrap().val() as usize;
let cursor_pos = cursor.position() as usize;
let vint_data: &[u32] = unsafe { mem::transmute(&data_u8[cursor_pos..]) };
let mut vints_decoder = VIntsDecoder::new();
doc_ids.extend_from_slice(vints_decoder.decode_sorted(&vint_data[..vint_len]));
}
}
SegmentPostings(doc_ids)
}
}
// impl Postings for SegmentPostings {
// fn skip_next(&mut self, target: DocId) -> Option<DocId> {
// loop {
// match Iterator::next(self) {
// Some(val) if val >= target => {
// return Some(val);
// },
// None => {
// return None;
// },
// _ => {}
// }
// }
// }
// }
// impl Iterator for SegmentPostings {
//
// type Item = DocId;
//
// fn next(&mut self,) -> Option<DocId> {
// if self.doc_id < self.doc_ids.len() {
// let res = Some(self.doc_ids[self.doc_id]);
// self.doc_id += 1;
// return res;
// }
// else {
// None
// }
// }
// }
pub struct SegmentReader {
segment_info: SegmentInfo,
@@ -206,11 +102,16 @@ impl SegmentReader {
/// Returns the list of doc ids containing all of the
/// given terms.
pub fn search<'a>(&self, terms: &Vec<Term>, mut timer: OpenTimer<'a>) -> SegmentPostings {
pub fn search<'a, 'b>(&'b self, terms: &Vec<Term>, mut timer: OpenTimer<'a>) -> Box<Postings + 'b> {
if terms.len() == 1 {
match self.get_term(&terms[0]) {
Some(term_info) => self.read_postings(&term_info),
None => SegmentPostings::empty(),
Some(term_info) => {
let postings: SegmentPostings<'b> = self.read_postings(&term_info);
Box::new(postings)
},
None => {
Box::new(SegmentPostings::empty())
},
}
} else {
let mut segment_postings: Vec<SegmentPostings> = Vec::new();
@@ -225,15 +126,12 @@ impl SegmentReader {
}
None => {
// currently this is a strict intersection.
return SegmentPostings::empty();
return Box::new(SegmentPostings::empty());
}
}
}
}
{
let _intersection_time = timer.open("intersection");
intersection(segment_postings)
}
Box::new(intersection(segment_postings))
}
}
}

View File

@@ -6,6 +6,7 @@ use schema::{Document, Term};
use collector::Collector;
use std::io;
use common::TimerTree;
use postings::Postings;
#[derive(Debug)]
pub struct Searcher {
@@ -58,11 +59,11 @@ impl Searcher {
let _ = segment_search_timer.open("set_segment");
try!(collector.set_segment(segment_ord as SegmentLocalId, &segment));
}
let postings = segment.search(terms, segment_search_timer.open("get_postings"));
let mut postings = segment.search(terms, segment_search_timer.open("get_postings"));
{
let _collection_timer = segment_search_timer.open("collection");
for doc_id in postings {
collector.collect(doc_id);
while postings.next() {
collector.collect(postings.doc());
}
}
}

View File

@@ -0,0 +1,131 @@
use postings::Postings;
use postings::SkipResult;
use postings::SegmentPostings;
use std::cmp::Ordering;
use DocId;
pub struct IntersectionPostings<'a> {
left: Box<Postings + 'a>,
right: Box<Postings + 'a>,
finished: bool,
}
impl<'a> IntersectionPostings<'a> {
fn from_pair(left: Box<Postings + 'a>, right: Box<Postings + 'a>) -> IntersectionPostings<'a> {
IntersectionPostings {
left: left,
right: right,
finished: false,
}
}
pub fn new(mut postings: Vec<Box<Postings + 'a>>) -> IntersectionPostings<'a> {
let left = postings.pop().unwrap();
let right;
if postings.len() == 1 {
right = postings.pop().unwrap();
}
else {
right = Box::new(IntersectionPostings::new(postings));
}
IntersectionPostings::from_pair(left, right)
}
}
impl<'a> Postings for IntersectionPostings<'a> {
fn next(&mut self,) -> bool {
if self.finished {
return false;
}
if !self.left.next() {
self.finished = true;
return false;
}
if !self.right.next() {
self.finished = true;
return false;
}
loop {
match self.left.doc().cmp(&self.right.doc()) {
Ordering::Equal => {
return true;
}
Ordering::Less => {
if !self.left.next() {
self.finished = true;
return false;
}
}
Ordering::Greater => {
if !self.right.next() {
self.finished = true;
return false;
}
}
}
}
}
fn doc(&self,) -> DocId {
self.left.doc()
}
fn skip_next(&mut self, target: DocId) -> SkipResult {
loop {
match self.doc().cmp(&target) {
Ordering::Equal => {
return SkipResult::Reached;
}
Ordering::Greater => {
return SkipResult::OverStep;
}
Ordering::Less => {
//
}
}
if !self.next() {
return SkipResult::End;
}
}
}
}
#[inline(never)]
pub fn intersection<'a>(postings: Vec<SegmentPostings<'a>>) -> IntersectionPostings<'a> {
let boxed_postings: Vec<Box<Postings + 'a>> = postings
.into_iter()
.map(|postings| {
let boxed_p: Box<Postings + 'a> = Box::new(postings);
boxed_p
})
.collect();
IntersectionPostings::new(boxed_postings)
// let min_len = postings.iter()
// .map(|v| v.len())
// .min()
// .unwrap();
// let buffer: Vec<u32> = postings.pop().unwrap().0;
// let mut output: Vec<u32> = Vec::with_capacity(min_len);
// unsafe {
// output.set_len(min_len);
// }
// let mut pair = (output, buffer);
// for posting in postings.iter() {
// pair = (pair.1, pair.0);
// let output_len = compression::intersection(posting.0.as_slice(),
// pair.0.as_slice(),
// pair.1.as_mut_slice());
// unsafe {
// pair.1.set_len(output_len);
// }
// }
// SegmentPostings(pair.1)
}

View File

@@ -5,6 +5,8 @@ mod writer;
mod term_info;
mod chained_postings;
mod vec_postings;
mod segment_postings;
mod intersection;
pub use self::recorder::{Recorder, NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder};
pub use self::serializer::PostingsSerializer;
@@ -13,6 +15,9 @@ pub use self::term_info::TermInfo;
pub use self::postings::{Postings, SkipResult};
pub use self::vec_postings::VecPostings;
pub use self::chained_postings::ChainedPostings;
pub use self::segment_postings::SegmentPostings;
pub use self::intersection::intersection;
pub use self::intersection::IntersectionPostings;
#[cfg(test)]
mod tests {

View File

@@ -0,0 +1,109 @@
use postings::Postings;
use compression::{NUM_DOCS_PER_BLOCK, Block128Decoder};
use DocId;
use std::cmp::Ordering;
use std::mem;
use postings::SkipResult;
use std::num::Wrapping;
// No Term Frequency, no postings.
pub struct SegmentPostings<'a> {
doc_freq: usize,
block_decoder: Block128Decoder,
remaining_data: &'a [u32],
cur: Wrapping<usize>,
}
const EMPTY_ARRAY: [u32; 0] = [];
impl<'a> SegmentPostings<'a> {
pub fn empty() -> SegmentPostings<'a> {
SegmentPostings {
doc_freq: 0,
block_decoder: Block128Decoder::new(),
remaining_data: &EMPTY_ARRAY,
cur: Wrapping(usize::max_value()),
}
}
pub fn load_next_block(&mut self,) {
if self.doc_freq - self.cur.0 >= NUM_DOCS_PER_BLOCK {
self.remaining_data = self.block_decoder.decode_sorted(self.remaining_data);
}
else {
self.block_decoder.decode_sorted_remaining(self.remaining_data);
}
}
pub fn from_data(doc_freq: u32, data: &'a [u8]) -> SegmentPostings<'a> {
SegmentPostings {
doc_freq: doc_freq as usize,
block_decoder: Block128Decoder::new(),
remaining_data: unsafe { mem::transmute(data) },
cur: Wrapping(usize::max_value()),
}
// let mut data_u32: &[u32] = unsafe { mem::transmute(data) };
// let mut doc_ids: Vec<u32> = Vec::with_capacity(doc_freq as usize);
// {
// let mut block_decoder = Block128Decoder::new();
// let num_blocks = doc_freq / (NUM_DOCS_PER_BLOCK as u32);
// for _ in 0..num_blocks {
// let (remaining = block_decoder.decode_sorted(data_u32);
// doc_ids.extend_from_slice(uncompressed);
// data_u32 = remaining;
// }
// if doc_freq % 128 != 0 {
// let data_u8: &[u8] = unsafe { mem::transmute(data_u32) };
// let mut cursor = Cursor::new(data_u8);
// let vint_len: usize = VInt::deserialize(&mut cursor).unwrap().val() as usize;
// let cursor_pos = cursor.position() as usize;
// let vint_data: &[u32] = unsafe { mem::transmute(&data_u8[cursor_pos..]) };
// let mut vints_decoder = VIntsDecoder::new();
// doc_ids.extend_from_slice(vints_decoder.decode_sorted(&vint_data[..vint_len]));
// }
// }
// SegmentPostings(doc_ids)
}
}
impl<'a> Postings for SegmentPostings<'a> {
// goes to the next element.
// next needs to be called a first time to point to the correct element.
fn next(&mut self,) -> bool {
self.cur += Wrapping(1);
if self.cur.0 >= self.doc_freq {
return false;
}
if self.cur.0 % NUM_DOCS_PER_BLOCK == 0 {
self.load_next_block();
}
return true;
}
fn doc(&self,) -> DocId {
self.block_decoder.output()[self.cur.0 % NUM_DOCS_PER_BLOCK]
}
// after skipping position
// the iterator in such a way that doc() will return a
// value greater or equal to target.
fn skip_next(&mut self, target: DocId) -> SkipResult {
loop {
match self.doc().cmp(&target) {
Ordering::Equal => {
return SkipResult::Reached;
}
Ordering::Greater => {
return SkipResult::OverStep;
}
Ordering::Less => {}
}
if !self.next() {
return SkipResult::End;
}
}
}
}

View File

@@ -68,7 +68,7 @@ impl PostingsSerializer {
if !self.doc_ids.is_empty() {
{
let block_encoded = self.vints_encoder.encode_sorted(&self.doc_ids[..]);
self.written_bytes_postings += try!(VInt(block_encoded.len() as u64).serialize(&mut self.postings_write));
// self.written_bytes_postings += try!(VInt(block_encoded.len() as u64).serialize(&mut self.postings_write));
for num in block_encoded {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}