Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Masurel
d30cafa80a Fixes the behavior of the range doc set, especially on multivalued
cardinality.

We stop scanning from the beginning of the column which is a great
improvement.
This is done by introducing a stateful `SelectCursor`.
2023-02-13 04:02:14 +01:00
9 changed files with 489 additions and 87 deletions

View File

@@ -22,6 +22,11 @@ tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8.5"
criterion = "0.4"
[features]
unstable = []
[[bench]]
name = "bench_index"
harness = false

View File

@@ -0,0 +1,91 @@
use std::ops::Range;
use criterion::*;
use rand::prelude::*;
use tantivy_columnar::column_index::MultiValueIndex;
use tantivy_columnar::RowId;
const WINDOW: usize = 40;
fn bench_multi_value_index_util(
len_range: Range<u32>,
num_rows: RowId,
select_value_ratio: f64,
b: &mut criterion::Bencher,
) {
let mut start_index: Vec<RowId> = vec![0u32];
let mut cursor: u32 = 0u32;
let mut rng = StdRng::from_seed([16u8; 32]);
for i in 0..num_rows {
let num_vals = rng.gen_range(len_range.clone());
cursor += num_vals;
start_index.push(cursor);
}
let select_rows: Vec<RowId> = (0u32..cursor)
.filter(|i| rng.gen_bool(select_value_ratio))
.collect();
let mv_index = MultiValueIndex::for_test(&start_index);
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
let mut buffer = Vec::new();
b.iter(|| {
let mut start_row = 0u32;
let mut len = 0;
for chunk in select_rows.chunks(WINDOW) {
buffer.clear();
buffer.extend_from_slice(chunk);
mv_index.select_batch_in_place(start_row, &mut buffer);
start_row = buffer.last().copied().unwrap();
len += buffer.len()
}
assert_eq!(len, 4303);
len
});
}
fn bench_multi_value_index_util2(
len_range: Range<u32>,
num_rows: RowId,
select_value_ratio: f64,
b: &mut criterion::Bencher,
) {
let mut start_index: Vec<RowId> = vec![0u32];
let mut cursor: u32 = 0u32;
let mut rng = StdRng::from_seed([16u8; 32]);
for i in 0..num_rows {
let num_vals = rng.gen_range(len_range.clone());
cursor += num_vals;
start_index.push(cursor);
}
let select_rows: Vec<RowId> = (0u32..cursor)
.filter(|i| rng.gen_bool(select_value_ratio))
.collect();
let mv_index = MultiValueIndex::for_test(&start_index);
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
let mut buffer = Vec::new();
b.iter(|| {
let mut mv_index_cursor = mv_index.select_cursor();
let mut len = 0;
for chunk in select_rows.chunks(WINDOW) {
buffer.clear();
buffer.extend_from_slice(chunk);
mv_index_cursor.select_batch_in_place(&mut buffer);
len += buffer.len();
}
assert_eq!(len, 4303);
len
});
}
fn select_benchmark(c: &mut criterion::Criterion) {
c.bench_function("bench_multi_value_index_10_100", |b| {
bench_multi_value_index_util(0..10, 100_000, 0.01f64, b)
});
c.bench_function("bench_multi_value_cursor_index_10_100", |b| {
bench_multi_value_index_util2(0..10, 100_000, 0.01f64, b)
});
}
criterion_group!(benches, select_benchmark);
criterion_main!(benches);

View File

@@ -11,7 +11,6 @@
# Perf and Size
* remove alloc in `ord_to_term`
+ multivaued range queries restrat frm the beginning all of the time.
* re-add ZSTD compression for dictionaries
no systematic monotonic mapping
consider removing multilinear

View File

@@ -9,9 +9,90 @@ pub use merge::merge_column_index;
pub use optional_index::{OptionalIndex, Set};
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
use crate::column_index::multivalued_index::MultiValueIndex;
pub use crate::column_index::multivalued_index::{MultiValueIndex, MultiValueIndexCursor};
use crate::column_index::optional_index::OptionalIndexSelectCursor;
use crate::{Cardinality, RowId};
pub struct ColumnIndexSelectCursor {
last_rank: Option<RowId>,
cardinality_specific_impl: CardinalitySpecificSelectCursor,
}
impl From<CardinalitySpecificSelectCursor> for ColumnIndexSelectCursor {
fn from(cardinality_specific_impl: CardinalitySpecificSelectCursor) -> Self {
ColumnIndexSelectCursor {
last_rank: None,
cardinality_specific_impl,
}
}
}
enum CardinalitySpecificSelectCursor {
Full,
Optional(OptionalIndexSelectCursor),
Multivalued(MultiValueIndexCursor),
}
/// This cursor object point is to compute batches of `select` operations.
///
/// Regardless of cardinality, a column index can always be seen as a mapping
/// from row_id -> start_value_row_id. By definition, it is increasing.
/// If `left <= right, column_index[left] <= column_index[right]`.
///
/// The select operation then identifies, given a value row id, which row it
/// belong to: it is the inverse mapping.
///
/// As a more formal definition, `select(rank)` is defined as the only `i` such that
/// mapping[i] <= rank and mapping[i+1] < rank.
/// Another way to define it is to say that it is the last i such that
/// mapping[i] <= rank.
/// Finally it can be defined as the number of `row_id` such that
/// mapping[i] <= rank.
///
/// `select_batch_in_place` is a complex function that copmutes
/// select operation in batches and in place.
///
/// For optimization reasons, it only supports supplying ever striclty increasing
/// values of `rank_ids`, even cross calls.
///
/// It is also required from the caller, to only supply rank_ids lower than max(mapping).
/// Within those condition, the returned `row_ids` are guaranteed to be unique.
///
/// # Panics
///
/// Panics if the supplied rank_ids are not increasing from one call to another.
/// We only check that the `rank_ids` Vec is increasing in debug mode for
/// performance reason.
impl ColumnIndexSelectCursor {
/// Returns a list of
pub fn select_batch_in_place(&mut self, rank_ids: &mut Vec<RowId>) {
// `rank_ids` has to be sorted.
debug_assert!(rank_ids.windows(2).all(|window| window[0] < window[1]));
// Two consecutive calls must pass strictly increasing `rank_ids`.
let (Some(first_rank), Some(new_last_rank)) = (rank_ids.first().copied(), rank_ids.last().copied()) else {
// rank_ids is empty, there is nothing to do.
return;
};
if let Some(last_rank) = self.last_rank {
assert!(last_rank < first_rank);
}
self.last_rank = Some(new_last_rank);
match &mut self.cardinality_specific_impl {
CardinalitySpecificSelectCursor::Full => {
// No need to do anything:
// `value_idx` and `row_idx` are the same.
}
CardinalitySpecificSelectCursor::Optional(optional_index) => {
optional_index.select_batch_in_place(&mut rank_ids[..]);
}
CardinalitySpecificSelectCursor::Multivalued(multivalued_index) => {
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
multivalued_index.select_batch_in_place(rank_ids)
}
}
}
}
#[derive(Clone)]
pub enum ColumnIndex {
Full,
@@ -67,18 +148,15 @@ impl ColumnIndex {
}
}
pub fn select_batch_in_place(&self, rank_ids: &mut Vec<RowId>) {
pub fn select_cursor(&self) -> ColumnIndexSelectCursor {
match self {
ColumnIndex::Full => {
// No need to do anything:
// value_idx and row_idx are the same.
}
ColumnIndex::Full => CardinalitySpecificSelectCursor::Full.into(),
ColumnIndex::Optional(optional_index) => {
optional_index.select_batch(&mut rank_ids[..]);
CardinalitySpecificSelectCursor::Optional(optional_index.select_cursor()).into()
}
ColumnIndex::Multivalued(multivalued_index) => {
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
multivalued_index.select_batch_in_place(0u32, rank_ids)
CardinalitySpecificSelectCursor::Multivalued(multivalued_index.select_cursor())
.into()
}
}
}

View File

@@ -35,12 +35,6 @@ pub struct MultiValueIndex {
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
}
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
MultiValueIndex { start_index_column }
}
}
impl MultiValueIndex {
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
let mut buffer = Vec::new();
@@ -64,78 +58,302 @@ impl MultiValueIndex {
self.start_index_column.num_vals() - 1
}
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
/// row_ids. Positions are converted inplace to docids.
pub fn select_cursor(&self) -> MultiValueIndexCursor {
MultiValueIndexCursor {
multivalued_index: self.clone(),
row_cursor: 0u32,
}
}
}
pub struct MultiValueIndexCursor {
multivalued_index: MultiValueIndex,
row_cursor: RowId,
}
impl MultiValueIndexCursor {
/// See contract in `ColumnIndexSelectCursor`.
///
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
/// index.
/// Multi valued cardinality is special for two different
/// ranks `rank_left` and `rank_right`, we can end up with
/// the same `select(rank_left)` and `select(rank_right)`.
///
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
/// increasing positions.
///
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
/// match a docid to its value position.
#[allow(clippy::bool_to_int_with_if)]
pub(crate) fn select_batch_in_place(&self, row_start: RowId, ranks: &mut Vec<u32>) {
/// For this reason, this function includes extra complexity
/// to prevent the cursor from emitting the same row_id.
/// - From a last call, by skipping ranks mapping to
/// the same row_id
/// - With the batch, by simply deduplicating the output.
pub fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
if ranks.is_empty() {
return;
}
let mut cur_doc = row_start;
let mut last_doc = None;
let mut row_cursor = self.row_cursor;
assert!(self.start_index_column.get_val(row_start) as u32 <= ranks[0]);
let mut write_cursor_id = usize::MAX;
let mut last_written_row_id = u32::MAX;
let mut write_doc_pos = 0;
for i in 0..ranks.len() {
let pos = ranks[i];
loop {
let end = self.start_index_column.get_val(cur_doc + 1) as u32;
if end > pos {
ranks[write_doc_pos] = cur_doc;
write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 };
last_doc = Some(cur_doc);
break;
}
cur_doc += 1;
// We skip all of the ranks that we already passed.
//
// It is possible in the case of multivalued, for a the first
// few rank to belong to the same row_id as the last rank
// of the previous call.
let start_bound = self
.multivalued_index
.start_index_column
.get_val(row_cursor);
let mut skip = 0;
while ranks[skip] < start_bound {
skip += 1;
if skip == ranks.len() {
ranks.clear();
return;
}
}
ranks.truncate(write_doc_pos);
for i in skip..ranks.len() {
let rank = ranks[i];
let row_id = loop {
// TODO See if we can find a way to introduce a function in
// ColumnValue to remove dynamic dispatch.
// This is tricky however... because it only applies to T=u32.
//
// TODO consider using exponential search.
let end = self
.multivalued_index
.start_index_column
.get_val(row_cursor + 1) as u32;
if end > rank {
break row_cursor;
}
row_cursor += 1;
};
// We remove duplicates in a branchless fashion: we only advance
// the write cursor when we are writing a value different from
// the last written value.
write_cursor_id =
write_cursor_id.wrapping_add(if row_id == last_written_row_id { 0 } else { 1 });
ranks[write_cursor_id] = row_id;
last_written_row_id = row_id;
}
self.row_cursor = row_cursor + 1;
ranks.truncate(write_cursor_id + 1);
}
}
#[cfg(test)]
mod tests {
use std::ops::Range;
use std::sync::Arc;
use super::MultiValueIndex;
use crate::column_values::IterColumn;
use crate::{ColumnValues, RowId};
use proptest::prelude::*;
fn index_to_pos_helper(
index: &MultiValueIndex,
doc_id_range: Range<u32>,
positions: &[u32],
) -> Vec<u32> {
fn index_to_pos_helper(index: &MultiValueIndex, positions: &[u32]) -> Vec<u32> {
let mut positions = positions.to_vec();
index.select_batch_in_place(doc_id_range.start, &mut positions);
let mut cursor = index.select_cursor();
cursor.select_batch_in_place(&mut positions);
positions
}
// Value row id ranges are [0..10, 10..12, 12..15, etc.]
const START_OFFSETS: &[RowId] = &[0, 10, 12, 15, 22, 23];
#[track_caller]
fn test_multivalue_select_cursor_aux(
start_offsets: &'static [RowId],
ranks: &[RowId],
expected: &[RowId],
) {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(start_offsets.iter().copied()));
let index = MultiValueIndex {
start_index_column: column,
};
assert_eq!(&index_to_pos_helper(&index, &ranks), expected);
}
#[test]
fn test_positions_to_docid() {
let offsets: Vec<RowId> = vec![0, 10, 12, 15, 22, 23]; // docid values are [0..10, 10..12, 12..15, etc.]
let column: Arc<dyn ColumnValues<RowId>> = Arc::new(IterColumn::from(offsets.into_iter()));
let index = MultiValueIndex::from(column);
assert_eq!(index.num_rows(), 5);
let positions = &[10u32, 11, 15, 20, 21, 22];
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12]), vec![2]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]);
fn test_multivalue_select_cursor_empty() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[], &[]);
}
#[test]
fn test_multivalue_select_cursor_single() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[9], &[0]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[10], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[12], &[2]);
}
#[test]
fn test_multivalue_select_cursor_duplicates() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[12, 14], &[2]);
}
#[test]
fn test_multivalue_select_cursor_complex() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[10, 11, 15, 20, 21, 22], &[1, 3, 4])
}
#[test]
fn test_multivalue_select_corner_case_skip_all() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from([0, 10].into_iter()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![0];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0]);
}
{
let mut ranks = vec![5];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
}
#[test]
fn test_multi_value_index_cursor_bug() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from([0, 10].into_iter()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![0];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0]);
}
{
let mut ranks = vec![4];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
{
let mut ranks = vec![9];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
}
#[test]
fn test_multivalue_select_cursor_skip_already_emitted() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(START_OFFSETS.iter().copied()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![1, 10];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0, 1]);
}
{
// Here we skip row_id = 1.
let mut ranks = vec![11, 12];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[2]);
}
}
fn start_index_strategy() -> impl Strategy<Value = Vec<RowId>> {
proptest::collection::vec(0u32..3u32, 1..6)
.prop_map(|deltas: Vec<u32>| {
let mut start_offsets: Vec<RowId> = Vec::with_capacity(deltas.len() + 1);
let mut cumul = 0u32;
start_offsets.push(cumul);
for delta in deltas {
cumul += delta;
if cumul >= 10 {
break;
}
start_offsets.push(cumul);
}
start_offsets.push(10);
start_offsets
})
}
fn query_strategy() -> impl Strategy<Value = Vec<Vec<RowId>> > {
proptest::collection::btree_set(0u32..10u32, 1..=10)
.prop_flat_map(|els| {
let els: Vec<RowId> = els.into_iter().collect();
proptest::collection::btree_set(0..els.len(), 0..els.len())
.prop_map(move |mut split_positions| {
split_positions.insert(els.len());
let mut queries: Vec<Vec<RowId>> = Vec::with_capacity(split_positions.len() + 1);
let mut cursor = 0;
for split_position in split_positions {
queries.push(els[cursor..split_position].to_vec());
cursor = split_position;
}
queries
})
})
}
/// Simple inefficient implementation used for reference.
struct SimpleSelectCursor {
start_indexes: Vec<RowId>,
last_emitted_row_id: Option<RowId>,
}
impl SimpleSelectCursor {
fn select(&self, rank: u32) -> RowId {
for i in 0..self.start_indexes.len() - 1 {
if self.start_indexes[i] <= rank && self.start_indexes[i + 1] > rank{
return i as u32;
}
}
panic!();
}
fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
if ranks.is_empty() {
return;
}
for rank in ranks.iter_mut() {
*rank = self.select(*rank);
}
ranks.dedup();
if ranks.first().copied() == self.last_emitted_row_id {
ranks.remove(0);
}
if let Some(last_emitted) = ranks.last().copied() {
self.last_emitted_row_id = Some(last_emitted);
}
}
}
proptest! {
#[test]
fn test_multi_value_index_cursor_proptest(start_indexes in start_index_strategy(), mut queries in query_strategy()) {
let mut simple_select_cursor = SimpleSelectCursor {
start_indexes: start_indexes.clone(),
last_emitted_row_id: None
};
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(start_indexes.into_iter()));
let index = MultiValueIndex { start_index_column: column };
let mut select_cursor = index.select_cursor();
for query in queries.iter_mut() {
let mut query_clone = query.clone();
select_cursor.select_batch_in_place(query);
simple_select_cursor.select_batch_in_place(&mut query_clone);
assert_eq!(&query[..], &query_clone[..]);
}
}
}
}

View File

@@ -118,17 +118,17 @@ impl<'a> BlockSelectCursor<'a> {
}
}
}
pub struct OptionalIndexSelectCursor<'a> {
current_block_cursor: BlockSelectCursor<'a>,
pub struct OptionalIndexSelectCursor {
current_block_cursor: BlockSelectCursor<'static>,
current_block_id: u16,
// The current block is guaranteed to contain ranks < end_rank.
current_block_end_rank: RowId,
optional_index: &'a OptionalIndex,
optional_index: OptionalIndex,
block_doc_idx_start: RowId,
num_null_rows_before_block: RowId,
}
impl<'a> OptionalIndexSelectCursor<'a> {
impl OptionalIndexSelectCursor {
fn search_and_load_block(&mut self, rank: RowId) {
if rank < self.current_block_end_rank {
// we are already in the right block
@@ -145,14 +145,23 @@ impl<'a> OptionalIndexSelectCursor<'a> {
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
let block: Block<'_> = self.optional_index.block(block_meta);
self.current_block_cursor = match block {
let current_block_cursor = match block {
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
};
// We are building a self-owned `OptionalIndexSelectCursor`.
self.current_block_cursor = unsafe { std::mem::transmute(current_block_cursor) };
}
pub fn select_batch_in_place(&mut self, ranks: &mut [RowId]) {
// TODO see if we can batch at the block level as well for optimization purposes.
for rank in ranks {
*rank = self.select(*rank);
}
}
}
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor {
fn select(&mut self, rank: RowId) -> RowId {
self.search_and_load_block(rank);
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
@@ -161,7 +170,7 @@ impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
}
impl Set<RowId> for OptionalIndex {
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
type SelectCursor<'a> = OptionalIndexSelectCursor;
// Check if value at position is not null.
#[inline]
fn contains(&self, row_id: RowId) -> bool {
@@ -220,14 +229,14 @@ impl Set<RowId> for OptionalIndex {
block_doc_idx_start + in_block_rank as u32
}
fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> {
fn select_cursor(&self) -> OptionalIndexSelectCursor {
OptionalIndexSelectCursor {
current_block_cursor: BlockSelectCursor::Sparse(
SparseBlockCodec::open(b"").select_cursor(),
),
current_block_id: 0u16,
current_block_end_rank: 0u32, //< this is sufficient to force the first load
optional_index: self,
optional_index: self.clone(),
block_doc_idx_start: 0u32,
num_null_rows_before_block: 0u32,
}

View File

@@ -10,6 +10,7 @@ use std::sync::Arc;
use common::{BinarySerializable, OwnedBytes};
use crate::column_index::MultiValueIndex;
use crate::column_values::monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
};

View File

@@ -10,7 +10,7 @@ extern crate test;
use std::io;
mod column;
mod column_index;
pub mod column_index;
pub mod column_values;
mod columnar;
mod dictionary;

View File

@@ -1,7 +1,9 @@
use core::fmt::Debug;
use std::ops::RangeInclusive;
use std::sync::Arc;
use columnar::Column;
use columnar::column_index::ColumnIndexSelectCursor;
use columnar::{Column, ColumnValues};
use crate::fastfield::MakeZero;
use crate::{DocId, DocSet, TERMINATED};
@@ -43,7 +45,9 @@ impl VecCursor {
pub(crate) struct RangeDocSet<T: MakeZero> {
/// The range filter on the values.
value_range: RangeInclusive<T>,
column: Column<T>,
column_index_select_cursor: ColumnIndexSelectCursor,
column_values: Arc<dyn ColumnValues<T>>,
/// The next docid start range to fetch (inclusive).
next_fetch_start: u32,
/// Number of docs range checked in a batch.
@@ -63,13 +67,15 @@ pub(crate) struct RangeDocSet<T: MakeZero> {
const DEFAULT_FETCH_HORIZON: u32 = 128;
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
let column_index_select_cursor = column.select_cursor();
let mut range_docset = Self {
value_range,
column,
column_values: column.values,
loaded_docs: VecCursor::new(),
next_fetch_start: 0,
fetch_horizon: DEFAULT_FETCH_HORIZON,
last_seek_pos_opt: None,
column_index_select_cursor,
};
range_docset.reset_fetch_range();
range_docset.fetch_block();
@@ -106,26 +112,21 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSe
fn fetch_horizon(&mut self, horizon: u32) -> bool {
let mut finished_to_end = false;
let limit = self.column.values.num_vals();
let limit = self.column_values.num_vals();
let mut end = self.next_fetch_start + horizon;
if end >= limit {
end = limit;
finished_to_end = true;
}
let last_value = self.loaded_docs.last_value();
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
self.column.values.get_docids_for_value_range(
self.column_values.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
doc_buffer,
);
self.column.idx.select_batch_in_place(doc_buffer);
if let Some(last_value) = last_value {
while self.loaded_docs.current() == Some(last_value) {
self.loaded_docs.next();
}
}
self.column_index_select_cursor
.select_batch_in_place(doc_buffer);
self.next_fetch_start = end;
finished_to_end
@@ -138,7 +139,7 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for
if let Some(docid) = self.loaded_docs.next() {
return docid;
}
if self.next_fetch_start >= self.column.values.num_vals() {
if self.next_fetch_start >= self.column_values.num_vals() {
return TERMINATED;
}
self.fetch_block();