Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
c6c1485abd Added a columnar cli 2023-02-09 18:17:45 +01:00
17 changed files with 136 additions and 536 deletions

View File

@@ -23,7 +23,7 @@ regex = { version = "1.5.5", default-features = false, features = ["std", "unico
aho-corasick = "0.7"
tantivy-fst = "0.4.0"
memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"], optional = true }
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.12", optional = true, default-features = false }
snap = { version = "1.0.5", optional = true }

View File

@@ -22,11 +22,6 @@ tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8.5"
criterion = "0.4"
[features]
unstable = []
[[bench]]
name = "bench_index"
harness = false

View File

@@ -1,91 +0,0 @@
use std::ops::Range;
use criterion::*;
use rand::prelude::*;
use tantivy_columnar::column_index::MultiValueIndex;
use tantivy_columnar::RowId;
const WINDOW: usize = 40;
fn bench_multi_value_index_util(
len_range: Range<u32>,
num_rows: RowId,
select_value_ratio: f64,
b: &mut criterion::Bencher,
) {
let mut start_index: Vec<RowId> = vec![0u32];
let mut cursor: u32 = 0u32;
let mut rng = StdRng::from_seed([16u8; 32]);
for i in 0..num_rows {
let num_vals = rng.gen_range(len_range.clone());
cursor += num_vals;
start_index.push(cursor);
}
let select_rows: Vec<RowId> = (0u32..cursor)
.filter(|i| rng.gen_bool(select_value_ratio))
.collect();
let mv_index = MultiValueIndex::for_test(&start_index);
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
let mut buffer = Vec::new();
b.iter(|| {
let mut start_row = 0u32;
let mut len = 0;
for chunk in select_rows.chunks(WINDOW) {
buffer.clear();
buffer.extend_from_slice(chunk);
mv_index.select_batch_in_place(start_row, &mut buffer);
start_row = buffer.last().copied().unwrap();
len += buffer.len()
}
assert_eq!(len, 4303);
len
});
}
fn bench_multi_value_index_util2(
len_range: Range<u32>,
num_rows: RowId,
select_value_ratio: f64,
b: &mut criterion::Bencher,
) {
let mut start_index: Vec<RowId> = vec![0u32];
let mut cursor: u32 = 0u32;
let mut rng = StdRng::from_seed([16u8; 32]);
for i in 0..num_rows {
let num_vals = rng.gen_range(len_range.clone());
cursor += num_vals;
start_index.push(cursor);
}
let select_rows: Vec<RowId> = (0u32..cursor)
.filter(|i| rng.gen_bool(select_value_ratio))
.collect();
let mv_index = MultiValueIndex::for_test(&start_index);
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
let mut buffer = Vec::new();
b.iter(|| {
let mut mv_index_cursor = mv_index.select_cursor();
let mut len = 0;
for chunk in select_rows.chunks(WINDOW) {
buffer.clear();
buffer.extend_from_slice(chunk);
mv_index_cursor.select_batch_in_place(&mut buffer);
len += buffer.len();
}
assert_eq!(len, 4303);
len
});
}
fn select_benchmark(c: &mut criterion::Criterion) {
c.bench_function("bench_multi_value_index_10_100", |b| {
bench_multi_value_index_util(0..10, 100_000, 0.01f64, b)
});
c.bench_function("bench_multi_value_cursor_index_10_100", |b| {
bench_multi_value_index_util2(0..10, 100_000, 0.01f64, b)
});
}
criterion_group!(benches, select_benchmark);
criterion_main!(benches);

View File

@@ -11,6 +11,7 @@
# Perf and Size
* remove alloc in `ord_to_term`
+ multivaued range queries restrat frm the beginning all of the time.
* re-add ZSTD compression for dictionaries
no systematic monotonic mapping
consider removing multilinear

View File

@@ -9,90 +9,9 @@ pub use merge::merge_column_index;
pub use optional_index::{OptionalIndex, Set};
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
pub use crate::column_index::multivalued_index::{MultiValueIndex, MultiValueIndexCursor};
use crate::column_index::optional_index::OptionalIndexSelectCursor;
use crate::column_index::multivalued_index::MultiValueIndex;
use crate::{Cardinality, RowId};
pub struct ColumnIndexSelectCursor {
last_rank: Option<RowId>,
cardinality_specific_impl: CardinalitySpecificSelectCursor,
}
impl From<CardinalitySpecificSelectCursor> for ColumnIndexSelectCursor {
fn from(cardinality_specific_impl: CardinalitySpecificSelectCursor) -> Self {
ColumnIndexSelectCursor {
last_rank: None,
cardinality_specific_impl,
}
}
}
enum CardinalitySpecificSelectCursor {
Full,
Optional(OptionalIndexSelectCursor),
Multivalued(MultiValueIndexCursor),
}
/// This cursor object point is to compute batches of `select` operations.
///
/// Regardless of cardinality, a column index can always be seen as a mapping
/// from row_id -> start_value_row_id. By definition, it is increasing.
/// If `left <= right, column_index[left] <= column_index[right]`.
///
/// The select operation then identifies, given a value row id, which row it
/// belong to: it is the inverse mapping.
///
/// As a more formal definition, `select(rank)` is defined as the only `i` such that
/// mapping[i] <= rank and mapping[i+1] < rank.
/// Another way to define it is to say that it is the last i such that
/// mapping[i] <= rank.
/// Finally it can be defined as the number of `row_id` such that
/// mapping[i] <= rank.
///
/// `select_batch_in_place` is a complex function that copmutes
/// select operation in batches and in place.
///
/// For optimization reasons, it only supports supplying ever striclty increasing
/// values of `rank_ids`, even cross calls.
///
/// It is also required from the caller, to only supply rank_ids lower than max(mapping).
/// Within those condition, the returned `row_ids` are guaranteed to be unique.
///
/// # Panics
///
/// Panics if the supplied rank_ids are not increasing from one call to another.
/// We only check that the `rank_ids` Vec is increasing in debug mode for
/// performance reason.
impl ColumnIndexSelectCursor {
/// Returns a list of
pub fn select_batch_in_place(&mut self, rank_ids: &mut Vec<RowId>) {
// `rank_ids` has to be sorted.
debug_assert!(rank_ids.windows(2).all(|window| window[0] < window[1]));
// Two consecutive calls must pass strictly increasing `rank_ids`.
let (Some(first_rank), Some(new_last_rank)) = (rank_ids.first().copied(), rank_ids.last().copied()) else {
// rank_ids is empty, there is nothing to do.
return;
};
if let Some(last_rank) = self.last_rank {
assert!(last_rank < first_rank);
}
self.last_rank = Some(new_last_rank);
match &mut self.cardinality_specific_impl {
CardinalitySpecificSelectCursor::Full => {
// No need to do anything:
// `value_idx` and `row_idx` are the same.
}
CardinalitySpecificSelectCursor::Optional(optional_index) => {
optional_index.select_batch_in_place(&mut rank_ids[..]);
}
CardinalitySpecificSelectCursor::Multivalued(multivalued_index) => {
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
multivalued_index.select_batch_in_place(rank_ids)
}
}
}
}
#[derive(Clone)]
pub enum ColumnIndex {
Full,
@@ -148,15 +67,18 @@ impl ColumnIndex {
}
}
pub fn select_cursor(&self) -> ColumnIndexSelectCursor {
pub fn select_batch_in_place(&self, rank_ids: &mut Vec<RowId>) {
match self {
ColumnIndex::Full => CardinalitySpecificSelectCursor::Full.into(),
ColumnIndex::Full => {
// No need to do anything:
// value_idx and row_idx are the same.
}
ColumnIndex::Optional(optional_index) => {
CardinalitySpecificSelectCursor::Optional(optional_index.select_cursor()).into()
optional_index.select_batch(&mut rank_ids[..]);
}
ColumnIndex::Multivalued(multivalued_index) => {
CardinalitySpecificSelectCursor::Multivalued(multivalued_index.select_cursor())
.into()
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
multivalued_index.select_batch_in_place(0u32, rank_ids)
}
}
}

View File

@@ -35,6 +35,12 @@ pub struct MultiValueIndex {
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
}
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
MultiValueIndex { start_index_column }
}
}
impl MultiValueIndex {
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
let mut buffer = Vec::new();
@@ -58,302 +64,78 @@ impl MultiValueIndex {
self.start_index_column.num_vals() - 1
}
pub fn select_cursor(&self) -> MultiValueIndexCursor {
MultiValueIndexCursor {
multivalued_index: self.clone(),
row_cursor: 0u32,
}
}
}
pub struct MultiValueIndexCursor {
multivalued_index: MultiValueIndex,
row_cursor: RowId,
}
impl MultiValueIndexCursor {
/// See contract in `ColumnIndexSelectCursor`.
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
/// row_ids. Positions are converted inplace to docids.
///
/// Multi valued cardinality is special for two different
/// ranks `rank_left` and `rank_right`, we can end up with
/// the same `select(rank_left)` and `select(rank_right)`.
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
/// index.
///
/// For this reason, this function includes extra complexity
/// to prevent the cursor from emitting the same row_id.
/// - From a last call, by skipping ranks mapping to
/// the same row_id
/// - With the batch, by simply deduplicating the output.
pub fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
/// increasing positions.
///
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
/// match a docid to its value position.
#[allow(clippy::bool_to_int_with_if)]
pub(crate) fn select_batch_in_place(&self, row_start: RowId, ranks: &mut Vec<u32>) {
if ranks.is_empty() {
return;
}
let mut row_cursor = self.row_cursor;
let mut cur_doc = row_start;
let mut last_doc = None;
let mut write_cursor_id = usize::MAX;
let mut last_written_row_id = u32::MAX;
assert!(self.start_index_column.get_val(row_start) as u32 <= ranks[0]);
// We skip all of the ranks that we already passed.
//
// It is possible in the case of multivalued, for a the first
// few rank to belong to the same row_id as the last rank
// of the previous call.
let start_bound = self
.multivalued_index
.start_index_column
.get_val(row_cursor);
let mut skip = 0;
while ranks[skip] < start_bound {
skip += 1;
if skip == ranks.len() {
ranks.clear();
return;
let mut write_doc_pos = 0;
for i in 0..ranks.len() {
let pos = ranks[i];
loop {
let end = self.start_index_column.get_val(cur_doc + 1) as u32;
if end > pos {
ranks[write_doc_pos] = cur_doc;
write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 };
last_doc = Some(cur_doc);
break;
}
cur_doc += 1;
}
}
for i in skip..ranks.len() {
let rank = ranks[i];
let row_id = loop {
// TODO See if we can find a way to introduce a function in
// ColumnValue to remove dynamic dispatch.
// This is tricky however... because it only applies to T=u32.
//
// TODO consider using exponential search.
let end = self
.multivalued_index
.start_index_column
.get_val(row_cursor + 1) as u32;
if end > rank {
break row_cursor;
}
row_cursor += 1;
};
// We remove duplicates in a branchless fashion: we only advance
// the write cursor when we are writing a value different from
// the last written value.
write_cursor_id =
write_cursor_id.wrapping_add(if row_id == last_written_row_id { 0 } else { 1 });
ranks[write_cursor_id] = row_id;
last_written_row_id = row_id;
}
self.row_cursor = row_cursor + 1;
ranks.truncate(write_cursor_id + 1);
ranks.truncate(write_doc_pos);
}
}
#[cfg(test)]
mod tests {
use std::ops::Range;
use std::sync::Arc;
use super::MultiValueIndex;
use crate::column_values::IterColumn;
use crate::{ColumnValues, RowId};
use proptest::prelude::*;
fn index_to_pos_helper(index: &MultiValueIndex, positions: &[u32]) -> Vec<u32> {
fn index_to_pos_helper(
index: &MultiValueIndex,
doc_id_range: Range<u32>,
positions: &[u32],
) -> Vec<u32> {
let mut positions = positions.to_vec();
let mut cursor = index.select_cursor();
cursor.select_batch_in_place(&mut positions);
index.select_batch_in_place(doc_id_range.start, &mut positions);
positions
}
// Value row id ranges are [0..10, 10..12, 12..15, etc.]
const START_OFFSETS: &[RowId] = &[0, 10, 12, 15, 22, 23];
#[track_caller]
fn test_multivalue_select_cursor_aux(
start_offsets: &'static [RowId],
ranks: &[RowId],
expected: &[RowId],
) {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(start_offsets.iter().copied()));
let index = MultiValueIndex {
start_index_column: column,
};
assert_eq!(&index_to_pos_helper(&index, &ranks), expected);
}
#[test]
fn test_multivalue_select_cursor_empty() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[], &[]);
}
#[test]
fn test_multivalue_select_cursor_single() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[9], &[0]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[10], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
test_multivalue_select_cursor_aux(START_OFFSETS, &[12], &[2]);
}
#[test]
fn test_multivalue_select_cursor_duplicates() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[12, 14], &[2]);
}
#[test]
fn test_multivalue_select_cursor_complex() {
test_multivalue_select_cursor_aux(START_OFFSETS, &[10, 11, 15, 20, 21, 22], &[1, 3, 4])
}
#[test]
fn test_multivalue_select_corner_case_skip_all() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from([0, 10].into_iter()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![0];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0]);
}
{
let mut ranks = vec![5];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
}
#[test]
fn test_multi_value_index_cursor_bug() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from([0, 10].into_iter()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![0];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0]);
}
{
let mut ranks = vec![4];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
{
let mut ranks = vec![9];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[]);
}
}
#[test]
fn test_multivalue_select_cursor_skip_already_emitted() {
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(START_OFFSETS.iter().copied()));
let index = MultiValueIndex {
start_index_column: column,
};
let mut cursor = index.select_cursor();
{
let mut ranks = vec![1, 10];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[0, 1]);
}
{
// Here we skip row_id = 1.
let mut ranks = vec![11, 12];
cursor.select_batch_in_place(&mut ranks);
assert_eq!(ranks, &[2]);
}
}
fn start_index_strategy() -> impl Strategy<Value = Vec<RowId>> {
proptest::collection::vec(0u32..3u32, 1..6)
.prop_map(|deltas: Vec<u32>| {
let mut start_offsets: Vec<RowId> = Vec::with_capacity(deltas.len() + 1);
let mut cumul = 0u32;
start_offsets.push(cumul);
for delta in deltas {
cumul += delta;
if cumul >= 10 {
break;
}
start_offsets.push(cumul);
}
start_offsets.push(10);
start_offsets
})
}
fn query_strategy() -> impl Strategy<Value = Vec<Vec<RowId>> > {
proptest::collection::btree_set(0u32..10u32, 1..=10)
.prop_flat_map(|els| {
let els: Vec<RowId> = els.into_iter().collect();
proptest::collection::btree_set(0..els.len(), 0..els.len())
.prop_map(move |mut split_positions| {
split_positions.insert(els.len());
let mut queries: Vec<Vec<RowId>> = Vec::with_capacity(split_positions.len() + 1);
let mut cursor = 0;
for split_position in split_positions {
queries.push(els[cursor..split_position].to_vec());
cursor = split_position;
}
queries
})
})
}
/// Simple inefficient implementation used for reference.
struct SimpleSelectCursor {
start_indexes: Vec<RowId>,
last_emitted_row_id: Option<RowId>,
}
impl SimpleSelectCursor {
fn select(&self, rank: u32) -> RowId {
for i in 0..self.start_indexes.len() - 1 {
if self.start_indexes[i] <= rank && self.start_indexes[i + 1] > rank{
return i as u32;
}
}
panic!();
}
fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
if ranks.is_empty() {
return;
}
for rank in ranks.iter_mut() {
*rank = self.select(*rank);
}
ranks.dedup();
if ranks.first().copied() == self.last_emitted_row_id {
ranks.remove(0);
}
if let Some(last_emitted) = ranks.last().copied() {
self.last_emitted_row_id = Some(last_emitted);
}
}
}
proptest! {
#[test]
fn test_multi_value_index_cursor_proptest(start_indexes in start_index_strategy(), mut queries in query_strategy()) {
let mut simple_select_cursor = SimpleSelectCursor {
start_indexes: start_indexes.clone(),
last_emitted_row_id: None
};
let column: Arc<dyn ColumnValues<RowId>> =
Arc::new(IterColumn::from(start_indexes.into_iter()));
let index = MultiValueIndex { start_index_column: column };
let mut select_cursor = index.select_cursor();
for query in queries.iter_mut() {
let mut query_clone = query.clone();
select_cursor.select_batch_in_place(query);
simple_select_cursor.select_batch_in_place(&mut query_clone);
assert_eq!(&query[..], &query_clone[..]);
}
}
fn test_positions_to_docid() {
let offsets: Vec<RowId> = vec![0, 10, 12, 15, 22, 23]; // docid values are [0..10, 10..12, 12..15, etc.]
let column: Arc<dyn ColumnValues<RowId>> = Arc::new(IterColumn::from(offsets.into_iter()));
let index = MultiValueIndex::from(column);
assert_eq!(index.num_rows(), 5);
let positions = &[10u32, 11, 15, 20, 21, 22];
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12]), vec![2]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]);
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]);
}
}

View File

@@ -118,17 +118,17 @@ impl<'a> BlockSelectCursor<'a> {
}
}
}
pub struct OptionalIndexSelectCursor {
current_block_cursor: BlockSelectCursor<'static>,
pub struct OptionalIndexSelectCursor<'a> {
current_block_cursor: BlockSelectCursor<'a>,
current_block_id: u16,
// The current block is guaranteed to contain ranks < end_rank.
current_block_end_rank: RowId,
optional_index: OptionalIndex,
optional_index: &'a OptionalIndex,
block_doc_idx_start: RowId,
num_null_rows_before_block: RowId,
}
impl OptionalIndexSelectCursor {
impl<'a> OptionalIndexSelectCursor<'a> {
fn search_and_load_block(&mut self, rank: RowId) {
if rank < self.current_block_end_rank {
// we are already in the right block
@@ -145,23 +145,14 @@ impl OptionalIndexSelectCursor {
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
let block: Block<'_> = self.optional_index.block(block_meta);
let current_block_cursor = match block {
self.current_block_cursor = match block {
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
};
// We are building a self-owned `OptionalIndexSelectCursor`.
self.current_block_cursor = unsafe { std::mem::transmute(current_block_cursor) };
}
pub fn select_batch_in_place(&mut self, ranks: &mut [RowId]) {
// TODO see if we can batch at the block level as well for optimization purposes.
for rank in ranks {
*rank = self.select(*rank);
}
}
}
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor {
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
fn select(&mut self, rank: RowId) -> RowId {
self.search_and_load_block(rank);
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
@@ -170,7 +161,7 @@ impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor {
}
impl Set<RowId> for OptionalIndex {
type SelectCursor<'a> = OptionalIndexSelectCursor;
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
// Check if value at position is not null.
#[inline]
fn contains(&self, row_id: RowId) -> bool {
@@ -229,14 +220,14 @@ impl Set<RowId> for OptionalIndex {
block_doc_idx_start + in_block_rank as u32
}
fn select_cursor(&self) -> OptionalIndexSelectCursor {
fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> {
OptionalIndexSelectCursor {
current_block_cursor: BlockSelectCursor::Sparse(
SparseBlockCodec::open(b"").select_cursor(),
),
current_block_id: 0u16,
current_block_end_rank: 0u32, //< this is sufficient to force the first load
optional_index: self.clone(),
optional_index: self,
block_doc_idx_start: 0u32,
num_null_rows_before_block: 0u32,
}

View File

@@ -10,7 +10,6 @@ use std::sync::Arc;
use common::{BinarySerializable, OwnedBytes};
use crate::column_index::MultiValueIndex;
use crate::column_values::monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
};

View File

@@ -10,7 +10,7 @@ extern crate test;
use std::io;
mod column;
pub mod column_index;
mod column_index;
pub mod column_values;
mod columnar;
mod dictionary;

View File

@@ -322,6 +322,7 @@ impl SegmentTermCollector {
let mut entries: Vec<(u32, TermBucketEntry)> =
self.term_buckets.entries.into_iter().collect();
let order_by_key = self.req.order.target == OrderTarget::Key;
let order_by_sub_aggregation =
matches!(self.req.order.target, OrderTarget::SubAggregation(_));
@@ -350,7 +351,7 @@ impl SegmentTermCollector {
}
}
let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation {
let (term_doc_count_before_cutoff, mut sum_other_doc_count) = if order_by_sub_aggregation {
(0, 0)
} else {
cut_off_buckets(&mut entries, self.req.segment_size as usize)
@@ -392,6 +393,20 @@ impl SegmentTermCollector {
}
}
if order_by_key {
let mut dict_entries = dict.into_iter().collect_vec();
if self.req.order.order == Order::Desc {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key1.cmp(key2));
} else {
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key2.cmp(key1));
}
let (_, sum_other_docs) =
cut_off_buckets(&mut dict_entries, self.req.segment_size as usize);
sum_other_doc_count += sum_other_docs;
dict = dict_entries.into_iter().collect();
}
Ok(IntermediateBucketResult::Terms(
IntermediateTermBucketResult {
entries: dict,
@@ -842,14 +857,14 @@ mod tests {
];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
// key asc
// key desc
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Asc,
order: Order::Desc,
target: OrderTarget::Key,
}),
..Default::default()
@@ -876,7 +891,7 @@ mod tests {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Asc,
order: Order::Desc,
target: OrderTarget::Key,
}),
size: Some(2),
@@ -900,14 +915,14 @@ mod tests {
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
// key asc and segment_size cut_off
// key desc and segment_size cut_off
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Asc,
order: Order::Desc,
target: OrderTarget::Key,
}),
size: Some(2),
@@ -930,14 +945,14 @@ mod tests {
serde_json::Value::Null
);
// key desc
// key asc
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Desc,
order: Order::Asc,
target: OrderTarget::Key,
}),
..Default::default()
@@ -957,14 +972,14 @@ mod tests {
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
// key desc, size cut_off
// key asc, size cut_off
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Desc,
order: Order::Asc,
target: OrderTarget::Key,
}),
size: Some(2),
@@ -987,14 +1002,14 @@ mod tests {
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
// key desc, segment_size cut_off
// key asc, segment_size cut_off
let agg_req: Aggregations = vec![(
"my_texts".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
field: "string_id".to_string(),
order: Some(CustomOrder {
order: Order::Desc,
order: Order::Asc,
target: OrderTarget::Key,
}),
size: Some(2),

View File

@@ -498,7 +498,7 @@ impl IntermediateTermBucketResult {
match req.order.target {
OrderTarget::Key => {
buckets.sort_by(|left, right| {
if req.order.order == Order::Asc {
if req.order.order == Order::Desc {
left.key.partial_cmp(&right.key)
} else {
right.key.partial_cmp(&left.key)

View File

@@ -1152,6 +1152,12 @@ mod tests {
r#"FieldNotFound("not_exist_field")"#
);
let agg_res = avg_on_field("scores_i64");
assert_eq!(
format!("{:?}", agg_res),
r#"InvalidArgument("Invalid field cardinality on field scores_i64 expected SingleValue, but got MultiValues")"#
);
Ok(())
}

View File

@@ -135,8 +135,6 @@ impl InvertedIndexReader {
term_info: &TermInfo,
option: IndexRecordOption,
) -> io::Result<SegmentPostings> {
let option = option.downgrade(self.record_option);
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
let position_reader = {
if option.has_positions() {

View File

@@ -819,23 +819,20 @@ mod tests {
// This is a bit of a contrived example.
let tokens = PreTokenizedString {
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
tokens: vec![
Token {
// Not the last token, yet ends after the last token.
offset_from: 0,
offset_to: 14,
position: 0,
text: "long_token".to_string(),
position_length: 3,
},
Token {
offset_from: 0,
offset_to: 14,
position: 1,
text: "short".to_string(),
position_length: 1,
},
],
tokens: vec![Token { // Not the last token, yet ends after the last token.
offset_from: 0,
offset_to: 14,
position: 0,
text: "long_token".to_string(),
position_length: 3,
},
Token {
offset_from: 0,
offset_to: 14,
position: 1,
text: "short".to_string(),
position_length: 1,
}],
};
doc.add_pre_tokenized_text(text, tokens);
doc.add_text(text, "hello");

View File

@@ -1,9 +1,7 @@
use core::fmt::Debug;
use std::ops::RangeInclusive;
use std::sync::Arc;
use columnar::column_index::ColumnIndexSelectCursor;
use columnar::{Column, ColumnValues};
use columnar::Column;
use crate::fastfield::MakeZero;
use crate::{DocId, DocSet, TERMINATED};
@@ -45,9 +43,7 @@ impl VecCursor {
pub(crate) struct RangeDocSet<T: MakeZero> {
/// The range filter on the values.
value_range: RangeInclusive<T>,
column_index_select_cursor: ColumnIndexSelectCursor,
column_values: Arc<dyn ColumnValues<T>>,
column: Column<T>,
/// The next docid start range to fetch (inclusive).
next_fetch_start: u32,
/// Number of docs range checked in a batch.
@@ -67,15 +63,13 @@ pub(crate) struct RangeDocSet<T: MakeZero> {
const DEFAULT_FETCH_HORIZON: u32 = 128;
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
let column_index_select_cursor = column.select_cursor();
let mut range_docset = Self {
value_range,
column_values: column.values,
column,
loaded_docs: VecCursor::new(),
next_fetch_start: 0,
fetch_horizon: DEFAULT_FETCH_HORIZON,
last_seek_pos_opt: None,
column_index_select_cursor,
};
range_docset.reset_fetch_range();
range_docset.fetch_block();
@@ -112,21 +106,26 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSe
fn fetch_horizon(&mut self, horizon: u32) -> bool {
let mut finished_to_end = false;
let limit = self.column_values.num_vals();
let limit = self.column.values.num_vals();
let mut end = self.next_fetch_start + horizon;
if end >= limit {
end = limit;
finished_to_end = true;
}
let last_value = self.loaded_docs.last_value();
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
self.column_values.get_docids_for_value_range(
self.column.values.get_docids_for_value_range(
self.value_range.clone(),
self.next_fetch_start..end,
doc_buffer,
);
self.column_index_select_cursor
.select_batch_in_place(doc_buffer);
self.column.idx.select_batch_in_place(doc_buffer);
if let Some(last_value) = last_value {
while self.loaded_docs.current() == Some(last_value) {
self.loaded_docs.next();
}
}
self.next_fetch_start = end;
finished_to_end
@@ -139,7 +138,7 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for
if let Some(docid) = self.loaded_docs.next() {
return docid;
}
if self.next_fetch_start >= self.column_values.num_vals() {
if self.next_fetch_start >= self.column.values.num_vals() {
return TERMINATED;
}
self.fetch_block();

View File

@@ -109,7 +109,6 @@ impl TermQuery {
} else {
IndexRecordOption::Basic
};
Ok(TermWeight::new(
self.term.clone(),
index_record_option,

View File

@@ -49,17 +49,4 @@ impl IndexRecordOption {
IndexRecordOption::WithFreqsAndPositions => true,
}
}
/// Downgrades to the next level if provided `IndexRecordOption` is unavailable.
pub fn downgrade(&self, other: IndexRecordOption) -> IndexRecordOption {
use IndexRecordOption::*;
match (other, self) {
(WithFreqsAndPositions, WithFreqsAndPositions) => WithFreqsAndPositions,
(WithFreqs, WithFreqs) => WithFreqs,
(WithFreqsAndPositions, WithFreqs) => WithFreqs,
(WithFreqs, WithFreqsAndPositions) => WithFreqs,
_ => Basic,
}
}
}