mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-07 09:32:54 +00:00
Compare commits
5 Commits
columnar-c
...
fast-u64-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d30cafa80a | ||
|
|
36c6138e7f | ||
|
|
7a9befd18d | ||
|
|
62c811df2b | ||
|
|
03345f0aa2 |
@@ -23,7 +23,7 @@ regex = { version = "1.5.5", default-features = false, features = ["std", "unico
|
|||||||
aho-corasick = "0.7"
|
aho-corasick = "0.7"
|
||||||
tantivy-fst = "0.4.0"
|
tantivy-fst = "0.4.0"
|
||||||
memmap2 = { version = "0.5.3", optional = true }
|
memmap2 = { version = "0.5.3", optional = true }
|
||||||
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
|
lz4_flex = { version = "0.10", default-features = false, features = ["checked-decode"], optional = true }
|
||||||
brotli = { version = "3.3.4", optional = true }
|
brotli = { version = "3.3.4", optional = true }
|
||||||
zstd = { version = "0.12", optional = true, default-features = false }
|
zstd = { version = "0.12", optional = true, default-features = false }
|
||||||
snap = { version = "1.0.5", optional = true }
|
snap = { version = "1.0.5", optional = true }
|
||||||
|
|||||||
@@ -22,6 +22,11 @@ tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
|
|||||||
proptest = "1"
|
proptest = "1"
|
||||||
more-asserts = "0.3.1"
|
more-asserts = "0.3.1"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
|
criterion = "0.4"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
unstable = []
|
unstable = []
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "bench_index"
|
||||||
|
harness = false
|
||||||
|
|||||||
91
columnar/benches/bench_index.rs
Normal file
91
columnar/benches/bench_index.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
use std::ops::Range;
|
||||||
|
|
||||||
|
use criterion::*;
|
||||||
|
use rand::prelude::*;
|
||||||
|
use tantivy_columnar::column_index::MultiValueIndex;
|
||||||
|
use tantivy_columnar::RowId;
|
||||||
|
|
||||||
|
const WINDOW: usize = 40;
|
||||||
|
|
||||||
|
fn bench_multi_value_index_util(
|
||||||
|
len_range: Range<u32>,
|
||||||
|
num_rows: RowId,
|
||||||
|
select_value_ratio: f64,
|
||||||
|
b: &mut criterion::Bencher,
|
||||||
|
) {
|
||||||
|
let mut start_index: Vec<RowId> = vec![0u32];
|
||||||
|
let mut cursor: u32 = 0u32;
|
||||||
|
let mut rng = StdRng::from_seed([16u8; 32]);
|
||||||
|
for i in 0..num_rows {
|
||||||
|
let num_vals = rng.gen_range(len_range.clone());
|
||||||
|
cursor += num_vals;
|
||||||
|
start_index.push(cursor);
|
||||||
|
}
|
||||||
|
let select_rows: Vec<RowId> = (0u32..cursor)
|
||||||
|
.filter(|i| rng.gen_bool(select_value_ratio))
|
||||||
|
.collect();
|
||||||
|
let mv_index = MultiValueIndex::for_test(&start_index);
|
||||||
|
|
||||||
|
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
b.iter(|| {
|
||||||
|
let mut start_row = 0u32;
|
||||||
|
let mut len = 0;
|
||||||
|
for chunk in select_rows.chunks(WINDOW) {
|
||||||
|
buffer.clear();
|
||||||
|
buffer.extend_from_slice(chunk);
|
||||||
|
mv_index.select_batch_in_place(start_row, &mut buffer);
|
||||||
|
start_row = buffer.last().copied().unwrap();
|
||||||
|
len += buffer.len()
|
||||||
|
}
|
||||||
|
assert_eq!(len, 4303);
|
||||||
|
len
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bench_multi_value_index_util2(
|
||||||
|
len_range: Range<u32>,
|
||||||
|
num_rows: RowId,
|
||||||
|
select_value_ratio: f64,
|
||||||
|
b: &mut criterion::Bencher,
|
||||||
|
) {
|
||||||
|
let mut start_index: Vec<RowId> = vec![0u32];
|
||||||
|
let mut cursor: u32 = 0u32;
|
||||||
|
let mut rng = StdRng::from_seed([16u8; 32]);
|
||||||
|
for i in 0..num_rows {
|
||||||
|
let num_vals = rng.gen_range(len_range.clone());
|
||||||
|
cursor += num_vals;
|
||||||
|
start_index.push(cursor);
|
||||||
|
}
|
||||||
|
let select_rows: Vec<RowId> = (0u32..cursor)
|
||||||
|
.filter(|i| rng.gen_bool(select_value_ratio))
|
||||||
|
.collect();
|
||||||
|
let mv_index = MultiValueIndex::for_test(&start_index);
|
||||||
|
|
||||||
|
// mv_index.select_batch_in_place(0, &mut select_rows[..]);
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
b.iter(|| {
|
||||||
|
let mut mv_index_cursor = mv_index.select_cursor();
|
||||||
|
let mut len = 0;
|
||||||
|
for chunk in select_rows.chunks(WINDOW) {
|
||||||
|
buffer.clear();
|
||||||
|
buffer.extend_from_slice(chunk);
|
||||||
|
mv_index_cursor.select_batch_in_place(&mut buffer);
|
||||||
|
len += buffer.len();
|
||||||
|
}
|
||||||
|
assert_eq!(len, 4303);
|
||||||
|
len
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn select_benchmark(c: &mut criterion::Criterion) {
|
||||||
|
c.bench_function("bench_multi_value_index_10_100", |b| {
|
||||||
|
bench_multi_value_index_util(0..10, 100_000, 0.01f64, b)
|
||||||
|
});
|
||||||
|
c.bench_function("bench_multi_value_cursor_index_10_100", |b| {
|
||||||
|
bench_multi_value_index_util2(0..10, 100_000, 0.01f64, b)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
criterion_group!(benches, select_benchmark);
|
||||||
|
criterion_main!(benches);
|
||||||
17
columnar/columnar-cli/Cargo.toml
Normal file
17
columnar/columnar-cli/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
[package]
|
||||||
|
name = "tantivy-columnar-cli"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
license = "MIT"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
columnar = {path="../", package="tantivy-columnar"}
|
||||||
|
serde_json = "1"
|
||||||
|
serde_json_borrow = {git="https://github.com/PSeitz/serde_json_borrow/"}
|
||||||
|
serde = "1"
|
||||||
|
|
||||||
|
[workspace]
|
||||||
|
members = []
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
debug = true
|
||||||
134
columnar/columnar-cli/src/main.rs
Normal file
134
columnar/columnar-cli/src/main.rs
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
use columnar::ColumnarWriter;
|
||||||
|
use columnar::NumericalValue;
|
||||||
|
use serde_json_borrow;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io;
|
||||||
|
use std::io::BufRead;
|
||||||
|
use std::io::BufReader;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct JsonStack {
|
||||||
|
path: String,
|
||||||
|
stack: Vec<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JsonStack {
|
||||||
|
fn push(&mut self, seg: &str) {
|
||||||
|
let len = self.path.len();
|
||||||
|
self.stack.push(len);
|
||||||
|
self.path.push('.');
|
||||||
|
self.path.push_str(seg);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pop(&mut self) {
|
||||||
|
if let Some(len) = self.stack.pop() {
|
||||||
|
self.path.truncate(len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn path(&self) -> &str {
|
||||||
|
&self.path[1..]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn append_json_to_columnar(
|
||||||
|
doc: u32,
|
||||||
|
json_value: &serde_json_borrow::Value,
|
||||||
|
columnar: &mut ColumnarWriter,
|
||||||
|
stack: &mut JsonStack,
|
||||||
|
) -> usize {
|
||||||
|
let mut count = 0;
|
||||||
|
match json_value {
|
||||||
|
serde_json_borrow::Value::Null => {}
|
||||||
|
serde_json_borrow::Value::Bool(val) => {
|
||||||
|
columnar.record_numerical(
|
||||||
|
doc,
|
||||||
|
stack.path(),
|
||||||
|
NumericalValue::from(if *val { 1u64 } else { 0u64 }),
|
||||||
|
);
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
serde_json_borrow::Value::Number(num) => {
|
||||||
|
let numerical_value: NumericalValue = if let Some(num_i64) = num.as_i64() {
|
||||||
|
num_i64.into()
|
||||||
|
} else if let Some(num_u64) = num.as_u64() {
|
||||||
|
num_u64.into()
|
||||||
|
} else if let Some(num_f64) = num.as_f64() {
|
||||||
|
num_f64.into()
|
||||||
|
} else {
|
||||||
|
panic!();
|
||||||
|
};
|
||||||
|
count += 1;
|
||||||
|
columnar.record_numerical(
|
||||||
|
doc,
|
||||||
|
stack.path(),
|
||||||
|
numerical_value,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
serde_json_borrow::Value::Str(msg) => {
|
||||||
|
columnar.record_str(
|
||||||
|
doc,
|
||||||
|
stack.path(),
|
||||||
|
msg,
|
||||||
|
);
|
||||||
|
count += 1;
|
||||||
|
},
|
||||||
|
serde_json_borrow::Value::Array(vals) => {
|
||||||
|
for val in vals {
|
||||||
|
count += append_json_to_columnar(doc, val, columnar, stack);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
serde_json_borrow::Value::Object(json_map) => {
|
||||||
|
for (child_key, child_val) in json_map {
|
||||||
|
stack.push(child_key);
|
||||||
|
count += append_json_to_columnar(doc, child_val, columnar, stack);
|
||||||
|
stack.pop();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
count
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> io::Result<()> {
|
||||||
|
let file = File::open("gh_small.json")?;
|
||||||
|
let mut reader = BufReader::new(file);
|
||||||
|
let mut line = String::with_capacity(100);
|
||||||
|
let mut columnar = columnar::ColumnarWriter::default();
|
||||||
|
let mut doc = 0;
|
||||||
|
let start = Instant::now();
|
||||||
|
let mut stack = JsonStack::default();
|
||||||
|
let mut total_count = 0;
|
||||||
|
|
||||||
|
let start_build = Instant::now();
|
||||||
|
loop {
|
||||||
|
line.clear();
|
||||||
|
let len = reader.read_line(&mut line)?;
|
||||||
|
if len == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let Ok(json_value) = serde_json::from_str::<serde_json_borrow::Value>(&line) else { continue; };
|
||||||
|
total_count += append_json_to_columnar(doc, &json_value, &mut columnar, &mut stack);
|
||||||
|
doc += 1;
|
||||||
|
}
|
||||||
|
println!("Build in {:?}", start_build.elapsed());
|
||||||
|
|
||||||
|
println!("value count {total_count}");
|
||||||
|
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
let start_serialize = Instant::now();
|
||||||
|
columnar.serialize(doc, None, &mut buffer)?;
|
||||||
|
println!("Serialized in {:?}", start_serialize.elapsed());
|
||||||
|
println!("num docs: {doc}, {:?}", start.elapsed());
|
||||||
|
println!("buffer len {} MB", buffer.len() / 1_000_000);
|
||||||
|
let columnar = columnar::ColumnarReader::open(buffer)?;
|
||||||
|
for (column_name, dynamic_column) in columnar.list_columns()? {
|
||||||
|
let num_bytes = dynamic_column.num_bytes();
|
||||||
|
let typ = dynamic_column.column_type();
|
||||||
|
if num_bytes > 1_000_000 {
|
||||||
|
println!("{column_name} {typ:?} {} KB", num_bytes / 1_000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
println!("{} columns", columnar.num_columns());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -11,7 +11,6 @@
|
|||||||
|
|
||||||
# Perf and Size
|
# Perf and Size
|
||||||
* remove alloc in `ord_to_term`
|
* remove alloc in `ord_to_term`
|
||||||
+ multivaued range queries restrat frm the beginning all of the time.
|
|
||||||
* re-add ZSTD compression for dictionaries
|
* re-add ZSTD compression for dictionaries
|
||||||
no systematic monotonic mapping
|
no systematic monotonic mapping
|
||||||
consider removing multilinear
|
consider removing multilinear
|
||||||
|
|||||||
@@ -9,9 +9,90 @@ pub use merge::merge_column_index;
|
|||||||
pub use optional_index::{OptionalIndex, Set};
|
pub use optional_index::{OptionalIndex, Set};
|
||||||
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
|
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
|
||||||
|
|
||||||
use crate::column_index::multivalued_index::MultiValueIndex;
|
pub use crate::column_index::multivalued_index::{MultiValueIndex, MultiValueIndexCursor};
|
||||||
|
use crate::column_index::optional_index::OptionalIndexSelectCursor;
|
||||||
use crate::{Cardinality, RowId};
|
use crate::{Cardinality, RowId};
|
||||||
|
|
||||||
|
pub struct ColumnIndexSelectCursor {
|
||||||
|
last_rank: Option<RowId>,
|
||||||
|
cardinality_specific_impl: CardinalitySpecificSelectCursor,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<CardinalitySpecificSelectCursor> for ColumnIndexSelectCursor {
|
||||||
|
fn from(cardinality_specific_impl: CardinalitySpecificSelectCursor) -> Self {
|
||||||
|
ColumnIndexSelectCursor {
|
||||||
|
last_rank: None,
|
||||||
|
cardinality_specific_impl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum CardinalitySpecificSelectCursor {
|
||||||
|
Full,
|
||||||
|
Optional(OptionalIndexSelectCursor),
|
||||||
|
Multivalued(MultiValueIndexCursor),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This cursor object point is to compute batches of `select` operations.
|
||||||
|
///
|
||||||
|
/// Regardless of cardinality, a column index can always be seen as a mapping
|
||||||
|
/// from row_id -> start_value_row_id. By definition, it is increasing.
|
||||||
|
/// If `left <= right, column_index[left] <= column_index[right]`.
|
||||||
|
///
|
||||||
|
/// The select operation then identifies, given a value row id, which row it
|
||||||
|
/// belong to: it is the inverse mapping.
|
||||||
|
///
|
||||||
|
/// As a more formal definition, `select(rank)` is defined as the only `i` such that
|
||||||
|
/// mapping[i] <= rank and mapping[i+1] < rank.
|
||||||
|
/// Another way to define it is to say that it is the last i such that
|
||||||
|
/// mapping[i] <= rank.
|
||||||
|
/// Finally it can be defined as the number of `row_id` such that
|
||||||
|
/// mapping[i] <= rank.
|
||||||
|
///
|
||||||
|
/// `select_batch_in_place` is a complex function that copmutes
|
||||||
|
/// select operation in batches and in place.
|
||||||
|
///
|
||||||
|
/// For optimization reasons, it only supports supplying ever striclty increasing
|
||||||
|
/// values of `rank_ids`, even cross calls.
|
||||||
|
///
|
||||||
|
/// It is also required from the caller, to only supply rank_ids lower than max(mapping).
|
||||||
|
/// Within those condition, the returned `row_ids` are guaranteed to be unique.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics if the supplied rank_ids are not increasing from one call to another.
|
||||||
|
/// We only check that the `rank_ids` Vec is increasing in debug mode for
|
||||||
|
/// performance reason.
|
||||||
|
impl ColumnIndexSelectCursor {
|
||||||
|
/// Returns a list of
|
||||||
|
pub fn select_batch_in_place(&mut self, rank_ids: &mut Vec<RowId>) {
|
||||||
|
// `rank_ids` has to be sorted.
|
||||||
|
debug_assert!(rank_ids.windows(2).all(|window| window[0] < window[1]));
|
||||||
|
// Two consecutive calls must pass strictly increasing `rank_ids`.
|
||||||
|
let (Some(first_rank), Some(new_last_rank)) = (rank_ids.first().copied(), rank_ids.last().copied()) else {
|
||||||
|
// rank_ids is empty, there is nothing to do.
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if let Some(last_rank) = self.last_rank {
|
||||||
|
assert!(last_rank < first_rank);
|
||||||
|
}
|
||||||
|
self.last_rank = Some(new_last_rank);
|
||||||
|
match &mut self.cardinality_specific_impl {
|
||||||
|
CardinalitySpecificSelectCursor::Full => {
|
||||||
|
// No need to do anything:
|
||||||
|
// `value_idx` and `row_idx` are the same.
|
||||||
|
}
|
||||||
|
CardinalitySpecificSelectCursor::Optional(optional_index) => {
|
||||||
|
optional_index.select_batch_in_place(&mut rank_ids[..]);
|
||||||
|
}
|
||||||
|
CardinalitySpecificSelectCursor::Multivalued(multivalued_index) => {
|
||||||
|
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
|
||||||
|
multivalued_index.select_batch_in_place(rank_ids)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum ColumnIndex {
|
pub enum ColumnIndex {
|
||||||
Full,
|
Full,
|
||||||
@@ -67,18 +148,15 @@ impl ColumnIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn select_batch_in_place(&self, rank_ids: &mut Vec<RowId>) {
|
pub fn select_cursor(&self) -> ColumnIndexSelectCursor {
|
||||||
match self {
|
match self {
|
||||||
ColumnIndex::Full => {
|
ColumnIndex::Full => CardinalitySpecificSelectCursor::Full.into(),
|
||||||
// No need to do anything:
|
|
||||||
// value_idx and row_idx are the same.
|
|
||||||
}
|
|
||||||
ColumnIndex::Optional(optional_index) => {
|
ColumnIndex::Optional(optional_index) => {
|
||||||
optional_index.select_batch(&mut rank_ids[..]);
|
CardinalitySpecificSelectCursor::Optional(optional_index.select_cursor()).into()
|
||||||
}
|
}
|
||||||
ColumnIndex::Multivalued(multivalued_index) => {
|
ColumnIndex::Multivalued(multivalued_index) => {
|
||||||
// TODO important: avoid using 0u32, and restart from the beginning all of the time.
|
CardinalitySpecificSelectCursor::Multivalued(multivalued_index.select_cursor())
|
||||||
multivalued_index.select_batch_in_place(0u32, rank_ids)
|
.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,12 +35,6 @@ pub struct MultiValueIndex {
|
|||||||
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
|
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
|
|
||||||
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
|
|
||||||
MultiValueIndex { start_index_column }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MultiValueIndex {
|
impl MultiValueIndex {
|
||||||
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
|
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
@@ -64,78 +58,302 @@ impl MultiValueIndex {
|
|||||||
self.start_index_column.num_vals() - 1
|
self.start_index_column.num_vals() - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
|
pub fn select_cursor(&self) -> MultiValueIndexCursor {
|
||||||
/// row_ids. Positions are converted inplace to docids.
|
MultiValueIndexCursor {
|
||||||
|
multivalued_index: self.clone(),
|
||||||
|
row_cursor: 0u32,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MultiValueIndexCursor {
|
||||||
|
multivalued_index: MultiValueIndex,
|
||||||
|
row_cursor: RowId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MultiValueIndexCursor {
|
||||||
|
/// See contract in `ColumnIndexSelectCursor`.
|
||||||
///
|
///
|
||||||
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
|
/// Multi valued cardinality is special for two different
|
||||||
/// index.
|
/// ranks `rank_left` and `rank_right`, we can end up with
|
||||||
|
/// the same `select(rank_left)` and `select(rank_right)`.
|
||||||
///
|
///
|
||||||
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
|
/// For this reason, this function includes extra complexity
|
||||||
/// increasing positions.
|
/// to prevent the cursor from emitting the same row_id.
|
||||||
///
|
/// - From a last call, by skipping ranks mapping to
|
||||||
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
|
/// the same row_id
|
||||||
/// match a docid to its value position.
|
/// - With the batch, by simply deduplicating the output.
|
||||||
#[allow(clippy::bool_to_int_with_if)]
|
pub fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
|
||||||
pub(crate) fn select_batch_in_place(&self, row_start: RowId, ranks: &mut Vec<u32>) {
|
|
||||||
if ranks.is_empty() {
|
if ranks.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let mut cur_doc = row_start;
|
let mut row_cursor = self.row_cursor;
|
||||||
let mut last_doc = None;
|
|
||||||
|
|
||||||
assert!(self.start_index_column.get_val(row_start) as u32 <= ranks[0]);
|
let mut write_cursor_id = usize::MAX;
|
||||||
|
let mut last_written_row_id = u32::MAX;
|
||||||
|
|
||||||
let mut write_doc_pos = 0;
|
// We skip all of the ranks that we already passed.
|
||||||
for i in 0..ranks.len() {
|
//
|
||||||
let pos = ranks[i];
|
// It is possible in the case of multivalued, for a the first
|
||||||
loop {
|
// few rank to belong to the same row_id as the last rank
|
||||||
let end = self.start_index_column.get_val(cur_doc + 1) as u32;
|
// of the previous call.
|
||||||
if end > pos {
|
let start_bound = self
|
||||||
ranks[write_doc_pos] = cur_doc;
|
.multivalued_index
|
||||||
write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 };
|
.start_index_column
|
||||||
last_doc = Some(cur_doc);
|
.get_val(row_cursor);
|
||||||
break;
|
|
||||||
}
|
let mut skip = 0;
|
||||||
cur_doc += 1;
|
while ranks[skip] < start_bound {
|
||||||
|
skip += 1;
|
||||||
|
if skip == ranks.len() {
|
||||||
|
ranks.clear();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ranks.truncate(write_doc_pos);
|
|
||||||
|
for i in skip..ranks.len() {
|
||||||
|
let rank = ranks[i];
|
||||||
|
let row_id = loop {
|
||||||
|
// TODO See if we can find a way to introduce a function in
|
||||||
|
// ColumnValue to remove dynamic dispatch.
|
||||||
|
// This is tricky however... because it only applies to T=u32.
|
||||||
|
//
|
||||||
|
// TODO consider using exponential search.
|
||||||
|
let end = self
|
||||||
|
.multivalued_index
|
||||||
|
.start_index_column
|
||||||
|
.get_val(row_cursor + 1) as u32;
|
||||||
|
if end > rank {
|
||||||
|
break row_cursor;
|
||||||
|
}
|
||||||
|
row_cursor += 1;
|
||||||
|
};
|
||||||
|
// We remove duplicates in a branchless fashion: we only advance
|
||||||
|
// the write cursor when we are writing a value different from
|
||||||
|
// the last written value.
|
||||||
|
write_cursor_id =
|
||||||
|
write_cursor_id.wrapping_add(if row_id == last_written_row_id { 0 } else { 1 });
|
||||||
|
ranks[write_cursor_id] = row_id;
|
||||||
|
last_written_row_id = row_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.row_cursor = row_cursor + 1;
|
||||||
|
ranks.truncate(write_cursor_id + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::ops::Range;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::MultiValueIndex;
|
use super::MultiValueIndex;
|
||||||
use crate::column_values::IterColumn;
|
use crate::column_values::IterColumn;
|
||||||
use crate::{ColumnValues, RowId};
|
use crate::{ColumnValues, RowId};
|
||||||
|
use proptest::prelude::*;
|
||||||
|
|
||||||
fn index_to_pos_helper(
|
fn index_to_pos_helper(index: &MultiValueIndex, positions: &[u32]) -> Vec<u32> {
|
||||||
index: &MultiValueIndex,
|
|
||||||
doc_id_range: Range<u32>,
|
|
||||||
positions: &[u32],
|
|
||||||
) -> Vec<u32> {
|
|
||||||
let mut positions = positions.to_vec();
|
let mut positions = positions.to_vec();
|
||||||
index.select_batch_in_place(doc_id_range.start, &mut positions);
|
let mut cursor = index.select_cursor();
|
||||||
|
cursor.select_batch_in_place(&mut positions);
|
||||||
positions
|
positions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Value row id ranges are [0..10, 10..12, 12..15, etc.]
|
||||||
|
const START_OFFSETS: &[RowId] = &[0, 10, 12, 15, 22, 23];
|
||||||
|
|
||||||
|
#[track_caller]
|
||||||
|
fn test_multivalue_select_cursor_aux(
|
||||||
|
start_offsets: &'static [RowId],
|
||||||
|
ranks: &[RowId],
|
||||||
|
expected: &[RowId],
|
||||||
|
) {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from(start_offsets.iter().copied()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
assert_eq!(&index_to_pos_helper(&index, &ranks), expected);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_positions_to_docid() {
|
fn test_multivalue_select_cursor_empty() {
|
||||||
let offsets: Vec<RowId> = vec![0, 10, 12, 15, 22, 23]; // docid values are [0..10, 10..12, 12..15, etc.]
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[], &[]);
|
||||||
let column: Arc<dyn ColumnValues<RowId>> = Arc::new(IterColumn::from(offsets.into_iter()));
|
}
|
||||||
let index = MultiValueIndex::from(column);
|
|
||||||
assert_eq!(index.num_rows(), 5);
|
#[test]
|
||||||
let positions = &[10u32, 11, 15, 20, 21, 22];
|
fn test_multivalue_select_cursor_single() {
|
||||||
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[9], &[0]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[10], &[1]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[11], &[1]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]);
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[12], &[2]);
|
||||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12]), vec![2]);
|
}
|
||||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]);
|
|
||||||
assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]);
|
#[test]
|
||||||
|
fn test_multivalue_select_cursor_duplicates() {
|
||||||
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[12, 14], &[2]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multivalue_select_cursor_complex() {
|
||||||
|
test_multivalue_select_cursor_aux(START_OFFSETS, &[10, 11, 15, 20, 21, 22], &[1, 3, 4])
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multivalue_select_corner_case_skip_all() {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from([0, 10].into_iter()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
let mut cursor = index.select_cursor();
|
||||||
|
{
|
||||||
|
let mut ranks = vec![0];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[0]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut ranks = vec![5];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multi_value_index_cursor_bug() {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from([0, 10].into_iter()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
let mut cursor = index.select_cursor();
|
||||||
|
{
|
||||||
|
let mut ranks = vec![0];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[0]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut ranks = vec![4];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut ranks = vec![9];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_multivalue_select_cursor_skip_already_emitted() {
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from(START_OFFSETS.iter().copied()));
|
||||||
|
let index = MultiValueIndex {
|
||||||
|
start_index_column: column,
|
||||||
|
};
|
||||||
|
let mut cursor = index.select_cursor();
|
||||||
|
{
|
||||||
|
let mut ranks = vec![1, 10];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[0, 1]);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
// Here we skip row_id = 1.
|
||||||
|
let mut ranks = vec![11, 12];
|
||||||
|
cursor.select_batch_in_place(&mut ranks);
|
||||||
|
assert_eq!(ranks, &[2]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_index_strategy() -> impl Strategy<Value = Vec<RowId>> {
|
||||||
|
proptest::collection::vec(0u32..3u32, 1..6)
|
||||||
|
.prop_map(|deltas: Vec<u32>| {
|
||||||
|
let mut start_offsets: Vec<RowId> = Vec::with_capacity(deltas.len() + 1);
|
||||||
|
let mut cumul = 0u32;
|
||||||
|
start_offsets.push(cumul);
|
||||||
|
for delta in deltas {
|
||||||
|
cumul += delta;
|
||||||
|
if cumul >= 10 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
start_offsets.push(cumul);
|
||||||
|
}
|
||||||
|
start_offsets.push(10);
|
||||||
|
start_offsets
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_strategy() -> impl Strategy<Value = Vec<Vec<RowId>> > {
|
||||||
|
proptest::collection::btree_set(0u32..10u32, 1..=10)
|
||||||
|
.prop_flat_map(|els| {
|
||||||
|
let els: Vec<RowId> = els.into_iter().collect();
|
||||||
|
proptest::collection::btree_set(0..els.len(), 0..els.len())
|
||||||
|
.prop_map(move |mut split_positions| {
|
||||||
|
split_positions.insert(els.len());
|
||||||
|
let mut queries: Vec<Vec<RowId>> = Vec::with_capacity(split_positions.len() + 1);
|
||||||
|
let mut cursor = 0;
|
||||||
|
for split_position in split_positions {
|
||||||
|
queries.push(els[cursor..split_position].to_vec());
|
||||||
|
cursor = split_position;
|
||||||
|
}
|
||||||
|
queries
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Simple inefficient implementation used for reference.
|
||||||
|
struct SimpleSelectCursor {
|
||||||
|
start_indexes: Vec<RowId>,
|
||||||
|
last_emitted_row_id: Option<RowId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SimpleSelectCursor {
|
||||||
|
fn select(&self, rank: u32) -> RowId {
|
||||||
|
for i in 0..self.start_indexes.len() - 1 {
|
||||||
|
if self.start_indexes[i] <= rank && self.start_indexes[i + 1] > rank{
|
||||||
|
return i as u32;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn select_batch_in_place(&mut self, ranks: &mut Vec<RowId>) {
|
||||||
|
if ranks.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for rank in ranks.iter_mut() {
|
||||||
|
*rank = self.select(*rank);
|
||||||
|
}
|
||||||
|
ranks.dedup();
|
||||||
|
if ranks.first().copied() == self.last_emitted_row_id {
|
||||||
|
ranks.remove(0);
|
||||||
|
}
|
||||||
|
if let Some(last_emitted) = ranks.last().copied() {
|
||||||
|
self.last_emitted_row_id = Some(last_emitted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
proptest! {
|
||||||
|
#[test]
|
||||||
|
fn test_multi_value_index_cursor_proptest(start_indexes in start_index_strategy(), mut queries in query_strategy()) {
|
||||||
|
let mut simple_select_cursor = SimpleSelectCursor {
|
||||||
|
start_indexes: start_indexes.clone(),
|
||||||
|
last_emitted_row_id: None
|
||||||
|
};
|
||||||
|
let column: Arc<dyn ColumnValues<RowId>> =
|
||||||
|
Arc::new(IterColumn::from(start_indexes.into_iter()));
|
||||||
|
let index = MultiValueIndex { start_index_column: column };
|
||||||
|
let mut select_cursor = index.select_cursor();
|
||||||
|
for query in queries.iter_mut() {
|
||||||
|
let mut query_clone = query.clone();
|
||||||
|
select_cursor.select_batch_in_place(query);
|
||||||
|
simple_select_cursor.select_batch_in_place(&mut query_clone);
|
||||||
|
assert_eq!(&query[..], &query_clone[..]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -118,17 +118,17 @@ impl<'a> BlockSelectCursor<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub struct OptionalIndexSelectCursor<'a> {
|
pub struct OptionalIndexSelectCursor {
|
||||||
current_block_cursor: BlockSelectCursor<'a>,
|
current_block_cursor: BlockSelectCursor<'static>,
|
||||||
current_block_id: u16,
|
current_block_id: u16,
|
||||||
// The current block is guaranteed to contain ranks < end_rank.
|
// The current block is guaranteed to contain ranks < end_rank.
|
||||||
current_block_end_rank: RowId,
|
current_block_end_rank: RowId,
|
||||||
optional_index: &'a OptionalIndex,
|
optional_index: OptionalIndex,
|
||||||
block_doc_idx_start: RowId,
|
block_doc_idx_start: RowId,
|
||||||
num_null_rows_before_block: RowId,
|
num_null_rows_before_block: RowId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> OptionalIndexSelectCursor<'a> {
|
impl OptionalIndexSelectCursor {
|
||||||
fn search_and_load_block(&mut self, rank: RowId) {
|
fn search_and_load_block(&mut self, rank: RowId) {
|
||||||
if rank < self.current_block_end_rank {
|
if rank < self.current_block_end_rank {
|
||||||
// we are already in the right block
|
// we are already in the right block
|
||||||
@@ -145,14 +145,23 @@ impl<'a> OptionalIndexSelectCursor<'a> {
|
|||||||
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
|
let block_meta = self.optional_index.block_metas[self.current_block_id as usize];
|
||||||
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
|
self.num_null_rows_before_block = block_meta.non_null_rows_before_block;
|
||||||
let block: Block<'_> = self.optional_index.block(block_meta);
|
let block: Block<'_> = self.optional_index.block(block_meta);
|
||||||
self.current_block_cursor = match block {
|
let current_block_cursor = match block {
|
||||||
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
|
Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()),
|
||||||
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
|
Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()),
|
||||||
};
|
};
|
||||||
|
// We are building a self-owned `OptionalIndexSelectCursor`.
|
||||||
|
self.current_block_cursor = unsafe { std::mem::transmute(current_block_cursor) };
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn select_batch_in_place(&mut self, ranks: &mut [RowId]) {
|
||||||
|
// TODO see if we can batch at the block level as well for optimization purposes.
|
||||||
|
for rank in ranks {
|
||||||
|
*rank = self.select(*rank);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor {
|
||||||
fn select(&mut self, rank: RowId) -> RowId {
|
fn select(&mut self, rank: RowId) -> RowId {
|
||||||
self.search_and_load_block(rank);
|
self.search_and_load_block(rank);
|
||||||
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
|
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
|
||||||
@@ -161,7 +170,7 @@ impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Set<RowId> for OptionalIndex {
|
impl Set<RowId> for OptionalIndex {
|
||||||
type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b;
|
type SelectCursor<'a> = OptionalIndexSelectCursor;
|
||||||
// Check if value at position is not null.
|
// Check if value at position is not null.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn contains(&self, row_id: RowId) -> bool {
|
fn contains(&self, row_id: RowId) -> bool {
|
||||||
@@ -220,14 +229,14 @@ impl Set<RowId> for OptionalIndex {
|
|||||||
block_doc_idx_start + in_block_rank as u32
|
block_doc_idx_start + in_block_rank as u32
|
||||||
}
|
}
|
||||||
|
|
||||||
fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> {
|
fn select_cursor(&self) -> OptionalIndexSelectCursor {
|
||||||
OptionalIndexSelectCursor {
|
OptionalIndexSelectCursor {
|
||||||
current_block_cursor: BlockSelectCursor::Sparse(
|
current_block_cursor: BlockSelectCursor::Sparse(
|
||||||
SparseBlockCodec::open(b"").select_cursor(),
|
SparseBlockCodec::open(b"").select_cursor(),
|
||||||
),
|
),
|
||||||
current_block_id: 0u16,
|
current_block_id: 0u16,
|
||||||
current_block_end_rank: 0u32, //< this is sufficient to force the first load
|
current_block_end_rank: 0u32, //< this is sufficient to force the first load
|
||||||
optional_index: self,
|
optional_index: self.clone(),
|
||||||
block_doc_idx_start: 0u32,
|
block_doc_idx_start: 0u32,
|
||||||
num_null_rows_before_block: 0u32,
|
num_null_rows_before_block: 0u32,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use common::{BinarySerializable, OwnedBytes};
|
use common::{BinarySerializable, OwnedBytes};
|
||||||
|
|
||||||
|
use crate::column_index::MultiValueIndex;
|
||||||
use crate::column_values::monotonic_mapping::{
|
use crate::column_values::monotonic_mapping::{
|
||||||
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ extern crate test;
|
|||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
mod column;
|
mod column;
|
||||||
mod column_index;
|
pub mod column_index;
|
||||||
pub mod column_values;
|
pub mod column_values;
|
||||||
mod columnar;
|
mod columnar;
|
||||||
mod dictionary;
|
mod dictionary;
|
||||||
|
|||||||
@@ -322,7 +322,6 @@ impl SegmentTermCollector {
|
|||||||
let mut entries: Vec<(u32, TermBucketEntry)> =
|
let mut entries: Vec<(u32, TermBucketEntry)> =
|
||||||
self.term_buckets.entries.into_iter().collect();
|
self.term_buckets.entries.into_iter().collect();
|
||||||
|
|
||||||
let order_by_key = self.req.order.target == OrderTarget::Key;
|
|
||||||
let order_by_sub_aggregation =
|
let order_by_sub_aggregation =
|
||||||
matches!(self.req.order.target, OrderTarget::SubAggregation(_));
|
matches!(self.req.order.target, OrderTarget::SubAggregation(_));
|
||||||
|
|
||||||
@@ -351,7 +350,7 @@ impl SegmentTermCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (term_doc_count_before_cutoff, mut sum_other_doc_count) = if order_by_sub_aggregation {
|
let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation {
|
||||||
(0, 0)
|
(0, 0)
|
||||||
} else {
|
} else {
|
||||||
cut_off_buckets(&mut entries, self.req.segment_size as usize)
|
cut_off_buckets(&mut entries, self.req.segment_size as usize)
|
||||||
@@ -393,20 +392,6 @@ impl SegmentTermCollector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if order_by_key {
|
|
||||||
let mut dict_entries = dict.into_iter().collect_vec();
|
|
||||||
if self.req.order.order == Order::Desc {
|
|
||||||
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key1.cmp(key2));
|
|
||||||
} else {
|
|
||||||
dict_entries.sort_unstable_by(|(key1, _), (key2, _)| key2.cmp(key1));
|
|
||||||
}
|
|
||||||
let (_, sum_other_docs) =
|
|
||||||
cut_off_buckets(&mut dict_entries, self.req.segment_size as usize);
|
|
||||||
|
|
||||||
sum_other_doc_count += sum_other_docs;
|
|
||||||
dict = dict_entries.into_iter().collect();
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(IntermediateBucketResult::Terms(
|
Ok(IntermediateBucketResult::Terms(
|
||||||
IntermediateTermBucketResult {
|
IntermediateTermBucketResult {
|
||||||
entries: dict,
|
entries: dict,
|
||||||
@@ -857,14 +842,14 @@ mod tests {
|
|||||||
];
|
];
|
||||||
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
|
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
|
||||||
|
|
||||||
// key desc
|
// key asc
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Desc,
|
order: Order::Asc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
@@ -891,7 +876,7 @@ mod tests {
|
|||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Desc,
|
order: Order::Asc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
@@ -915,14 +900,14 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 3);
|
||||||
|
|
||||||
// key desc and segment_size cut_off
|
// key asc and segment_size cut_off
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Desc,
|
order: Order::Asc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
@@ -945,14 +930,14 @@ mod tests {
|
|||||||
serde_json::Value::Null
|
serde_json::Value::Null
|
||||||
);
|
);
|
||||||
|
|
||||||
// key asc
|
// key desc
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Asc,
|
order: Order::Desc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
@@ -972,14 +957,14 @@ mod tests {
|
|||||||
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
|
assert_eq!(res["my_texts"]["buckets"][2]["doc_count"], 5);
|
||||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
|
||||||
|
|
||||||
// key asc, size cut_off
|
// key desc, size cut_off
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Asc,
|
order: Order::Desc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
@@ -1002,14 +987,14 @@ mod tests {
|
|||||||
);
|
);
|
||||||
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
|
assert_eq!(res["my_texts"]["sum_other_doc_count"], 5);
|
||||||
|
|
||||||
// key asc, segment_size cut_off
|
// key desc, segment_size cut_off
|
||||||
let agg_req: Aggregations = vec![(
|
let agg_req: Aggregations = vec![(
|
||||||
"my_texts".to_string(),
|
"my_texts".to_string(),
|
||||||
Aggregation::Bucket(BucketAggregation {
|
Aggregation::Bucket(BucketAggregation {
|
||||||
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
bucket_agg: BucketAggregationType::Terms(TermsAggregation {
|
||||||
field: "string_id".to_string(),
|
field: "string_id".to_string(),
|
||||||
order: Some(CustomOrder {
|
order: Some(CustomOrder {
|
||||||
order: Order::Asc,
|
order: Order::Desc,
|
||||||
target: OrderTarget::Key,
|
target: OrderTarget::Key,
|
||||||
}),
|
}),
|
||||||
size: Some(2),
|
size: Some(2),
|
||||||
|
|||||||
@@ -498,7 +498,7 @@ impl IntermediateTermBucketResult {
|
|||||||
match req.order.target {
|
match req.order.target {
|
||||||
OrderTarget::Key => {
|
OrderTarget::Key => {
|
||||||
buckets.sort_by(|left, right| {
|
buckets.sort_by(|left, right| {
|
||||||
if req.order.order == Order::Desc {
|
if req.order.order == Order::Asc {
|
||||||
left.key.partial_cmp(&right.key)
|
left.key.partial_cmp(&right.key)
|
||||||
} else {
|
} else {
|
||||||
right.key.partial_cmp(&left.key)
|
right.key.partial_cmp(&left.key)
|
||||||
|
|||||||
@@ -1152,12 +1152,6 @@ mod tests {
|
|||||||
r#"FieldNotFound("not_exist_field")"#
|
r#"FieldNotFound("not_exist_field")"#
|
||||||
);
|
);
|
||||||
|
|
||||||
let agg_res = avg_on_field("scores_i64");
|
|
||||||
assert_eq!(
|
|
||||||
format!("{:?}", agg_res),
|
|
||||||
r#"InvalidArgument("Invalid field cardinality on field scores_i64 expected SingleValue, but got MultiValues")"#
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -135,6 +135,8 @@ impl InvertedIndexReader {
|
|||||||
term_info: &TermInfo,
|
term_info: &TermInfo,
|
||||||
option: IndexRecordOption,
|
option: IndexRecordOption,
|
||||||
) -> io::Result<SegmentPostings> {
|
) -> io::Result<SegmentPostings> {
|
||||||
|
let option = option.downgrade(self.record_option);
|
||||||
|
|
||||||
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
|
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
|
||||||
let position_reader = {
|
let position_reader = {
|
||||||
if option.has_positions() {
|
if option.has_positions() {
|
||||||
|
|||||||
@@ -819,20 +819,23 @@ mod tests {
|
|||||||
// This is a bit of a contrived example.
|
// This is a bit of a contrived example.
|
||||||
let tokens = PreTokenizedString {
|
let tokens = PreTokenizedString {
|
||||||
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
|
text: "contrived-example".to_string(), //< I can't think of a use case where this corner case happens in real life.
|
||||||
tokens: vec![Token { // Not the last token, yet ends after the last token.
|
tokens: vec![
|
||||||
offset_from: 0,
|
Token {
|
||||||
offset_to: 14,
|
// Not the last token, yet ends after the last token.
|
||||||
position: 0,
|
offset_from: 0,
|
||||||
text: "long_token".to_string(),
|
offset_to: 14,
|
||||||
position_length: 3,
|
position: 0,
|
||||||
},
|
text: "long_token".to_string(),
|
||||||
Token {
|
position_length: 3,
|
||||||
offset_from: 0,
|
},
|
||||||
offset_to: 14,
|
Token {
|
||||||
position: 1,
|
offset_from: 0,
|
||||||
text: "short".to_string(),
|
offset_to: 14,
|
||||||
position_length: 1,
|
position: 1,
|
||||||
}],
|
text: "short".to_string(),
|
||||||
|
position_length: 1,
|
||||||
|
},
|
||||||
|
],
|
||||||
};
|
};
|
||||||
doc.add_pre_tokenized_text(text, tokens);
|
doc.add_pre_tokenized_text(text, tokens);
|
||||||
doc.add_text(text, "hello");
|
doc.add_text(text, "hello");
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
use core::fmt::Debug;
|
use core::fmt::Debug;
|
||||||
use std::ops::RangeInclusive;
|
use std::ops::RangeInclusive;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use columnar::Column;
|
use columnar::column_index::ColumnIndexSelectCursor;
|
||||||
|
use columnar::{Column, ColumnValues};
|
||||||
|
|
||||||
use crate::fastfield::MakeZero;
|
use crate::fastfield::MakeZero;
|
||||||
use crate::{DocId, DocSet, TERMINATED};
|
use crate::{DocId, DocSet, TERMINATED};
|
||||||
@@ -43,7 +45,9 @@ impl VecCursor {
|
|||||||
pub(crate) struct RangeDocSet<T: MakeZero> {
|
pub(crate) struct RangeDocSet<T: MakeZero> {
|
||||||
/// The range filter on the values.
|
/// The range filter on the values.
|
||||||
value_range: RangeInclusive<T>,
|
value_range: RangeInclusive<T>,
|
||||||
column: Column<T>,
|
column_index_select_cursor: ColumnIndexSelectCursor,
|
||||||
|
column_values: Arc<dyn ColumnValues<T>>,
|
||||||
|
|
||||||
/// The next docid start range to fetch (inclusive).
|
/// The next docid start range to fetch (inclusive).
|
||||||
next_fetch_start: u32,
|
next_fetch_start: u32,
|
||||||
/// Number of docs range checked in a batch.
|
/// Number of docs range checked in a batch.
|
||||||
@@ -63,13 +67,15 @@ pub(crate) struct RangeDocSet<T: MakeZero> {
|
|||||||
const DEFAULT_FETCH_HORIZON: u32 = 128;
|
const DEFAULT_FETCH_HORIZON: u32 = 128;
|
||||||
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
|
impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSet<T> {
|
||||||
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
|
pub(crate) fn new(value_range: RangeInclusive<T>, column: Column<T>) -> Self {
|
||||||
|
let column_index_select_cursor = column.select_cursor();
|
||||||
let mut range_docset = Self {
|
let mut range_docset = Self {
|
||||||
value_range,
|
value_range,
|
||||||
column,
|
column_values: column.values,
|
||||||
loaded_docs: VecCursor::new(),
|
loaded_docs: VecCursor::new(),
|
||||||
next_fetch_start: 0,
|
next_fetch_start: 0,
|
||||||
fetch_horizon: DEFAULT_FETCH_HORIZON,
|
fetch_horizon: DEFAULT_FETCH_HORIZON,
|
||||||
last_seek_pos_opt: None,
|
last_seek_pos_opt: None,
|
||||||
|
column_index_select_cursor,
|
||||||
};
|
};
|
||||||
range_docset.reset_fetch_range();
|
range_docset.reset_fetch_range();
|
||||||
range_docset.fetch_block();
|
range_docset.fetch_block();
|
||||||
@@ -106,26 +112,21 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> RangeDocSe
|
|||||||
fn fetch_horizon(&mut self, horizon: u32) -> bool {
|
fn fetch_horizon(&mut self, horizon: u32) -> bool {
|
||||||
let mut finished_to_end = false;
|
let mut finished_to_end = false;
|
||||||
|
|
||||||
let limit = self.column.values.num_vals();
|
let limit = self.column_values.num_vals();
|
||||||
let mut end = self.next_fetch_start + horizon;
|
let mut end = self.next_fetch_start + horizon;
|
||||||
if end >= limit {
|
if end >= limit {
|
||||||
end = limit;
|
end = limit;
|
||||||
finished_to_end = true;
|
finished_to_end = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
let last_value = self.loaded_docs.last_value();
|
|
||||||
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
|
let doc_buffer: &mut Vec<DocId> = self.loaded_docs.get_cleared_data();
|
||||||
self.column.values.get_docids_for_value_range(
|
self.column_values.get_docids_for_value_range(
|
||||||
self.value_range.clone(),
|
self.value_range.clone(),
|
||||||
self.next_fetch_start..end,
|
self.next_fetch_start..end,
|
||||||
doc_buffer,
|
doc_buffer,
|
||||||
);
|
);
|
||||||
self.column.idx.select_batch_in_place(doc_buffer);
|
self.column_index_select_cursor
|
||||||
if let Some(last_value) = last_value {
|
.select_batch_in_place(doc_buffer);
|
||||||
while self.loaded_docs.current() == Some(last_value) {
|
|
||||||
self.loaded_docs.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.next_fetch_start = end;
|
self.next_fetch_start = end;
|
||||||
|
|
||||||
finished_to_end
|
finished_to_end
|
||||||
@@ -138,7 +139,7 @@ impl<T: MakeZero + Send + Sync + PartialOrd + Copy + Debug + 'static> DocSet for
|
|||||||
if let Some(docid) = self.loaded_docs.next() {
|
if let Some(docid) = self.loaded_docs.next() {
|
||||||
return docid;
|
return docid;
|
||||||
}
|
}
|
||||||
if self.next_fetch_start >= self.column.values.num_vals() {
|
if self.next_fetch_start >= self.column_values.num_vals() {
|
||||||
return TERMINATED;
|
return TERMINATED;
|
||||||
}
|
}
|
||||||
self.fetch_block();
|
self.fetch_block();
|
||||||
|
|||||||
@@ -109,6 +109,7 @@ impl TermQuery {
|
|||||||
} else {
|
} else {
|
||||||
IndexRecordOption::Basic
|
IndexRecordOption::Basic
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(TermWeight::new(
|
Ok(TermWeight::new(
|
||||||
self.term.clone(),
|
self.term.clone(),
|
||||||
index_record_option,
|
index_record_option,
|
||||||
|
|||||||
@@ -49,4 +49,17 @@ impl IndexRecordOption {
|
|||||||
IndexRecordOption::WithFreqsAndPositions => true,
|
IndexRecordOption::WithFreqsAndPositions => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Downgrades to the next level if provided `IndexRecordOption` is unavailable.
|
||||||
|
pub fn downgrade(&self, other: IndexRecordOption) -> IndexRecordOption {
|
||||||
|
use IndexRecordOption::*;
|
||||||
|
|
||||||
|
match (other, self) {
|
||||||
|
(WithFreqsAndPositions, WithFreqsAndPositions) => WithFreqsAndPositions,
|
||||||
|
(WithFreqs, WithFreqs) => WithFreqs,
|
||||||
|
(WithFreqsAndPositions, WithFreqs) => WithFreqs,
|
||||||
|
(WithFreqs, WithFreqsAndPositions) => WithFreqs,
|
||||||
|
_ => Basic,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user