Compare commits

..

3 Commits

Author SHA1 Message Date
Pascal Seitz
2ce485b8cc skip estimate phase for merge multivalue index
precompute stats for merge multivalue index + disable Line encoding for
multivalue index. That combination allows to skip the first estimation
pass. This gives up to 2x on merge performance on multivalue indices.

This change may decrease compression as Line is
very good compressible for documents, which have a fixed amount of
values in each doc. The line codec should be replaced.

```
merge_multi_and_multi          Avg: 22.7880ms (-47.15%)    Median: 22.5469ms (-47.38%)    [22.3691ms .. 25.8392ms]
merge_dense_and_dense          Avg: 14.4398ms (+2.18%)     Median: 14.2465ms (+0.74%)     [14.1620ms .. 16.1270ms]
merge_sparse_and_sparse        Avg: 10.6559ms (+1.10%)     Median: 10.6318ms (+0.91%)     [10.5527ms .. 11.2848ms]
merge_sparse_and_dense         Avg: 12.4886ms (+1.52%)     Median: 12.4044ms (+0.84%)     [12.3261ms .. 13.9439ms]
merge_multi_and_dense          Avg: 25.6686ms (-45.56%)    Median: 25.4851ms (-45.84%)    [25.1618ms .. 27.6226ms]
merge_multi_and_sparse         Avg: 24.3278ms (-47.00%)    Median: 24.1917ms (-47.34%)    [23.7159ms .. 27.0513ms]
```
2024-06-11 20:22:00 +08:00
PSeitz
c3b92a5412 fix compiler warning, cleanup (#2393)
fix compiler warning for missing feature flag
remove unused variables
cleanup unused methods
2024-06-11 16:03:50 +08:00
PSeitz
2f55511064 extend indexwriter proptests (#2342)
* index random values in proptest

* add proptest with multiple docs
2024-06-11 16:02:57 +08:00
31 changed files with 998 additions and 1058 deletions

View File

@@ -22,7 +22,7 @@ impl Display for Card {
}
}
const NUM_DOCS: u32 = 100_000;
const NUM_DOCS: u32 = 1_000_000;
fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader {
use tantivy_columnar::ColumnarWriter;
@@ -88,12 +88,8 @@ fn main() {
let columnar_readers = columnar_readers.iter().collect::<Vec<_>>();
let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]);
let _ = black_box(merge_columnar(
&columnar_readers,
&[],
merge_row_order.into(),
&mut out,
));
merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap();
black_box(out);
},
);
}

View File

@@ -73,14 +73,18 @@ fn detect_cardinality(
pub fn merge_column_index<'a>(
columns: &'a [ColumnIndex],
merge_row_order: &'a MergeRowOrder,
num_values: u32,
) -> SerializableColumnIndex<'a> {
// For simplification, we do not try to detect whether the cardinality could be
// downgraded thanks to deletes.
let cardinality_after_merge = detect_cardinality(columns, merge_row_order);
match merge_row_order {
MergeRowOrder::Stack(stack_merge_order) => {
merge_column_index_stacked(columns, cardinality_after_merge, stack_merge_order)
}
MergeRowOrder::Stack(stack_merge_order) => merge_column_index_stacked(
columns,
cardinality_after_merge,
stack_merge_order,
num_values,
),
MergeRowOrder::Shuffled(complex_merge_order) => {
merge_column_index_shuffled(columns, cardinality_after_merge, complex_merge_order)
}
@@ -150,62 +154,69 @@ mod tests {
);
}
// #[test]
// fn test_merge_index_multivalued_sorted() {
// let column_indexes: Vec<ColumnIndex> = vec![MultiValueIndex::for_test(&[0, 2,
// 5]).into()]; let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test(
// &[2],
// vec![
// RowAddr {
// segment_ord: 0u32,
// row_id: 1u32,
// },
// RowAddr {
// segment_ord: 0u32,
// row_id: 0u32,
// },
// ],
// )
// .into();
// let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order);
// let SerializableColumnIndex::Multivalued(serializable_multivalue_index) =
// merged_column_index else { panic!("Excpected a multivalued index")
// };
// serializable_multivalue_index.doc_ids_with_values_opt.
// let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect();
// assert_eq!(&start_indexes, &[0, 3, 5]);
// }
#[test]
fn test_merge_index_multivalued_sorted() {
let column_indexes: Vec<ColumnIndex> = vec![MultiValueIndex::for_test(&[0, 2, 5]).into()];
let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test(
&[2],
vec![
RowAddr {
segment_ord: 0u32,
row_id: 1u32,
},
RowAddr {
segment_ord: 0u32,
row_id: 0u32,
},
],
)
.into();
let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 3);
let SerializableColumnIndex::Multivalued {
indices: start_index_iterable,
..
} = merged_column_index
else {
panic!("Excpected a multivalued index")
};
let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect();
assert_eq!(&start_indexes, &[0, 3, 5]);
}
// #[test]
// fn test_merge_index_multivalued_sorted_several_segment() {
// let column_indexes: Vec<ColumnIndex> = vec![
// MultiValueIndex::for_test(&[0, 2, 5]).into(),
// ColumnIndex::Empty { num_docs: 0 },
// MultiValueIndex::for_test(&[0, 1, 4]).into(),
// ];
// let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test(
// &[2, 0, 2],
// vec![
// RowAddr {
// segment_ord: 2u32,
// row_id: 1u32,
// },
// RowAddr {
// segment_ord: 0u32,
// row_id: 0u32,
// },
// RowAddr {
// segment_ord: 2u32,
// row_id: 0u32,
// },
// ],
// )
// .into();
// let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order);
// let SerializableColumnIndex::Multivalued(serializable_multivalue_index) =
// merged_column_index else { panic!("Excpected a multivalued index")
// };
// let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect();
// assert_eq!(&start_indexes, &[0, 3, 5, 6]);
// }
#[test]
fn test_merge_index_multivalued_sorted_several_segment() {
let column_indexes: Vec<ColumnIndex> = vec![
MultiValueIndex::for_test(&[0, 2, 5]).into(),
ColumnIndex::Empty { num_docs: 0 },
MultiValueIndex::for_test(&[0, 1, 4]).into(),
];
let merge_row_order: MergeRowOrder = ShuffleMergeOrder::for_test(
&[2, 0, 2],
vec![
RowAddr {
segment_ord: 2u32,
row_id: 1u32,
},
RowAddr {
segment_ord: 0u32,
row_id: 0u32,
},
RowAddr {
segment_ord: 2u32,
row_id: 0u32,
},
],
)
.into();
let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order, 6);
let SerializableColumnIndex::Multivalued {
indices: start_index_iterable,
..
} = merged_column_index
else {
panic!("Excpected a multivalued index")
};
let start_indexes: Vec<RowId> = start_index_iterable.boxed_iter().collect();
assert_eq!(&start_indexes, &[0, 3, 5, 6]);
}
}

View File

@@ -9,23 +9,25 @@ pub fn merge_column_index_shuffled<'a>(
cardinality_after_merge: Cardinality,
shuffle_merge_order: &'a ShuffleMergeOrder,
) -> SerializableColumnIndex<'a> {
todo!();
// match cardinality_after_merge {
// Cardinality::Full => SerializableColumnIndex::Full,
// Cardinality::Optional => {
// let non_null_row_ids =
// merge_column_index_shuffled_optional(column_indexes, shuffle_merge_order);
// SerializableColumnIndex::Optional {
// non_null_row_ids,
// num_rows: shuffle_merge_order.num_rows(),
// }
// }
// Cardinality::Multivalued => {
// let multivalue_start_index =
// merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order);
// SerializableColumnIndex::Multivalued(multivalue_start_index)
// }
// }
match cardinality_after_merge {
Cardinality::Full => SerializableColumnIndex::Full,
Cardinality::Optional => {
let non_null_row_ids =
merge_column_index_shuffled_optional(column_indexes, shuffle_merge_order);
SerializableColumnIndex::Optional {
non_null_row_ids,
num_rows: shuffle_merge_order.num_rows(),
}
}
Cardinality::Multivalued => {
let multivalue_start_index =
merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order);
SerializableColumnIndex::Multivalued {
indices: multivalue_start_index,
stats: None,
}
}
}
}
/// Merge several column indexes into one, ordering rows according to the merge_order passed as
@@ -138,35 +140,35 @@ mod tests {
assert!(integrate_num_vals([3, 0, 10, 20].into_iter()).eq([0, 3, 3, 13, 33].into_iter()));
}
// #[test]
// fn test_merge_column_index_optional_shuffle() {
// let optional_index: ColumnIndex = OptionalIndex::for_test(2, &[0]).into();
// let column_indexes = [optional_index, ColumnIndex::Full];
// let row_addrs = vec![
// RowAddr {
// segment_ord: 0u32,
// row_id: 1u32,
// },
// RowAddr {
// segment_ord: 1u32,
// row_id: 0u32,
// },
// ];
// let shuffle_merge_order = ShuffleMergeOrder::for_test(&[2, 1], row_addrs);
// let serializable_index = merge_column_index_shuffled(
// &column_indexes[..],
// Cardinality::Optional,
// &shuffle_merge_order,
// );
// let SerializableColumnIndex::Optional {
// non_null_row_ids,
// num_rows,
// } = serializable_index
// else {
// panic!()
// };
// assert_eq!(num_rows, 2);
// let non_null_rows: Vec<RowId> = non_null_row_ids.boxed_iter().collect();
// assert_eq!(&non_null_rows, &[1]);
// }
#[test]
fn test_merge_column_index_optional_shuffle() {
let optional_index: ColumnIndex = OptionalIndex::for_test(2, &[0]).into();
let column_indexes = [optional_index, ColumnIndex::Full];
let row_addrs = vec![
RowAddr {
segment_ord: 0u32,
row_id: 1u32,
},
RowAddr {
segment_ord: 1u32,
row_id: 0u32,
},
];
let shuffle_merge_order = ShuffleMergeOrder::for_test(&[2, 1], row_addrs);
let serializable_index = merge_column_index_shuffled(
&column_indexes[..],
Cardinality::Optional,
&shuffle_merge_order,
);
let SerializableColumnIndex::Optional {
non_null_row_ids,
num_rows,
} = serializable_index
else {
panic!()
};
assert_eq!(num_rows, 2);
let non_null_rows: Vec<RowId> = non_null_row_ids.boxed_iter().collect();
assert_eq!(&non_null_rows, &[1]);
}
}

View File

@@ -1,8 +1,8 @@
use std::ops::Range;
use std::iter;
use std::num::NonZeroU64;
use crate::column_index::multivalued_index::SerializableMultivalueIndex;
use crate::column_index::serialize::SerializableOptionalIndex;
use crate::column_index::SerializableColumnIndex;
use crate::column_index::{SerializableColumnIndex, Set};
use crate::column_values::ColumnStats;
use crate::iterable::Iterable;
use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder};
@@ -14,143 +14,37 @@ pub fn merge_column_index_stacked<'a>(
columns: &'a [ColumnIndex],
cardinality_after_merge: Cardinality,
stack_merge_order: &'a StackMergeOrder,
num_values: u32,
) -> SerializableColumnIndex<'a> {
match cardinality_after_merge {
Cardinality::Full => SerializableColumnIndex::Full,
Cardinality::Optional => SerializableColumnIndex::Optional(SerializableOptionalIndex {
Cardinality::Optional => SerializableColumnIndex::Optional {
non_null_row_ids: Box::new(StackedOptionalIndex {
columns,
stack_merge_order,
}),
num_rows: stack_merge_order.num_rows(),
}),
},
Cardinality::Multivalued => {
let serializable_multivalue_index =
make_serializable_multivalued_index(columns, stack_merge_order);
SerializableColumnIndex::Multivalued(serializable_multivalue_index)
let stacked_multivalued_index = StackedMultivaluedIndex {
columns,
stack_merge_order,
};
SerializableColumnIndex::Multivalued {
indices: Box::new(stacked_multivalued_index),
stats: Some(ColumnStats {
gcd: NonZeroU64::new(1).unwrap(),
// The values in the multivalue index are the positions of the values
min_value: 0,
max_value: num_values as u64,
// This is num docs, but it starts at 0 so we need +1
num_rows: stack_merge_order.num_rows() + 1,
}),
}
}
}
}
struct StackedDocIdsWithValues<'a> {
column_indexes: &'a [ColumnIndex],
stack_merge_order: &'a StackMergeOrder,
}
impl Iterable<u32> for StackedDocIdsWithValues<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new((0..self.column_indexes.len()).flat_map(|i| {
let column_index = &self.column_indexes[i];
let doc_range = self.stack_merge_order.columnar_range(i);
get_doc_ids_with_values(column_index, doc_range)
}))
}
}
fn get_doc_ids_with_values<'a>(
column_index: &'a ColumnIndex,
doc_range: Range<u32>,
) -> Box<dyn Iterator<Item = u32> + 'a> {
match column_index {
ColumnIndex::Empty { .. } => Box::new(0..0),
ColumnIndex::Full => Box::new(doc_range),
ColumnIndex::Optional(optional_index) => Box::new(
optional_index
.iter_rows()
.map(move |row| row + doc_range.start),
),
ColumnIndex::Multivalued(multivalued_index) => Box::new(
multivalued_index
.optional_index
.iter_rows()
.map(move |row| row + doc_range.start),
),
}
}
fn stack_doc_ids_with_values<'a>(
column_indexes: &'a [ColumnIndex],
stack_merge_order: &'a StackMergeOrder,
) -> SerializableOptionalIndex<'a> {
let num_rows = stack_merge_order.num_rows();
SerializableOptionalIndex {
non_null_row_ids: Box::new(StackedDocIdsWithValues {
column_indexes,
stack_merge_order,
}),
num_rows,
}
}
struct StackedStartOffsets<'a> {
column_indexes: &'a [ColumnIndex],
stack_merge_order: &'a StackMergeOrder,
}
fn get_num_values_iterator<'a>(
column_index: &'a ColumnIndex,
num_docs: u32,
) -> Box<dyn Iterator<Item = u32> + 'a> {
match column_index {
ColumnIndex::Empty { .. } => Box::new(std::iter::empty()),
ColumnIndex::Full => Box::new(std::iter::repeat(1u32).take(num_docs as usize)),
ColumnIndex::Optional(optional_index) => {
Box::new(std::iter::repeat(1u32).take(optional_index.num_non_nulls() as usize))
}
ColumnIndex::Multivalued(multivalued_index) => {
let vals: Vec<u32> = multivalued_index.start_index_column.iter().collect();
Box::new(
multivalued_index
.start_index_column
.iter()
.scan(0u32, |previous_start_offset, current_start_offset| {
let num_vals = current_start_offset - *previous_start_offset;
*previous_start_offset = current_start_offset;
Some(num_vals)
})
.skip(1),
)
}
}
}
impl<'a> Iterable for StackedStartOffsets<'a> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
let num_values_it = (0..self.column_indexes.len()).flat_map(|columnar_id| {
let num_docs = self.stack_merge_order.columnar_range(columnar_id).len() as u32;
let column_index = &self.column_indexes[columnar_id];
get_num_values_iterator(column_index, num_docs)
});
Box::new(std::iter::once(0u64).chain(num_values_it.into_iter().scan(
0u64,
|cumulated, el| {
*cumulated += el as u64;
Some(*cumulated)
},
)))
}
}
fn stack_start_offsets<'a>(
column_indexes: &'a [ColumnIndex],
stack_merge_order: &'a StackMergeOrder,
) -> Box<dyn Iterable + 'a> {
Box::new(StackedStartOffsets {
column_indexes,
stack_merge_order,
})
}
fn make_serializable_multivalued_index<'a>(
columns: &'a [ColumnIndex],
stack_merge_order: &'a StackMergeOrder,
) -> SerializableMultivalueIndex<'a> {
SerializableMultivalueIndex {
doc_ids_with_values: stack_doc_ids_with_values(columns, stack_merge_order),
start_offsets: stack_start_offsets(columns, stack_merge_order),
}
}
struct StackedOptionalIndex<'a> {
columns: &'a [ColumnIndex],
stack_merge_order: &'a StackMergeOrder,
@@ -181,3 +75,87 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
)
}
}
#[derive(Clone, Copy)]
struct StackedMultivaluedIndex<'a> {
columns: &'a [ColumnIndex],
stack_merge_order: &'a StackMergeOrder,
}
fn convert_column_opt_to_multivalued_index<'a>(
column_index_opt: &'a ColumnIndex,
num_rows: RowId,
) -> Box<dyn Iterator<Item = RowId> + 'a> {
match column_index_opt {
ColumnIndex::Empty { .. } => Box::new(iter::repeat(0u32).take(num_rows as usize + 1)),
ColumnIndex::Full => Box::new(0..num_rows + 1),
ColumnIndex::Optional(optional_index) => {
Box::new(
(0..num_rows)
// TODO optimize
.map(|row_id| optional_index.rank(row_id))
.chain(std::iter::once(optional_index.num_non_nulls())),
)
}
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.start_index_column.iter(),
}
}
impl<'a> Iterable<RowId> for StackedMultivaluedIndex<'a> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = RowId> + '_> {
let multivalued_indexes =
self.columns
.iter()
.enumerate()
.map(|(columnar_id, column_opt)| {
let num_rows =
self.stack_merge_order.columnar_range(columnar_id).len() as RowId;
convert_column_opt_to_multivalued_index(column_opt, num_rows)
});
stack_multivalued_indexes(multivalued_indexes)
}
}
// Refactor me
fn stack_multivalued_indexes<'a>(
mut multivalued_indexes: impl Iterator<Item = Box<dyn Iterator<Item = RowId> + 'a>> + 'a,
) -> Box<dyn Iterator<Item = RowId> + 'a> {
let mut offset = 0;
let mut last_row_id = 0;
let mut current_it = multivalued_indexes.next();
Box::new(std::iter::from_fn(move || loop {
if let Some(row_id) = current_it.as_mut()?.next() {
last_row_id = offset + row_id;
return Some(last_row_id);
}
offset = last_row_id;
loop {
current_it = multivalued_indexes.next();
if current_it.as_mut()?.next().is_some() {
break;
}
}
}))
}
#[cfg(test)]
mod tests {
use crate::RowId;
fn it<'a>(row_ids: &'a [RowId]) -> Box<dyn Iterator<Item = RowId> + 'a> {
Box::new(row_ids.iter().copied())
}
#[test]
fn test_stack() {
let columns = [
it(&[0u32, 0u32]),
it(&[0u32, 1u32, 1u32, 4u32]),
it(&[0u32, 3u32, 5u32]),
it(&[0u32, 4u32]),
]
.into_iter();
let start_offsets: Vec<RowId> = super::stack_multivalued_indexes(columns).collect();
assert_eq!(start_offsets, &[0, 0, 1, 1, 4, 7, 9, 13]);
}
}

View File

@@ -11,11 +11,8 @@ mod serialize;
use std::ops::Range;
pub use merge::merge_column_index;
pub(crate) use multivalued_index::SerializableMultivalueIndex;
pub use optional_index::{OptionalIndex, Set};
pub use serialize::{
open_column_index, serialize_column_index, SerializableColumnIndex, SerializableOptionalIndex,
};
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};
use crate::column_index::multivalued_index::MultiValueIndex;
use crate::{Cardinality, DocId, RowId};

View File

@@ -3,64 +3,44 @@ use std::io::Write;
use std::ops::Range;
use std::sync::Arc;
use common::{CountingWriter, OwnedBytes};
use common::OwnedBytes;
use super::optional_index::{open_optional_index, serialize_optional_index};
use super::{OptionalIndex, SerializableOptionalIndex, Set};
use crate::column_values::{
load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues,
load_u64_based_column_values, serialize_u64_based_column_values,
serialize_u64_with_codec_and_stats, CodecType, ColumnStats, ColumnValues,
};
use crate::iterable::Iterable;
use crate::{DocId, RowId};
pub struct SerializableMultivalueIndex<'a> {
pub doc_ids_with_values: SerializableOptionalIndex<'a>,
pub start_offsets: Box<dyn Iterable<u64> + 'a>,
}
pub fn serialize_multivalued_index<'a>(
multivalued_index: &SerializableMultivalueIndex<'a>,
pub fn serialize_multivalued_index(
multivalued_index: &dyn Iterable<RowId>,
stats: Option<ColumnStats>,
output: &mut impl Write,
) -> io::Result<()> {
let SerializableMultivalueIndex {
doc_ids_with_values,
start_offsets,
} = multivalued_index;
let mut count_writer = CountingWriter::wrap(output);
let SerializableOptionalIndex {
non_null_row_ids,
num_rows,
} = doc_ids_with_values;
serialize_optional_index(&**non_null_row_ids, *num_rows, &mut count_writer)?;
let optional_len = count_writer.written_bytes() as u32;
let output = count_writer.finish();
serialize_u64_based_column_values(
&**start_offsets,
&[CodecType::Bitpacked, CodecType::Linear],
output,
)?;
output.write_all(&optional_len.to_le_bytes())?;
if let Some(stats) = stats {
// TODO: Add something with higher compression that doesn't require a full scan upfront
let estimator = CodecType::Bitpacked.estimator();
assert!(!estimator.requires_full_scan());
serialize_u64_with_codec_and_stats(multivalued_index, estimator, stats, output)?;
} else {
serialize_u64_based_column_values(
multivalued_index,
&[CodecType::Bitpacked, CodecType::Linear],
output,
)?;
}
Ok(())
}
pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result<MultiValueIndex> {
let (body_bytes, optional_index_len) = bytes.rsplit(4);
let optional_index_len = u32::from_le_bytes(optional_index_len.as_slice().try_into().unwrap());
let (optional_index_bytes, start_index_bytes) = body_bytes.split(optional_index_len as usize);
let optional_index = open_optional_index(optional_index_bytes)?;
let start_index_column: Arc<dyn ColumnValues<RowId>> =
load_u64_based_column_values(start_index_bytes)?;
Ok(MultiValueIndex {
optional_index,
start_index_column,
})
let start_index_column: Arc<dyn ColumnValues<RowId>> = load_u64_based_column_values(bytes)?;
Ok(MultiValueIndex { start_index_column })
}
#[derive(Clone)]
/// Index to resolve value range for given doc_id.
/// Starts at 0.
pub struct MultiValueIndex {
pub optional_index: OptionalIndex,
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
}
@@ -72,27 +52,16 @@ impl std::fmt::Debug for MultiValueIndex {
}
}
impl From<Arc<dyn ColumnValues<RowId>>> for MultiValueIndex {
fn from(start_index_column: Arc<dyn ColumnValues<RowId>>) -> Self {
MultiValueIndex { start_index_column }
}
}
impl MultiValueIndex {
pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex {
assert!(start_offsets.len() > 0);
assert_eq!(start_offsets[0], 0);
let mut doc_with_values = Vec::new();
let mut compact_start_offsets: Vec<u64> = vec![0];
for doc in 0..start_offsets.len() - 1 {
if start_offsets[doc] < start_offsets[doc + 1] {
doc_with_values.push(doc as RowId);
compact_start_offsets.push(start_offsets[doc + 1] as u64);
}
}
let serializable_multivalued_index = SerializableMultivalueIndex {
doc_ids_with_values: SerializableOptionalIndex {
non_null_row_ids: Box::new(&doc_with_values[..]),
num_rows: start_offsets.len() as u32 - 1,
},
start_offsets: Box::new(&compact_start_offsets[..]),
};
let mut buffer = Vec::new();
serialize_multivalued_index(&serializable_multivalued_index, &mut buffer).unwrap();
serialize_multivalued_index(&start_offsets, None, &mut buffer).unwrap();
let bytes = OwnedBytes::new(buffer);
open_multivalued_index(bytes).unwrap()
}
@@ -101,19 +70,15 @@ impl MultiValueIndex {
/// the given document are `start..end`.
#[inline]
pub(crate) fn range(&self, doc_id: DocId) -> Range<RowId> {
let Some(rank) = self.optional_index.rank_if_exists(doc_id) else {
return 0..0;
};
let start = self.start_index_column.get_val(rank);
let end = self.start_index_column.get_val(rank + 1);
let start = self.start_index_column.get_val(doc_id);
let end = self.start_index_column.get_val(doc_id + 1);
start..end
}
/// Returns the number of documents in the index.
#[inline]
pub fn num_docs(&self) -> u32 {
self.optional_index.num_docs()
// self.start_index_column.num_vals() - 1
self.start_index_column.num_vals() - 1
}
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
@@ -152,10 +117,6 @@ impl MultiValueIndex {
}
}
ranks.truncate(write_doc_pos);
for rank in ranks.iter_mut() {
*rank = self.optional_index.select(*rank);
}
}
}
@@ -182,7 +143,6 @@ mod tests {
let positions = &[10u32, 11, 15, 20, 21, 22];
assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]);
assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]);
assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]);

View File

@@ -86,14 +86,8 @@ pub struct OptionalIndex {
block_metas: Arc<[BlockMeta]>,
}
impl<'a> Iterable<u32> for &'a OptionalIndex {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(self.iter_rows())
}
}
impl std::fmt::Debug for OptionalIndex {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OptionalIndex")
.field("num_rows", &self.num_rows)
.field("num_non_null_rows", &self.num_non_null_rows)
@@ -256,10 +250,6 @@ impl Set<RowId> for OptionalIndex {
}
impl OptionalIndex {
pub fn new_empty(num_rows: RowId) -> OptionalIndex {
Self::for_test(num_rows, &[])
}
pub fn for_test(num_rows: RowId, row_ids: &[RowId]) -> OptionalIndex {
assert!(row_ids
.last()

View File

@@ -3,42 +3,33 @@ use std::io::Write;
use common::{CountingWriter, OwnedBytes};
use super::multivalued_index::SerializableMultivalueIndex;
use super::OptionalIndex;
use crate::column_index::multivalued_index::serialize_multivalued_index;
use crate::column_index::optional_index::serialize_optional_index;
use crate::column_index::ColumnIndex;
use crate::column_values::ColumnStats;
use crate::iterable::Iterable;
use crate::{Cardinality, RowId};
pub struct SerializableOptionalIndex<'a> {
pub non_null_row_ids: Box<dyn Iterable<RowId> + 'a>,
pub num_rows: RowId,
}
impl<'a> From<&'a OptionalIndex> for SerializableOptionalIndex<'a> {
fn from(optional_index: &'a OptionalIndex) -> Self {
SerializableOptionalIndex {
non_null_row_ids: Box::new(optional_index),
num_rows: optional_index.num_docs(),
}
}
}
pub enum SerializableColumnIndex<'a> {
Full,
Optional(SerializableOptionalIndex<'a>),
// TODO remove the Arc<dyn> apart from serialization this is not
// dynamic at all.
Multivalued(SerializableMultivalueIndex<'a>),
Optional {
non_null_row_ids: Box<dyn Iterable<RowId> + 'a>,
num_rows: RowId,
},
Multivalued {
/// Iterator emitting the indices for the index
indices: Box<dyn Iterable<RowId> + 'a>,
/// In the merge case we can precompute the column stats
stats: Option<ColumnStats>,
},
}
impl<'a> SerializableColumnIndex<'a> {
pub fn get_cardinality(&self) -> Cardinality {
match self {
SerializableColumnIndex::Full => Cardinality::Full,
SerializableColumnIndex::Optional(_) => Cardinality::Optional,
SerializableColumnIndex::Multivalued(_) => Cardinality::Multivalued,
SerializableColumnIndex::Optional { .. } => Cardinality::Optional,
SerializableColumnIndex::Multivalued { .. } => Cardinality::Multivalued,
}
}
}
@@ -53,13 +44,14 @@ pub fn serialize_column_index(
output.write_all(&[cardinality])?;
match column_index {
SerializableColumnIndex::Full => {}
SerializableColumnIndex::Optional(SerializableOptionalIndex {
SerializableColumnIndex::Optional {
non_null_row_ids,
num_rows,
}) => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?,
SerializableColumnIndex::Multivalued(multivalued_index) => {
serialize_multivalued_index(&multivalued_index, &mut output)?
}
} => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?,
SerializableColumnIndex::Multivalued {
indices: multivalued_index,
stats,
} => serialize_multivalued_index(&*multivalued_index, stats, &mut output)?,
}
let column_index_num_bytes = output.written_bytes() as u32;
Ok(column_index_num_bytes)

View File

@@ -32,7 +32,8 @@ pub use u128_based::{
};
pub use u64_based::{
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
serialize_u64_based_column_values, serialize_u64_with_codec_and_stats, CodecType,
ALL_U64_CODEC_TYPES,
};
pub use vec_column::VecColumn;

View File

@@ -128,6 +128,9 @@ impl ColumnCodecEstimator for BitpackedCodecEstimator {
bit_packer.close(wrt)?;
Ok(())
}
fn codec_type(&self) -> super::CodecType {
super::CodecType::Bitpacked
}
}
pub struct BitpackedCodec;

View File

@@ -163,6 +163,10 @@ impl ColumnCodecEstimator for BlockwiseLinearEstimator {
Ok(())
}
fn codec_type(&self) -> super::CodecType {
super::CodecType::BlockwiseLinear
}
}
pub struct BlockwiseLinearCodec;

View File

@@ -153,6 +153,12 @@ impl ColumnCodecEstimator for LinearCodecEstimator {
self.collect_before_line_estimation(value);
}
}
fn requires_full_scan(&self) -> bool {
true
}
fn codec_type(&self) -> super::CodecType {
super::CodecType::Linear
}
}
impl LinearCodecEstimator {

View File

@@ -37,7 +37,11 @@ pub trait ColumnCodecEstimator<T = u64>: 'static {
/// This method will be called for each element of the column during
/// `estimation`.
fn collect(&mut self, value: u64);
/// Finalizes the first pass phase.
/// Returns true if the estimator needs a full pass over the column before serialization
fn requires_full_scan(&self) -> bool {
false
}
fn codec_type(&self) -> CodecType;
fn finalize(&mut self) {}
/// Returns an accurate estimation of the number of bytes that will
/// be used to represent this column.
@@ -150,34 +154,45 @@ pub fn serialize_u64_based_column_values<T: MonotonicallyMappableToU64>(
wrt: &mut dyn Write,
) -> io::Result<()> {
let mut stats_collector = StatsCollector::default();
let mut estimators: Vec<(CodecType, Box<dyn ColumnCodecEstimator>)> =
Vec::with_capacity(codec_types.len());
let mut estimators: Vec<Box<dyn ColumnCodecEstimator>> = Vec::with_capacity(codec_types.len());
for &codec_type in codec_types {
estimators.push((codec_type, codec_type.estimator()));
estimators.push(codec_type.estimator());
}
for val in vals.boxed_iter() {
let val_u64 = val.to_u64();
stats_collector.collect(val_u64);
for (_, estimator) in &mut estimators {
for estimator in &mut estimators {
estimator.collect(val_u64);
}
}
for (_, estimator) in &mut estimators {
for estimator in &mut estimators {
estimator.finalize();
}
let stats = stats_collector.stats();
let (_, best_codec, best_codec_estimator) = estimators
let (_, best_codec) = estimators
.into_iter()
.flat_map(|(codec_type, estimator)| {
.flat_map(|estimator| {
let num_bytes = estimator.estimate(&stats)?;
Some((num_bytes, codec_type, estimator))
Some((num_bytes, estimator))
})
.min_by_key(|(num_bytes, _, _)| *num_bytes)
.min_by_key(|(num_bytes, _)| *num_bytes)
.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "No available applicable codec.")
})?;
best_codec.to_code().serialize(wrt)?;
best_codec_estimator.serialize(
serialize_u64_with_codec_and_stats(vals, best_codec, stats, wrt)?;
Ok(())
}
/// Serializes a given column of u64-mapped values.
/// The codec estimator needs to be collected fully for the Line codec before calling this.
pub fn serialize_u64_with_codec_and_stats<T: MonotonicallyMappableToU64>(
vals: &dyn Iterable<T>,
codec: Box<dyn ColumnCodecEstimator>,
stats: ColumnStats,
wrt: &mut dyn Write,
) -> io::Result<()> {
codec.codec_type().to_code().serialize(wrt)?;
codec.serialize(
&stats,
&mut vals.boxed_iter().map(MonotonicallyMappableToU64::to_u64),
wrt,

View File

@@ -8,7 +8,7 @@ const MAGIC_BYTES: [u8; 4] = [2, 113, 119, 66];
pub fn footer() -> [u8; VERSION_FOOTER_NUM_BYTES] {
let mut footer_bytes = [0u8; VERSION_FOOTER_NUM_BYTES];
footer_bytes[0..4].copy_from_slice(&Version::V2.to_bytes());
footer_bytes[0..4].copy_from_slice(&Version::V1.to_bytes());
footer_bytes[4..8].copy_from_slice(&MAGIC_BYTES[..]);
footer_bytes
}
@@ -24,7 +24,6 @@ pub fn parse_footer(footer_bytes: [u8; VERSION_FOOTER_NUM_BYTES]) -> Result<Vers
#[repr(u32)]
pub enum Version {
V1 = 1u32,
V2 = 2u32,
}
impl Version {
@@ -35,7 +34,7 @@ impl Version {
fn try_from_bytes(bytes: [u8; 4]) -> Result<Version, InvalidData> {
let code = u32::from_le_bytes(bytes);
match code {
2u32 => Ok(Version::V2),
1u32 => Ok(Version::V1),
_ => Err(InvalidData),
}
}
@@ -50,7 +49,7 @@ mod tests {
#[test]
fn test_footer_dserialization() {
let parsed_version: Version = parse_footer(footer()).unwrap();
assert_eq!(Version::V2, parsed_version);
assert_eq!(Version::V1, parsed_version);
}
#[test]
@@ -64,7 +63,7 @@ mod tests {
for &i in &version_to_tests {
let version_res = Version::try_from_bytes(i.to_le_bytes());
if let Ok(version) = version_res {
assert_eq!(version, Version::V2);
assert_eq!(version, Version::V1);
assert_eq!(version.to_bytes(), i.to_le_bytes());
valid_versions.insert(i);
}

View File

@@ -3,7 +3,7 @@ mod merge_mapping;
mod term_merger;
use std::collections::{BTreeMap, HashSet};
use std::io;
use std::io::{self};
use std::net::Ipv6Addr;
use std::sync::Arc;
@@ -156,8 +156,15 @@ fn merge_column(
column_values.push(None);
}
}
let merged_column_index =
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
let num_values: u32 = column_values
.iter()
.map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0))
.sum();
let merged_column_index = crate::column_index::merge_column_index(
&column_indexes[..],
merge_row_order,
num_values,
);
let merge_column_values = MergedColumnValues {
column_indexes: &column_indexes[..],
column_values: &column_values[..],
@@ -183,8 +190,15 @@ fn merge_column(
}
}
let merged_column_index =
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
let num_values: u32 = column_values
.iter()
.map(|vals| vals.as_ref().map(|idx| idx.num_vals()).unwrap_or(0))
.sum();
let merged_column_index = crate::column_index::merge_column_index(
&column_indexes[..],
merge_row_order,
num_values,
);
let merge_column_values = MergedColumnValues {
column_indexes: &column_indexes[..],
column_values: &column_values,
@@ -214,8 +228,19 @@ fn merge_column(
}
}
}
let merged_column_index =
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
let num_values: u32 = bytes_columns
.iter()
.map(|vals| {
vals.as_ref()
.map(|idx| idx.term_ord_column.values.num_vals())
.unwrap_or(0)
})
.sum();
let merged_column_index = crate::column_index::merge_column_index(
&column_indexes[..],
merge_row_order,
num_values,
);
merge_bytes_or_str_column(merged_column_index, &bytes_columns, merge_row_order, wrt)?;
}
}

View File

@@ -12,7 +12,7 @@ use common::CountingWriter;
pub(crate) use serializer::ColumnarSerializer;
use stacker::{Addr, ArenaHashMap, MemoryArena};
use crate::column_index::{SerializableColumnIndex, SerializableOptionalIndex};
use crate::column_index::SerializableColumnIndex;
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
use crate::columnar::column_type::ColumnType;
use crate::columnar::writer::column_writers::{
@@ -20,7 +20,6 @@ use crate::columnar::writer::column_writers::{
};
use crate::columnar::writer::value_index::{IndexBuilder, PreallocatedIndexBuilders};
use crate::dictionary::{DictionaryBuilder, TermIdMapping, UnorderedId};
use crate::iterable::Iterable;
use crate::value::{Coerce, NumericalType, NumericalValue};
use crate::{Cardinality, RowId};
@@ -636,16 +635,19 @@ fn send_to_serialize_column_mappable_to_u128<
let optional_index_builder = value_index_builders.borrow_optional_index_builder();
consume_operation_iterator(op_iterator, optional_index_builder, values);
let optional_index = optional_index_builder.finish(num_rows);
SerializableColumnIndex::Optional(SerializableOptionalIndex {
SerializableColumnIndex::Optional {
num_rows,
non_null_row_ids: Box::new(optional_index),
})
}
}
Cardinality::Multivalued => {
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
consume_operation_iterator(op_iterator, multivalued_index_builder, values);
let serializable_multivalued_index = multivalued_index_builder.finish(num_rows);
SerializableColumnIndex::Multivalued(serializable_multivalued_index)
let multivalued_index = multivalued_index_builder.finish(num_rows);
SerializableColumnIndex::Multivalued {
indices: Box::new(multivalued_index),
stats: Default::default(), // TODO: implement stats for u128
}
}
};
crate::column::serialize_column_mappable_to_u128(
@@ -688,21 +690,22 @@ fn send_to_serialize_column_mappable_to_u64(
let optional_index_builder = value_index_builders.borrow_optional_index_builder();
consume_operation_iterator(op_iterator, optional_index_builder, values);
let optional_index = optional_index_builder.finish(num_rows);
SerializableColumnIndex::Optional(SerializableOptionalIndex {
SerializableColumnIndex::Optional {
non_null_row_ids: Box::new(optional_index),
num_rows,
})
}
}
Cardinality::Multivalued => {
let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder();
consume_operation_iterator(op_iterator, multivalued_index_builder, values);
let multivalued_index = multivalued_index_builder.finish(num_rows);
if sort_values_within_row {
// not supported in this hack
todo!()
// sort_values_within_row_in_place(multivalued_index, values);
sort_values_within_row_in_place(multivalued_index, values);
}
SerializableColumnIndex::Multivalued {
indices: Box::new(multivalued_index),
stats: None,
}
let serializable_multivalued_index = multivalued_index_builder.finish(num_rows);
SerializableColumnIndex::Multivalued(serializable_multivalued_index)
}
};
crate::column::serialize_column_mappable_to_u64(

View File

@@ -1,4 +1,3 @@
use crate::column_index::{SerializableMultivalueIndex, SerializableOptionalIndex};
use crate::iterable::Iterable;
use crate::RowId;
@@ -60,50 +59,32 @@ impl IndexBuilder for OptionalIndexBuilder {
#[derive(Default)]
pub struct MultivaluedIndexBuilder {
doc_with_values: Vec<RowId>,
start_offsets: Vec<u64>,
total_num_vals_seen: u64,
current_row: RowId,
current_row_has_value: bool,
start_offsets: Vec<RowId>,
total_num_vals_seen: u32,
}
impl MultivaluedIndexBuilder {
pub fn finish(&mut self, num_docs: RowId) -> SerializableMultivalueIndex<'_> {
self.start_offsets.push(self.total_num_vals_seen as u64);
let non_null_row_ids: Box<dyn Iterable<RowId>> = Box::new(&self.doc_with_values[..]);
SerializableMultivalueIndex {
doc_ids_with_values: SerializableOptionalIndex {
non_null_row_ids,
num_rows: num_docs,
},
start_offsets: Box::new(&self.start_offsets[..]),
}
pub fn finish(&mut self, num_docs: RowId) -> &[u32] {
self.start_offsets
.resize(num_docs as usize + 1, self.total_num_vals_seen);
&self.start_offsets[..]
}
fn reset(&mut self) {
self.doc_with_values.clear();
self.start_offsets.clear();
self.start_offsets.push(0u32);
self.total_num_vals_seen = 0;
self.current_row = 0;
self.current_row_has_value = false;
}
}
impl IndexBuilder for MultivaluedIndexBuilder {
fn record_row(&mut self, row_id: RowId) {
self.current_row = row_id;
self.current_row_has_value = false;
// self.start_offsets
// .resize(row_id as usize + 1, self.total_num_vals_seen);
self.start_offsets
.resize(row_id as usize + 1, self.total_num_vals_seen);
}
fn record_value(&mut self) {
if !self.current_row_has_value {
self.current_row_has_value = true;
self.doc_with_values.push(self.current_row);
self.start_offsets.push(self.total_num_vals_seen as u64);
}
self.total_num_vals_seen += 1u64;
self.total_num_vals_seen += 1;
}
}
@@ -160,32 +141,6 @@ mod tests {
);
}
#[test]
fn test_multivalued_value_index_builder_simple() {
let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default();
{
multivalued_value_index_builder.record_row(0u32);
multivalued_value_index_builder.record_value();
multivalued_value_index_builder.record_value();
let serialized_multivalue_index = multivalued_value_index_builder.finish(1u32);
let start_offsets: Vec<u64> = serialized_multivalue_index
.start_offsets
.boxed_iter()
.collect();
assert_eq!(&start_offsets, &[0, 2]);
}
multivalued_value_index_builder.reset();
multivalued_value_index_builder.record_row(0u32);
multivalued_value_index_builder.record_value();
multivalued_value_index_builder.record_value();
let serialized_multivalue_index = multivalued_value_index_builder.finish(1u32);
let start_offsets: Vec<u64> = serialized_multivalue_index
.start_offsets
.boxed_iter()
.collect();
assert_eq!(&start_offsets, &[0, 2]);
}
#[test]
fn test_multivalued_value_index_builder() {
let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default();
@@ -194,30 +149,17 @@ mod tests {
multivalued_value_index_builder.record_value();
multivalued_value_index_builder.record_row(2u32);
multivalued_value_index_builder.record_value();
let SerializableMultivalueIndex {
doc_ids_with_values,
start_offsets,
} = multivalued_value_index_builder.finish(4u32);
assert_eq!(doc_ids_with_values.num_rows, 4u32);
let doc_ids_with_values: Vec<u32> =
doc_ids_with_values.non_null_row_ids.boxed_iter().collect();
assert_eq!(&doc_ids_with_values, &[1u32, 2u32]);
let start_offsets: Vec<u64> = start_offsets.boxed_iter().collect::<Vec<u64>>();
assert_eq!(&start_offsets[..], &[0, 2, 3]);
// assert!(doc_ids_with_values_opt.is_some());
// assert!(doc_ids_with_values_opt.is_some());
// assert_eq!(
// multivalued_value_index_builder.finish(4u32).to_vec(),
// vec![0, 0, 2, 3, 3]
// );
// multivalued_value_index_builder.reset();
// multivalued_value_index_builder.record_row(2u32);
// multivalued_value_index_builder.record_value();
// multivalued_value_index_builder.record_value();
// assert_eq!(
// multivalued_value_index_builder.finish(4u32).to_vec(),
// vec![0, 0, 0, 2, 2]
// );
assert_eq!(
multivalued_value_index_builder.finish(4u32).to_vec(),
vec![0, 0, 2, 3, 3]
);
multivalued_value_index_builder.reset();
multivalued_value_index_builder.record_row(2u32);
multivalued_value_index_builder.record_value();
multivalued_value_index_builder.record_value();
assert_eq!(
multivalued_value_index_builder.finish(4u32).to_vec(),
vec![0, 0, 0, 2, 2]
);
}
}

View File

@@ -1,7 +1,4 @@
use std::ops::Range;
use std::sync::Arc;
use crate::{ColumnValues, RowId};
pub trait Iterable<T = u64> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_>;
@@ -20,9 +17,3 @@ where Range<T>: Iterator<Item = T>
Box::new(self.clone())
}
}
impl Iterable for Arc<dyn crate::ColumnValues<RowId>> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(self.iter().map(|row_id| row_id as u64))
}
}

View File

@@ -11,7 +11,7 @@ use crate::columnar::{ColumnType, ColumnTypeCategory};
use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle};
use crate::value::{Coerce, NumericalValue};
use crate::{
BytesColumn, Cardinality, Column, ColumnIndex, ColumnarReader, ColumnarWriter, RowAddr, RowId,
BytesColumn, Cardinality, Column, ColumnarReader, ColumnarWriter, RowAddr, RowId,
ShuffleMergeOrder, StackMergeOrder,
};
@@ -79,7 +79,7 @@ fn test_dataframe_writer_u64_multivalued() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("divisor").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 50);
assert_eq!(cols[0].num_bytes(), 29);
let dyn_i64_col = cols[0].open().unwrap();
let DynamicColumn::I64(divisor_col) = dyn_i64_col else {
panic!();
@@ -448,7 +448,6 @@ fn assert_columnar_eq(
}
}
#[track_caller]
fn assert_column_eq<T: Copy + PartialOrd + Debug + Send + Sync + 'static>(
left: &Column<T>,
right: &Column<T>,
@@ -739,35 +738,22 @@ proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))]
#[test]
fn test_columnar_merge_proptest(columnar_docs in proptest::collection::vec(columnar_docs_strategy(), 2..=3)) {
let columnar_readers: Vec<ColumnarReader> = columnar_docs.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into();
crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> = columnar_docs.iter().flatten().cloned().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
test_columnar_docs(columnar_docs);
}
}
#[test]
fn test_columnar_merging_empty_columnar() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> =
vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]];
fn test_columnar_docs(columnar_docs: Vec<Vec<Vec<(&'static str, ColumnValue)>>>) {
let columnar_readers: Vec<ColumnarReader> = columnar_docs
.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into();
crate::merge_columnar(
&columnar_readers_arr[..],
&[],
crate::MergeRowOrder::Stack(stack_merge_order),
stack_merge_order,
&mut output,
)
.unwrap();
@@ -778,6 +764,24 @@ fn test_columnar_merging_empty_columnar() {
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
}
#[test]
fn test_columnar_merging_empty_columnar() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> =
vec![vec![], vec![vec![("c1", ColumnValue::Str("a"))]]];
test_columnar_docs(columnar_docs);
}
#[test]
fn test_columnar_merging_simple() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> = vec![
vec![],
vec![vec![
("c1", ColumnValue::Numerical(0u64.into())),
("c1", ColumnValue::Numerical(0u64.into())),
]],
];
test_columnar_docs(columnar_docs);
}
#[test]
fn test_columnar_merging_number_columns() {
let columnar_docs: Vec<Vec<Vec<(&str, ColumnValue)>>> = vec![
@@ -794,25 +798,7 @@ fn test_columnar_merging_number_columns() {
vec![("c2", ColumnValue::Numerical(u64::MAX.into()))],
],
];
let columnar_readers: Vec<ColumnarReader> = columnar_docs
.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]);
crate::merge_columnar(
&columnar_readers_arr[..],
&[],
crate::MergeRowOrder::Stack(stack_merge_order),
&mut output,
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> =
columnar_docs.iter().flatten().cloned().collect();
let expected_merged_columnar = build_columnar(&concat_rows[..]);
assert_columnar_eq_strict(&merged_columnar, &expected_merged_columnar);
test_columnar_docs(columnar_docs);
}
// TODO add non trivial remap and merge
@@ -842,27 +828,26 @@ fn columnar_docs_and_remap(
)
}
// proptest! {
// #![proptest_config(ProptestConfig::with_cases(1000))]
// #[test]
// fn test_columnar_merge_and_remap_proptest((columnar_docs, shuffle_merge_order) in
// columnar_docs_and_remap()) { let shuffled_rows: Vec<Vec<(&'static str, ColumnValue)>> =
// shuffle_merge_order.iter() .map(|row_addr| columnar_docs[row_addr.segment_ord as
// usize][row_addr.row_id as usize].clone()) .collect();
// let expected_merged_columnar = build_columnar(&shuffled_rows[..]);
// let columnar_readers: Vec<ColumnarReader> = columnar_docs.iter()
// .map(|docs| build_columnar(&docs[..]))
// .collect::<Vec<_>>();
// let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
// let mut output: Vec<u8> = Vec::new();
// let segment_num_rows: Vec<RowId> = columnar_docs.iter().map(|docs| docs.len() as
// RowId).collect(); let shuffle_merge_order =
// ShuffleMergeOrder::for_test(&segment_num_rows, shuffle_merge_order);
// crate::merge_columnar(&columnar_readers_arr[..], &[], shuffle_merge_order.into(), &mut
// output).unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap();
// assert_columnar_eq(&merged_columnar, &expected_merged_columnar, true);
// }
// }
proptest! {
#![proptest_config(ProptestConfig::with_cases(1000))]
#[test]
fn test_columnar_merge_and_remap_proptest((columnar_docs, shuffle_merge_order) in columnar_docs_and_remap()) {
let shuffled_rows: Vec<Vec<(&'static str, ColumnValue)>> = shuffle_merge_order.iter()
.map(|row_addr| columnar_docs[row_addr.segment_ord as usize][row_addr.row_id as usize].clone())
.collect();
let expected_merged_columnar = build_columnar(&shuffled_rows[..]);
let columnar_readers: Vec<ColumnarReader> = columnar_docs.iter()
.map(|docs| build_columnar(&docs[..]))
.collect::<Vec<_>>();
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
let mut output: Vec<u8> = Vec::new();
let segment_num_rows: Vec<RowId> = columnar_docs.iter().map(|docs| docs.len() as RowId).collect();
let shuffle_merge_order = ShuffleMergeOrder::for_test(&segment_num_rows, shuffle_merge_order);
crate::merge_columnar(&columnar_readers_arr[..], &[], shuffle_merge_order.into(), &mut output).unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
assert_columnar_eq(&merged_columnar, &expected_merged_columnar, true);
}
}
#[test]
fn test_columnar_merge_empty() {
@@ -884,64 +869,64 @@ fn test_columnar_merge_empty() {
assert_eq!(merged_columnar.num_columns(), 0);
}
// #[test]
// fn test_columnar_merge_single_str_column() {
// let columnar_reader_1 = build_columnar(&[]);
// let rows: &[Vec<_>] = &[vec![("c1", ColumnValue::Str("a"))]][..];
// let columnar_reader_2 = build_columnar(rows);
// let mut output: Vec<u8> = Vec::new();
// let segment_num_rows: Vec<RowId> = vec![0, 1];
// let shuffle_merge_order = ShuffleMergeOrder::for_test(
// &segment_num_rows,
// vec![RowAddr {
// segment_ord: 1u32,
// row_id: 0u32,
// }],
// );
// crate::merge_columnar(
// &[&columnar_reader_1, &columnar_reader_2],
// &[],
// shuffle_merge_order.into(),
// &mut output,
// )
// .unwrap();
// let merged_columnar = ColumnarReader::open(output).unwrap();
// assert_eq!(merged_columnar.num_rows(), 1);
// assert_eq!(merged_columnar.num_columns(), 1);
// }
#[test]
fn test_columnar_merge_single_str_column() {
let columnar_reader_1 = build_columnar(&[]);
let rows: &[Vec<_>] = &[vec![("c1", ColumnValue::Str("a"))]][..];
let columnar_reader_2 = build_columnar(rows);
let mut output: Vec<u8> = Vec::new();
let segment_num_rows: Vec<RowId> = vec![0, 1];
let shuffle_merge_order = ShuffleMergeOrder::for_test(
&segment_num_rows,
vec![RowAddr {
segment_ord: 1u32,
row_id: 0u32,
}],
);
crate::merge_columnar(
&[&columnar_reader_1, &columnar_reader_2],
&[],
shuffle_merge_order.into(),
&mut output,
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_rows(), 1);
assert_eq!(merged_columnar.num_columns(), 1);
}
// #[test]
// fn test_delete_decrease_cardinality() {
// let columnar_reader_1 = build_columnar(&[]);
// let rows: &[Vec<_>] = &[
// vec![
// ("c", ColumnValue::from(0i64)),
// ("c", ColumnValue::from(0i64)),
// ],
// vec![("c", ColumnValue::from(0i64))],
// ][..];
// // c is multivalued here
// let columnar_reader_2 = build_columnar(rows);
// let mut output: Vec<u8> = Vec::new();
// let shuffle_merge_order = ShuffleMergeOrder::for_test(
// &[0, 2],
// vec![RowAddr {
// segment_ord: 1u32,
// row_id: 1u32,
// }],
// );
// crate::merge_columnar(
// &[&columnar_reader_1, &columnar_reader_2],
// &[],
// shuffle_merge_order.into(),
// &mut output,
// )
// .unwrap();
// let merged_columnar = ColumnarReader::open(output).unwrap();
// assert_eq!(merged_columnar.num_rows(), 1);
// assert_eq!(merged_columnar.num_columns(), 1);
// let cols = merged_columnar.read_columns("c").unwrap();
// assert_eq!(cols.len(), 1);
// assert_eq!(cols[0].column_type(), ColumnType::I64);
// assert_eq!(cols[0].open().unwrap().get_cardinality(), Cardinality::Full);
// }
#[test]
fn test_delete_decrease_cardinality() {
let columnar_reader_1 = build_columnar(&[]);
let rows: &[Vec<_>] = &[
vec![
("c", ColumnValue::from(0i64)),
("c", ColumnValue::from(0i64)),
],
vec![("c", ColumnValue::from(0i64))],
][..];
// c is multivalued here
let columnar_reader_2 = build_columnar(rows);
let mut output: Vec<u8> = Vec::new();
let shuffle_merge_order = ShuffleMergeOrder::for_test(
&[0, 2],
vec![RowAddr {
segment_ord: 1u32,
row_id: 1u32,
}],
);
crate::merge_columnar(
&[&columnar_reader_1, &columnar_reader_2],
&[],
shuffle_merge_order.into(),
&mut output,
)
.unwrap();
let merged_columnar = ColumnarReader::open(output).unwrap();
assert_eq!(merged_columnar.num_rows(), 1);
assert_eq!(merged_columnar.num_columns(), 1);
let cols = merged_columnar.read_columns("c").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].column_type(), ColumnType::I64);
assert_eq!(cols[0].open().unwrap().get_cardinality(), Cardinality::Full);
}

View File

@@ -22,3 +22,6 @@ serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
proptest = "1.0.0"
rand = "0.8.4"
[features]
unstable = [] # useful for benches.

View File

@@ -643,30 +643,30 @@ mod tests {
facet_collector.add_facet(Facet::from("/country/europe"));
}
// #[test]
// fn test_doc_unsorted_multifacet() -> crate::Result<()> {
// let mut schema_builder = Schema::builder();
// let facet_field = schema_builder.add_facet_field("facets", FacetOptions::default());
// let schema = schema_builder.build();
// let index = Index::create_in_ram(schema);
// let mut index_writer = index.writer_for_tests()?;
// index_writer.add_document(doc!(
// facet_field => Facet::from_text(&"/subjects/A/a").unwrap(),
// facet_field => Facet::from_text(&"/subjects/B/a").unwrap(),
// facet_field => Facet::from_text(&"/subjects/A/b").unwrap(),
// facet_field => Facet::from_text(&"/subjects/B/b").unwrap(),
// ))?;
// index_writer.commit()?;
// let reader = index.reader()?;
// let searcher = reader.searcher();
// assert_eq!(searcher.num_docs(), 1);
// let mut facet_collector = FacetCollector::for_field("facets");
// facet_collector.add_facet("/subjects");
// let counts = searcher.search(&AllQuery, &facet_collector)?;
// let facets: Vec<(&Facet, u64)> = counts.get("/subjects").collect();
// assert_eq!(facets[0].1, 1);
// Ok(())
// }
#[test]
fn test_doc_unsorted_multifacet() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let facet_field = schema_builder.add_facet_field("facets", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(
facet_field => Facet::from_text(&"/subjects/A/a").unwrap(),
facet_field => Facet::from_text(&"/subjects/B/a").unwrap(),
facet_field => Facet::from_text(&"/subjects/A/b").unwrap(),
facet_field => Facet::from_text(&"/subjects/B/b").unwrap(),
))?;
index_writer.commit()?;
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 1);
let mut facet_collector = FacetCollector::for_field("facets");
facet_collector.add_facet("/subjects");
let counts = searcher.search(&AllQuery, &facet_collector)?;
let facets: Vec<(&Facet, u64)> = counts.get("/subjects").collect();
assert_eq!(facets[0].1, 1);
Ok(())
}
#[test]
fn test_doc_search_by_facet() -> crate::Result<()> {
@@ -725,99 +725,99 @@ mod tests {
facet_collector.add_facet(Facet::from("/countryeurope"));
}
// #[test]
// fn test_facet_collector_topk() {
// let mut schema_builder = Schema::builder();
// let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
// let schema = schema_builder.build();
// let index = Index::create_in_ram(schema);
#[test]
fn test_facet_collector_topk() {
let mut schema_builder = Schema::builder();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
// let uniform = Uniform::new_inclusive(1, 100_000);
// let mut docs: Vec<TantivyDocument> =
// vec![("a", 10), ("b", 100), ("c", 7), ("d", 12), ("e", 21)]
// .into_iter()
// .flat_map(|(c, count)| {
// let facet = Facet::from(&format!("/facet/{c}"));
// let doc = doc!(facet_field => facet);
// iter::repeat(doc).take(count)
// })
// .map(|mut doc| {
// doc.add_facet(
// facet_field,
// &format!("/facet/{}", thread_rng().sample(uniform)),
// );
// doc
// })
// .collect();
// docs[..].shuffle(&mut thread_rng());
let uniform = Uniform::new_inclusive(1, 100_000);
let mut docs: Vec<TantivyDocument> =
vec![("a", 10), ("b", 100), ("c", 7), ("d", 12), ("e", 21)]
.into_iter()
.flat_map(|(c, count)| {
let facet = Facet::from(&format!("/facet/{c}"));
let doc = doc!(facet_field => facet);
iter::repeat(doc).take(count)
})
.map(|mut doc| {
doc.add_facet(
facet_field,
&format!("/facet/{}", thread_rng().sample(uniform)),
);
doc
})
.collect();
docs[..].shuffle(&mut thread_rng());
// let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// for doc in docs {
// index_writer.add_document(doc).unwrap();
// }
// index_writer.commit().unwrap();
// let searcher = index.reader().unwrap().searcher();
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
for doc in docs {
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
// let mut facet_collector = FacetCollector::for_field("facet");
// facet_collector.add_facet("/facet");
// let counts: FacetCounts = searcher.search(&AllQuery, &facet_collector).unwrap();
let mut facet_collector = FacetCollector::for_field("facet");
facet_collector.add_facet("/facet");
let counts: FacetCounts = searcher.search(&AllQuery, &facet_collector).unwrap();
// {
// let facets: Vec<(&Facet, u64)> = counts.top_k("/facet", 3);
// assert_eq!(
// facets,
// vec![
// (&Facet::from("/facet/b"), 100),
// (&Facet::from("/facet/e"), 21),
// (&Facet::from("/facet/d"), 12),
// ]
// );
// }
// }
{
let facets: Vec<(&Facet, u64)> = counts.top_k("/facet", 3);
assert_eq!(
facets,
vec![
(&Facet::from("/facet/b"), 100),
(&Facet::from("/facet/e"), 21),
(&Facet::from("/facet/d"), 12),
]
);
}
}
// #[test]
// fn test_facet_collector_topk_tie_break() -> crate::Result<()> {
// let mut schema_builder = Schema::builder();
// let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
// let schema = schema_builder.build();
// let index = Index::create_in_ram(schema);
#[test]
fn test_facet_collector_topk_tie_break() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
// let docs: Vec<TantivyDocument> = vec![("b", 2), ("a", 2), ("c", 4)]
// .into_iter()
// .flat_map(|(c, count)| {
// let facet = Facet::from(&format!("/facet/{c}"));
// let doc = doc!(facet_field => facet);
// iter::repeat(doc).take(count)
// })
// .collect();
let docs: Vec<TantivyDocument> = vec![("b", 2), ("a", 2), ("c", 4)]
.into_iter()
.flat_map(|(c, count)| {
let facet = Facet::from(&format!("/facet/{c}"));
let doc = doc!(facet_field => facet);
iter::repeat(doc).take(count)
})
.collect();
// let mut index_writer = index.writer_for_tests()?;
// for doc in docs {
// index_writer.add_document(doc)?;
// }
// index_writer.commit()?;
let mut index_writer = index.writer_for_tests()?;
for doc in docs {
index_writer.add_document(doc)?;
}
index_writer.commit()?;
// let searcher = index.reader()?.searcher();
// let mut facet_collector = FacetCollector::for_field("facet");
// facet_collector.add_facet("/facet");
// let counts: FacetCounts = searcher.search(&AllQuery, &facet_collector)?;
let searcher = index.reader()?.searcher();
let mut facet_collector = FacetCollector::for_field("facet");
facet_collector.add_facet("/facet");
let counts: FacetCounts = searcher.search(&AllQuery, &facet_collector)?;
// let facets: Vec<(&Facet, u64)> = counts.top_k("/facet", 2);
// assert_eq!(
// facets,
// vec![(&Facet::from("/facet/c"), 4), (&Facet::from("/facet/a"), 2)]
// );
// Ok(())
// }
let facets: Vec<(&Facet, u64)> = counts.top_k("/facet", 2);
assert_eq!(
facets,
vec![(&Facet::from("/facet/c"), 4), (&Facet::from("/facet/a"), 2)]
);
Ok(())
}
// #[test]
// fn is_child_facet() {
// assert!(super::is_child_facet(&b"foo"[..], &b"foo\0bar"[..]));
// assert!(super::is_child_facet(&b""[..], &b"foo\0bar"[..]));
// assert!(super::is_child_facet(&b""[..], &b"foo"[..]));
// assert!(!super::is_child_facet(&b"foo\0bar"[..], &b"foo"[..]));
// assert!(!super::is_child_facet(&b"foo"[..], &b"foobar\0baz"[..]));
// }
#[test]
fn is_child_facet() {
assert!(super::is_child_facet(&b"foo"[..], &b"foo\0bar"[..]));
assert!(super::is_child_facet(&b""[..], &b"foo\0bar"[..]));
assert!(super::is_child_facet(&b""[..], &b"foo"[..]));
assert!(!super::is_child_facet(&b"foo\0bar"[..], &b"foo"[..]));
assert!(!super::is_child_facet(&b"foo"[..], &b"foobar\0baz"[..]));
}
}
#[cfg(all(test, feature = "unstable"))]

View File

@@ -94,102 +94,102 @@ mod tests {
assert_eq!(value, None);
}
// #[test]
// fn test_facet_several_facets_sorted() {
// let mut schema_builder = SchemaBuilder::default();
// let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
// let schema = schema_builder.build();
// let index = Index::create_in_ram(schema);
// let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// index_writer
// .add_document(doc!(facet_field=>Facet::from_text("/parent/child1").unwrap()))
// .unwrap();
// index_writer
// .add_document(doc!(
// facet_field=>Facet::from_text("/parent/child2").unwrap(),
// facet_field=>Facet::from_text("/parent/child1/blop").unwrap(),
// ))
// .unwrap();
// index_writer.commit().unwrap();
// let searcher = index.reader().unwrap().searcher();
// let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
// let mut facet_ords = Vec::new();
#[test]
fn test_facet_several_facets_sorted() {
let mut schema_builder = SchemaBuilder::default();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
index_writer
.add_document(doc!(facet_field=>Facet::from_text("/parent/child1").unwrap()))
.unwrap();
index_writer
.add_document(doc!(
facet_field=>Facet::from_text("/parent/child2").unwrap(),
facet_field=>Facet::from_text("/parent/child1/blop").unwrap(),
))
.unwrap();
index_writer.commit().unwrap();
let searcher = index.reader().unwrap().searcher();
let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
let mut facet_ords = Vec::new();
// facet_ords.extend(facet_reader.facet_ords(0u32));
// assert_eq!(&facet_ords, &[0u64]);
facet_ords.extend(facet_reader.facet_ords(0u32));
assert_eq!(&facet_ords, &[0u64]);
// facet_ords.clear();
// facet_ords.extend(facet_reader.facet_ords(1u32));
// assert_eq!(&facet_ords, &[1u64, 2u64]);
facet_ords.clear();
facet_ords.extend(facet_reader.facet_ords(1u32));
assert_eq!(&facet_ords, &[1u64, 2u64]);
// assert_eq!(facet_reader.num_facets(), 3);
// let mut facet = Facet::default();
// facet_reader.facet_from_ord(0, &mut facet).unwrap();
// assert_eq!(facet.to_path_string(), "/parent/child1");
// facet_reader.facet_from_ord(1, &mut facet).unwrap();
// assert_eq!(facet.to_path_string(), "/parent/child1/blop");
// facet_reader.facet_from_ord(2, &mut facet).unwrap();
// assert_eq!(facet.to_path_string(), "/parent/child2");
// }
assert_eq!(facet_reader.num_facets(), 3);
let mut facet = Facet::default();
facet_reader.facet_from_ord(0, &mut facet).unwrap();
assert_eq!(facet.to_path_string(), "/parent/child1");
facet_reader.facet_from_ord(1, &mut facet).unwrap();
assert_eq!(facet.to_path_string(), "/parent/child1/blop");
facet_reader.facet_from_ord(2, &mut facet).unwrap();
assert_eq!(facet.to_path_string(), "/parent/child2");
}
// #[test]
// fn test_facet_stored_and_indexed() -> crate::Result<()> {
// let mut schema_builder = SchemaBuilder::default();
// let facet_field = schema_builder.add_facet_field("facet", STORED);
// let schema = schema_builder.build();
// let index = Index::create_in_ram(schema);
// let mut index_writer = index.writer_for_tests()?;
// index_writer.add_document(doc!(facet_field=>Facet::from_text("/a/b").unwrap()))?;
// index_writer.commit()?;
// let searcher = index.reader()?.searcher();
// let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
// let mut facet_ords = Vec::new();
// facet_ords.extend(facet_reader.facet_ords(0u32));
// assert_eq!(&facet_ords, &[0u64]);
// let doc = searcher.doc::<TantivyDocument>(DocAddress::new(0u32, 0u32))?;
// let value: Option<Facet> = doc
// .get_first(facet_field)
// .and_then(|v| v.as_facet())
// .map(|facet| Facet::from_encoded_string(facet.to_string()));
// assert_eq!(value, Facet::from_text("/a/b").ok());
// Ok(())
// }
#[test]
fn test_facet_stored_and_indexed() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
let facet_field = schema_builder.add_facet_field("facet", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(facet_field=>Facet::from_text("/a/b").unwrap()))?;
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
let mut facet_ords = Vec::new();
facet_ords.extend(facet_reader.facet_ords(0u32));
assert_eq!(&facet_ords, &[0u64]);
let doc = searcher.doc::<TantivyDocument>(DocAddress::new(0u32, 0u32))?;
let value: Option<Facet> = doc
.get_first(facet_field)
.and_then(|v| v.as_facet())
.map(|facet| Facet::from_encoded_string(facet.to_string()));
assert_eq!(value, Facet::from_text("/a/b").ok());
Ok(())
}
// #[test]
// fn test_facet_not_populated_for_all_docs() -> crate::Result<()> {
// let mut schema_builder = SchemaBuilder::default();
// let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
// let schema = schema_builder.build();
// let index = Index::create_in_ram(schema);
// let mut index_writer = index.writer_for_tests()?;
// index_writer.add_document(doc!(facet_field=>Facet::from_text("/a/b").unwrap()))?;
// index_writer.add_document(TantivyDocument::default())?;
// index_writer.commit()?;
// let searcher = index.reader()?.searcher();
// let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
// let mut facet_ords = Vec::new();
// facet_ords.extend(facet_reader.facet_ords(0u32));
// assert_eq!(&facet_ords, &[0u64]);
// facet_ords.clear();
// facet_ords.extend(facet_reader.facet_ords(1u32));
// assert!(facet_ords.is_empty());
// Ok(())
// }
#[test]
fn test_facet_not_populated_for_all_docs() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(facet_field=>Facet::from_text("/a/b").unwrap()))?;
index_writer.add_document(TantivyDocument::default())?;
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
let mut facet_ords = Vec::new();
facet_ords.extend(facet_reader.facet_ords(0u32));
assert_eq!(&facet_ords, &[0u64]);
facet_ords.clear();
facet_ords.extend(facet_reader.facet_ords(1u32));
assert!(facet_ords.is_empty());
Ok(())
}
// #[test]
// fn test_facet_not_populated_for_any_docs() -> crate::Result<()> {
// let mut schema_builder = SchemaBuilder::default();
// schema_builder.add_facet_field("facet", FacetOptions::default());
// let schema = schema_builder.build();
// let index = Index::create_in_ram(schema);
// let mut index_writer = index.writer_for_tests()?;
// index_writer.add_document(TantivyDocument::default())?;
// index_writer.add_document(TantivyDocument::default())?;
// index_writer.commit()?;
// let searcher = index.reader()?.searcher();
// let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
// assert!(facet_reader.facet_ords(0u32).next().is_none());
// assert!(facet_reader.facet_ords(1u32).next().is_none());
// Ok(())
// }
#[test]
fn test_facet_not_populated_for_any_docs() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::default();
schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(TantivyDocument::default())?;
index_writer.add_document(TantivyDocument::default())?;
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let facet_reader = searcher.segment_reader(0u32).facet_reader("facet").unwrap();
assert!(facet_reader.facet_ords(0u32).next().is_none());
assert!(facet_reader.facet_ords(1u32).next().is_none());
Ok(())
}
}

View File

@@ -808,7 +808,7 @@ mod tests {
use proptest::prop_oneof;
use super::super::operation::UserOperation;
use crate::collector::TopDocs;
use crate::collector::{Count, TopDocs};
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
@@ -1572,20 +1572,74 @@ mod tests {
Ok(())
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
enum IndexingOp {
AddDoc { id: u64 },
DeleteDoc { id: u64 },
DeleteDocQuery { id: u64 },
AddMultipleDoc {
id: u64,
num_docs: u64,
value: IndexValue,
},
AddDoc {
id: u64,
value: IndexValue,
},
DeleteDoc {
id: u64,
},
DeleteDocQuery {
id: u64,
},
Commit,
Merge,
}
impl IndexingOp {
fn add(id: u64) -> Self {
IndexingOp::AddDoc {
id,
value: IndexValue::F64(id as f64),
}
}
}
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
enum IndexValue {
Str(String),
F64(f64),
U64(u64),
I64(i64),
}
impl Default for IndexValue {
fn default() -> Self {
IndexValue::F64(0.0)
}
}
fn value_strategy() -> impl Strategy<Value = IndexValue> {
prop_oneof![
any::<f64>().prop_map(IndexValue::F64),
any::<u64>().prop_map(IndexValue::U64),
any::<i64>().prop_map(IndexValue::I64),
any::<String>().prop_map(IndexValue::Str),
]
}
fn balanced_operation_strategy() -> impl Strategy<Value = IndexingOp> {
prop_oneof![
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
(0u64..20u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
(0u64..20u64).prop_map(|id| IndexingOp::AddDoc { id }),
(0u64..20u64, value_strategy())
.prop_map(move |(id, value)| IndexingOp::AddDoc { id, value }),
((0u64..20u64), (1u64..100), value_strategy()).prop_map(
move |(id, num_docs, value)| {
IndexingOp::AddMultipleDoc {
id,
num_docs,
value,
}
}
),
(0u64..1u64).prop_map(|_| IndexingOp::Commit),
(0u64..1u64).prop_map(|_| IndexingOp::Merge),
]
@@ -1595,7 +1649,17 @@ mod tests {
prop_oneof![
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDoc { id }),
5 => (0u64..100u64).prop_map(|id| IndexingOp::DeleteDocQuery { id }),
50 => (0u64..100u64).prop_map(|id| IndexingOp::AddDoc { id }),
50 => (0u64..100u64, value_strategy())
.prop_map(move |(id, value)| IndexingOp::AddDoc { id, value }),
50 => (0u64..100u64, (1u64..100), value_strategy()).prop_map(
move |(id, num_docs, value)| {
IndexingOp::AddMultipleDoc {
id,
num_docs,
value,
}
}
),
2 => (0u64..1u64).prop_map(|_| IndexingOp::Commit),
1 => (0u64..1u64).prop_map(|_| IndexingOp::Merge),
]
@@ -1604,19 +1668,27 @@ mod tests {
fn expected_ids(ops: &[IndexingOp]) -> (HashMap<u64, u64>, HashSet<u64>) {
let mut existing_ids = HashMap::new();
let mut deleted_ids = HashSet::new();
for &op in ops {
for op in ops {
match op {
IndexingOp::AddDoc { id } => {
*existing_ids.entry(id).or_insert(0) += 1;
deleted_ids.remove(&id);
IndexingOp::AddDoc { id, value: _ } => {
*existing_ids.entry(*id).or_insert(0) += 1;
deleted_ids.remove(id);
}
IndexingOp::AddMultipleDoc {
id,
num_docs,
value: _,
} => {
*existing_ids.entry(*id).or_insert(0) += num_docs;
deleted_ids.remove(id);
}
IndexingOp::DeleteDoc { id } => {
existing_ids.remove(&id);
deleted_ids.insert(id);
deleted_ids.insert(*id);
}
IndexingOp::DeleteDocQuery { id } => {
existing_ids.remove(&id);
deleted_ids.insert(id);
deleted_ids.insert(*id);
}
_ => {}
}
@@ -1626,16 +1698,19 @@ mod tests {
fn get_id_list(ops: &[IndexingOp]) -> Vec<u64> {
let mut id_list = Vec::new();
for &op in ops {
for op in ops {
match op {
IndexingOp::AddDoc { id } => {
id_list.push(id);
IndexingOp::AddDoc { id, value: _ } => {
id_list.push(*id);
}
IndexingOp::AddMultipleDoc { id, .. } => {
id_list.push(*id);
}
IndexingOp::DeleteDoc { id } => {
id_list.retain(|el| *el != id);
id_list.retain(|el| el != id);
}
IndexingOp::DeleteDocQuery { id } => {
id_list.retain(|el| *el != id);
id_list.retain(|el| el != id);
}
_ => {}
}
@@ -1716,42 +1791,59 @@ mod tests {
let ip_from_id = |id| Ipv6Addr::from_u128(id as u128);
for &op in ops {
match op {
IndexingOp::AddDoc { id } => {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
let ip = ip_from_id(id);
if !id_is_full_doc(id) {
// every 3rd doc has no ip field
index_writer.add_document(doc!(
id_field=>id,
))?;
} else {
let json = json!({"date1": format!("2022-{id}-01T00:00:01Z"), "date2": format!("{id}-05-01T00:00:01Z"), "id": id, "ip": ip.to_string()});
index_writer.add_document(doc!(id_field=>id,
json_field=>json,
bytes_field => id.to_le_bytes().as_slice(),
id_opt_field => id,
ip_field => ip,
ips_field => ip,
ips_field => ip,
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
i64_field => id as i64,
f64_field => id as f64,
date_field => DateTime::from_timestamp_secs(id as i64),
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field => LOREM,
multi_text_fields => multi_text_field_text1,
multi_text_fields => multi_text_field_text2,
multi_text_fields => multi_text_field_text3,
))?;
}
let add_docs = |index_writer: &mut IndexWriter,
id: u64,
value: IndexValue,
num: u64|
-> crate::Result<()> {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
let ip = ip_from_id(id);
let doc = if !id_is_full_doc(id) {
// every 3rd doc has no ip field
doc!(
id_field=>id,
)
} else {
let json = json!({"date1": format!("2022-{id}-01T00:00:01Z"), "date2": format!("{id}-05-01T00:00:01Z"), "id": id, "ip": ip.to_string(), "val": value});
doc!(id_field=>id,
json_field=>json,
bytes_field => id.to_le_bytes().as_slice(),
id_opt_field => id,
ip_field => ip,
ips_field => ip,
ips_field => ip,
multi_numbers=> id,
multi_numbers => id,
bool_field => (id % 2u64) != 0,
i64_field => id as i64,
f64_field => id as f64,
date_field => DateTime::from_timestamp_secs(id as i64),
multi_bools => (id % 2u64) != 0,
multi_bools => (id % 2u64) == 0,
text_field => id.to_string(),
facet_field => facet,
large_text_field => LOREM,
multi_text_fields => multi_text_field_text1,
multi_text_fields => multi_text_field_text2,
multi_text_fields => multi_text_field_text3,
)
};
for _ in 0..num {
index_writer.add_document(doc.clone())?;
}
Ok(())
};
for op in ops {
match op.clone() {
IndexingOp::AddMultipleDoc {
id,
num_docs,
value,
} => {
add_docs(&mut index_writer, id, value, num_docs)?;
}
IndexingOp::AddDoc { id, value } => {
add_docs(&mut index_writer, id, value, 1)?;
}
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));
@@ -2032,18 +2124,22 @@ mod tests {
top_docs.iter().map(|el| el.1).collect::<Vec<_>>()
};
let count_search = |term: &str, field| {
let query = QueryParser::for_index(&index, vec![field])
.parse_query(term)
.unwrap();
searcher.search(&query, &Count).unwrap()
};
let do_search2 = |term: Term| {
let count_search2 = |term: Term| {
let query = TermQuery::new(term, IndexRecordOption::Basic);
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(1000)).unwrap();
top_docs.iter().map(|el| el.1).collect::<Vec<_>>()
searcher.search(&query, &Count).unwrap()
};
for (id, count) in &expected_ids_and_num_occurrences {
// skip expensive queries
let (existing_id, count) = (*id, *count);
let get_num_hits = |field| do_search(&existing_id.to_string(), field).len() as u64;
let get_num_hits = |field| count_search(&existing_id.to_string(), field) as u64;
assert_eq!(get_num_hits(id_field), count);
if !id_is_full_doc(existing_id) {
continue;
@@ -2053,29 +2149,31 @@ mod tests {
assert_eq!(get_num_hits(f64_field), count);
// Test multi text
assert_eq!(
do_search("\"test1 test2\"", multi_text_fields).len(),
num_docs_with_values
);
assert_eq!(
do_search("\"test2 test3\"", multi_text_fields).len(),
num_docs_with_values
);
if num_docs_with_values < 1000 {
assert_eq!(
do_search("\"test1 test2\"", multi_text_fields).len(),
num_docs_with_values
);
assert_eq!(
do_search("\"test2 test3\"", multi_text_fields).len(),
num_docs_with_values
);
}
// Test bytes
let term = Term::from_field_bytes(bytes_field, existing_id.to_le_bytes().as_slice());
assert_eq!(do_search2(term).len() as u64, count);
assert_eq!(count_search2(term) as u64, count);
// Test date
let term = Term::from_field_date(
date_field,
DateTime::from_timestamp_secs(existing_id as i64),
);
assert_eq!(do_search2(term).len() as u64, count);
assert_eq!(count_search2(term) as u64, count);
}
for deleted_id in deleted_ids {
let assert_field = |field| {
assert_eq!(do_search(&deleted_id.to_string(), field).len() as u64, 0);
assert_eq!(count_search(&deleted_id.to_string(), field) as u64, 0);
};
assert_field(text_field);
assert_field(f64_field);
@@ -2084,12 +2182,12 @@ mod tests {
// Test bytes
let term = Term::from_field_bytes(bytes_field, deleted_id.to_le_bytes().as_slice());
assert_eq!(do_search2(term).len() as u64, 0);
assert_eq!(count_search2(term), 0);
// Test date
let term =
Term::from_field_date(date_field, DateTime::from_timestamp_secs(deleted_id as i64));
assert_eq!(do_search2(term).len() as u64, 0);
assert_eq!(count_search2(term), 0);
}
// search ip address
//
@@ -2098,13 +2196,13 @@ mod tests {
if !id_is_full_doc(existing_id) {
continue;
}
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
let do_search_ip_field = |term: &str| count_search(term, ip_field) as u64;
let ip_addr = Ipv6Addr::from_u128(existing_id as u128);
// Test incoming ip as ipv6
assert_eq!(do_search_ip_field(&format!("\"{ip_addr}\"")), count);
let term = Term::from_field_ip_addr(ip_field, ip_addr);
assert_eq!(do_search2(term).len() as u64, count);
assert_eq!(count_search2(term) as u64, count);
// Test incoming ip as ipv4
if let Some(ip_addr) = ip_addr.to_ipv4_mapped() {
@@ -2121,7 +2219,7 @@ mod tests {
if !sample.is_empty() {
let (left_sample, right_sample) = sample.split_at(sample.len() / 2);
let expected_count = |sample: &[(&u64, &u64)]| {
let calc_expected_count = |sample: &[(&u64, &u64)]| {
sample
.iter()
.filter(|(id, _)| id_is_full_doc(**id))
@@ -2137,18 +2235,17 @@ mod tests {
}
// Query first half
if !left_sample.is_empty() {
let expected_count = expected_count(left_sample);
let expected_count = calc_expected_count(left_sample);
if !left_sample.is_empty() && expected_count < 1000 {
let start_range = *left_sample[0].0;
let end_range = *left_sample.last().unwrap().0;
let query = gen_query_inclusive("id_opt", start_range, end_range);
assert_eq!(do_search(&query, id_opt_field).len() as u64, expected_count);
assert_eq!(count_search(&query, id_opt_field) as u64, expected_count);
// Range query on ip field
let ip1 = ip_from_id(start_range);
let ip2 = ip_from_id(end_range);
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
let do_search_ip_field = |term: &str| count_search(term, ip_field) as u64;
let query = gen_query_inclusive("ip", ip1, ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
let query = gen_query_inclusive("ip", "*", ip2);
@@ -2160,19 +2257,19 @@ mod tests {
assert_eq!(do_search_ip_field(&query), expected_count);
}
// Query second half
if !right_sample.is_empty() {
let expected_count = expected_count(right_sample);
let expected_count = calc_expected_count(right_sample);
if !right_sample.is_empty() && expected_count < 1000 {
let start_range = *right_sample[0].0;
let end_range = *right_sample.last().unwrap().0;
// Range query on id opt field
let query =
gen_query_inclusive("id_opt", start_range.to_string(), end_range.to_string());
assert_eq!(do_search(&query, id_opt_field).len() as u64, expected_count);
assert_eq!(count_search(&query, id_opt_field) as u64, expected_count);
// Range query on ip field
let ip1 = ip_from_id(start_range);
let ip2 = ip_from_id(end_range);
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
let do_search_ip_field = |term: &str| count_search(term, ip_field) as u64;
let query = gen_query_inclusive("ip", ip1, ip2);
assert_eq!(do_search_ip_field(&query), expected_count);
let query = gen_query_inclusive("ip", ip1, "*");
@@ -2197,7 +2294,7 @@ mod tests {
};
let ip = ip_from_id(existing_id);
let do_search_ip_field = |term: &str| do_search(term, ip_field).len() as u64;
let do_search_ip_field = |term: &str| count_search(term, ip_field) as u64;
// Range query on single value field
let query = gen_query_inclusive("ip", ip, ip);
assert_eq!(do_search_ip_field(&query), count);
@@ -2257,7 +2354,7 @@ mod tests {
#[test]
fn test_fast_field_range() {
let ops: Vec<_> = (0..1000).map(|id| IndexingOp::AddDoc { id }).collect();
let ops: Vec<_> = (0..1000).map(|id| IndexingOp::add(id)).collect();
assert!(test_operation_strategy(&ops, false, true).is_ok());
}
@@ -2265,8 +2362,8 @@ mod tests {
fn test_sort_index_on_opt_field_regression() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 81 },
IndexingOp::AddDoc { id: 70 },
IndexingOp::add(81),
IndexingOp::add(70),
IndexingOp::DeleteDoc { id: 70 }
],
true,
@@ -2275,14 +2372,45 @@ mod tests {
.is_ok());
}
#[test]
fn test_simple_multiple_doc() {
assert!(test_operation_strategy(
&[
IndexingOp::AddMultipleDoc {
id: 7,
num_docs: 800,
value: IndexValue::U64(0),
},
IndexingOp::AddMultipleDoc {
id: 92,
num_docs: 800,
value: IndexValue::U64(0),
},
IndexingOp::AddMultipleDoc {
id: 30,
num_docs: 800,
value: IndexValue::U64(0),
},
IndexingOp::AddMultipleDoc {
id: 33,
num_docs: 800,
value: IndexValue::U64(0),
},
],
true,
false
)
.is_ok());
}
#[test]
fn test_ip_range_query_multivalue_bug() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 2 },
IndexingOp::add(2),
IndexingOp::Commit,
IndexingOp::AddDoc { id: 1 },
IndexingOp::AddDoc { id: 1 },
IndexingOp::add(1),
IndexingOp::add(1),
IndexingOp::Commit,
IndexingOp::Merge
],
@@ -2296,11 +2424,11 @@ mod tests {
fn test_ff_num_ips_regression() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 13 },
IndexingOp::AddDoc { id: 1 },
IndexingOp::add(13),
IndexingOp::add(1),
IndexingOp::Commit,
IndexingOp::DeleteDocQuery { id: 13 },
IndexingOp::AddDoc { id: 1 },
IndexingOp::add(1),
IndexingOp::Commit,
],
false,
@@ -2312,7 +2440,7 @@ mod tests {
#[test]
fn test_minimal_sort_force_end_merge() {
assert!(test_operation_strategy(
&[IndexingOp::AddDoc { id: 23 }, IndexingOp::AddDoc { id: 13 },],
&[IndexingOp::add(23), IndexingOp::add(13),],
false,
false
)
@@ -2373,8 +2501,8 @@ mod tests {
fn test_minimal_sort_force_end_merge_with_delete() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::add(23),
IndexingOp::add(13),
IndexingOp::DeleteDoc { id: 13 }
],
true,
@@ -2387,8 +2515,8 @@ mod tests {
fn test_minimal_no_sort_no_force_end_merge() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 23 },
IndexingOp::AddDoc { id: 13 },
IndexingOp::add(23),
IndexingOp::add(13),
IndexingOp::DeleteDoc { id: 13 }
],
false,
@@ -2399,7 +2527,7 @@ mod tests {
#[test]
fn test_minimal_sort_merge() {
assert!(test_operation_strategy(&[IndexingOp::AddDoc { id: 3 },], true, true).is_ok());
assert!(test_operation_strategy(&[IndexingOp::add(3),], true, true).is_ok());
}
use proptest::prelude::*;
@@ -2495,14 +2623,14 @@ mod tests {
fn test_delete_bug_reproduction_ip_addr() {
use IndexingOp::*;
let ops = &[
AddDoc { id: 1 },
AddDoc { id: 2 },
IndexingOp::add(1),
IndexingOp::add(2),
Commit,
AddDoc { id: 3 },
IndexingOp::add(3),
DeleteDoc { id: 1 },
Commit,
Merge,
AddDoc { id: 4 },
IndexingOp::add(4),
Commit,
];
test_operation_strategy(&ops[..], false, true).unwrap();
@@ -2511,7 +2639,13 @@ mod tests {
#[test]
fn test_merge_regression_1() {
use IndexingOp::*;
let ops = &[AddDoc { id: 15 }, Commit, AddDoc { id: 9 }, Commit, Merge];
let ops = &[
IndexingOp::add(15),
Commit,
IndexingOp::add(9),
Commit,
Merge,
];
test_operation_strategy(&ops[..], false, true).unwrap();
}
@@ -2519,9 +2653,9 @@ mod tests {
fn test_range_query_bug_1() {
use IndexingOp::*;
let ops = &[
AddDoc { id: 9 },
AddDoc { id: 0 },
AddDoc { id: 13 },
IndexingOp::add(9),
IndexingOp::add(0),
IndexingOp::add(13),
Commit,
];
test_operation_strategy(&ops[..], false, true).unwrap();
@@ -2529,12 +2663,11 @@ mod tests {
#[test]
fn test_range_query_bug_2() {
use IndexingOp::*;
let ops = &[
AddDoc { id: 3 },
AddDoc { id: 6 },
AddDoc { id: 9 },
AddDoc { id: 10 },
IndexingOp::add(3),
IndexingOp::add(6),
IndexingOp::add(9),
IndexingOp::add(10),
];
test_operation_strategy(&ops[..], false, false).unwrap();
}
@@ -2556,7 +2689,7 @@ mod tests {
assert!(test_operation_strategy(
&[
IndexingOp::DeleteDoc { id: 0 },
IndexingOp::AddDoc { id: 6 },
IndexingOp::add(6),
IndexingOp::DeleteDocQuery { id: 11 },
IndexingOp::Commit,
IndexingOp::Merge,
@@ -2573,10 +2706,13 @@ mod tests {
fn test_bug_1617_2() {
assert!(test_operation_strategy(
&[
IndexingOp::AddDoc { id: 13 },
IndexingOp::AddDoc {
id: 13,
value: Default::default()
},
IndexingOp::DeleteDoc { id: 13 },
IndexingOp::Commit,
IndexingOp::AddDoc { id: 30 },
IndexingOp::add(30),
IndexingOp::Commit,
IndexingOp::Merge,
],

View File

@@ -202,9 +202,8 @@ impl SegmentWriter {
match field_entry.field_type() {
FieldType::Facet(_) => {
let mut facet_tokenizer = FacetTokenizer::default(); // this can be global
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
let facet_str = value.as_facet().ok_or_else(make_schema_error)?;
let mut facet_tokenizer = facet_tokenizer.token_stream(facet_str);
@@ -220,15 +219,14 @@ impl SegmentWriter {
}
FieldType::Str(_) => {
let mut indexing_position = IndexingPosition::default();
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
let mut token_stream = if let Some(text) = value.as_str() {
let text_analyzer =
&mut self.per_field_text_analyzers[field.field_id() as usize];
text_analyzer.token_stream(text)
} else if let Some(tok_str) = value.as_pre_tokenized_text() {
} else if let Some(tok_str) = value.into_pre_tokenized_text() {
BoxTokenStream::new(PreTokenizedStream::from(*tok_str.clone()))
} else {
continue;
@@ -250,9 +248,8 @@ impl SegmentWriter {
}
FieldType::U64(_) => {
let mut num_vals = 0;
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
num_vals += 1;
let u64_val = value.as_u64().ok_or_else(make_schema_error)?;
@@ -265,10 +262,8 @@ impl SegmentWriter {
}
FieldType::Date(_) => {
let mut num_vals = 0;
for value_access in values {
// Used to help with linting and type checking.
let value_access = value_access as D::Value<'_>;
let value = value_access.as_value();
for value in values {
let value = value.as_value();
num_vals += 1;
let date_val = value.as_datetime().ok_or_else(make_schema_error)?;
@@ -282,9 +277,8 @@ impl SegmentWriter {
}
FieldType::I64(_) => {
let mut num_vals = 0;
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
num_vals += 1;
let i64_val = value.as_i64().ok_or_else(make_schema_error)?;
@@ -297,10 +291,8 @@ impl SegmentWriter {
}
FieldType::F64(_) => {
let mut num_vals = 0;
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
num_vals += 1;
let f64_val = value.as_f64().ok_or_else(make_schema_error)?;
term_buffer.set_f64(f64_val);
@@ -312,10 +304,8 @@ impl SegmentWriter {
}
FieldType::Bool(_) => {
let mut num_vals = 0;
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
num_vals += 1;
let bool_val = value.as_bool().ok_or_else(make_schema_error)?;
term_buffer.set_bool(bool_val);
@@ -327,10 +317,8 @@ impl SegmentWriter {
}
FieldType::Bytes(_) => {
let mut num_vals = 0;
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
num_vals += 1;
let bytes = value.as_bytes().ok_or_else(make_schema_error)?;
term_buffer.set_bytes(bytes);
@@ -364,9 +352,8 @@ impl SegmentWriter {
}
FieldType::IpAddr(_) => {
let mut num_vals = 0;
for value_access in values {
// Used to help with linting and type checking.
let value = value_access as D::Value<'_>;
for value in values {
let value = value.as_value();
num_vals += 1;
let ip_addr = value.as_ip_addr().ok_or_else(make_schema_error)?;

View File

@@ -2,7 +2,7 @@ use crate::docset::{DocSet, TERMINATED};
use crate::fieldnorm::FieldNormReader;
use crate::postings::Postings;
use crate::query::bm25::Bm25Weight;
use crate::query::phrase_query::{intersection_count, intersection_exists, PhraseScorer};
use crate::query::phrase_query::{intersection_count, PhraseScorer};
use crate::query::Scorer;
use crate::{DocId, Score};
@@ -92,17 +92,14 @@ impl<TPostings: Postings> Scorer for PhraseKind<TPostings> {
}
}
pub struct PhrasePrefixScorer<TPostings: Postings, const SCORING_ENABLED: bool> {
pub struct PhrasePrefixScorer<TPostings: Postings> {
phrase_scorer: PhraseKind<TPostings>,
suffixes: Vec<TPostings>,
suffix_offset: u32,
phrase_count: u32,
suffix_position_buffer: Vec<u32>,
}
impl<TPostings: Postings, const SCORING_ENABLED: bool>
PhrasePrefixScorer<TPostings, SCORING_ENABLED>
{
impl<TPostings: Postings> PhrasePrefixScorer<TPostings> {
// If similarity_weight is None, then scoring is disabled.
pub fn new(
mut term_postings: Vec<(usize, TPostings)>,
@@ -110,7 +107,7 @@ impl<TPostings: Postings, const SCORING_ENABLED: bool>
fieldnorm_reader: FieldNormReader,
suffixes: Vec<TPostings>,
suffix_pos: usize,
) -> PhrasePrefixScorer<TPostings, SCORING_ENABLED> {
) -> PhrasePrefixScorer<TPostings> {
// correct indices so we can merge with our suffix term the PhraseScorer doesn't know about
let max_offset = term_postings
.iter()
@@ -143,7 +140,6 @@ impl<TPostings: Postings, const SCORING_ENABLED: bool>
suffixes,
suffix_offset: (max_offset - suffix_pos) as u32,
phrase_count: 0,
suffix_position_buffer: Vec::with_capacity(100),
};
if phrase_prefix_scorer.doc() != TERMINATED && !phrase_prefix_scorer.matches_prefix() {
phrase_prefix_scorer.advance();
@@ -157,6 +153,7 @@ impl<TPostings: Postings, const SCORING_ENABLED: bool>
fn matches_prefix(&mut self) -> bool {
let mut count = 0;
let mut positions = Vec::new();
let current_doc = self.doc();
let pos_matching = self.phrase_scorer.get_intersection();
for suffix in &mut self.suffixes {
@@ -165,27 +162,16 @@ impl<TPostings: Postings, const SCORING_ENABLED: bool>
}
let doc = suffix.seek(current_doc);
if doc == current_doc {
suffix.positions_with_offset(self.suffix_offset, &mut self.suffix_position_buffer);
if SCORING_ENABLED {
count += intersection_count(pos_matching, &self.suffix_position_buffer);
} else {
if intersection_exists(pos_matching, &self.suffix_position_buffer) {
return true;
}
}
suffix.positions_with_offset(self.suffix_offset, &mut positions);
count += intersection_count(pos_matching, &positions);
}
}
if !SCORING_ENABLED {
return false;
}
self.phrase_count = count as u32;
count != 0
}
}
impl<TPostings: Postings, const SCORING_ENABLED: bool> DocSet
for PhrasePrefixScorer<TPostings, SCORING_ENABLED>
{
impl<TPostings: Postings> DocSet for PhrasePrefixScorer<TPostings> {
fn advance(&mut self) -> DocId {
loop {
let doc = self.phrase_scorer.advance();
@@ -212,15 +198,9 @@ impl<TPostings: Postings, const SCORING_ENABLED: bool> DocSet
}
}
impl<TPostings: Postings, const SCORING_ENABLED: bool> Scorer
for PhrasePrefixScorer<TPostings, SCORING_ENABLED>
{
impl<TPostings: Postings> Scorer for PhrasePrefixScorer<TPostings> {
fn score(&mut self) -> Score {
if SCORING_ENABLED {
self.phrase_scorer.score()
} else {
1.0f32
}
// TODO modify score??
self.phrase_scorer.score()
}
}

View File

@@ -42,11 +42,11 @@ impl PhrasePrefixWeight {
Ok(FieldNormReader::constant(reader.max_doc(), 1))
}
pub(crate) fn phrase_prefix_scorer<const SCORING_ENABLED: bool>(
pub(crate) fn phrase_scorer(
&self,
reader: &SegmentReader,
boost: Score,
) -> crate::Result<Option<PhrasePrefixScorer<SegmentPostings, SCORING_ENABLED>>> {
) -> crate::Result<Option<PhrasePrefixScorer<SegmentPostings>>> {
let similarity_weight_opt = self
.similarity_weight_opt
.as_ref()
@@ -128,20 +128,15 @@ impl PhrasePrefixWeight {
impl Weight for PhrasePrefixWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
if self.similarity_weight_opt.is_some() {
if let Some(scorer) = self.phrase_prefix_scorer::<true>(reader, boost)? {
return Ok(Box::new(scorer));
}
if let Some(scorer) = self.phrase_scorer(reader, boost)? {
Ok(Box::new(scorer))
} else {
if let Some(scorer) = self.phrase_prefix_scorer::<false>(reader, boost)? {
return Ok(Box::new(scorer));
}
Ok(Box::new(EmptyScorer))
}
Ok(Box::new(EmptyScorer))
}
fn explain(&self, reader: &SegmentReader, doc: DocId) -> crate::Result<Explanation> {
let scorer_opt = self.phrase_prefix_scorer::<true>(reader, 1.0)?;
let scorer_opt = self.phrase_scorer(reader, 1.0)?;
if scorer_opt.is_none() {
return Err(does_not_match(doc));
}
@@ -205,7 +200,7 @@ mod tests {
.unwrap()
.unwrap();
let mut phrase_scorer = phrase_weight
.phrase_prefix_scorer::<true>(searcher.segment_reader(0u32), 1.0)?
.phrase_scorer(searcher.segment_reader(0u32), 1.0)?
.unwrap();
assert_eq!(phrase_scorer.doc(), 1);
assert_eq!(phrase_scorer.phrase_count(), 2);
@@ -216,38 +211,6 @@ mod tests {
Ok(())
}
#[test]
pub fn test_phrase_no_count() -> crate::Result<()> {
let index = create_index(&[
"aa bb dd cc",
"aa aa bb c dd aa bb cc aa bb dc",
" aa bb cd",
])?;
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
let searcher = index.reader()?.searcher();
let phrase_query = PhrasePrefixQuery::new(vec![
Term::from_field_text(text_field, "aa"),
Term::from_field_text(text_field, "bb"),
Term::from_field_text(text_field, "c"),
]);
let enable_scoring = EnableScoring::enabled_from_searcher(&searcher);
let phrase_weight = phrase_query
.phrase_prefix_query_weight(enable_scoring)
.unwrap()
.unwrap();
let mut phrase_scorer = phrase_weight
.phrase_prefix_scorer::<false>(searcher.segment_reader(0u32), 1.0)?
.unwrap();
assert_eq!(phrase_scorer.doc(), 1);
assert_eq!(phrase_scorer.phrase_count(), 0);
assert_eq!(phrase_scorer.advance(), 2);
assert_eq!(phrase_scorer.doc(), 2);
assert_eq!(phrase_scorer.phrase_count(), 0);
assert_eq!(phrase_scorer.advance(), TERMINATED);
Ok(())
}
#[test]
pub fn test_phrase_count_mid() -> crate::Result<()> {
let index = create_index(&["aa dd cc", "aa aa bb c dd aa bb cc aa dc", " aa bb cd"])?;
@@ -264,7 +227,7 @@ mod tests {
.unwrap()
.unwrap();
let mut phrase_scorer = phrase_weight
.phrase_prefix_scorer::<true>(searcher.segment_reader(0u32), 1.0)?
.phrase_scorer(searcher.segment_reader(0u32), 1.0)?
.unwrap();
assert_eq!(phrase_scorer.doc(), 1);
assert_eq!(phrase_scorer.phrase_count(), 2);

View File

@@ -3,8 +3,8 @@ mod phrase_scorer;
mod phrase_weight;
pub use self::phrase_query::PhraseQuery;
pub(crate) use self::phrase_scorer::intersection_count;
pub use self::phrase_scorer::PhraseScorer;
pub(crate) use self::phrase_scorer::{intersection_count, intersection_exists};
pub use self::phrase_weight::PhraseWeight;
#[cfg(test)]

View File

@@ -58,7 +58,7 @@ pub struct PhraseScorer<TPostings: Postings> {
}
/// Returns true if and only if the two sorted arrays contain a common element
pub(crate) fn intersection_exists(left: &[u32], right: &[u32]) -> bool {
fn intersection_exists(left: &[u32], right: &[u32]) -> bool {
let mut left_index = 0;
let mut right_index = 0;
while left_index < left.len() && right_index < right.len() {

View File

@@ -2,7 +2,7 @@ use std::fmt;
use std::ops::Bound;
use crate::query::Occur;
use crate::schema::{Field, Term, Type};
use crate::schema::{Term, Type};
use crate::Score;
#[derive(Clone)]
@@ -20,8 +20,6 @@ pub enum LogicalLiteral {
upper: Bound<Term>,
},
Set {
field: Field,
value_type: Type,
elements: Vec<Term>,
},
All,

View File

@@ -832,17 +832,11 @@ impl QueryParser {
let (field, json_path) = try_tuple!(self
.split_full_path(&full_path)
.ok_or_else(|| QueryParserError::FieldDoesNotExist(full_path.clone())));
let field_entry = self.schema.get_field_entry(field);
let value_type = field_entry.field_type().value_type();
let (elements, errors) = elements
.into_iter()
.map(|element| self.compute_boundary_term(field, json_path, &element))
.partition_result();
let logical_ast = LogicalAst::Leaf(Box::new(LogicalLiteral::Set {
elements,
field,
value_type,
}));
let logical_ast = LogicalAst::Leaf(Box::new(LogicalLiteral::Set { elements }));
(Some(logical_ast), errors)
}
UserInputLeaf::Exists { .. } => (

View File

@@ -17,15 +17,6 @@ pub trait Value<'a>: Send + Sync + Debug {
/// Returns the field value represented by an enum which borrows it's data.
fn as_value(&self) -> ReferenceValue<'a, Self>;
#[inline]
/// Returns if the value is `null` or not.
fn is_null(&self) -> bool {
matches!(
self.as_value(),
ReferenceValue::Leaf(ReferenceValueLeaf::Null)
)
}
#[inline]
/// If the Value is a leaf, returns the associated leaf. Returns None otherwise.
fn as_leaf(&self) -> Option<ReferenceValueLeaf<'a>> {
@@ -117,18 +108,6 @@ pub trait Value<'a>: Send + Sync + Debug {
None
}
}
#[inline]
/// Returns true if the Value is an array.
fn is_array(&self) -> bool {
matches!(self.as_value(), ReferenceValue::Object(_))
}
#[inline]
/// Returns true if the Value is an object.
fn is_object(&self) -> bool {
matches!(self.as_value(), ReferenceValue::Object(_))
}
}
/// A enum representing a leaf value for tantivy to index.