From 8d10e48c74bcac8538ffa9ece2ce6f72ba801833 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 5 May 2016 23:02:07 +0900 Subject: [PATCH] bbbb --- src/compression/block128.rs | 35 +++++++-- src/core/merger.rs | 8 +- src/core/reader.rs | 128 +++--------------------------- src/core/searcher.rs | 7 +- src/postings/intersection.rs | 131 +++++++++++++++++++++++++++++++ src/postings/mod.rs | 5 ++ src/postings/segment_postings.rs | 109 +++++++++++++++++++++++++ src/postings/serializer.rs | 2 +- 8 files changed, 297 insertions(+), 128 deletions(-) create mode 100644 src/postings/intersection.rs create mode 100644 src/postings/segment_postings.rs diff --git a/src/compression/block128.rs b/src/compression/block128.rs index 9a4a9f59a..457fa9648 100644 --- a/src/compression/block128.rs +++ b/src/compression/block128.rs @@ -5,8 +5,13 @@ use std::ptr; extern { fn encode_sorted_block128_native(data: *mut u32, output: *mut u32, output_capacity: size_t) -> size_t; fn decode_sorted_block128_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32) -> usize; + fn encode_block128_native(data: *mut u32, output: *mut u32, output_capacity: size_t) -> size_t; fn decode_block128_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32) -> usize; + + fn encode_sorted_vint_native(data: *mut u32, num_els: size_t, output: *mut u32, output_capacity: size_t) -> size_t; + fn decode_sorted_vint_native(compressed_data: *const u32, compressed_size: size_t, uncompressed: *mut u32, output_capacity: size_t) -> size_t; + } //------------------------- @@ -71,27 +76,43 @@ impl Block128Decoder { pub fn decode<'a, 'b>( &'b mut self, - compressed_data: &'a [u32]) -> (&'a[u32], &'b[u32; 128]) { + compressed_data: &'a [u32]) -> &'a[u32] { unsafe { let consumed_num_bytes: usize = decode_block128_native( compressed_data.as_ptr(), compressed_data.len() as size_t, self.output.as_mut_ptr()); - (&compressed_data[consumed_num_bytes..], &self.output) + &compressed_data[consumed_num_bytes..] } } pub fn decode_sorted<'a, 'b>( &'b mut self, - compressed_data: &'a [u32]) -> (&'a[u32], &'b[u32; 128]) { + compressed_data: &'a [u32]) -> &'a [u32] { unsafe { let consumed_num_bytes: usize = decode_sorted_block128_native( compressed_data.as_ptr(), compressed_data.len() as size_t, self.output.as_mut_ptr()); - (&compressed_data[consumed_num_bytes..], &self.output) + &compressed_data[consumed_num_bytes..] } } + + pub fn decode_sorted_remaining(&mut self, + compressed_data: &[u32]) -> &[u32] { + unsafe { + let num_uncompressed = decode_sorted_vint_native( + compressed_data.as_ptr(), + compressed_data.len() as size_t, + self.output.as_mut_ptr(), + 128); + &self.output[..num_uncompressed] + } + } + + pub fn output(&self,) -> &[u32; 128] { + &self.output + } } @@ -114,7 +135,8 @@ mod tests { encoded_vec.push(i); } let mut decoder = Block128Decoder::new(); - let (remaining_input, uncompressed_values) = decoder.decode_sorted(&encoded_vec[..]); + let remaining_input = decoder.decode_sorted(&encoded_vec[..]); + let uncompressed_values = decoder.output(); assert_eq!(remaining_input.len(), *num_extra_values); for i in 0..128 { assert_eq!(uncompressed_values[i], input[i]); @@ -139,7 +161,8 @@ mod tests { encoded_vec.push(i); } let mut decoder = Block128Decoder::new(); - let (remaining_input, uncompressed_values) = decoder.decode(&encoded_vec[..]); + let remaining_input = decoder.decode(&encoded_vec[..]); + let uncompressed_values = decoder.output(); assert_eq!(remaining_input.len(), *num_extra_values); for i in 0..128 { assert_eq!(uncompressed_values[i], input[i]); diff --git a/src/core/merger.rs b/src/core/merger.rs index ca712b94a..6e2a6fd0e 100644 --- a/src/core/merger.rs +++ b/src/core/merger.rs @@ -7,7 +7,7 @@ use core::codec::SegmentSerializer; use postings::PostingsSerializer; use postings::TermInfo; - +use postings::Postings; use std::collections::BinaryHeap; use datastruct::FstMapIter; use schema::{Term, Schema, U32Field}; @@ -93,8 +93,9 @@ impl<'a> PostingsMerger<'a> { { let offset = self.doc_offsets[heap_item.segment_ord]; let reader = &self.readers[heap_item.segment_ord]; - for doc_id in reader.read_postings(&heap_item.term_info) { - self.doc_ids.push(offset + doc_id); + let mut segment_postings = reader.read_postings(&heap_item.term_info); + while segment_postings.next() { + self.doc_ids.push(segment_postings.doc()); } } self.push_next_segment_el(heap_item.segment_ord); @@ -179,6 +180,7 @@ impl IndexMerger { match postings_merger.next() { Some((term, doc_ids)) => { try!(postings_serializer.new_term(&Term::from(&term), doc_ids.len() as DocId)); + for doc_id in doc_ids.iter() { // TODO fix this // try!(postings_serializer.write_doc(doc_id.clone(), None)); diff --git a/src/core/reader.rs b/src/core/reader.rs index cb7f11dc1..53ba2c024 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -3,7 +3,6 @@ use schema::Term; use store::StoreReader; use schema::Document; use directory::ReadOnlySource; -use std::io::Cursor; use DocId; use core::index::SegmentComponent; use std::io; @@ -12,16 +11,14 @@ use postings::TermInfo; use datastruct::FstMap; use std::fmt; use rustc_serialize::json; -use common::VInt; use core::index::SegmentInfo; use common::OpenTimer; use schema::U32Field; use core::convert_to_ioerror; -use common::BinarySerializable; +use postings::SegmentPostings; +use postings::Postings; use fastfield::{U32FastFieldsReader, U32FastFieldReader}; -use compression; -use compression::{Block128Decoder, VIntsDecoder}; -use std::mem; +use postings::intersection; impl fmt::Debug for SegmentReader { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -29,107 +26,6 @@ impl fmt::Debug for SegmentReader { } } -#[inline(never)] -pub fn intersection(mut postings: Vec) -> SegmentPostings { - let min_len = postings.iter() - .map(|v| v.len()) - .min() - .unwrap(); - let buffer: Vec = postings.pop().unwrap().0; - let mut output: Vec = Vec::with_capacity(min_len); - unsafe { - output.set_len(min_len); - } - let mut pair = (output, buffer); - for posting in postings.iter() { - pair = (pair.1, pair.0); - let output_len = compression::intersection(posting.0.as_slice(), - pair.0.as_slice(), - pair.1.as_mut_slice()); - unsafe { - pair.1.set_len(output_len); - } - } - SegmentPostings(pair.1) -} - -pub struct SegmentPostings(Vec); - -impl IntoIterator for SegmentPostings { - type Item = DocId; - type IntoIter = ::std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl SegmentPostings { - pub fn empty() -> SegmentPostings { - SegmentPostings(Vec::new()) - } - - pub fn len(&self) -> usize { - self.0.len() - } - - pub fn from_data(doc_freq: DocId, data: &[u8]) -> SegmentPostings { - let mut data_u32: &[u32] = unsafe { mem::transmute(data) }; - let mut doc_ids: Vec = Vec::with_capacity(doc_freq as usize); - { - let mut block_decoder = Block128Decoder::new(); - let num_blocks = doc_freq / (compression::NUM_DOCS_PER_BLOCK as u32); - for _ in 0..num_blocks { - let (remaining, uncompressed) = block_decoder.decode_sorted(data_u32); - doc_ids.extend_from_slice(uncompressed); - data_u32 = remaining; - } - if doc_freq % 128 != 0 { - let data_u8: &[u8] = unsafe { mem::transmute(data_u32) }; - let mut cursor = Cursor::new(data_u8); - let vint_len: usize = VInt::deserialize(&mut cursor).unwrap().val() as usize; - let cursor_pos = cursor.position() as usize; - let vint_data: &[u32] = unsafe { mem::transmute(&data_u8[cursor_pos..]) }; - let mut vints_decoder = VIntsDecoder::new(); - doc_ids.extend_from_slice(vints_decoder.decode_sorted(&vint_data[..vint_len])); - } - } - SegmentPostings(doc_ids) - - } -} -// impl Postings for SegmentPostings { -// fn skip_next(&mut self, target: DocId) -> Option { -// loop { -// match Iterator::next(self) { -// Some(val) if val >= target => { -// return Some(val); -// }, -// None => { -// return None; -// }, -// _ => {} -// } -// } -// } -// } - -// impl Iterator for SegmentPostings { -// -// type Item = DocId; -// -// fn next(&mut self,) -> Option { -// if self.doc_id < self.doc_ids.len() { -// let res = Some(self.doc_ids[self.doc_id]); -// self.doc_id += 1; -// return res; -// } -// else { -// None -// } -// } -// } - pub struct SegmentReader { segment_info: SegmentInfo, @@ -206,11 +102,16 @@ impl SegmentReader { /// Returns the list of doc ids containing all of the /// given terms. - pub fn search<'a>(&self, terms: &Vec, mut timer: OpenTimer<'a>) -> SegmentPostings { + pub fn search<'a, 'b>(&'b self, terms: &Vec, mut timer: OpenTimer<'a>) -> Box { if terms.len() == 1 { match self.get_term(&terms[0]) { - Some(term_info) => self.read_postings(&term_info), - None => SegmentPostings::empty(), + Some(term_info) => { + let postings: SegmentPostings<'b> = self.read_postings(&term_info); + Box::new(postings) + }, + None => { + Box::new(SegmentPostings::empty()) + }, } } else { let mut segment_postings: Vec = Vec::new(); @@ -225,15 +126,12 @@ impl SegmentReader { } None => { // currently this is a strict intersection. - return SegmentPostings::empty(); + return Box::new(SegmentPostings::empty()); } } } } - { - let _intersection_time = timer.open("intersection"); - intersection(segment_postings) - } + Box::new(intersection(segment_postings)) } } } diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 35857dd80..1bd0d1109 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -6,6 +6,7 @@ use schema::{Document, Term}; use collector::Collector; use std::io; use common::TimerTree; +use postings::Postings; #[derive(Debug)] pub struct Searcher { @@ -58,11 +59,11 @@ impl Searcher { let _ = segment_search_timer.open("set_segment"); try!(collector.set_segment(segment_ord as SegmentLocalId, &segment)); } - let postings = segment.search(terms, segment_search_timer.open("get_postings")); + let mut postings = segment.search(terms, segment_search_timer.open("get_postings")); { let _collection_timer = segment_search_timer.open("collection"); - for doc_id in postings { - collector.collect(doc_id); + while postings.next() { + collector.collect(postings.doc()); } } } diff --git a/src/postings/intersection.rs b/src/postings/intersection.rs new file mode 100644 index 000000000..7dd3bf1e3 --- /dev/null +++ b/src/postings/intersection.rs @@ -0,0 +1,131 @@ +use postings::Postings; +use postings::SkipResult; +use postings::SegmentPostings; +use std::cmp::Ordering; +use DocId; + + +pub struct IntersectionPostings<'a> { + left: Box, + right: Box, + finished: bool, +} + +impl<'a> IntersectionPostings<'a> { + + fn from_pair(left: Box, right: Box) -> IntersectionPostings<'a> { + IntersectionPostings { + left: left, + right: right, + finished: false, + } + } + + pub fn new(mut postings: Vec>) -> IntersectionPostings<'a> { + let left = postings.pop().unwrap(); + let right; + if postings.len() == 1 { + right = postings.pop().unwrap(); + } + else { + right = Box::new(IntersectionPostings::new(postings)); + } + IntersectionPostings::from_pair(left, right) + } +} + + +impl<'a> Postings for IntersectionPostings<'a> { + + fn next(&mut self,) -> bool { + if self.finished { + return false; + } + + if !self.left.next() { + self.finished = true; + return false; + } + if !self.right.next() { + self.finished = true; + return false; + } + loop { + match self.left.doc().cmp(&self.right.doc()) { + Ordering::Equal => { + return true; + } + Ordering::Less => { + if !self.left.next() { + self.finished = true; + return false; + } + } + Ordering::Greater => { + if !self.right.next() { + self.finished = true; + return false; + } + } + } + } + } + + fn doc(&self,) -> DocId { + self.left.doc() + } + + fn skip_next(&mut self, target: DocId) -> SkipResult { + loop { + match self.doc().cmp(&target) { + Ordering::Equal => { + return SkipResult::Reached; + } + Ordering::Greater => { + return SkipResult::OverStep; + } + Ordering::Less => { + // + } + } + if !self.next() { + return SkipResult::End; + } + } + } +} + + + + +#[inline(never)] +pub fn intersection<'a>(postings: Vec>) -> IntersectionPostings<'a> { + let boxed_postings: Vec> = postings + .into_iter() + .map(|postings| { + let boxed_p: Box = Box::new(postings); + boxed_p + }) + .collect(); + IntersectionPostings::new(boxed_postings) + // let min_len = postings.iter() + // .map(|v| v.len()) + // .min() + // .unwrap(); + // let buffer: Vec = postings.pop().unwrap().0; + // let mut output: Vec = Vec::with_capacity(min_len); + // unsafe { + // output.set_len(min_len); + // } + // let mut pair = (output, buffer); + // for posting in postings.iter() { + // pair = (pair.1, pair.0); + // let output_len = compression::intersection(posting.0.as_slice(), + // pair.0.as_slice(), + // pair.1.as_mut_slice()); + // unsafe { + // pair.1.set_len(output_len); + // } + // } + // SegmentPostings(pair.1) +} \ No newline at end of file diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 048d33319..170b49b66 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -5,6 +5,8 @@ mod writer; mod term_info; mod chained_postings; mod vec_postings; +mod segment_postings; +mod intersection; pub use self::recorder::{Recorder, NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; pub use self::serializer::PostingsSerializer; @@ -13,6 +15,9 @@ pub use self::term_info::TermInfo; pub use self::postings::{Postings, SkipResult}; pub use self::vec_postings::VecPostings; pub use self::chained_postings::ChainedPostings; +pub use self::segment_postings::SegmentPostings; +pub use self::intersection::intersection; +pub use self::intersection::IntersectionPostings; #[cfg(test)] mod tests { diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs new file mode 100644 index 000000000..41df94190 --- /dev/null +++ b/src/postings/segment_postings.rs @@ -0,0 +1,109 @@ +use postings::Postings; +use compression::{NUM_DOCS_PER_BLOCK, Block128Decoder}; +use DocId; +use std::cmp::Ordering; +use std::mem; +use postings::SkipResult; +use std::num::Wrapping; + +// No Term Frequency, no postings. +pub struct SegmentPostings<'a> { + doc_freq: usize, + block_decoder: Block128Decoder, + remaining_data: &'a [u32], + cur: Wrapping, +} + +const EMPTY_ARRAY: [u32; 0] = []; + +impl<'a> SegmentPostings<'a> { + + pub fn empty() -> SegmentPostings<'a> { + SegmentPostings { + doc_freq: 0, + block_decoder: Block128Decoder::new(), + remaining_data: &EMPTY_ARRAY, + cur: Wrapping(usize::max_value()), + } + } + + pub fn load_next_block(&mut self,) { + if self.doc_freq - self.cur.0 >= NUM_DOCS_PER_BLOCK { + self.remaining_data = self.block_decoder.decode_sorted(self.remaining_data); + } + else { + self.block_decoder.decode_sorted_remaining(self.remaining_data); + } + } + + pub fn from_data(doc_freq: u32, data: &'a [u8]) -> SegmentPostings<'a> { + SegmentPostings { + doc_freq: doc_freq as usize, + block_decoder: Block128Decoder::new(), + remaining_data: unsafe { mem::transmute(data) }, + cur: Wrapping(usize::max_value()), + } + // let mut data_u32: &[u32] = unsafe { mem::transmute(data) }; + // let mut doc_ids: Vec = Vec::with_capacity(doc_freq as usize); + // { + // let mut block_decoder = Block128Decoder::new(); + // let num_blocks = doc_freq / (NUM_DOCS_PER_BLOCK as u32); + // for _ in 0..num_blocks { + // let (remaining = block_decoder.decode_sorted(data_u32); + // doc_ids.extend_from_slice(uncompressed); + // data_u32 = remaining; + // } + // if doc_freq % 128 != 0 { + // let data_u8: &[u8] = unsafe { mem::transmute(data_u32) }; + // let mut cursor = Cursor::new(data_u8); + // let vint_len: usize = VInt::deserialize(&mut cursor).unwrap().val() as usize; + // let cursor_pos = cursor.position() as usize; + // let vint_data: &[u32] = unsafe { mem::transmute(&data_u8[cursor_pos..]) }; + // let mut vints_decoder = VIntsDecoder::new(); + // doc_ids.extend_from_slice(vints_decoder.decode_sorted(&vint_data[..vint_len])); + // } + // } + // SegmentPostings(doc_ids) + + } +} + +impl<'a> Postings for SegmentPostings<'a> { + + // goes to the next element. + // next needs to be called a first time to point to the correct element. + fn next(&mut self,) -> bool { + self.cur += Wrapping(1); + if self.cur.0 >= self.doc_freq { + return false; + } + if self.cur.0 % NUM_DOCS_PER_BLOCK == 0 { + self.load_next_block(); + } + return true; + } + + fn doc(&self,) -> DocId { + self.block_decoder.output()[self.cur.0 % NUM_DOCS_PER_BLOCK] + } + + // after skipping position + // the iterator in such a way that doc() will return a + // value greater or equal to target. + fn skip_next(&mut self, target: DocId) -> SkipResult { + loop { + match self.doc().cmp(&target) { + Ordering::Equal => { + return SkipResult::Reached; + } + Ordering::Greater => { + return SkipResult::OverStep; + } + Ordering::Less => {} + } + if !self.next() { + return SkipResult::End; + } + } + } +} \ No newline at end of file diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 70b483ce1..e4857dd2a 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -68,7 +68,7 @@ impl PostingsSerializer { if !self.doc_ids.is_empty() { { let block_encoded = self.vints_encoder.encode_sorted(&self.doc_ids[..]); - self.written_bytes_postings += try!(VInt(block_encoded.len() as u64).serialize(&mut self.postings_write)); + // self.written_bytes_postings += try!(VInt(block_encoded.len() as u64).serialize(&mut self.postings_write)); for num in block_encoded { self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); }