From 95f11148702e67fe81f10c116e5fe54ef244bf1b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 5 Apr 2020 16:20:45 +0900 Subject: [PATCH] blop --- src/lib.rs | 138 +++++++++++++++++++++++++++++++ src/postings/block_search.rs | 4 +- src/postings/compression/mod.rs | 19 ++++- src/postings/mod.rs | 2 +- src/postings/segment_postings.rs | 45 +++++++++- src/postings/skip.rs | 2 +- src/query/query.rs | 29 ++++++- src/query/scorer.rs | 8 ++ 8 files changed, 240 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 55d71bd5f..d2dcb9fbe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -269,6 +269,144 @@ impl DocAddress { } } + + +mod sse2 { + use crate::postings::compression::{AlignedBuffer, COMPRESSION_BLOCK_SIZE}; + use std::arch::x86_64::__m128i as DataType; + use std::arch::x86_64::_mm_add_epi32 as op_add; + use std::arch::x86_64::_mm_cmplt_epi32 as op_lt; + use std::arch::x86_64::_mm_load_si128 as op_load; + // requires 128-bits alignment + use std::arch::x86_64::_mm_set1_epi32 as set1; + use std::arch::x86_64::_mm_setzero_si128 as set0; + use std::arch::x86_64::_mm_sub_epi32 as op_sub; + use std::arch::x86_64::{_mm_cvtsi128_si32, _mm_shuffle_epi32}; + + const MASK1: i32 = 78; + const MASK2: i32 = 177; + + /// Performs an exhaustive linear search over the + /// + /// There is no early exit here. We simply count the + /// number of elements that are `< target`. + pub unsafe fn linear_search_sse2_128(arr: &AlignedBuffer, target: u32) -> usize { + let ptr = arr as *const AlignedBuffer as *const DataType; + let vkey = set1(target as i32); + let mut cnt = set0(); + // We work over 4 `__m128i` at a time. + // A single `__m128i` actual contains 4 `u32`. + for i in 0..(COMPRESSION_BLOCK_SIZE as isize) / (4 * 4) { + let cmp1 = op_lt(op_load(ptr.offset(i * 4)), vkey); + let cmp2 = op_lt(op_load(ptr.offset(i * 4 + 1)), vkey); + let cmp3 = op_lt(op_load(ptr.offset(i * 4 + 2)), vkey); + let cmp4 = op_lt(op_load(ptr.offset(i * 4 + 3)), vkey); + let sum = op_add(op_add(cmp1, cmp2), op_add(cmp3, cmp4)); + cnt = op_sub(cnt, sum); + } + cnt = op_add(cnt, _mm_shuffle_epi32(cnt, MASK1)); + cnt = op_add(cnt, _mm_shuffle_epi32(cnt, MASK2)); + _mm_cvtsi128_si32(cnt) as usize + } +} + + +const MAX_DOC: u32 = 2_000_000_000u32; + +use sse2::linear_search_sse2_128; +use crate::postings::BlockSegmentPostings; +use crate::schema::IndexRecordOption; + +struct DocCursor { + block_segment_postings: BlockSegmentPostings, + cursor: usize, +} + +impl DocCursor { + fn new(mut block_segment_postings: BlockSegmentPostings) -> DocCursor { + block_segment_postings.advance(); + DocCursor { + block_segment_postings, + cursor: 0, + } + } + fn doc(&self,) -> DocId { + self.block_segment_postings.doc(self.cursor) + } + + fn len(&self,) -> usize { + self.block_segment_postings.doc_freq() + } + + fn advance(&mut self) { + if self.cursor == 127 { + self.block_segment_postings.advance(); + self.cursor = 0; + } else { + self.cursor += 1; + } + } + + fn skip_to(&mut self, target: DocId) { + if self.block_segment_postings.skip_reader.doc() >= target { + unsafe { + let mut ptr = self.block_segment_postings.docs_aligned_b().0.get_unchecked(self.cursor) as *const u32; + while *ptr < target { + self.cursor += 1; + ptr = ptr.offset(1); + } + } + return; + } + self.block_segment_postings.skip_to_b(target); + let block= self.block_segment_postings.docs_aligned_b(); + self.cursor = unsafe { linear_search_sse2_128(&block, target) }; + + } +} + +pub fn intersection(idx: &InvertedIndexReader, terms: &[Term]) -> crate::Result { + let mut posting_lists: Vec = Vec::with_capacity(terms.len()); + for term in terms { + if let Some(mut block_postings) = idx.read_block_postings(term, IndexRecordOption::Basic) { + posting_lists.push(DocCursor::new(block_postings )); + } else { + return Ok(0); + } + } + posting_lists.sort_by_key(|posting_list| posting_list.len()); + Ok({ intersection_idx(&mut posting_lists[..]) }) +} + +fn intersection_idx(posting_lists: &mut [DocCursor]) -> DocId { + let mut candidate = posting_lists[0].doc(); + let mut i = 1; + + let mut count = 0u32; + + let num_posting_lists = posting_lists.len(); + loop { + while i < num_posting_lists { + posting_lists[i].skip_to(candidate); + if posting_lists[i].doc() != candidate { + candidate = posting_lists[i].doc(); + i = 0; + continue; + } + i += 1; + } + count += 1; + if candidate == MAX_DOC { + break; + } + posting_lists[0].advance(); + candidate = posting_lists[0].doc(); + i = 1; + } + count - 1u32 +} + + /// `DocAddress` contains all the necessary information /// to identify a document given a `Searcher` object. /// diff --git a/src/postings/block_search.rs b/src/postings/block_search.rs index b5b3fca4e..b8287a043 100644 --- a/src/postings/block_search.rs +++ b/src/postings/block_search.rs @@ -25,7 +25,7 @@ mod sse2 { /// /// There is no early exit here. We simply count the /// number of elements that are `< target`. - pub(crate) fn linear_search_sse2_128(arr: &AlignedBuffer, target: u32) -> usize { + pub fn linear_search_sse2_128(arr: &AlignedBuffer, target: u32) -> usize { unsafe { let ptr = arr as *const AlignedBuffer as *const DataType; let vkey = set1(target as i32); @@ -63,6 +63,8 @@ mod sse2 { } } +pub use self::sse2::linear_search_sse2_128; + /// This `linear search` browser exhaustively through the array. /// but the early exit is very difficult to predict. /// diff --git a/src/postings/compression/mod.rs b/src/postings/compression/mod.rs index 342ba80cc..c6314ecc2 100644 --- a/src/postings/compression/mod.rs +++ b/src/postings/compression/mod.rs @@ -46,7 +46,7 @@ impl BlockEncoder { /// We ensure that the OutputBuffer is align on 128 bits /// in order to run SSE2 linear search on it. #[repr(align(128))] -pub(crate) struct AlignedBuffer(pub [u32; COMPRESSION_BLOCK_SIZE]); +pub struct AlignedBuffer(pub [u32; COMPRESSION_BLOCK_SIZE]); pub struct BlockDecoder { bitpacker: BitPacker4x, @@ -67,6 +67,18 @@ impl BlockDecoder { } } + pub fn uncompress_vint_sorted_b<'a>( + &mut self, + compressed_data: &'a [u8], + offset: u32, + num_els: usize, + ) { + if num_els > 0 { + vint::uncompress_sorted(compressed_data, &mut self.output.0[..num_els], offset); + } + self.output.0[num_els..].iter_mut().for_each(|val| *val = 2_000_000_000u32); + } + pub fn uncompress_block_sorted( &mut self, compressed_data: &[u8], @@ -94,6 +106,11 @@ impl BlockDecoder { (&self.output, self.output_len) } + #[inline] + pub(crate) fn output_aligned_2(&self) -> &AlignedBuffer { + &self.output + } + #[inline] pub fn output(&self, idx: usize) -> u32 { self.output.0[idx] diff --git a/src/postings/mod.rs b/src/postings/mod.rs index b66beb413..0d3a61e67 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -3,7 +3,7 @@ Postings module (also called inverted index) */ mod block_search; -pub(crate) mod compression; +pub mod compression; /// Postings module /// /// Postings, also called inverted lists, is the key datastructure diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index f70e5f429..21beea830 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -317,7 +317,7 @@ pub struct BlockSegmentPostings { num_vint_docs: usize, remaining_data: OwnedRead, - skip_reader: SkipReader, + pub skip_reader: SkipReader, } fn split_into_skips_and_postings( @@ -414,7 +414,12 @@ impl BlockSegmentPostings { self.doc_decoder.output_array() } - pub(crate) fn docs_aligned(&self) -> (&AlignedBuffer, usize) { + #[inline(always)] + pub fn docs_aligned_b(&self) -> &AlignedBuffer { + self.doc_decoder.output_aligned_2() + } + + pub fn docs_aligned(&self) -> (&AlignedBuffer, usize) { self.doc_decoder.output_aligned() } @@ -486,6 +491,7 @@ impl BlockSegmentPostings { } } self.doc_offset = self.skip_reader.doc(); + unsafe { core::arch::x86_64::_mm_prefetch(self.remaining_data.as_ref() as *mut i8, _MM_HINT_T1) }; return BlockSegmentPostingsSkipResult::Success(skip_freqs); } else { skip_freqs += self.skip_reader.tf_sum(); @@ -526,6 +532,41 @@ impl BlockSegmentPostings { BlockSegmentPostingsSkipResult::Terminated } + /// Ensure we are located on the right block. + #[inline(never)] + pub fn skip_to_b(&mut self, target_doc: DocId) { + while self.skip_reader.advance() { + if self.skip_reader.doc() >= target_doc { + // the last document of the current block is larger + // than the target. + // + // We found our block! + let num_bits = self.skip_reader.doc_num_bits(); + let num_consumed_bytes = self.doc_decoder.uncompress_block_sorted( + self.remaining_data.as_ref(), + self.doc_offset, + num_bits, + ); + let tf_num_bits = self.skip_reader.tf_num_bits(); + let num_bytes_to_skip = compressed_block_size(tf_num_bits); + self.remaining_data.advance(num_consumed_bytes + num_bytes_to_skip); + self.doc_offset = self.skip_reader.doc(); + return; + } else { + let advance_len = self.skip_reader.total_block_len(); + self.doc_offset = self.skip_reader.doc(); + self.remaining_data.advance(advance_len); + } + } + + // we are now on the last, incomplete, variable encoded block. + self.doc_decoder.uncompress_vint_sorted_b( + self.remaining_data.as_ref(), + self.doc_offset, + self.num_vint_docs + ); + } + /// Advance to the next block. /// /// Returns false iff there was no remaining blocks. diff --git a/src/postings/skip.rs b/src/postings/skip.rs index 165664847..b79c785e3 100644 --- a/src/postings/skip.rs +++ b/src/postings/skip.rs @@ -49,7 +49,7 @@ impl SkipSerializer { } } -pub(crate) struct SkipReader { +pub struct SkipReader { doc: DocId, owned_read: OwnedRead, doc_num_bits: u8, diff --git a/src/query/query.rs b/src/query/query.rs index 591461623..aed9c4d37 100644 --- a/src/query/query.rs +++ b/src/query/query.rs @@ -1,11 +1,17 @@ use super::Weight; use crate::core::searcher::Searcher; use crate::query::Explanation; -use crate::DocAddress; +use crate::{DocAddress, Score}; use crate::Term; use downcast_rs::impl_downcast; use std::collections::BTreeSet; use std::fmt; +use crate::collector::{TopDocs, Collector, SegmentCollector, Count}; + +pub struct TopKResult { + pub count: u64, + docs: Vec<(Score, DocAddress)> +} /// The `Query` trait defines a set of documents and a scoring method /// for those documents. @@ -56,6 +62,27 @@ pub trait Query: QueryClone + downcast_rs::Downcast + fmt::Debug { weight.explain(reader, doc_address.doc()) } + fn top_k(&self, searcher: &Searcher, num_hits: usize) -> crate::Result { + let top_docs = TopDocs::with_limit(num_hits); + let collector = (Count, top_docs); + let weight = self.weight(searcher, false)?; + let mut count = 0u64; + let mut result = 0; + let mut child_fruits = Vec::with_capacity(searcher.segment_readers().len()); + for (segment_ord, reader) in searcher.segment_readers().iter().enumerate() { + let mut child_top_k = collector.for_segment(segment_ord as u32, reader)?; + let mut scorer = weight.scorer(reader, 1.0f32)?; + // handle deletes + scorer.top_k(&mut child_top_k); + child_fruits.push(child_top_k.harvest()); + } + let (count, docs) = collector.merge_fruits(child_fruits)?; + Ok(TopKResult { + count: count as u64, + docs + }) + } + /// Returns the number of documents matching the query. fn count(&self, searcher: &Searcher) -> crate::Result { let weight = self.weight(searcher, false)?; diff --git a/src/query/scorer.rs b/src/query/scorer.rs index 02a4fb021..ec3966f85 100644 --- a/src/query/scorer.rs +++ b/src/query/scorer.rs @@ -4,6 +4,8 @@ use crate::DocId; use crate::Score; use downcast_rs::impl_downcast; use std::ops::DerefMut; +use crate::collector::{SegmentCollector, Collector, TopDocs, Count}; + /// Scored set of documents matching a query within a specific segment. /// @@ -21,6 +23,12 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static { callback(self.doc(), self.score()); } } + + fn top_k(&mut self, collector: &mut <(Count, TopDocs) as Collector>::Child) { + while self.advance() { + collector.collect(self.doc(), self.score()); + } + } } impl_downcast!(Scorer);