mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-27 20:42:54 +00:00
Compare commits
1 Commits
remove-byt
...
fast-u64-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d30cafa80a |
@@ -22,6 +22,11 @@ tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
|
||||
proptest = "1"
|
||||
more-asserts = "0.3.1"
|
||||
rand = "0.8.5"
|
||||
criterion = "0.4"
|
||||
|
||||
[features]
|
||||
unstable = []
|
||||
|
||||
[[bench]]
|
||||
name = "bench_index"
|
||||
harness = false
|
||||
|
||||
91
columnar/benches/bench_index.rs
Normal file
91
columnar/benches/bench_index.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use criterion::*;
|
||||
use rand::prelude::*;
|
||||
use tantivy_columnar::column_index::MultiValueIndex;
|
||||
use tantivy_columnar::RowId;
|
||||
|
||||
const WINDOW: usize = 40;
|
||||
|
||||
fn bench_multi_value_index_util(
|
||||
len_range: Range<u32>,
|
||||
num_rows: RowId,
|
||||
select_value_ratio: f64,
|
||||
b: &mut criterion::Bencher,
|
||||
) {
|
||||
let mut start_index: Vec<RowId> = vec![0u32];
|
||||
let mut cursor: u32 = 0u32;
|
||||
let mut rng = StdRng::from_seed([16u8; 32]);
|
||||
for i in 0..num_rows {
|
||||
let num_vals = rng.gen_range(len_range.clone());
|
||||
cursor += num_vals;
|
||||
start_index.push(cursor);
|
||||
}
|
||||
let select_rows: Vec<RowId> = (0u32..cursor)
|
||||
.filter(|i| rng.gen_bool(select_value_ratio))
|
||||
.collect();
|
||||
let mv_index = MultiValueIndex::for_test(&start_index);
|
||||
|
||||
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
|
||||
let mut buffer = Vec::new();
|
||||
b.iter(|| {
|
||||
let mut start_row = 0u32;
|
||||
let mut len = 0;
|
||||
for chunk in select_rows.chunks(WINDOW) {
|
||||
buffer.clear();
|
||||
buffer.extend_from_slice(chunk);
|
||||
mv_index.select_batch_in_place(start_row, &mut buffer);
|
||||
start_row = buffer.last().copied().unwrap();
|
||||
len += buffer.len()
|
||||
}
|
||||
assert_eq!(len, 4303);
|
||||
len
|
||||
});
|
||||
}
|
||||
|
||||
fn bench_multi_value_index_util2(
|
||||
len_range: Range<u32>,
|
||||
num_rows: RowId,
|
||||
select_value_ratio: f64,
|
||||
b: &mut criterion::Bencher,
|
||||
) {
|
||||
let mut start_index: Vec<RowId> = vec![0u32];
|
||||
let mut cursor: u32 = 0u32;
|
||||
let mut rng = StdRng::from_seed([16u8; 32]);
|
||||
for i in 0..num_rows {
|
||||
let num_vals = rng.gen_range(len_range.clone());
|
||||
cursor += num_vals;
|
||||
start_index.push(cursor);
|
||||
}
|
||||
let select_rows: Vec<RowId> = (0u32..cursor)
|
||||
.filter(|i| rng.gen_bool(select_value_ratio))
|
||||
.collect();
|
||||
let mv_index = MultiValueIndex::for_test(&start_index);
|
||||
|
||||
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
|
||||
let mut buffer = Vec::new();
|
||||
b.iter(|| {
|
||||
let mut mv_index_cursor = mv_index.select_cursor();
|
||||
let mut len = 0;
|
||||
for chunk in select_rows.chunks(WINDOW) {
|
||||
buffer.clear();
|
||||
buffer.extend_from_slice(chunk);
|
||||
mv_index_cursor.select_batch_in_place(&mut buffer);
|
||||
len += buffer.len();
|
||||
}
|
||||
assert_eq!(len, 4303);
|
||||
len
|
||||
});
|
||||
}
|
||||
|
||||
fn select_benchmark(c: &mut criterion::Criterion) {
|
||||
c.bench_function("bench_multi_value_index_10_100", |b| {
|
||||
bench_multi_value_index_util(0..10, 100_000, 0.01f64, b)
|
||||
});
|
||||
c.bench_function("bench_multi_value_cursor_index_10_100", |b| {
|
||||
bench_multi_value_index_util2(0..10, 100_000, 0.01f64, b)
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, select_benchmark);
|
||||
criterion_main!(benches);
|
||||
@@ -11,7 +11,6 @@
|
||||
|
||||
# Perf and Size
|
||||
* remove alloc in `ord_to_term`
|
||||
+ multivaued range queries restrat frm the beginning all of the time.
|
||||
* re-add ZSTD compression for dictionaries
|
||||
no systematic monotonic mapping
|
||||
consider removing multilinear
|
||||
|
||||
@@ -9,9 +9,90 @@ pub use merge::merge_column_index;
|
||||
pub use optional_index::{OptionalIndex, Set};
|
||||
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
|
||||
|
||||
use crate::column_index::multivalued_index::MultiValueIndex;
|
||||
pub use crate::column_index::multivalued_index::{MultiValueIndex, MultiValueIndexCursor};
|
||||
use crate::column_index::optional_index::OptionalIndexSelectCursor;
|
||||
use crate::{Cardinality, RowId};
|
||||
|
||||
pub struct ColumnIndexSelectCursor {
|
||||
last_rank: Option<RowId>,
|
||||
cardinality_specific_impl: CardinalitySpecificSelectCursor,
|
||||
}
|
||||
|
||||
impl From<CardinalitySpecificSelectCursor> for ColumnIndexSelectCursor {
|
||||
fn from(cardinality_specific_impl: CardinalitySpecificSelectCursor) -> Self {
|
||||
ColumnIndexSelectCursor {
|
||||
last_rank: None,
|
||||
cardinality_specific_impl,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum CardinalitySpecificSelectCursor {
|
||||
Full,
|
||||
Optional(OptionalIndexSelectCursor),
|
||||
Multivalued(MultiValueIndexCursor),
|
||||
}
|
||||
|
||||
/// This cursor object point is to compute batches of `select` operations.
|
||||
///
|
||||
/// Regardless of cardinality, a column index can always be seen as a mapping
|
||||
/// from row_id -> start_value_row_id. By definition, it is increasing.
|
||||
/// If `left <= right, column_index[left] <= column_index[right]`.
|
||||
///
|
||||
/// The select operation then identifies, given a value row id, which row it
|
||||
/// belong to: it is the inverse mapping.
|
||||
///
|
||||
/// As a more formal definition, `select(rank)` is defined as the only `i` such that
|
||||
/// mapping[i] <= rank and mapping[i+1] < rank.
|
||||
/// Another way to define it is to say that it is the last i such that
|
||||
/// mapping[i] <= rank.
|
||||
/// Finally it can be defined as the number of `row_id` such that
|
||||
/// mapping[i] <= rank.
|
||||
///
|
||||
/// `select_batch_in_place` is a complex function that copmutes
|
||||
/// select operation in batches and in place.
|
||||
///
|
||||
/// For optimization reasons, it only supports supplying ever striclty increasing
|
||||
/// values of `rank_ids`, even cross calls.
|
||||
///
|
||||
/// It is also required from the caller, to only supply rank_ids lower than max(mapping).
|
||||
/// Within those condition, the returned `row_ids` are guaranteed to be unique.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the supplied rank_ids are not increasing from one call to another.
|
||||
/// We only check that the `rank_ids` Vec is increasing in debug mode for
|
||||
/// performance reason.
|
||||
impl ColumnIndexSelectCursor {
|
||||
/// Returns a list of
|
||||
pub fn select_batch_in_place(&mut self, rank_ids: &mut Vec<RowId>) {
|
||||
// `rank_ids` has to be sorted.
|
||||
debug_assert!(rank_ids.windows(2).all(|window| window[0] < window[1]));
|
||||
// Two consecutive calls must pass strictly increasing `rank_ids`.
|
||||
let (Some(first_rank), Some(new_last_rank)) = (rank_ids.first().copied(), rank_ids.last().copied()) else {
|
||||
// rank_ids is empty, there is nothing to do.
|
||||
return;
|
||||
};
|
||||
if let Some(last_rank) = self.last_rank {
|
||||
assert!(last_rank < first_rank);
|
||||
}
|
||||
self.last_rank = Some(new_last_rank);
|
||||
match &mut self.cardinality_specific_impl {
|
||||
CardinalitySpecificSelectCursor::Full => {
|
||||
// No need to do anything:
|
||||
// `value_idx` and `row_idx` are the same.
|
||||
}
|
||||
CardinalitySpecificSelectCursor::Optional(optional_index) => {
|
||||
optional_index.select_batch_in_place(&mut rank_ids[..]);
|
||||
}
|
||||
CardinalitySpecificSelectCursor::Multivalued(multivalued_index) => {
|
||||
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
|
||||
multivalued_index.select_batch_in_place(rank_ids)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum ColumnIndex {
|
||||
Full,
|
||||
@@ -67,18 +148,15 @@ impl ColumnIndex {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn select_batch_in_place(&self, rank_ids: &mut Vec<RowId>) {
|
||||
pub fn select_cursor(&self) -> ColumnIndexSelectCursor {
|
||||
match self {
|
||||
ColumnIndex::Full => {
|
||||
// No need to do anything:
|
||||
// value_idx and row_idx are the same.
|
||||
}
|
||||
ColumnIndex::Full => CardinalitySpecificSelectCursor::Full.into(),
|
||||
ColumnIndex::Optional(optional_index) => {
|
||||
optional_index.select_batch(&mut rank_ids[..]);
|
||||
CardinalitySpecificSelectCursor::Optional(optional_index.select_cursor()).into()
|
||||
}
|
||||
ColumnIndex::Multivalued(multivalued_index) => {
|
||||
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
|
||||
multivalued_index.select_batch_in_place(0u32, rank_ids)
|
||||
CardinalitySpecificSelectCursor::Multivalued(multivalued_index.select_cursor())
|
||||
.into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,12 +35,6 @@ pub struct MultiValueIndex {
|
||||
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
|
||||
}
|
||||
|
||||
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
|
||||
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
|
||||
MultiValueIndex { start_index_column }
|
||||
}
|
||||
}
|
||||
|
||||
impl MultiValueIndex {
|
||||
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
|
||||
let mut buffer = Vec::new();
|
||||
@@ -64,78 +58,302 @@ impl MultiValueIndex {
|
||||
self.start_index_column.num_vals() - 1
|
||||
}
|
||||
|
||||
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
|
||||
/// row_ids. Positions are converted inplace to docids.
|
||||
pub fn select_cursor(&self) -> MultiValueIndexCursor {
|
||||
MultiValueIndexCursor {
|
||||
multivalued_index: self.clone(),
|
||||
row_cursor: 0u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MultiValueIndexCursor {
|
||||
multivalued_index: MultiValueIndex,
|
||||
row_cursor: RowId,
|
||||
}
|
||||
|
||||
impl MultiValueIndexCursor {
|
||||
/// See contract in `ColumnIndexSelectCursor`.
|
||||
///
|
||||
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
|
||||
/// index.
|
||||
/// Multi valued cardinality is special for two different
|
||||
/// ranks `rank_left` and `rank_right`, we can end up with
|
||||
/// the same `select(rank_left)` and `select(rank_right)`.
|
||||
///
|
||||
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
|
||||
/// increasing positions.
|
||||
///
|
||||
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
|
||||
/// match a docid to its value position.
|
||||
#[allow(clippy::bool_to_int_with_if)]
|
||||
pub(crate) fn select_batch_in_place(&self, row_start: RowId, ranks: &mut Vec<u32>) {
|
||||
/// For this reason, this function includes extra complexity
|
||||
/// to prevent the cursor from emitting the same row_id.
|
||||
/// - From a last call, by skipping ranks mapping to
|
||||
/// the same row_id
|
||||
/// - With the batch, by simply deduplicating the output.
|
||||
pub fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
|
||||
if ranks.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut cur_doc = row_start;
|
||||
let mut last_doc = None;
|
||||
let mut row_cursor = self.row_cursor;
|
||||
|
||||
assert!(self.start_index_column.get_val(row_start) as u32 <= ranks[0]);
|
||||
let mut write_cursor_id = usize::MAX;
|
||||
let mut last_written_row_id = u32::MAX;
|
||||
|
||||
let mut write_doc_pos = 0;
|
||||
for i in 0..ranks.len() {
|
||||
let pos = ranks[i];
|
||||
loop {
|
||||
let end = self.start_index_column.get_val(cur_doc + 1) as u32;
|
||||
if end > pos {
|
||||
ranks[write_doc_pos] = cur_doc;
|
||||
write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 };
|
||||
last_doc = Some(cur_doc);
|
||||
break;
|
||||
}
|
||||
cur_doc += 1;
|
||||
// We skip all of the ranks that we already passed.
|
||||
//
|
||||
// It is possible in the case of multivalued, for a the first
|
||||
// few rank to belong to the same row_id as the last rank
|
||||
// of the previous call.
|
||||
let start_bound = self
|
||||
.multivalued_index
|
||||
.start_index_column
|
||||
.get_val(row_cursor);
|
||||
|
||||
let mut skip = 0;
|
||||
while ranks[skip] < start_bound {
|
||||
skip += 1;
|
||||
if skip == ranks.len() {
|
||||
ranks.clear();
|
||||
return;
|
||||
}
|
||||
}
|
||||
ranks.truncate(write_doc_pos);
|
||||
|
||||
for i in skip..ranks.len() {
|
||||
let rank = ranks[i];
|
||||
let row_id = loop {
|
||||
// TODO See if we can find a way to introduce a function in
|
||||
// ColumnValue to remove dynamic dispatch.
|
||||
// This is tricky however... because it only applies to T=u32.
|
||||
//
|
||||
// TODO consider using exponential search.
|
||||
let end = self
|
||||
.multivalued_index
|
||||
.start_index_column
|
||||
.get_val(row_cursor + 1) as u32;
|
||||
if end > rank {
|
||||
break row_cursor;
|
||||
}
|
||||
row_cursor += 1;
|
||||
};
|
||||
// We remove duplicates in a branchless fashion: we only advance
|
||||
// the write cursor when we are writing a value different from
|
||||
// the last written value.
|
||||
write_cursor_id =
|
||||
write_cursor_id.wrapping_add(if row_id == last_written_row_id { 0 } else { 1 });
|
||||
ranks[write_cursor_id] = row_id;
|
||||
last_written_row_id = row_id;
|
||||
}
|
||||
|
||||
self.row_cursor = row_cursor + 1;
|
||||
ranks.truncate(write_cursor_id + 1);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::MultiValueIndex;
|
||||
use crate::column_values::IterColumn;
|
||||
use crate::{ColumnValues, RowId};
|
||||
use proptest::prelude::*;
|
||||
|
||||
fn index_to_pos_helper(
|
||||
index: &MultiValueIndex,
|
||||
doc_id_range: Range<u32>,
|
||||
positions: &[u32],
|
||||
) -> Vec<u32> {
|
||||
fn index_to_pos_helper(index: &MultiValueIndex, positions: &[u32]) -> Vec<u32> {
|
||||
let mut positions = positions.to_vec();
|
||||
index.select_batch_in_place(doc_id_range.start, &mut positions);
|
||||
let mut cursor = index.select_cursor();
|
||||
cursor.select_batch_in_place(&mut positions);
|
||||
positions
|
||||
}
|
||||
|
||||
// Value row id ranges are [0..10, 10..12, 12..15, etc.]
|
||||
const START_OFFSETS: &[RowId] = &[0, 10, 12, 15, 22, 23];
|
||||
|
||||
#[track_caller]
|
||||
fn test_multivalue_select_cursor_aux(
|
||||
start_offsets: &'static [RowId],
|
||||
ranks: &[RowId],
|
||||
expected: &[RowId],
|
||||
) {
|
||||
let column: Arc<dyn ColumnValues<RowId>> =
|
||||
Arc::new(IterColumn::from(start_offsets.iter().copied()));
|
||||
let index = MultiValueIndex {
|
||||
start_index_column: column,
|
||||
};
|
||||
assert_eq!(&index_to_pos_helper(&index, &ranks), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_positions_to_docid() {
|
||||
let offsets: Vec<RowId> = vec![0, 10, 12, 15, 22, 23]; // docid values are [0..10, 10..12, 12..15, etc.]
|
||||
let column: Arc<dyn ColumnValues<RowId>> = Arc::new(IterColumn::from(offsets.into_iter()));
|
||||
let index = MultiValueIndex::from(column);
|
||||
assert_eq!(index.num_rows(), 5);
|
||||
let positions = &[10u32, 11, 15, 20, 21, 22];
|
||||
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);
|
||||
assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]);
|
||||
assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]);
|
||||
assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]);
|
||||
assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]);
|
||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12]), vec![2]);
|
||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]);
|
||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]);
|
||||
fn test_multivalue_select_cursor_empty() {
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[], &[]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_select_cursor_single() {
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[9], &[0]);
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[10], &[1]);
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[12], &[2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_select_cursor_duplicates() {
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[12, 14], &[2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_select_cursor_complex() {
|
||||
test_multivalue_select_cursor_aux(START_OFFSETS, &[10, 11, 15, 20, 21, 22], &[1, 3, 4])
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_select_corner_case_skip_all() {
|
||||
let column: Arc<dyn ColumnValues<RowId>> =
|
||||
Arc::new(IterColumn::from([0, 10].into_iter()));
|
||||
let index = MultiValueIndex {
|
||||
start_index_column: column,
|
||||
};
|
||||
let mut cursor = index.select_cursor();
|
||||
{
|
||||
let mut ranks = vec![0];
|
||||
cursor.select_batch_in_place(&mut ranks);
|
||||
assert_eq!(ranks, &[0]);
|
||||
}
|
||||
{
|
||||
let mut ranks = vec![5];
|
||||
cursor.select_batch_in_place(&mut ranks);
|
||||
assert_eq!(ranks, &[]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multi_value_index_cursor_bug() {
|
||||
let column: Arc<dyn ColumnValues<RowId>> =
|
||||
Arc::new(IterColumn::from([0, 10].into_iter()));
|
||||
let index = MultiValueIndex {
|
||||
start_index_column: column,
|
||||
};
|
||||
let mut cursor = index.select_cursor();
|
||||
{
|
||||
let mut ranks = vec![0];
|
||||
cursor.select_batch_in_place(&mut ranks);
|
||||
assert_eq!(ranks, &[0]);
|
||||
}
|
||||
{
|
||||
let mut ranks = vec![4];
|
||||
cursor.select_batch_in_place(&mut ranks);
|
||||
assert_eq!(ranks, &[]);
|
||||
}
|
||||
{
|
||||
let mut ranks = vec![9];
|
||||
cursor.select_batch_in_place(&mut ranks);
|
||||
assert_eq!(ranks, &[]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multivalue_select_cursor_skip_already_emitted() {
|
||||
let column: Arc<dyn ColumnValues<RowId>> =
|
||||
Arc::new(IterColumn::from(START_OFFSETS.iter().copied()));
|
||||
let index = MultiValueIndex {
|
||||
start_index_column: column,
|
||||
};
|
||||
let mut cursor = index.select_cursor();
|
||||
{
|
||||
let mut ranks = vec![1, 10];
|
||||
cursor.select_batch_in_place(&mut ranks);
|
||||
assert_eq!(ranks, &[0, 1]);
|
||||
}
|
||||
{
|
||||
// Here we skip row_id = 1.
|
||||
let mut ranks = vec![11, 12];
|
||||
cursor.select_batch_in_place(&mut ranks);
|
||||
assert_eq!(ranks, &[2]);
|
||||
}
|
||||
}
|
||||
|
||||
fn start_index_strategy() -> impl Strategy<Value = Vec<RowId>> {
|
||||
proptest::collection::vec(0u32..3u32, 1..6)
|
||||
.prop_map(|deltas: Vec<u32>| {
|
||||
let mut start_offsets: Vec<RowId> = Vec::with_capacity(deltas.len() + 1);
|
||||
let mut cumul = 0u32;
|
||||
start_offsets.push(cumul);
|
||||
for delta in deltas {
|
||||
cumul += delta;
|
||||
if cumul >= 10 {
|
||||
break;
|
||||
}
|
||||
start_offsets.push(cumul);
|
||||
}
|
||||
start_offsets.push(10);
|
||||
start_offsets
|
||||
})
|
||||
}
|
||||
|
||||
fn query_strategy() -> impl Strategy<Value = Vec<Vec<RowId>> > {
|
||||
proptest::collection::btree_set(0u32..10u32, 1..=10)
|
||||
.prop_flat_map(|els| {
|
||||
let els: Vec<RowId> = els.into_iter().collect();
|
||||
proptest::collection::btree_set(0..els.len(), 0..els.len())
|
||||
.prop_map(move |mut split_positions| {
|
||||
split_positions.insert(els.len());
|
||||
let mut queries: Vec<Vec<RowId>> = Vec::with_capacity(split_positions.len() + 1);
|
||||
let mut cursor = 0;
|
||||
for split_position in split_positions {
|
||||
queries.push(els[cursor..split_position].to_vec());
|
||||
cursor = split_position;
|
||||
}
|
||||
queries
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// Simple inefficient implementation used for reference.
|
||||
struct SimpleSelectCursor {
|
||||
start_indexes: Vec<RowId>,
|
||||
last_emitted_row_id: Option<RowId>,
|
||||
}
|
||||
|
||||
impl SimpleSelectCursor {
|
||||
fn select(&self, rank: u32) -> RowId {
|
||||
for i in 0..self.start_indexes.len() - 1 {
|
||||
if self.start_indexes[i] <= rank && self.start_indexes[i + 1] > rank{
|
||||
return i as u32;
|
||||
}
|
||||
}
|
||||
panic!();
|
||||
}
|
||||
|
||||
fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
|
||||
if ranks.is_empty() {
|
||||
return;
|
||||
}
|
||||
for rank in ranks.iter_mut() {
|
||||
*rank = self.select(*rank);
|
||||
}
|
||||
ranks.dedup();
|
||||
if ranks.first().copied() == self.last_emitted_row_id {
|
||||
ranks.remove(0);
|
||||
}
|
||||
if let Some(last_emitted) = ranks.last().copied() {
|
||||
self.last_emitted_row_id = Some(last_emitted);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_multi_value_index_cursor_proptest(start_indexes in start_index_strategy(), mut queries in query_strategy()) {
|
||||
let mut simple_select_cursor = SimpleSelectCursor {
|
||||
start_indexes: start_indexes.clone(),
|
||||
last_emitted_row_id: None
|
||||
};
|
||||
let column: Arc<dyn ColumnValues<RowId>> =
|
||||
Arc::new(IterColumn::from(start_indexes.into_iter()));
|
||||
let index = MultiValueIndex { start_index_column: column };
|
||||
let mut select_cursor = index.select_cursor();
|
||||
for query in queries.iter_mut() {
|
||||
let mut query_clone = query.clone();
|
||||
select_cursor.select_batch_in_place(query);
|
||||
simple_select_cursor.select_batch_in_place(&mut query_clone);
|
||||
assert_eq!(&query[..], &query_clone[..]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,17 +118,17 @@ impl<'a> BlockSelectCursor<'a> {
|
||||
}
|
||||
}
|
||||
}
|
||||
pub struct OptionalIndexSelectCursor<'a> {
|
||||
current_block_cursor: BlockSelectCursor<'a>,
|
||||
pub struct OptionalIndexSelectCursor {
|
||||
current_block_cursor: BlockSelectCursor<'static>,
|
||||
current_block_id: u16,
|
||||
// The current block is guaranteed to contain ranks < end_rank.
|
||||
current_block_end_rank: RowId,
|
||||
optional_index: &'a OptionalIndex,
|
||||
optional_index: OptionalIndex,
|
||||
block_doc_idx_start: RowId,
|
||||
num_null_rows_before_block: RowId,
|
||||
}
|
||||
|
||||
impl<'a> OptionalIndexSelectCursor<'a> {
|
||||
impl OptionalIndexSelectCursor {
|
||||
fn search_and_load_block(&mut self, rank: RowId) {
|
||||
if rank < self.current_block_end_rank {
|
||||
// we are already in the right block
|
||||
@@ -145,14 +145,23 @@ impl<'a> OptionalIndexSelectCursor<'a> {
|
||||
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
|
||||
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
|
||||
let block: Block<'_> = self.optional_index.block(block_meta);
|
||||
self.current_block_cursor = match block {
|
||||
let current_block_cursor = match block {
|
||||
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
|
||||
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
|
||||
};
|
||||
// We are building a self-owned `OptionalIndexSelectCursor`.
|
||||
self.current_block_cursor = unsafe { std::mem::transmute(current_block_cursor) };
|
||||
}
|
||||
|
||||
pub fn select_batch_in_place(&mut self, ranks: &mut [RowId]) {
|
||||
// TODO see if we can batch at the block level as well for optimization purposes.
|
||||
for rank in ranks {
|
||||
*rank = self.select(*rank);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
||||
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor {
|
||||
fn select(&mut self, rank: RowId) -> RowId {
|
||||
self.search_and_load_block(rank);
|
||||
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
|
||||
@@ -161,7 +170,7 @@ impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
||||
}
|
||||
|
||||
impl Set<RowId> for OptionalIndex {
|
||||
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
|
||||
type SelectCursor<'a> = OptionalIndexSelectCursor;
|
||||
// Check if value at position is not null.
|
||||
#[inline]
|
||||
fn contains(&self, row_id: RowId) -> bool {
|
||||
@@ -220,14 +229,14 @@ impl Set<RowId> for OptionalIndex {
|
||||
block_doc_idx_start + in_block_rank as u32
|
||||
}
|
||||
|
||||
fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> {
|
||||
fn select_cursor(&self) -> OptionalIndexSelectCursor {
|
||||
OptionalIndexSelectCursor {
|
||||
current_block_cursor: BlockSelectCursor::Sparse(
|
||||
SparseBlockCodec::open(b"").select_cursor(),
|
||||
),
|
||||
current_block_id: 0u16,
|
||||
current_block_end_rank: 0u32, //< this is sufficient to force the first load
|
||||
optional_index: self,
|
||||
optional_index: self.clone(),
|
||||
block_doc_idx_start: 0u32,
|
||||
num_null_rows_before_block: 0u32,
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::sync::Arc;
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
|
||||
use crate::column_index::MultiValueIndex;
|
||||
use crate::column_values::monotonic_mapping::{
|
||||
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
||||
};
|
||||
|
||||
@@ -10,7 +10,7 @@ extern crate test;
|
||||
use std::io;
|
||||
|
||||
mod column;
|
||||
mod column_index;
|
||||
pub mod column_index;
|
||||
pub mod column_values;
|
||||
mod columnar;
|
||||
mod dictionary;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use core::fmt::Debug;
|
||||
use std::ops::RangeInclusive;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::Column;
|
||||
use columnar::column_index::ColumnIndexSelectCursor;
|
||||
use columnar::{Column, ColumnValues};
|
||||
|
||||
use crate::fastfield::MakeZero;
|
||||
use crate::{DocId, DocSet, TERMINATED};
|
||||
@@ -43,7 +45,9 @@ impl VecCursor {
|
||||
pub(crate) struct RangeDocSet<T: MakeZero> {
|
||||
/// The range filter on the values.
|
||||
value_range: RangeInclusive<T>,
|
||||
column: Column<T>,
|
||||
column_index_select_cursor: ColumnIndexSelectCursor,
|
||||
column_values: Arc<dyn ColumnValues<T>>,
|
||||
|
||||
/// The next docid start range to fetch (inclusive).
|
||||
next_fetch_start: u32,
|
||||
/// Number of docs range checked in a batch.
|
||||
@@ -63,13 +67,15 @@ pub(crate) struct RangeDocSet<T: MakeZero> {
|
||||
const DEFAULT_FETCH_HORIZON: u32 = 128;
|
||||
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
|
||||
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
|
||||
let column_index_select_cursor = column.select_cursor();
|
||||
let mut range_docset = Self {
|
||||
value_range,
|
||||
column,
|
||||
column_values: column.values,
|
||||
loaded_docs: VecCursor::new(),
|
||||
next_fetch_start: 0,
|
||||
fetch_horizon: DEFAULT_FETCH_HORIZON,
|
||||
last_seek_pos_opt: None,
|
||||
column_index_select_cursor,
|
||||
};
|
||||
range_docset.reset_fetch_range();
|
||||
range_docset.fetch_block();
|
||||
@@ -106,26 +112,21 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSe
|
||||
fn fetch_horizon(&mut self, horizon: u32) -> bool {
|
||||
let mut finished_to_end = false;
|
||||
|
||||
let limit = self.column.values.num_vals();
|
||||
let limit = self.column_values.num_vals();
|
||||
let mut end = self.next_fetch_start + horizon;
|
||||
if end >= limit {
|
||||
end = limit;
|
||||
finished_to_end = true;
|
||||
}
|
||||
|
||||
let last_value = self.loaded_docs.last_value();
|
||||
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
|
||||
self.column.values.get_docids_for_value_range(
|
||||
self.column_values.get_docids_for_value_range(
|
||||
self.value_range.clone(),
|
||||
self.next_fetch_start..end,
|
||||
doc_buffer,
|
||||
);
|
||||
self.column.idx.select_batch_in_place(doc_buffer);
|
||||
if let Some(last_value) = last_value {
|
||||
while self.loaded_docs.current() == Some(last_value) {
|
||||
self.loaded_docs.next();
|
||||
}
|
||||
}
|
||||
self.column_index_select_cursor
|
||||
.select_batch_in_place(doc_buffer);
|
||||
self.next_fetch_start = end;
|
||||
|
||||
finished_to_end
|
||||
@@ -138,7 +139,7 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for
|
||||
if let Some(docid) = self.loaded_docs.next() {
|
||||
return docid;
|
||||
}
|
||||
if self.next_fetch_start >= self.column.values.num_vals() {
|
||||
if self.next_fetch_start >= self.column_values.num_vals() {
|
||||
return TERMINATED;
|
||||
}
|
||||
self.fetch_block();
|
||||
|
||||
Reference in New Issue
Block a user