diff --git a/CHANGELOG.md b/CHANGELOG.md index d692de690..cee32c998 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ previous index format.* for int fields. (@fulmicoton) - Added DateTime field (@barrotsteindev) - Added IndexReader. By default, index is reloaded automatically upon new commits (@fulmicoton) - +- SIMD linear search within blocks (@fulmicoton) Tantivy 0.8.2 ===================== diff --git a/src/common/mod.rs b/src/common/mod.rs index 6a1026e54..78ec3c423 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -13,6 +13,12 @@ pub use self::serialize::{BinarySerializable, FixedSize}; pub use self::vint::{read_u32_vint, serialize_vint_u32, write_u32_vint, VInt}; pub use byteorder::LittleEndian as Endianness; + +/// Segment's max doc must be `< MAX_DOC_LIMIT`. +/// +/// We do not allow segments with more than +pub const MAX_DOC_LIMIT: u32 = 1 << 31; + /// Computes the number of bits that will be used for bitpacking. /// /// In general the target is the minimum number of bits @@ -127,4 +133,11 @@ pub(crate) mod test { assert_eq!(compute_num_bits(256), 9u8); assert_eq!(compute_num_bits(5_000_000_000), 33u8); } + + #[test] + fn test_max_doc() { + // this is the first time I write a unit test for a constant. + assert!(((super::MAX_DOC_LIMIT - 1) as i32) >= 0); + assert!((super::MAX_DOC_LIMIT as i32) < 0); + } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 97048546b..404fd9bd4 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,3 +1,4 @@ +use common::MAX_DOC_LIMIT; use core::Segment; use core::SegmentReader; use core::SerializableSegment; @@ -23,6 +24,7 @@ use termdict::TermMerger; use termdict::TermOrdinal; use DocId; use Result; +use TantivyError; fn compute_total_num_tokens(readers: &[SegmentReader], field: Field) -> u64 { let mut total_tokens = 0u64; @@ -150,6 +152,14 @@ impl IndexMerger { readers.push(reader); } } + if max_doc >= MAX_DOC_LIMIT { + let err_msg = format!( + "The segment resulting from this merge would have {} docs,\ + which exceeds the limit {}.", + max_doc, MAX_DOC_LIMIT + ); + return Err(TantivyError::InvalidArgument(err_msg)); + } Ok(IndexMerger { schema, readers, diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 3dc373795..316dbd68a 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -420,6 +420,7 @@ impl SegmentUpdater { }) .collect::>(); merge_candidates.extend(committed_merge_candidates.into_iter()); + for merge_operation in merge_candidates { match self.start_merge_impl(merge_operation) { Ok(merge_future) => { diff --git a/src/postings/block_search.rs b/src/postings/block_search.rs new file mode 100644 index 000000000..825eb9b7d --- /dev/null +++ b/src/postings/block_search.rs @@ -0,0 +1,229 @@ +/// This modules define the logic used to search for a doc in a given +/// block. (at most 128 docs) +/// +/// Searching within a block is a hotspot when running intersection. +/// so it was worth defining it in its own module. + +#[cfg(target_arch = "x86_64")] +mod sse2 { + use postings::compression::COMPRESSION_BLOCK_SIZE; + use std::arch::x86_64::__m128i as DataType; + use std::arch::x86_64::_mm_add_epi32 as op_add; + use std::arch::x86_64::_mm_cmplt_epi32 as op_lt; + use std::arch::x86_64::_mm_load_si128 as op_load; // requires 128-bits alignment + use std::arch::x86_64::_mm_set1_epi32 as set1; + use std::arch::x86_64::_mm_setzero_si128 as set0; + use std::arch::x86_64::_mm_sub_epi32 as op_sub; + use std::arch::x86_64::{_mm_cvtsi128_si32, _mm_shuffle_epi32}; + + const MASK1: i32 = 78; + const MASK2: i32 = 177; + + /// Performs an exhaustive linear search over the + /// + /// There is no early exit here. We simply count the + /// number of elements that are `< target`. + pub fn linear_search_sse2_128(arr: &[u32], target: u32) -> usize { + unsafe { + let ptr = arr.as_ptr() as *const DataType; + let vkey = set1(target as i32); + let mut cnt = set0(); + // We work over 4 `__m128i` at a time. + // A single `__m128i` actual contains 4 `u32`. + for i in 0..(COMPRESSION_BLOCK_SIZE as isize) / (4 * 4) { + let cmp1 = op_lt(op_load(ptr.offset(i * 4)), vkey); + let cmp2 = op_lt(op_load(ptr.offset(i * 4 + 1)), vkey); + let cmp3 = op_lt(op_load(ptr.offset(i * 4 + 2)), vkey); + let cmp4 = op_lt(op_load(ptr.offset(i * 4 + 3)), vkey); + let sum = op_add(op_add(cmp1, cmp2), op_add(cmp3, cmp4)); + cnt = op_sub(cnt, sum); + } + cnt = op_add(cnt, _mm_shuffle_epi32(cnt, MASK1)); + cnt = op_add(cnt, _mm_shuffle_epi32(cnt, MASK2)); + _mm_cvtsi128_si32(cnt) as usize + } + } + + #[cfg(test)] + mod test { + use super::linear_search_sse2_128; + + #[test] + fn test_linear_search_sse2_128_u32() { + for i in 0..23 { + dbg!(i); + let arr: Vec = (0..128).map(|el| el * 2 + 1 << 18).collect(); + assert_eq!(linear_search_sse2_128(&arr, arr[64] + 1), 65); + } + } + } +} + +/// This `linear search` browser exhaustively through the array. +/// but the early exit is very difficult to predict. +/// +/// Coupled with `exponential search` this function is likely +/// to be called with the same `len` +fn linear_search(arr: &[u32], target: u32) -> usize { + arr.iter().map(|&el| if el < target { 1 } else { 0 }).sum() +} + +fn exponential_search(arr: &[u32], target: u32) -> (usize, usize) { + let end = arr.len(); + let mut begin = 0; + for &pivot in &[1, 3, 7, 15, 31, 63] { + if pivot >= end { + break; + } + if arr[pivot] > target { + return (begin, pivot); + } + begin = pivot; + } + (begin, end) +} + +fn galloping(block_docs: &[u32], target: u32) -> usize { + let (start, end) = exponential_search(&block_docs, target); + start + linear_search(&block_docs[start..end], target) +} + +/// Tantivy may rely on SIMD instructions to search for a specific document within +/// a given block. +#[derive(Clone, Copy, PartialEq)] +pub enum BlockSearcher { + #[cfg(target_arch = "x86_64")] + SSE2, + Scalar, +} + +impl BlockSearcher { + /// Search the first index containing an element greater or equal to + /// the target. + /// + /// The results should be equivalent to + /// ```ignore + /// block[..] + // .iter() + // .take_while(|&&val| val < target) + // .count() + /// ``` + /// + /// The `start` argument is just used to hint that the response is + /// greater than beyond `start`. The implementation may or may not use + /// it for optimization. + /// + /// # Assumption + /// + /// The array len is > start. + /// The block is sorted + /// The target is assumed greater or equal to the `arr[start]`. + /// The target is assumed smaller or equal to the last element of the block. + /// + /// Currently the scalar implementation starts by an exponential search, and + /// then operates a linear search in the result subarray. + /// + /// If SSE2 instructions are available in the `(platform, running CPU)`, + /// then we use a different implementation that does an exhaustive linear search over + /// the full block whenever the block is full (`len == 128`). It is surprisingly faster, most likely because of the lack + /// of branch. + pub fn search_in_block(&self, block_docs: &[u32], start: usize, target: u32) -> usize { + #[cfg(target_arch = "x86_64")] + { + use postings::compression::COMPRESSION_BLOCK_SIZE; + if *self == BlockSearcher::SSE2 { + if block_docs.len() == COMPRESSION_BLOCK_SIZE { + return sse2::linear_search_sse2_128(block_docs, target); + } + } + } + start + galloping(&block_docs[start..], target) + } +} + +impl Default for BlockSearcher { + fn default() -> BlockSearcher { + #[cfg(target_arch = "x86_64")] + { + if is_x86_feature_detected!("sse2") { + return BlockSearcher::SSE2; + } + } + BlockSearcher::Scalar + } +} + +#[cfg(test)] +mod tests { + use super::exponential_search; + use super::linear_search; + use super::BlockSearcher; + + #[test] + fn test_linear_search() { + let len: usize = 50; + let arr: Vec = (0..len).map(|el| 1u32 + (el as u32) * 2).collect(); + for target in 1..*arr.last().unwrap() { + let res = linear_search(&arr[..], target); + if res > 0 { + assert!(arr[res - 1] < target); + } + if res < len { + assert!(arr[res] >= target); + } + } + } + + #[test] + fn test_exponentiel_search() { + assert_eq!(exponential_search(&[1, 2], 0), (0, 1)); + assert_eq!(exponential_search(&[1, 2], 1), (0, 1)); + assert_eq!( + exponential_search(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 7), + (3, 7) + ); + } + + fn util_test_search_in_block(block_searcher: BlockSearcher, block: &[u32], target: u32) { + let cursor = search_in_block_trivial_but_slow(block, target); + for i in 0..cursor { + assert_eq!(block_searcher.search_in_block(block, i, target), cursor); + } + } + + fn util_test_search_in_block_all(block_searcher: BlockSearcher, block: &[u32]) { + use std::collections::HashSet; + let mut targets = HashSet::new(); + for (i, val) in block.iter().cloned().enumerate() { + if i > 0 { + targets.insert(val - 1); + } + targets.insert(val); + } + for target in targets { + util_test_search_in_block(block_searcher, block, target); + } + } + + fn search_in_block_trivial_but_slow(block: &[u32], target: u32) -> usize { + block.iter().take_while(|&&val| val < target).count() + } + + fn test_search_in_block_util(block_searcher: BlockSearcher) { + for len in 1u32..128u32 { + let v: Vec = (0..len).map(|i| i * 2).collect(); + util_test_search_in_block_all(block_searcher, &v[..]); + } + } + + #[test] + fn test_search_in_block_scalar() { + test_search_in_block_util(BlockSearcher::Scalar); + } + + #[cfg(target_arch = "x86_64")] + #[test] + fn test_search_in_block_sse2() { + test_search_in_block_util(BlockSearcher::SSE2); + } +} diff --git a/src/postings/compression/mod.rs b/src/postings/compression/mod.rs index 4da8c1c25..f35b6cde4 100644 --- a/src/postings/compression/mod.rs +++ b/src/postings/compression/mod.rs @@ -43,9 +43,14 @@ impl BlockEncoder { } } +/// We ensure that the OutputBuffer is align on 128 bits +/// in order to run SSE2 linear search on it. +#[repr(align(128))] +struct OutputBuffer([u32; COMPRESSION_BLOCK_SIZE + 1]); + pub struct BlockDecoder { bitpacker: BitPacker4x, - pub output: [u32; COMPRESSION_BLOCK_SIZE + 1], + output: OutputBuffer, pub output_len: usize, } @@ -59,7 +64,7 @@ impl BlockDecoder { output[COMPRESSION_BLOCK_SIZE] = 0u32; BlockDecoder { bitpacker: BitPacker4x::new(), - output, + output: OutputBuffer(output), output_len: 0, } } @@ -72,23 +77,23 @@ impl BlockDecoder { ) -> usize { self.output_len = COMPRESSION_BLOCK_SIZE; self.bitpacker - .decompress_sorted(offset, &compressed_data, &mut self.output, num_bits) + .decompress_sorted(offset, &compressed_data, &mut self.output.0, num_bits) } pub fn uncompress_block_unsorted(&mut self, compressed_data: &[u8], num_bits: u8) -> usize { self.output_len = COMPRESSION_BLOCK_SIZE; self.bitpacker - .decompress(&compressed_data, &mut self.output, num_bits) + .decompress(&compressed_data, &mut self.output.0, num_bits) } #[inline] pub fn output_array(&self) -> &[u32] { - &self.output[..self.output_len] + &self.output.0[..self.output_len] } #[inline] pub fn output(&self, idx: usize) -> u32 { - self.output[idx] + self.output.0[idx] } } @@ -159,12 +164,12 @@ impl VIntDecoder for BlockDecoder { num_els: usize, ) -> usize { self.output_len = num_els; - vint::uncompress_sorted(compressed_data, &mut self.output[..num_els], offset) + vint::uncompress_sorted(compressed_data, &mut self.output.0[..num_els], offset) } fn uncompress_vint_unsorted<'a>(&mut self, compressed_data: &'a [u8], num_els: usize) -> usize { self.output_len = num_els; - vint::uncompress_unsorted(compressed_data, &mut self.output[..num_els]) + vint::uncompress_unsorted(compressed_data, &mut self.output.0[..num_els]) } } diff --git a/src/postings/mod.rs b/src/postings/mod.rs index e571e26df..3e1c03411 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -2,6 +2,7 @@ Postings module (also called inverted index) */ +mod block_search; pub(crate) mod compression; /// Postings module /// @@ -16,6 +17,8 @@ mod skip; mod stacker; mod term_info; +pub(crate) use self::block_search::BlockSearcher; + pub(crate) use self::postings_writer::MultiFieldPostingsWriter; pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; @@ -104,9 +107,7 @@ pub mod tests { let searcher = index.reader().unwrap().searcher(); let inverted_index = searcher.segment_reader(0u32).inverted_index(title); let term = Term::from_field_text(title, "abc"); - let mut positions = Vec::new(); - { let mut postings = inverted_index .read_postings(&term, IndexRecordOption::WithFreqsAndPositions) diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index e6eca614b..b12c0b716 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -7,6 +7,7 @@ use positions::PositionReader; use postings::compression::compressed_block_size; use postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE}; use postings::serializer::PostingsSerializer; +use postings::BlockSearcher; use postings::FreqReadingOption; use postings::Postings; use postings::SkipReader; @@ -60,6 +61,7 @@ pub struct SegmentPostings { block_cursor: BlockSegmentPostings, cur: usize, position_computer: Option, + block_searcher: BlockSearcher, } impl SegmentPostings { @@ -70,6 +72,7 @@ impl SegmentPostings { block_cursor: empty_block_cursor, cur: COMPRESSION_BLOCK_SIZE, position_computer: None, + block_searcher: BlockSearcher::default(), } } @@ -117,41 +120,11 @@ impl SegmentPostings { block_cursor: segment_block_postings, cur: COMPRESSION_BLOCK_SIZE, // cursor within the block position_computer: positions_stream_opt.map(PositionComputer::new), + block_searcher: BlockSearcher::default(), } } } -fn linear_search(arr: &[u32], target: u32) -> usize { - arr.iter().map(|&el| if el < target { 1 } else { 0 }).sum() -} - -fn exponential_search(arr: &[u32], target: u32) -> (usize, usize) { - let end = arr.len(); - let mut begin = 0; - for &pivot in &[1, 3, 7, 15, 31, 63] { - if pivot >= end { - break; - } - if arr[pivot] > target { - return (begin, pivot); - } - begin = pivot; - } - (begin, end) -} - -/// Search the first index containing an element greater or equal to the target. -/// -/// # Assumption -/// -/// The array is assumed non empty. -/// The target is assumed greater or equal to the first element. -/// The target is assumed smaller or equal to the last element. -fn search_within_block(block_docs: &[u32], target: u32) -> usize { - let (start, end) = exponential_search(block_docs, target); - start + linear_search(&block_docs[start..end], target) -} - impl DocSet for SegmentPostings { // goes to the next element. // next needs to be called a first time to point to the correct element. @@ -230,9 +203,8 @@ impl DocSet for SegmentPostings { // we're in the right block now, start with an exponential search let block_docs = self.block_cursor.docs(); let new_cur = self - .cur - .wrapping_add(search_within_block(&block_docs[self.cur..], target)); - + .block_searcher + .search_in_block(&block_docs, self.cur, target); if need_positions { sum_freqs_skipped += self.block_cursor.freqs()[self.cur..new_cur] .iter() @@ -614,10 +586,6 @@ impl<'b> Streamer<'b> for BlockSegmentPostings { #[cfg(test)] mod tests { - - use super::exponential_search; - use super::linear_search; - use super::search_within_block; use super::BlockSegmentPostings; use super::BlockSegmentPostingsSkipResult; use super::SegmentPostings; @@ -632,21 +600,6 @@ mod tests { use DocId; use SkipResult; - #[test] - fn test_linear_search() { - let len: usize = 50; - let arr: Vec = (0..len).map(|el| 1u32 + (el as u32) * 2).collect(); - for target in 1..*arr.last().unwrap() { - let res = linear_search(&arr[..], target); - if res > 0 { - assert!(arr[res - 1] < target); - } - if res < len { - assert!(arr[res] >= target); - } - } - } - #[test] fn test_empty_segment_postings() { let mut postings = SegmentPostings::empty(); @@ -662,56 +615,6 @@ mod tests { assert_eq!(postings.doc_freq(), 0); } - fn search_within_block_trivial_but_slow(block: &[u32], target: u32) -> usize { - block - .iter() - .cloned() - .enumerate() - .filter(|&(_, ref val)| *val >= target) - .next() - .unwrap() - .0 - } - - #[test] - fn test_exponentiel_search() { - assert_eq!(exponential_search(&[1, 2], 0), (0, 1)); - assert_eq!(exponential_search(&[1, 2], 1), (0, 1)); - assert_eq!( - exponential_search(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 7), - (3, 7) - ); - } - - fn util_test_search_within_block(block: &[u32], target: u32) { - assert_eq!( - search_within_block(block, target), - search_within_block_trivial_but_slow(block, target) - ); - } - - fn util_test_search_within_block_all(block: &[u32]) { - use std::collections::HashSet; - let mut targets = HashSet::new(); - for (i, val) in block.iter().cloned().enumerate() { - if i > 0 { - targets.insert(val - 1); - } - targets.insert(val); - } - for target in targets { - util_test_search_within_block(block, target); - } - } - - #[test] - fn test_search_within_block() { - for len in 1u32..128u32 { - let v: Vec = (0..len).map(|i| i * 2).collect(); - util_test_search_within_block_all(&v[..]); - } - } - #[test] fn test_block_segment_postings() { let mut block_segments = build_block_postings(&(0..100_000).collect::>()); diff --git a/src/query/intersection.rs b/src/query/intersection.rs index 1067fe46d..a35a8ef6c 100644 --- a/src/query/intersection.rs +++ b/src/query/intersection.rs @@ -14,41 +14,35 @@ use Score; /// specialized implementation if the two /// shortest scorers are `TermScorer`s. pub fn intersect_scorers(mut scorers: Vec>) -> Box { + if scorers.is_empty() { + return Box::new(EmptyScorer); + } + if scorers.len() == 1 { + return scorers.pop().unwrap(); + } + // We know that we have at least 2 elements. let num_docsets = scorers.len(); scorers.sort_by(|left, right| right.size_hint().cmp(&left.size_hint())); - let rarest_opt = scorers.pop(); - let second_rarest_opt = scorers.pop(); + let left = scorers.pop().unwrap(); + let right = scorers.pop().unwrap(); scorers.reverse(); - match (rarest_opt, second_rarest_opt) { - (None, None) => Box::new(EmptyScorer), - (Some(single_docset), None) => single_docset, - (Some(left), Some(right)) => { - { - let all_term_scorers = [&left, &right] - .iter() - .all(|&scorer| scorer.is::()); - if all_term_scorers { - let left = *(left.downcast::().map_err(|_| ()).unwrap()); - let right = *(right.downcast::().map_err(|_| ()).unwrap()); - return Box::new(Intersection { - left, - right, - others: scorers, - num_docsets, - }); - } - } - Box::new(Intersection { - left, - right, - others: scorers, - num_docsets, - }) - } - _ => { - unreachable!(); - } + let all_term_scorers = [&left, &right] + .iter() + .all(|&scorer| scorer.is::()); + if all_term_scorers { + return Box::new(Intersection { + left: *(left.downcast::().map_err(|_| ()).unwrap()), + right: *(right.downcast::().map_err(|_| ()).unwrap()), + others: scorers, + num_docsets, + }); } + Box::new(Intersection { + left, + right, + others: scorers, + num_docsets, + }) } /// Creates a `DocSet` that iterator through the intersection of two `DocSet`s. @@ -124,7 +118,6 @@ impl DocSet for Intersection { break; @@ -140,35 +133,36 @@ impl DocSet for Intersection {} - SkipResult::OverStep => { - // this is not in the intersection, - // let's update our candidate. - candidate = docset.doc(); - match left.skip_next(candidate) { - SkipResult::Reached => { - other_candidate_ord = ord; - } - SkipResult::OverStep => { - candidate = left.doc(); - other_candidate_ord = usize::max_value(); - } - SkipResult::End => { - return false; - } + if ord == other_candidate_ord { + continue; + } + // `candidate_ord` is already at the + // right position. + // + // Calling `skip_next` would advance this docset + // and miss it. + match docset.skip_next(candidate) { + SkipResult::Reached => {} + SkipResult::OverStep => { + // this is not in the intersection, + // let's update our candidate. + candidate = docset.doc(); + match left.skip_next(candidate) { + SkipResult::Reached => { + other_candidate_ord = ord; + } + SkipResult::OverStep => { + candidate = left.doc(); + other_candidate_ord = usize::max_value(); + } + SkipResult::End => { + return false; } - continue 'outer; - } - SkipResult::End => { - return false; } + continue 'outer; + } + SkipResult::End => { + return false; } } }