mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-23 02:29:57 +00:00
Introduced a select cursor.
This commit is contained in:
@@ -5,8 +5,8 @@ use std::sync::Arc;
|
||||
mod set;
|
||||
mod set_block;
|
||||
|
||||
use common::{BinarySerializable, GroupByIteratorExtended, OwnedBytes, VInt};
|
||||
pub use set::{Set, SetCodec};
|
||||
use common::{BinarySerializable, OwnedBytes, VInt};
|
||||
pub use set::{Set, SetCodec, SelectCursor};
|
||||
use set_block::{
|
||||
DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
|
||||
};
|
||||
@@ -115,7 +115,59 @@ fn row_addr_from_row_id(row_id: RowId) -> RowAddr {
|
||||
}
|
||||
}
|
||||
|
||||
enum BlockSelectCursor<'a> {
|
||||
Dense(<DenseBlock<'a> as Set<u16>>::SelectCursor<'a>),
|
||||
Sparse(<SparseBlock<'a> as Set<u16>>::SelectCursor<'a>),
|
||||
}
|
||||
|
||||
impl<'a> BlockSelectCursor<'a> {
|
||||
fn select(&mut self, rank: u16) -> u16 {
|
||||
match self {
|
||||
BlockSelectCursor::Dense(dense_select_cursor) => dense_select_cursor.select(rank),
|
||||
BlockSelectCursor::Sparse(sparse_select_cursor) => sparse_select_cursor.select(rank),
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
pub struct OptionalIndexSelectCursor<'a> {
|
||||
current_block_cursor: BlockSelectCursor<'a>,
|
||||
current_block_id: u16,
|
||||
// The current block is guaranteed to contain ranks < end_rank.
|
||||
current_block_end_rank: RowId,
|
||||
optional_index: &'a OptionalIndex,
|
||||
block_doc_idx_start: RowId,
|
||||
num_null_rows_before_block: RowId,
|
||||
}
|
||||
|
||||
impl<'a> OptionalIndexSelectCursor<'a> {
|
||||
fn search_and_load_block(&mut self, rank: RowId) {
|
||||
if rank < self.current_block_end_rank {
|
||||
// we are already in the right block
|
||||
return;
|
||||
}
|
||||
self.current_block_id = self.optional_index.find_block(rank, self.current_block_id);
|
||||
self.current_block_end_rank = self.optional_index.block_metas.get(self.current_block_id as usize + 1).map(|block_meta| block_meta.non_null_rows_before_block).unwrap_or(u32::MAX);
|
||||
self.block_doc_idx_start = (self.current_block_id as u32) * ELEMENTS_PER_BLOCK;
|
||||
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
|
||||
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
|
||||
let block: Block<'_> = self.optional_index.block(block_meta);
|
||||
self.current_block_cursor = match block {
|
||||
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
|
||||
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
||||
fn select(&mut self, rank: RowId) -> RowId {
|
||||
self.search_and_load_block(rank);
|
||||
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
|
||||
self.current_block_cursor.select(index_in_block) as RowId + self.block_doc_idx_start
|
||||
}
|
||||
}
|
||||
|
||||
impl Set<RowId> for OptionalIndex {
|
||||
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
|
||||
// Check if value at position is not null.
|
||||
#[inline]
|
||||
fn contains(&self, row_id: RowId) -> bool {
|
||||
@@ -148,7 +200,7 @@ impl Set<RowId> for OptionalIndex {
|
||||
#[inline]
|
||||
fn select(&self, rank: RowId) -> RowId {
|
||||
let block_pos = self.find_block(rank, 0);
|
||||
let block_doc_idx_start = block_pos * ELEMENTS_PER_BLOCK;
|
||||
let block_doc_idx_start = (block_pos as u32) * ELEMENTS_PER_BLOCK;
|
||||
let block_meta = self.block_metas[block_pos as usize];
|
||||
let block: Block<'_> = self.block(block_meta);
|
||||
let index_in_block = (rank - block_meta.non_null_rows_before_block) as u16;
|
||||
@@ -159,39 +211,27 @@ impl Set<RowId> for OptionalIndex {
|
||||
block_doc_idx_start + in_block_rank as u32
|
||||
}
|
||||
|
||||
fn select_batch(&self, ranks: &[u32], output_idxs: &mut [u32]) {
|
||||
let mut block_pos = 0u32;
|
||||
let mut start = 0;
|
||||
let group_by_it = ranks.iter().copied().group_by(move |codec_idx| {
|
||||
block_pos = self.find_block(*codec_idx, block_pos);
|
||||
block_pos
|
||||
});
|
||||
for (block_pos, block_iter) in group_by_it {
|
||||
let block_doc_idx_start = block_pos * ELEMENTS_PER_BLOCK;
|
||||
let block_meta = self.block_metas[block_pos as usize];
|
||||
let block: Block<'_> = self.block(block_meta);
|
||||
let offset = block_meta.non_null_rows_before_block;
|
||||
let indexes_in_block_iter =
|
||||
block_iter.map(move |codec_idx| (codec_idx - offset) as u16);
|
||||
match block {
|
||||
Block::Dense(dense_block) => {
|
||||
for in_offset in dense_block.select_iter(indexes_in_block_iter) {
|
||||
output_idxs[start] = in_offset as u32 + block_doc_idx_start;
|
||||
start += 1;
|
||||
}
|
||||
}
|
||||
Block::Sparse(sparse_block) => {
|
||||
for in_offset in sparse_block.select_iter(indexes_in_block_iter) {
|
||||
output_idxs[start] = in_offset as u32 + block_doc_idx_start;
|
||||
start += 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> {
|
||||
OptionalIndexSelectCursor {
|
||||
current_block_cursor: BlockSelectCursor::Sparse(SparseBlockCodec::open(b"").select_cursor()),
|
||||
current_block_id: 0u16,
|
||||
current_block_end_rank: 0u32, //< this is sufficient to force the first load
|
||||
optional_index: self,
|
||||
block_doc_idx_start: 0u32,
|
||||
num_null_rows_before_block: 0u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl OptionalIndex {
|
||||
|
||||
pub fn select_batch(&self, ranks: &mut [RowId]) {
|
||||
let mut select_cursor = self.select_cursor();
|
||||
for rank in ranks.iter_mut() {
|
||||
*rank = select_cursor.select(*rank);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn block<'a>(&'a self, block_meta: BlockMeta) -> Block<'a> {
|
||||
let BlockMeta {
|
||||
@@ -214,14 +254,14 @@ impl OptionalIndex {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn find_block(&self, dense_idx: u32, start_block_pos: u32) -> u32 {
|
||||
for block_pos in start_block_pos..self.block_metas.len() as u32 {
|
||||
fn find_block(&self, dense_idx: u32, start_block_pos: u16) -> u16 {
|
||||
for block_pos in start_block_pos..self.block_metas.len() as u16 {
|
||||
let offset = self.block_metas[block_pos as usize].non_null_rows_before_block;
|
||||
if offset > dense_idx {
|
||||
return block_pos - 1;
|
||||
return block_pos - 1u16;
|
||||
}
|
||||
}
|
||||
self.block_metas.len() as u32 - 1u32
|
||||
self.block_metas.len() as u16 - 1u16
|
||||
}
|
||||
|
||||
// TODO Add a good API for the codec_idx to original_idx translation.
|
||||
|
||||
@@ -13,7 +13,19 @@ pub trait SetCodec {
|
||||
fn open<'a>(data: &'a [u8]) -> Self::Reader<'a>;
|
||||
}
|
||||
|
||||
|
||||
/// Stateful object that makes it possible to compute several select in a row,
|
||||
/// provided the rank passed as argument are increasing.
|
||||
pub trait SelectCursor<T> {
|
||||
// May panic if rank is greater than the number of elements in the Set,
|
||||
// or if rank is < than value provided in the previous call.
|
||||
fn select(&mut self, rank: T) -> T;
|
||||
}
|
||||
|
||||
pub trait Set<T> {
|
||||
type SelectCursor<'b>: SelectCursor<T> where Self: 'b;
|
||||
|
||||
|
||||
/// Returns true if the elements is contained in the Set
|
||||
fn contains(&self, el: T) -> bool;
|
||||
|
||||
@@ -28,11 +40,6 @@ pub trait Set<T> {
|
||||
/// May panic if rank is greater than the number of elements in the Set.
|
||||
fn select(&self, rank: T) -> T;
|
||||
|
||||
/// Batch version of select.
|
||||
/// `ranks` is assumed to be sorted.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// May panic if rank is greater than the number of elements in the Set.
|
||||
fn select_batch(&self, ranks: &[T], outputs: &mut [T]);
|
||||
/// Creates a brand new select cursor.
|
||||
fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b>;
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::io::{self, Write};
|
||||
|
||||
use common::BinarySerializable;
|
||||
|
||||
use crate::column_index::optional_index::{Set, SetCodec, ELEMENTS_PER_BLOCK};
|
||||
use crate::column_index::optional_index::{Set, SetCodec, SelectCursor, ELEMENTS_PER_BLOCK};
|
||||
|
||||
#[inline(always)]
|
||||
fn get_bit_at(input: u64, n: u16) -> bool {
|
||||
@@ -105,7 +105,24 @@ impl DenseMiniBlock {
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct DenseBlock<'a>(&'a [u8]);
|
||||
|
||||
pub struct DenseBlockSelectCursor<'a> {
|
||||
block_id: u16,
|
||||
dense_block: DenseBlock<'a>,
|
||||
}
|
||||
|
||||
impl<'a> SelectCursor<u16> for DenseBlockSelectCursor<'a> {
|
||||
#[inline]
|
||||
fn select(&mut self, rank: u16) -> u16 {
|
||||
self.block_id = self.dense_block.find_miniblock_containing_rank(rank, self.block_id).unwrap();
|
||||
let index_block = self.dense_block.mini_block(self.block_id);
|
||||
let in_block_rank = rank - index_block.rank;
|
||||
self.block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Set<u16> for DenseBlock<'a> {
|
||||
type SelectCursor<'b> = DenseBlockSelectCursor<'a> where Self: 'b;
|
||||
|
||||
#[inline(always)]
|
||||
fn contains(&self, el: u16) -> bool {
|
||||
let mini_block_id = el / ELEMENTS_PER_MINI_BLOCK;
|
||||
@@ -136,37 +153,15 @@ impl<'a> Set<u16> for DenseBlock<'a> {
|
||||
block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank)
|
||||
}
|
||||
|
||||
fn select_batch(&self, ranks: &[u16], outputs: &mut [u16]) {
|
||||
let orig_ids = self.select_iter(ranks.iter().copied());
|
||||
for (output, original_id) in outputs.iter_mut().zip(orig_ids) {
|
||||
*output = original_id;
|
||||
#[inline(always)]
|
||||
fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b> {
|
||||
DenseBlockSelectCursor {
|
||||
block_id: 0,
|
||||
dense_block: *self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DenseBlock<'a> {
|
||||
/// Iterator verison of select.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if one of the rank is higher than the number of elements in the set.
|
||||
pub fn select_iter<'b>(
|
||||
&self,
|
||||
rank_it: impl Iterator<Item = u16> + 'b,
|
||||
) -> impl Iterator<Item = u16> + 'b
|
||||
where
|
||||
Self: 'b,
|
||||
{
|
||||
let mut block_id = 0u16;
|
||||
let me = *self;
|
||||
rank_it.map(move |rank| {
|
||||
block_id = me.find_miniblock_containing_rank(rank, block_id).unwrap();
|
||||
let index_block = me.mini_block(block_id);
|
||||
let in_block_rank = rank - index_block.rank;
|
||||
block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DenseBlock<'a> {
|
||||
#[inline]
|
||||
fn mini_block(&self, mini_block_id: u16) -> DenseMiniBlock {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::column_index::optional_index::{Set, SetCodec};
|
||||
use crate::column_index::optional_index::{Set, SetCodec, SelectCursor};
|
||||
|
||||
pub struct SparseBlockCodec;
|
||||
|
||||
@@ -24,7 +24,17 @@ impl SetCodec for SparseBlockCodec {
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct SparseBlock<'a>(&'a [u8]);
|
||||
|
||||
impl<'a> SelectCursor<u16> for SparseBlock<'a> {
|
||||
#[inline]
|
||||
fn select(&mut self, rank: u16) -> u16 {
|
||||
<SparseBlock<'a> as Set<u16>>::select(self, rank)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Set<u16> for SparseBlock<'a> {
|
||||
|
||||
type SelectCursor<'b> = Self where Self: 'b;
|
||||
|
||||
#[inline(always)]
|
||||
fn contains(&self, el: u16) -> bool {
|
||||
self.binary_search(el).is_ok()
|
||||
@@ -41,12 +51,11 @@ impl<'a> Set<u16> for SparseBlock<'a> {
|
||||
u16::from_le_bytes(self.0[offset..offset + 2].try_into().unwrap())
|
||||
}
|
||||
|
||||
fn select_batch(&self, ranks: &[u16], outputs: &mut [u16]) {
|
||||
let orig_ids = self.select_iter(ranks.iter().copied());
|
||||
for (output, original_id) in outputs.iter_mut().zip(orig_ids) {
|
||||
*output = original_id;
|
||||
}
|
||||
#[inline(always)]
|
||||
fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b> {
|
||||
*self
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -96,17 +105,4 @@ impl<'a> SparseBlock<'a> {
|
||||
}
|
||||
Err(left)
|
||||
}
|
||||
|
||||
pub fn select_iter<'b>(
|
||||
&self,
|
||||
iter: impl Iterator<Item = u16> + 'b,
|
||||
) -> impl Iterator<Item = u16> + 'b
|
||||
where
|
||||
Self: 'b,
|
||||
{
|
||||
iter.map(|codec_id| {
|
||||
let offset = codec_id as usize * 2;
|
||||
u16::from_le_bytes(self.0[offset..offset + 2].try_into().unwrap())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use crate::column_index::optional_index::set_block::dense::DENSE_BLOCK_NUM_BYTES;
|
||||
use crate::column_index::optional_index::set_block::{DenseBlockCodec, SparseBlockCodec};
|
||||
use crate::column_index::optional_index::{Set, SetCodec};
|
||||
use crate::column_index::optional_index::{Set, SetCodec, SelectCursor};
|
||||
|
||||
fn test_set_helper<C: SetCodec<Item = u16>>(vals: &[u16]) -> usize {
|
||||
let mut buffer = Vec::new();
|
||||
@@ -74,12 +74,10 @@ fn test_simple_translate_codec_codec_idx_to_original_idx_dense() {
|
||||
.unwrap();
|
||||
let tested_set = DenseBlockCodec::open(buffer.as_slice());
|
||||
assert!(tested_set.contains(1));
|
||||
assert_eq!(
|
||||
&tested_set
|
||||
.select_iter([0, 1, 2, 5].iter().copied())
|
||||
.collect::<Vec<u16>>(),
|
||||
&[1, 3, 17, 30_001]
|
||||
);
|
||||
let mut select_cursor = tested_set.select_cursor();
|
||||
assert_eq!(select_cursor.select(0), 1);
|
||||
assert_eq!(select_cursor.select(1), 3);
|
||||
assert_eq!(select_cursor.select(2), 17);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -88,12 +86,10 @@ fn test_simple_translate_codec_idx_to_original_idx_sparse() {
|
||||
SparseBlockCodec::serialize([1, 3, 17].iter().copied(), &mut buffer).unwrap();
|
||||
let tested_set = SparseBlockCodec::open(buffer.as_slice());
|
||||
assert!(tested_set.contains(1));
|
||||
assert_eq!(
|
||||
&tested_set
|
||||
.select_iter([0, 1, 2].iter().copied())
|
||||
.collect::<Vec<u16>>(),
|
||||
&[1, 3, 17]
|
||||
);
|
||||
let mut select_cursor = tested_set.select_cursor();
|
||||
assert_eq!(SelectCursor::select(&mut select_cursor, 0), 1);
|
||||
assert_eq!(SelectCursor::select(&mut select_cursor, 1), 3);
|
||||
assert_eq!(SelectCursor::select(&mut select_cursor, 2), 17);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -102,10 +98,8 @@ fn test_simple_translate_codec_idx_to_original_idx_dense() {
|
||||
DenseBlockCodec::serialize(0u16..150u16, &mut buffer).unwrap();
|
||||
let tested_set = DenseBlockCodec::open(buffer.as_slice());
|
||||
assert!(tested_set.contains(1));
|
||||
let rg = 0u16..150u16;
|
||||
let els: Vec<u16> = rg.clone().collect();
|
||||
assert_eq!(
|
||||
&tested_set.select_iter(rg.clone()).collect::<Vec<u16>>(),
|
||||
&els
|
||||
);
|
||||
let mut select_cursor = tested_set.select_cursor();
|
||||
for i in 0..150 {
|
||||
assert_eq!(i, select_cursor.select(i));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,9 +41,10 @@ fn test_with_random_sets_simple() {
|
||||
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
|
||||
let ranks: Vec<u32> = (65_472u32..65_473u32).collect();
|
||||
let els: Vec<u32> = ranks.iter().copied().map(|rank| rank + 10).collect();
|
||||
let mut output = vec![0u32; ranks.len()];
|
||||
null_index.select_batch(&ranks[..], &mut output[..]);
|
||||
assert_eq!(&output, &els);
|
||||
let mut select_cursor = null_index.select_cursor();
|
||||
for (rank, el) in ranks.iter().copied().zip(els.iter().copied()) {
|
||||
assert_eq!(select_cursor.select(rank), el);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -91,11 +92,10 @@ fn test_null_index(data: &[bool]) {
|
||||
.filter(|(_pos, val)| **val)
|
||||
.map(|(pos, _val)| pos as u32)
|
||||
.collect();
|
||||
let ids: Vec<u32> = (0..orig_idx_with_value.len() as u32).collect();
|
||||
let mut output = vec![0u32; ids.len()];
|
||||
null_index.select_batch(&ids[..], &mut output);
|
||||
// assert_eq!(&output[0..100], &orig_idx_with_value[0..100]);
|
||||
assert_eq!(output, orig_idx_with_value);
|
||||
let mut select_iter = null_index.select_cursor();
|
||||
for i in 0..orig_idx_with_value.len() {
|
||||
assert_eq!(select_iter.select(i as u32), orig_idx_with_value[i]);
|
||||
}
|
||||
|
||||
let step_size = (orig_idx_with_value.len() / 100).max(1);
|
||||
for (dense_idx, orig_idx) in orig_idx_with_value.iter().enumerate().step_by(step_size) {
|
||||
@@ -115,9 +115,9 @@ fn test_optional_index_test_translation() {
|
||||
let iter = &[true, false, true, false];
|
||||
serialize_optional_index(&&iter[..], &mut out).unwrap();
|
||||
let null_index = open_optional_index(OwnedBytes::new(out)).unwrap();
|
||||
let mut output = vec![0u32; 2];
|
||||
null_index.select_batch(&[0, 1], &mut output);
|
||||
assert_eq!(output, &[0, 2]);
|
||||
let mut select_cursor = null_index.select_cursor();
|
||||
assert_eq!(select_cursor.select(0), 0);
|
||||
assert_eq!(select_cursor.select(1), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -175,7 +175,6 @@ mod bench {
|
||||
.map(|_| rng.gen_bool(fill_ratio))
|
||||
.collect();
|
||||
serialize_optional_index(&&vals[..], &mut out).unwrap();
|
||||
|
||||
let codec = open_optional_index(OwnedBytes::new(out)).unwrap();
|
||||
codec
|
||||
}
|
||||
@@ -311,7 +310,8 @@ mod bench {
|
||||
};
|
||||
let mut output = vec![0u32; idxs.len()];
|
||||
bench.iter(|| {
|
||||
codec.select_batch(&idxs[..], &mut output);
|
||||
output.copy_from_slice(&idxs[..]);
|
||||
codec.select_batch(&mut output);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user