mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 16:22:55 +00:00
Compare commits
1 Commits
remove_dyn
...
fast-u64-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d30cafa80a |
@@ -22,6 +22,11 @@ tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
|
|||||||
proptest = "1"
|
proptest = "1"
|
||||||
more-asserts = "0.3.1"
|
more-asserts = "0.3.1"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
|
criterion = "0.4"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
unstable = []
|
unstable = []
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "bench_index"
|
||||||
|
harness = false
|
||||||
|
|||||||
91
columnar/benches/bench_index.rs
Normal file
91
columnar/benches/bench_index.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
use std::ops::Range;
|
||||||
|
|
||||||
|
use criterion::*;
|
||||||
|
use rand::prelude::*;
|
||||||
|
use tantivy_columnar::column_index::MultiValueIndex;
|
||||||
|
use tantivy_columnar::RowId;
|
||||||
|
|
||||||
|
const WINDOW: usize = 40;
|
||||||
|
|
||||||
|
fn bench_multi_value_index_util(
|
||||||
|
len_range: Range<u32>,
|
||||||
|
num_rows: RowId,
|
||||||
|
select_value_ratio: f64,
|
||||||
|
b: &mut criterion::Bencher,
|
||||||
|
) {
|
||||||
|
let mut start_index: Vec<RowId> = vec![0u32];
|
||||||
|
let mut cursor: u32 = 0u32;
|
||||||
|
let mut rng = StdRng::from_seed([16u8; 32]);
|
||||||
|
for i in 0..num_rows {
|
||||||
|
let num_vals = rng.gen_range(len_range.clone());
|
||||||
|
cursor += num_vals;
|
||||||
|
start_index.push(cursor);
|
||||||
|
}
|
||||||
|
let select_rows: Vec<RowId> = (0u32..cursor)
|
||||||
|
.filter(|i| rng.gen_bool(select_value_ratio))
|
||||||
|
.collect();
|
||||||
|
let mv_index = MultiValueIndex::for_test(&start_index);
|
||||||
|
|
||||||
|
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
b.iter(|| {
|
||||||
|
let mut start_row = 0u32;
|
||||||
|
let mut len = 0;
|
||||||
|
for chunk in select_rows.chunks(WINDOW) {
|
||||||
|
buffer.clear();
|
||||||
|
buffer.extend_from_slice(chunk);
|
||||||
|
mv_index.select_batch_in_place(start_row, &mut buffer);
|
||||||
|
start_row = buffer.last().copied().unwrap();
|
||||||
|
len += buffer.len()
|
||||||
|
}
|
||||||
|
assert_eq!(len, 4303);
|
||||||
|
len
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bench_multi_value_index_util2(
|
||||||
|
len_range: Range<u32>,
|
||||||
|
num_rows: RowId,
|
||||||
|
select_value_ratio: f64,
|
||||||
|
b: &mut criterion::Bencher,
|
||||||
|
) {
|
||||||
|
let mut start_index: Vec<RowId> = vec![0u32];
|
||||||
|
let mut cursor: u32 = 0u32;
|
||||||
|
let mut rng = StdRng::from_seed([16u8; 32]);
|
||||||
|
for i in 0..num_rows {
|
||||||
|
let num_vals = rng.gen_range(len_range.clone());
|
||||||
|
cursor += num_vals;
|
||||||
|
start_index.push(cursor);
|
||||||
|
}
|
||||||
|
let select_rows: Vec<RowId> = (0u32..cursor)
|
||||||
|
.filter(|i| rng.gen_bool(select_value_ratio))
|
||||||
|
.collect();
|
||||||
|
let mv_index = MultiValueIndex::for_test(&start_index);
|
||||||
|
|
||||||
|
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
b.iter(|| {
|
||||||
|
let mut mv_index_cursor = mv_index.select_cursor();
|
||||||
|
let mut len = 0;
|
||||||
|
for chunk in select_rows.chunks(WINDOW) {
|
||||||
|
buffer.clear();
|
||||||
|
buffer.extend_from_slice(chunk);
|
||||||
|
mv_index_cursor.select_batch_in_place(&mut buffer);
|
||||||
|
len += buffer.len();
|
||||||
|
}
|
||||||
|
assert_eq!(len, 4303);
|
||||||
|
len
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn select_benchmark(c: &mut criterion::Criterion) {
|
||||||
|
c.bench_function("bench_multi_value_index_10_100", |b| {
|
||||||
|
bench_multi_value_index_util(0..10, 100_000, 0.01f64, b)
|
||||||
|
});
|
||||||
|
c.bench_function("bench_multi_value_cursor_index_10_100", |b| {
|
||||||
|
bench_multi_value_index_util2(0..10, 100_000, 0.01f64, b)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
criterion_group!(benches, select_benchmark);
|
||||||
|
criterion_main!(benches);
|
||||||
@@ -11,7 +11,6 @@
|
|||||||
|
|
||||||
# Perf and Size
|
# Perf and Size
|
||||||
* remove alloc in `ord_to_term`
|
* remove alloc in `ord_to_term`
|
||||||
+ multivaued range queries restrat frm the beginning all of the time.
|
|
||||||
* re-add ZSTD compression for dictionaries
|
* re-add ZSTD compression for dictionaries
|
||||||
no systematic monotonic mapping
|
no systematic monotonic mapping
|
||||||
consider removing multilinear
|
consider removing multilinear
|
||||||
|
|||||||
@@ -9,9 +9,90 @@ pub use merge::merge_column_index;
|
|||||||
pub use optional_index::{OptionalIndex, Set};
|
pub use optional_index::{OptionalIndex, Set};
|
||||||
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
|
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
|
||||||
|
|
||||||
use crate::column_index::multivalued_index::MultiValueIndex;
|
pub use crate::column_index::multivalued_index::{MultiValueIndex, MultiValueIndexCursor};
|
||||||
|
use crate::column_index::optional_index::OptionalIndexSelectCursor;
|
||||||
use crate::{Cardinality, RowId};
|
use crate::{Cardinality, RowId};
|
||||||
|
|
||||||
|
pub struct ColumnIndexSelectCursor {
|
||||||
|
last_rank: Option<RowId>,
|
||||||
|
cardinality_specific_impl: CardinalitySpecificSelectCursor,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<CardinalitySpecificSelectCursor> for ColumnIndexSelectCursor {
|
||||||
|
fn from(cardinality_specific_impl: CardinalitySpecificSelectCursor) -> Self {
|
||||||
|
ColumnIndexSelectCursor {
|
||||||
|
last_rank: None,
|
||||||
|
cardinality_specific_impl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum CardinalitySpecificSelectCursor {
|
||||||
|
Full,
|
||||||
|
Optional(OptionalIndexSelectCursor),
|
||||||
|
Multivalued(MultiValueIndexCursor),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This cursor object point is to compute batches of `select` operations.
|
||||||
|
///
|
||||||
|
/// Regardless of cardinality, a column index can always be seen as a mapping
|
||||||
|
/// from row_id -> start_value_row_id. By definition, it is increasing.
|
||||||
|
/// If `left <= right, column_index[left] <= column_index[right]`.
|
||||||
|
///
|
||||||
|
/// The select operation then identifies, given a value row id, which row it
|
||||||
|
/// belong to: it is the inverse mapping.
|
||||||
|
///
|
||||||
|
/// As a more formal definition, `select(rank)` is defined as the only `i` such that
|
||||||
|
/// mapping[i] <= rank and mapping[i+1] < rank.
|
||||||
|
/// Another way to define it is to say that it is the last i such that
|
||||||
|
/// mapping[i] <= rank.
|
||||||
|
/// Finally it can be defined as the number of `row_id` such that
|
||||||
|
/// mapping[i] <= rank.
|
||||||
|
///
|
||||||
|
/// `select_batch_in_place` is a complex function that copmutes
|
||||||
|
/// select operation in batches and in place.
|
||||||
|
///
|
||||||
|
/// For optimization reasons, it only supports supplying ever striclty increasing
|
||||||
|
/// values of `rank_ids`, even cross calls.
|
||||||
|
///
|
||||||
|
/// It is also required from the caller, to only supply rank_ids lower than max(mapping).
|
||||||
|
/// Within those condition, the returned `row_ids` are guaranteed to be unique.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics if the supplied rank_ids are not increasing from one call to another.
|
||||||
|
/// We only check that the `rank_ids` Vec is increasing in debug mode for
|
||||||
|
/// performance reason.
|
||||||
|
impl ColumnIndexSelectCursor {
|
||||||
|
/// Returns a list of
|
||||||
|
pub fn select_batch_in_place(&mut self, rank_ids: &mut Vec<RowId>) {
|
||||||
|
// `rank_ids` has to be sorted.
|
||||||
|
debug_assert!(rank_ids.windows(2).all(|window| window[0] < window[1]));
|
||||||
|
// Two consecutive calls must pass strictly increasing `rank_ids`.
|
||||||
|
let (Some(first_rank), Some(new_last_rank)) = (rank_ids.first().copied(), rank_ids.last().copied()) else {
|
||||||
|
// rank_ids is empty, there is nothing to do.
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if let Some(last_rank) = self.last_rank {
|
||||||
|
assert!(last_rank < first_rank);
|
||||||
|
}
|
||||||
|
self.last_rank = Some(new_last_rank);
|
||||||
|
match &mut self.cardinality_specific_impl {
|
||||||
|
CardinalitySpecificSelectCursor::Full => {
|
||||||
|
// No need to do anything:
|
||||||
|
// `value_idx` and `row_idx` are the same.
|
||||||
|
}
|
||||||
|
CardinalitySpecificSelectCursor::Optional(optional_index) => {
|
||||||
|
optional_index.select_batch_in_place(&mut rank_ids[..]);
|
||||||
|
}
|
||||||
|
CardinalitySpecificSelectCursor::Multivalued(multivalued_index) => {
|
||||||
|
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
|
||||||
|
multivalued_index.select_batch_in_place(rank_ids)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum ColumnIndex {
|
pub enum ColumnIndex {
|
||||||
Full,
|
Full,
|
||||||
@@ -67,18 +148,15 @@ impl ColumnIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn select_batch_in_place(&self, rank_ids: &mut Vec<RowId>) {
|
pub fn select_cursor(&self) -> ColumnIndexSelectCursor {
|
||||||
match self {
|
match self {
|
||||||
ColumnIndex::Full => {
|
ColumnIndex::Full => CardinalitySpecificSelectCursor::Full.into(),
|
||||||
// No need to do anything:
|
|
||||||
// value_idx and row_idx are the same.
|
|
||||||
}
|
|
||||||
ColumnIndex::Optional(optional_index) => {
|
ColumnIndex::Optional(optional_index) => {
|
||||||
optional_index.select_batch(&mut rank_ids[..]);
|
CardinalitySpecificSelectCursor::Optional(optional_index.select_cursor()).into()
|
||||||
}
|
}
|
||||||
ColumnIndex::Multivalued(multivalued_index) => {
|
ColumnIndex::Multivalued(multivalued_index) => {
|
||||||
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
|
CardinalitySpecificSelectCursor::Multivalued(multivalued_index.select_cursor())
|
||||||
multivalued_index.select_batch_in_place(0u32, rank_ids)
|
.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,12 +35,6 @@ pub struct MultiValueIndex {
|
|||||||
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
|
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
|
|
||||||
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
|
|
||||||
MultiValueIndex { start_index_column }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MultiValueIndex {
|
impl MultiValueIndex {
|
||||||
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
|
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
@@ -64,78 +58,302 @@ impl MultiValueIndex {
|
|||||||
self.start_index_column.num_vals() - 1
|
self.start_index_column.num_vals() - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
|
pub fn select_cursor(&self) -> MultiValueIndexCursor {
|
||||||
/// row_ids. Positions are converted inplace to docids.
|
MultiValueIndexCursor {
|
||||||
|
multivalued_index: self.clone(),
|
||||||
|
row_cursor: 0u32,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MultiValueIndexCursor {
|
||||||
|
multivalued_index: MultiValueIndex,
|
||||||
|
row_cursor: RowId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MultiValueIndexCursor {
|
||||||
|
/// See contract in `ColumnIndexSelectCursor`.
|
||||||
///
|
///
|
||||||
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
|
/// Multi valued cardinality is special for two different
|
||||||
/// index.
|
/// ranks `rank_left` and `rank_right`, we can end up with
|
||||||
|
/// the same `select(rank_left)` and `select(rank_right)`.
|
||||||
///
|
///
|
||||||
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
|
/// For this reason, this function includes extra complexity
|
||||||
/// increasing positions.
|
/// to prevent the cursor from emitting the same row_id.
|
||||||
///
|
/// - From a last call, by skipping ranks mapping to
|
||||||
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
|
/// the same row_id
|
||||||
/// match a docid to its value position.
|
/// - With the batch, by simply deduplicating the output.
|
||||||
#[allow(clippy::bool_to_int_with_if)]
|
pub fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
|
||||||
pub(crate) fn select_batch_in_place(&self, row_start: RowId, ranks: &mut Vec<u32>) {
|
|
||||||
if ranks.is_empty() {
|
if ranks.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let mut cur_doc = row_start;
|
let mut row_cursor = self.row_cursor;
|
||||||
let mut last_doc = None;
|
|
||||||
|
|
||||||
assert!(self.start_index_column.get_val(row_start) as u32 <= ranks[0]);
|
let mut write_cursor_id = usize::MAX;
|
||||||
|
let mut last_written_row_id = u32::MAX;
|
||||||
|
|
||||||
let mut write_doc_pos = 0;
|
// We skip all of the ranks that we already passed.
|
||||||
for i in 0..ranks.len() {
|
//
|
||||||
let pos = ranks[i];
|
// It is possible in the case of multivalued, for a the first
|
||||||
loop {
|
// few rank to belong to the same row_id as the last rank
|
||||||
let end = self.start_index_column.get_val(cur_doc + 1) as u32;
|
// of the previous call.
|
||||||
if end > pos {
|
let start_bound = self
|
||||||
ranks[write_doc_pos] = cur_doc;
|
.multivalued_index
|
||||||
write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 };
|
.start_index_column
|
||||||
last_doc = Some(cur_doc);
|
.get_val(row_cursor);
|
||||||
break;
|
|
||||||
}
|
let mut skip = 0;
|
||||||
cur_doc += 1;
|
while ranks[skip] < start_bound {
|
||||||
|
skip += 1;
|
||||||
|
if skip == ranks.len() {
|
||||||
|
ranks.clear();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ranks.truncate(write_doc_pos);
|
|
||||||
|
for i in skip..ranks.len() {
|
||||||
|
let rank = ranks[i];
|
||||||
|
let row_id = loop {
|
||||||
|
// TODO See if we can find a way to introduce a function in
|
||||||
|
// ColumnValue to remove dynamic dispatch.
|
||||||
|
// This is tricky however... because it only applies to T=u32.
|
||||||
|
//
|
||||||
|
// TODO consider using exponential search.
|
||||||
|
let end = self
|
||||||
|
.multivalued_index
|
||||||
|
.start_index_column
|
||||||
|
.get_val(row_cursor + 1) as u32;
|
||||||
|
if end > rank {
|
||||||
|
break row_cursor;
|
||||||
|
}
|
||||||
|
row_cursor += 1;
|
||||||
|
};
|
||||||
|
// We remove duplicates in a branchless fashion: we only advance
|
||||||
|
// the write cursor when we are writing a value different from
|
||||||
|
// the last written value.
|
||||||
|
write_cursor_id =
|
||||||
|
write_cursor_id.wrapping_add(if row_id == last_written_row_id { 0 } else { 1 });
|
||||||
|
ranks[write_cursor_id] = row_id;
|
||||||
|
last_written_row_id = row_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.row_cursor = row_cursor + 1;
|
||||||
|
ranks.truncate(write_cursor_id + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::ops::Range;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::MultiValueIndex;
|
use super::MultiValueIndex;
|
||||||
use crate::column_values::IterColumn;
|
use crate::column_values::IterColumn;
|
||||||
use crate::{ColumnValues, RowId};
|
use crate::{ColumnValues, RowId};
|
||||||
|
use proptest::prelude::*;
|
||||||
|
|
||||||
fn index_to_pos_helper(
|
fn index_to_pos_helper(index: &MultiValueIndex, positions: &[u32]) -> Vec<u32> {
|
||||||
index: &MultiValueIndex,
|
|
||||||
doc_id_range: Range<u32>,
|
|
||||||
positions: &[u32],
|
|
||||||
) -> Vec<u32> {
|
|
||||||
let mut positions = positions.to_vec();
|
let mut positions = positions.to_vec();
|
||||||
index.select_batch_in_place(doc_id_range.start, &mut positions);
|
let mut cursor = index.select_cursor();
|
||||||
|
cursor.select_batch_in_place(&mut positions);
|
||||||
positions
|
positions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Value row id ranges are [0..10, 10..12, 12..15, etc.]
|
||||||
|
const START_OFFSETS: &[RowId] = &[0, 10, 12, 15, 22, 23];
|
||||||
|
|
||||||
|
#[track_caller]
|
||||||
|
fn test_multivalue_select_cursor_aux(
|
||||||
|
start_offsets: &'static [RowId],
|
||||||
|
ranks: &[RowId],
|
||||||
|
expected: &[RowId],
|
||||||
|
) {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from(start_offsets.iter().copied()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
assert_eq!(&index_to_pos_helper(&index, &ranks), expected);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_positions_to_docid() {
|
fn test_multivalue_select_cursor_empty() {
|
||||||
let offsets: Vec<RowId> = vec![0, 10, 12, 15, 22, 23]; // docid values are [0..10, 10..12, 12..15, etc.]
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[], &[]);
|
||||||
let column: Arc<dyn ColumnValues<RowId>> = Arc::new(IterColumn::from(offsets.into_iter()));
|
}
|
||||||
let index = MultiValueIndex::from(column);
|
|
||||||
assert_eq!(index.num_rows(), 5);
|
#[test]
|
||||||
let positions = &[10u32, 11, 15, 20, 21, 22];
|
fn test_multivalue_select_cursor_single() {
|
||||||
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[9], &[0]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[10], &[1]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[12], &[2]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12]), vec![2]);
|
}
|
||||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]);
|
|
||||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]);
|
#[test]
|
||||||
|
fn test_multivalue_select_cursor_duplicates() {
|
||||||
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[12, 14], &[2]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multivalue_select_cursor_complex() {
|
||||||
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[10, 11, 15, 20, 21, 22], &[1, 3, 4])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multivalue_select_corner_case_skip_all() {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from([0, 10].into_iter()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
let mut cursor = index.select_cursor();
|
||||||
|
{
|
||||||
|
let mut ranks = vec![0];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[0]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut ranks = vec![5];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multi_value_index_cursor_bug() {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from([0, 10].into_iter()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
let mut cursor = index.select_cursor();
|
||||||
|
{
|
||||||
|
let mut ranks = vec![0];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[0]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut ranks = vec![4];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut ranks = vec![9];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multivalue_select_cursor_skip_already_emitted() {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from(START_OFFSETS.iter().copied()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
let mut cursor = index.select_cursor();
|
||||||
|
{
|
||||||
|
let mut ranks = vec![1, 10];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[0, 1]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
// Here we skip row_id = 1.
|
||||||
|
let mut ranks = vec![11, 12];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[2]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_index_strategy() -> impl Strategy<Value = Vec<RowId>> {
|
||||||
|
proptest::collection::vec(0u32..3u32, 1..6)
|
||||||
|
.prop_map(|deltas: Vec<u32>| {
|
||||||
|
let mut start_offsets: Vec<RowId> = Vec::with_capacity(deltas.len() + 1);
|
||||||
|
let mut cumul = 0u32;
|
||||||
|
start_offsets.push(cumul);
|
||||||
|
for delta in deltas {
|
||||||
|
cumul += delta;
|
||||||
|
if cumul >= 10 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
start_offsets.push(cumul);
|
||||||
|
}
|
||||||
|
start_offsets.push(10);
|
||||||
|
start_offsets
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_strategy() -> impl Strategy<Value = Vec<Vec<RowId>> > {
|
||||||
|
proptest::collection::btree_set(0u32..10u32, 1..=10)
|
||||||
|
.prop_flat_map(|els| {
|
||||||
|
let els: Vec<RowId> = els.into_iter().collect();
|
||||||
|
proptest::collection::btree_set(0..els.len(), 0..els.len())
|
||||||
|
.prop_map(move |mut split_positions| {
|
||||||
|
split_positions.insert(els.len());
|
||||||
|
let mut queries: Vec<Vec<RowId>> = Vec::with_capacity(split_positions.len() + 1);
|
||||||
|
let mut cursor = 0;
|
||||||
|
for split_position in split_positions {
|
||||||
|
queries.push(els[cursor..split_position].to_vec());
|
||||||
|
cursor = split_position;
|
||||||
|
}
|
||||||
|
queries
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Simple inefficient implementation used for reference.
|
||||||
|
struct SimpleSelectCursor {
|
||||||
|
start_indexes: Vec<RowId>,
|
||||||
|
last_emitted_row_id: Option<RowId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SimpleSelectCursor {
|
||||||
|
fn select(&self, rank: u32) -> RowId {
|
||||||
|
for i in 0..self.start_indexes.len() - 1 {
|
||||||
|
if self.start_indexes[i] <= rank && self.start_indexes[i + 1] > rank{
|
||||||
|
return i as u32;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
|
||||||
|
if ranks.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for rank in ranks.iter_mut() {
|
||||||
|
*rank = self.select(*rank);
|
||||||
|
}
|
||||||
|
ranks.dedup();
|
||||||
|
if ranks.first().copied() == self.last_emitted_row_id {
|
||||||
|
ranks.remove(0);
|
||||||
|
}
|
||||||
|
if let Some(last_emitted) = ranks.last().copied() {
|
||||||
|
self.last_emitted_row_id = Some(last_emitted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
proptest! {
|
||||||
|
#[test]
|
||||||
|
fn test_multi_value_index_cursor_proptest(start_indexes in start_index_strategy(), mut queries in query_strategy()) {
|
||||||
|
let mut simple_select_cursor = SimpleSelectCursor {
|
||||||
|
start_indexes: start_indexes.clone(),
|
||||||
|
last_emitted_row_id: None
|
||||||
|
};
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from(start_indexes.into_iter()));
|
||||||
|
let index = MultiValueIndex { start_index_column: column };
|
||||||
|
let mut select_cursor = index.select_cursor();
|
||||||
|
for query in queries.iter_mut() {
|
||||||
|
let mut query_clone = query.clone();
|
||||||
|
select_cursor.select_batch_in_place(query);
|
||||||
|
simple_select_cursor.select_batch_in_place(&mut query_clone);
|
||||||
|
assert_eq!(&query[..], &query_clone[..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -118,17 +118,17 @@ impl<'a> BlockSelectCursor<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub struct OptionalIndexSelectCursor<'a> {
|
pub struct OptionalIndexSelectCursor {
|
||||||
current_block_cursor: BlockSelectCursor<'a>,
|
current_block_cursor: BlockSelectCursor<'static>,
|
||||||
current_block_id: u16,
|
current_block_id: u16,
|
||||||
// The current block is guaranteed to contain ranks < end_rank.
|
// The current block is guaranteed to contain ranks < end_rank.
|
||||||
current_block_end_rank: RowId,
|
current_block_end_rank: RowId,
|
||||||
optional_index: &'a OptionalIndex,
|
optional_index: OptionalIndex,
|
||||||
block_doc_idx_start: RowId,
|
block_doc_idx_start: RowId,
|
||||||
num_null_rows_before_block: RowId,
|
num_null_rows_before_block: RowId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> OptionalIndexSelectCursor<'a> {
|
impl OptionalIndexSelectCursor {
|
||||||
fn search_and_load_block(&mut self, rank: RowId) {
|
fn search_and_load_block(&mut self, rank: RowId) {
|
||||||
if rank < self.current_block_end_rank {
|
if rank < self.current_block_end_rank {
|
||||||
// we are already in the right block
|
// we are already in the right block
|
||||||
@@ -145,14 +145,23 @@ impl<'a> OptionalIndexSelectCursor<'a> {
|
|||||||
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
|
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
|
||||||
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
|
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
|
||||||
let block: Block<'_> = self.optional_index.block(block_meta);
|
let block: Block<'_> = self.optional_index.block(block_meta);
|
||||||
self.current_block_cursor = match block {
|
let current_block_cursor = match block {
|
||||||
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
|
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
|
||||||
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
|
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
|
||||||
};
|
};
|
||||||
|
// We are building a self-owned `OptionalIndexSelectCursor`.
|
||||||
|
self.current_block_cursor = unsafe { std::mem::transmute(current_block_cursor) };
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn select_batch_in_place(&mut self, ranks: &mut [RowId]) {
|
||||||
|
// TODO see if we can batch at the block level as well for optimization purposes.
|
||||||
|
for rank in ranks {
|
||||||
|
*rank = self.select(*rank);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor {
|
||||||
fn select(&mut self, rank: RowId) -> RowId {
|
fn select(&mut self, rank: RowId) -> RowId {
|
||||||
self.search_and_load_block(rank);
|
self.search_and_load_block(rank);
|
||||||
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
|
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
|
||||||
@@ -161,7 +170,7 @@ impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Set<RowId> for OptionalIndex {
|
impl Set<RowId> for OptionalIndex {
|
||||||
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
|
type SelectCursor<'a> = OptionalIndexSelectCursor;
|
||||||
// Check if value at position is not null.
|
// Check if value at position is not null.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn contains(&self, row_id: RowId) -> bool {
|
fn contains(&self, row_id: RowId) -> bool {
|
||||||
@@ -220,14 +229,14 @@ impl Set<RowId> for OptionalIndex {
|
|||||||
block_doc_idx_start + in_block_rank as u32
|
block_doc_idx_start + in_block_rank as u32
|
||||||
}
|
}
|
||||||
|
|
||||||
fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> {
|
fn select_cursor(&self) -> OptionalIndexSelectCursor {
|
||||||
OptionalIndexSelectCursor {
|
OptionalIndexSelectCursor {
|
||||||
current_block_cursor: BlockSelectCursor::Sparse(
|
current_block_cursor: BlockSelectCursor::Sparse(
|
||||||
SparseBlockCodec::open(b"").select_cursor(),
|
SparseBlockCodec::open(b"").select_cursor(),
|
||||||
),
|
),
|
||||||
current_block_id: 0u16,
|
current_block_id: 0u16,
|
||||||
current_block_end_rank: 0u32, //< this is sufficient to force the first load
|
current_block_end_rank: 0u32, //< this is sufficient to force the first load
|
||||||
optional_index: self,
|
optional_index: self.clone(),
|
||||||
block_doc_idx_start: 0u32,
|
block_doc_idx_start: 0u32,
|
||||||
num_null_rows_before_block: 0u32,
|
num_null_rows_before_block: 0u32,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use common::{BinarySerializable, OwnedBytes};
|
use common::{BinarySerializable, OwnedBytes};
|
||||||
|
|
||||||
|
use crate::column_index::MultiValueIndex;
|
||||||
use crate::column_values::monotonic_mapping::{
|
use crate::column_values::monotonic_mapping::{
|
||||||
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ extern crate test;
|
|||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
mod column;
|
mod column;
|
||||||
mod column_index;
|
pub mod column_index;
|
||||||
pub mod column_values;
|
pub mod column_values;
|
||||||
mod columnar;
|
mod columnar;
|
||||||
mod dictionary;
|
mod dictionary;
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
use core::fmt::Debug;
|
use core::fmt::Debug;
|
||||||
use std::ops::RangeInclusive;
|
use std::ops::RangeInclusive;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use columnar::Column;
|
use columnar::column_index::ColumnIndexSelectCursor;
|
||||||
|
use columnar::{Column, ColumnValues};
|
||||||
|
|
||||||
use crate::fastfield::MakeZero;
|
use crate::fastfield::MakeZero;
|
||||||
use crate::{DocId, DocSet, TERMINATED};
|
use crate::{DocId, DocSet, TERMINATED};
|
||||||
@@ -43,7 +45,9 @@ impl VecCursor {
|
|||||||
pub(crate) struct RangeDocSet<T: MakeZero> {
|
pub(crate) struct RangeDocSet<T: MakeZero> {
|
||||||
/// The range filter on the values.
|
/// The range filter on the values.
|
||||||
value_range: RangeInclusive<T>,
|
value_range: RangeInclusive<T>,
|
||||||
column: Column<T>,
|
column_index_select_cursor: ColumnIndexSelectCursor,
|
||||||
|
column_values: Arc<dyn ColumnValues<T>>,
|
||||||
|
|
||||||
/// The next docid start range to fetch (inclusive).
|
/// The next docid start range to fetch (inclusive).
|
||||||
next_fetch_start: u32,
|
next_fetch_start: u32,
|
||||||
/// Number of docs range checked in a batch.
|
/// Number of docs range checked in a batch.
|
||||||
@@ -63,13 +67,15 @@ pub(crate) struct RangeDocSet<T: MakeZero> {
|
|||||||
const DEFAULT_FETCH_HORIZON: u32 = 128;
|
const DEFAULT_FETCH_HORIZON: u32 = 128;
|
||||||
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
|
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
|
||||||
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
|
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
|
||||||
|
let column_index_select_cursor = column.select_cursor();
|
||||||
let mut range_docset = Self {
|
let mut range_docset = Self {
|
||||||
value_range,
|
value_range,
|
||||||
column,
|
column_values: column.values,
|
||||||
loaded_docs: VecCursor::new(),
|
loaded_docs: VecCursor::new(),
|
||||||
next_fetch_start: 0,
|
next_fetch_start: 0,
|
||||||
fetch_horizon: DEFAULT_FETCH_HORIZON,
|
fetch_horizon: DEFAULT_FETCH_HORIZON,
|
||||||
last_seek_pos_opt: None,
|
last_seek_pos_opt: None,
|
||||||
|
column_index_select_cursor,
|
||||||
};
|
};
|
||||||
range_docset.reset_fetch_range();
|
range_docset.reset_fetch_range();
|
||||||
range_docset.fetch_block();
|
range_docset.fetch_block();
|
||||||
@@ -106,26 +112,21 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSe
|
|||||||
fn fetch_horizon(&mut self, horizon: u32) -> bool {
|
fn fetch_horizon(&mut self, horizon: u32) -> bool {
|
||||||
let mut finished_to_end = false;
|
let mut finished_to_end = false;
|
||||||
|
|
||||||
let limit = self.column.values.num_vals();
|
let limit = self.column_values.num_vals();
|
||||||
let mut end = self.next_fetch_start + horizon;
|
let mut end = self.next_fetch_start + horizon;
|
||||||
if end >= limit {
|
if end >= limit {
|
||||||
end = limit;
|
end = limit;
|
||||||
finished_to_end = true;
|
finished_to_end = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
let last_value = self.loaded_docs.last_value();
|
|
||||||
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
|
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
|
||||||
self.column.values.get_docids_for_value_range(
|
self.column_values.get_docids_for_value_range(
|
||||||
self.value_range.clone(),
|
self.value_range.clone(),
|
||||||
self.next_fetch_start..end,
|
self.next_fetch_start..end,
|
||||||
doc_buffer,
|
doc_buffer,
|
||||||
);
|
);
|
||||||
self.column.idx.select_batch_in_place(doc_buffer);
|
self.column_index_select_cursor
|
||||||
if let Some(last_value) = last_value {
|
.select_batch_in_place(doc_buffer);
|
||||||
while self.loaded_docs.current() == Some(last_value) {
|
|
||||||
self.loaded_docs.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.next_fetch_start = end;
|
self.next_fetch_start = end;
|
||||||
|
|
||||||
finished_to_end
|
finished_to_end
|
||||||
@@ -138,7 +139,7 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for
|
|||||||
if let Some(docid) = self.loaded_docs.next() {
|
if let Some(docid) = self.loaded_docs.next() {
|
||||||
return docid;
|
return docid;
|
||||||
}
|
}
|
||||||
if self.next_fetch_start >= self.column.values.num_vals() {
|
if self.next_fetch_start >= self.column_values.num_vals() {
|
||||||
return TERMINATED;
|
return TERMINATED;
|
||||||
}
|
}
|
||||||
self.fetch_block();
|
self.fetch_block();
|
||||||
|
|||||||
Reference in New Issue
Block a user