Compare commits

..

14 Commits

Author SHA1 Message Date
Paul Masurel
5e550beeb6 follow up on the fix of multiply with overflow 2025-02-24 12:02:02 +09:00
Pascal Seitz
fe93eded83 use usize in bitpacker
use usize in bitpacker to enable larger columns in the columnar store

Godbolt comparison with u32 vs u64 for get access: https://godbolt.org/z/cjf7nenYP

Add a mini-tool to inspect columnar files created by tantivy. (very basic functionality which can be extended later)
2025-02-24 11:25:53 +09:00
Paul Masurel
82b510b88b Adding panic handler for the rayon merge thread pool 2025-02-19 17:19:28 +09:00
Remi Dettai
71cf19870b Exist queries match subpath fields (#2558)
* Exist queries match subpath fields

* Make subpath check optional

* Add async subpath listing
2025-01-06 10:17:39 +01:00
Harrison Burt
148594f0f9 Improve IndexWriter customisation via builder (#2562)
* Improve `IndexWriter` customisation via builder

* Remove change noise from PR

* Correct documentation

* Resolve comments and add test
2025-01-02 09:43:22 +01:00
dependabot[bot]
8edb439440 Update rustc-hash requirement from 1.1.0 to 2.1.0 (#2551)
---
updated-dependencies:
- dependency-name: rustc-hash
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-12-26 10:25:05 +01:00
trinity-1686a
c39d91f827 Merge pull request #2547 from quickwit-oss/trinity/count-str
add support for counting non integer in aggregation
2024-12-17 15:27:30 +01:00
trinity Pointard
32b6e9711b add tests 2024-12-13 16:06:24 +01:00
dependabot[bot]
0f99d4f420 Update measure_time requirement from 0.8.2 to 0.9.0 (#2557)
---
updated-dependencies:
- dependency-name: measure_time
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-12-09 21:39:01 +01:00
Pierre Barre
6e02c5cb25 Make NUM_MERGE_THREADS configurable (#2535)
* Make `NUM_MERGE_THREADS` configurable

* Remove unused import

* Reword comment src/index/index.rs

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>

---------

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>
2024-12-09 16:53:11 +08:00
PSeitz
876a579e5d queryparser: add field respecification test (#2550) 2024-12-02 14:17:12 +01:00
PSeitz
4c52499622 clippy (#2549) 2024-11-29 16:08:21 +08:00
trinity-1686a
0bac391291 add support for counting non integer in aggregation 2024-11-28 19:52:47 +01:00
PSeitz
52d4e81e70 update CHANGELOG (#2546) 2024-11-27 20:49:35 +08:00
40 changed files with 729 additions and 186 deletions

View File

@@ -21,7 +21,7 @@ jobs:
- name: Generate code coverage
run: cargo +nightly-2024-07-01 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
uses: codecov/codecov-action@v3
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos

View File

@@ -1,11 +1,12 @@
Tantivy 0.23 - Unreleased
================================
Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0.21.
Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0.21. The new minimum rust version will be 1.75.
#### Bugfixes
- fix potential endless loop in merge [#2457](https://github.com/quickwit-oss/tantivy/pull/2457)(@PSeitz)
- fix bug that causes out-of-order sstable key. [#2445](https://github.com/quickwit-oss/tantivy/pull/2445)(@fulmicoton)
- fix ReferenceValue API flaw [#2372](https://github.com/quickwit-oss/tantivy/pull/2372)(@PSeitz)
- fix `OwnedBytes` debug panic [#2512](https://github.com/quickwit-oss/tantivy/pull/2512)(@b41sh)
#### Breaking API Changes
- remove index sorting [#2434](https://github.com/quickwit-oss/tantivy/pull/2434)(@PSeitz)
@@ -35,7 +36,15 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
- make find_field_with_default return json fields without path [#2476](https://github.com/quickwit-oss/tantivy/pull/2476)(@trinity-1686a)
- feat(query): Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
- **Optional Index in Multivalue Columnar Index** For mostly empty multivalued indices there was a large overhead during creation when iterating all docids (merge case). This is alleviated by placing an optional index in the multivalued index to mark documents that have values. This will slightly increase space and access time. [#2439](https://github.com/quickwit-oss/tantivy/pull/2439)(@PSeitz)
- **RegexPhraseQuery**
`RegexPhraseQuery` supports phrase queries with regex. E.g. query "b.* b.* wolf" matches "big bad wolf". Slop is supported as well: "b.* wolf"~2 matches "big bad wolf" [#2516](https://github.com/quickwit-oss/tantivy/pull/2516)(@PSeitz)
- **Optional Index in Multivalue Columnar Index**
For mostly empty multivalued indices there was a large overhead during creation when iterating all docids (merge case).
This is alleviated by placing an optional index in the multivalued index to mark documents that have values.
This will slightly increase space and access time. [#2439](https://github.com/quickwit-oss/tantivy/pull/2439)(@PSeitz)
- **Store DateTime as nanoseconds in doc store** DateTime in the doc store was truncated to microseconds previously. This removes this truncation, while still keeping backwards compatibility. [#2486](https://github.com/quickwit-oss/tantivy/pull/2486)(@PSeitz)
- **Performace/Memory**
- lift clauses in LogicalAst for optimized ast during execution [#2449](https://github.com/quickwit-oss/tantivy/pull/2449)(@PSeitz)
@@ -57,12 +66,13 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
- add bench & test for columnar merging [#2428](https://github.com/quickwit-oss/tantivy/pull/2428)(@PSeitz)
- Change in Executor API [#2391](https://github.com/quickwit-oss/tantivy/pull/2391)(@fulmicoton)
- Removed usage of num_cpus [#2387](https://github.com/quickwit-oss/tantivy/pull/2387)(@fulmicoton)
- use bingang for agg benchmark [#2378](https://github.com/quickwit-oss/tantivy/pull/2378)(@PSeitz)
- use bingang for agg and stacker benchmark [#2378](https://github.com/quickwit-oss/tantivy/pull/2378)[#2492](https://github.com/quickwit-oss/tantivy/pull/2492)(@PSeitz)
- cleanup top level exports [#2382](https://github.com/quickwit-oss/tantivy/pull/2382)(@PSeitz)
- make convert_to_fast_value_and_append_to_json_term pub [#2370](https://github.com/quickwit-oss/tantivy/pull/2370)(@PSeitz)
- remove JsonTermWriter [#2238](https://github.com/quickwit-oss/tantivy/pull/2238)(@PSeitz)
- validate sort by field type [#2336](https://github.com/quickwit-oss/tantivy/pull/2336)(@PSeitz)
- Fix trait bound of StoreReader::iter [#2360](https://github.com/quickwit-oss/tantivy/pull/2360)(@adamreichold)
- remove read_postings_no_deletes [#2526](https://github.com/quickwit-oss/tantivy/pull/2526)(@PSeitz)
Tantivy 0.22
================================
@@ -717,7 +727,7 @@ Tantivy 0.4.0
- Raise the limit of number of fields (previously 256 fields) (@fulmicoton)
- Removed u32 fields. They are replaced by u64 and i64 fields (#65) (@fulmicoton)
- Optimized skip in SegmentPostings (#130) (@lnicola)
- Replacing rustc_serialize by serde. Kudos to @KodrAus and @lnicola
- Replacing rustc_serialize by serde. Kudos to benchmark@KodrAus and @lnicola
- Using error-chain (@KodrAus)
- QueryParser: (@fulmicoton)
- Explicit error returned when searched for a term that is not indexed

View File

@@ -53,8 +53,9 @@ rayon = "1.5.2"
lru = "0.12.0"
fastdivide = "0.4.0"
itertools = "0.13.0"
measure_time = "0.8.2"
measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }

View File

@@ -65,7 +65,7 @@ impl BitPacker {
#[derive(Clone, Debug, Default, Copy)]
pub struct BitUnpacker {
num_bits: u32,
num_bits: usize,
mask: u64,
}
@@ -83,7 +83,7 @@ impl BitUnpacker {
(1u64 << num_bits) - 1u64
};
BitUnpacker {
num_bits: u32::from(num_bits),
num_bits: usize::from(num_bits),
mask,
}
}
@@ -94,14 +94,14 @@ impl BitUnpacker {
#[inline]
pub fn get(&self, idx: u32, data: &[u8]) -> u64 {
let addr_in_bits = idx * self.num_bits;
let addr = (addr_in_bits >> 3) as usize;
let addr_in_bits = idx as usize * self.num_bits;
let addr = addr_in_bits >> 3;
if addr + 8 > data.len() {
if self.num_bits == 0 {
return 0;
}
let bit_shift = addr_in_bits & 7;
return self.get_slow_path(addr, bit_shift, data);
return self.get_slow_path(addr, bit_shift as u32, data);
}
let bit_shift = addr_in_bits & 7;
let bytes: [u8; 8] = (&data[addr..addr + 8]).try_into().unwrap();
@@ -129,24 +129,25 @@ impl BitUnpacker {
//
// This methods panics if `num_bits` is > 32.
fn get_batch_u32s(&self, start_idx: u32, data: &[u8], output: &mut [u32]) {
let start_idx = start_idx as usize;
assert!(
self.bit_width() <= 32,
"Bitwidth must be <= 32 to use this method."
);
let end_idx = start_idx + output.len() as u32;
let end_idx = start_idx + output.len();
let end_bit_read = end_idx * self.num_bits;
let end_byte_read = (end_bit_read + 7) / 8;
assert!(
end_byte_read as usize <= data.len(),
end_byte_read <= data.len(),
"Requested index is out of bounds."
);
// Simple slow implementation of get_batch_u32s, to deal with our ramps.
let get_batch_ramp = |start_idx: u32, output: &mut [u32]| {
let get_batch_ramp = |start_idx: usize, output: &mut [u32]| {
for (out, idx) in output.iter_mut().zip(start_idx..) {
*out = self.get(idx, data) as u32;
*out = self.get(idx as u32, data) as u32;
}
};
@@ -161,23 +162,23 @@ impl BitUnpacker {
// so highway start is the closest multiple of 8 that is >= start_idx.
let entrance_ramp_len = 8 - (start_idx % 8) % 8;
let highway_start: u32 = start_idx + entrance_ramp_len;
let highway_start: usize = start_idx + entrance_ramp_len;
if highway_start + BitPacker1x::BLOCK_LEN as u32 > end_idx {
if highway_start + BitPacker1x::BLOCK_LEN > end_idx {
// We don't have enough values to have even a single block of highway.
// Let's just supply the values the simple way.
get_batch_ramp(start_idx, output);
return;
}
let num_blocks: u32 = (end_idx - highway_start) / BitPacker1x::BLOCK_LEN as u32;
let num_blocks: usize = (end_idx - highway_start) / BitPacker1x::BLOCK_LEN;
// Entrance ramp
get_batch_ramp(start_idx, &mut output[..entrance_ramp_len as usize]);
get_batch_ramp(start_idx, &mut output[..entrance_ramp_len]);
// Highway
let mut offset = (highway_start * self.num_bits) as usize / 8;
let mut output_cursor = (highway_start - start_idx) as usize;
let mut offset = (highway_start * self.num_bits) / 8;
let mut output_cursor = highway_start - start_idx;
for _ in 0..num_blocks {
offset += BitPacker1x.decompress(
&data[offset..],
@@ -188,7 +189,7 @@ impl BitUnpacker {
}
// Exit ramp
let highway_end = highway_start + num_blocks * BitPacker1x::BLOCK_LEN as u32;
let highway_end = highway_start + num_blocks * BitPacker1x::BLOCK_LEN;
get_batch_ramp(highway_end, &mut output[output_cursor..]);
}

View File

@@ -0,0 +1,18 @@
[package]
name = "tantivy-columnar-inspect"
version = "0.1.0"
edition = "2021"
license = "MIT"
[dependencies]
tantivy = {path="../..", package="tantivy"}
columnar = {path="../", package="tantivy-columnar"}
common = {path="../../common", package="tantivy-common"}
[workspace]
members = []
[profile.release]
debug = true
#debug-assertions = true
#overflow-checks = true

View File

@@ -0,0 +1,54 @@
use columnar::ColumnarReader;
use common::file_slice::{FileSlice, WrapFile};
use std::io;
use std::path::Path;
use tantivy::directory::footer::Footer;
fn main() -> io::Result<()> {
println!("Opens a columnar file written by tantivy and validates it.");
let path = std::env::args().nth(1).unwrap();
let path = Path::new(&path);
println!("Reading {:?}", path);
let _reader = open_and_validate_columnar(path.to_str().unwrap())?;
Ok(())
}
pub fn validate_columnar_reader(reader: &ColumnarReader) {
let num_rows = reader.num_rows();
println!("num_rows: {}", num_rows);
let columns = reader.list_columns().unwrap();
println!("num columns: {:?}", columns.len());
for (col_name, dynamic_column_handle) in columns {
let col = dynamic_column_handle.open().unwrap();
match col {
columnar::DynamicColumn::Bool(_)
| columnar::DynamicColumn::I64(_)
| columnar::DynamicColumn::U64(_)
| columnar::DynamicColumn::F64(_)
| columnar::DynamicColumn::IpAddr(_)
| columnar::DynamicColumn::DateTime(_)
| columnar::DynamicColumn::Bytes(_) => {}
columnar::DynamicColumn::Str(str_column) => {
let num_vals = str_column.ords().values.num_vals();
let num_terms_dict = str_column.num_terms() as u64;
let max_ord = str_column.ords().values.iter().max().unwrap_or_default();
println!("{col_name:35} num_vals {num_vals:10} \t num_terms_dict {num_terms_dict:8} max_ord: {max_ord:8}",);
for ord in str_column.ords().values.iter() {
assert!(ord < num_terms_dict);
}
}
}
}
}
/// Opens a columnar file that was written by tantivy and validates it.
pub fn open_and_validate_columnar(path: &str) -> io::Result<ColumnarReader> {
let wrap_file = WrapFile::new(std::fs::File::open(path)?)?;
let slice = FileSlice::new(std::sync::Arc::new(wrap_file));
let (_footer, slice) = Footer::extract_footer(slice.clone()).unwrap();
let reader = ColumnarReader::open(slice).unwrap();
validate_columnar_reader(&reader);
Ok(reader)
}

View File

@@ -58,7 +58,7 @@ struct ShuffledIndex<'a> {
merge_order: &'a ShuffleMergeOrder,
}
impl<'a> Iterable<u32> for ShuffledIndex<'a> {
impl Iterable<u32> for ShuffledIndex<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(
self.merge_order
@@ -127,7 +127,7 @@ fn integrate_num_vals(num_vals: impl Iterator<Item = u32>) -> impl Iterator<Item
)
}
impl<'a> Iterable<u32> for ShuffledMultivaluedIndex<'a> {
impl Iterable<u32> for ShuffledMultivaluedIndex<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
let num_vals_per_row = iter_num_values(self.column_indexes, self.merge_order);
Box::new(integrate_num_vals(num_vals_per_row))

View File

@@ -123,7 +123,7 @@ fn get_num_values_iterator<'a>(
}
}
impl<'a> Iterable<u32> for StackedStartOffsets<'a> {
impl Iterable<u32> for StackedStartOffsets<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
let num_values_it = (0..self.column_indexes.len()).flat_map(|columnar_id| {
let num_docs = self.stack_merge_order.columnar_range(columnar_id).len() as u32;

View File

@@ -86,7 +86,7 @@ pub struct OptionalIndex {
block_metas: Arc<[BlockMeta]>,
}
impl<'a> Iterable<u32> for &'a OptionalIndex {
impl Iterable<u32> for &OptionalIndex {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(self.iter_rows())
}
@@ -123,7 +123,7 @@ enum BlockSelectCursor<'a> {
Sparse(<SparseBlock<'a> as Set<u16>>::SelectCursor<'a>),
}
impl<'a> BlockSelectCursor<'a> {
impl BlockSelectCursor<'_> {
fn select(&mut self, rank: u16) -> u16 {
match self {
BlockSelectCursor::Dense(dense_select_cursor) => dense_select_cursor.select(rank),
@@ -141,7 +141,7 @@ pub struct OptionalIndexSelectCursor<'a> {
num_null_rows_before_block: RowId,
}
impl<'a> OptionalIndexSelectCursor<'a> {
impl OptionalIndexSelectCursor<'_> {
fn search_and_load_block(&mut self, rank: RowId) {
if rank < self.current_block_end_rank {
// we are already in the right block
@@ -165,7 +165,7 @@ impl<'a> OptionalIndexSelectCursor<'a> {
}
}
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
impl SelectCursor<RowId> for OptionalIndexSelectCursor<'_> {
fn select(&mut self, rank: RowId) -> RowId {
self.search_and_load_block(rank);
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
@@ -505,7 +505,7 @@ fn deserialize_optional_index_block_metadatas(
non_null_rows_before_block += num_non_null_rows;
}
block_metas.resize(
((num_rows + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK) as usize,
num_rows.div_ceil(ELEMENTS_PER_BLOCK) as usize,
BlockMeta {
non_null_rows_before_block,
start_byte_offset,

View File

@@ -23,7 +23,6 @@ fn set_bit_at(input: &mut u64, n: u16) {
///
/// When translating a dense index to the original index, we can use the offset to find the correct
/// block. Direct computation is not possible, but we can employ a linear or binary search.
const ELEMENTS_PER_MINI_BLOCK: u16 = 64;
const MINI_BLOCK_BITVEC_NUM_BYTES: usize = 8;
const MINI_BLOCK_OFFSET_NUM_BYTES: usize = 2;
@@ -109,7 +108,7 @@ pub struct DenseBlockSelectCursor<'a> {
dense_block: DenseBlock<'a>,
}
impl<'a> SelectCursor<u16> for DenseBlockSelectCursor<'a> {
impl SelectCursor<u16> for DenseBlockSelectCursor<'_> {
#[inline]
fn select(&mut self, rank: u16) -> u16 {
self.block_id = self
@@ -175,7 +174,7 @@ impl<'a> Set<u16> for DenseBlock<'a> {
}
}
impl<'a> DenseBlock<'a> {
impl DenseBlock<'_> {
#[inline]
fn mini_block(&self, mini_block_id: u16) -> DenseMiniBlock {
let data_start_pos = mini_block_id as usize * MINI_BLOCK_NUM_BYTES;

View File

@@ -31,7 +31,7 @@ impl<'a> SelectCursor<u16> for SparseBlock<'a> {
}
}
impl<'a> Set<u16> for SparseBlock<'a> {
impl Set<u16> for SparseBlock<'_> {
type SelectCursor<'b>
= Self
where Self: 'b;
@@ -69,7 +69,7 @@ fn get_u16(data: &[u8], byte_position: usize) -> u16 {
u16::from_le_bytes(bytes)
}
impl<'a> SparseBlock<'a> {
impl SparseBlock<'_> {
#[inline(always)]
fn value_at_idx(&self, data: &[u8], idx: u16) -> u16 {
let start_offset: usize = idx as usize * 2;

View File

@@ -31,7 +31,7 @@ pub enum SerializableColumnIndex<'a> {
Multivalued(SerializableMultivalueIndex<'a>),
}
impl<'a> SerializableColumnIndex<'a> {
impl SerializableColumnIndex<'_> {
pub fn get_cardinality(&self) -> Cardinality {
match self {
SerializableColumnIndex::Full => Cardinality::Full,

View File

@@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> {
pub(crate) merge_row_order: &'a MergeRowOrder,
}
impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'a, T> {
impl<T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'_, T> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
match self.merge_row_order {
MergeRowOrder::Stack(_) => Box::new(

View File

@@ -39,7 +39,7 @@ impl BinarySerializable for Block {
}
fn compute_num_blocks(num_vals: u32) -> u32 {
(num_vals + BLOCK_SIZE - 1) / BLOCK_SIZE
num_vals.div_ceil(BLOCK_SIZE)
}
pub struct BlockwiseLinearEstimator {

View File

@@ -39,7 +39,7 @@ struct RemappedTermOrdinalsValues<'a> {
merge_row_order: &'a MergeRowOrder,
}
impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
impl Iterable for RemappedTermOrdinalsValues<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
match self.merge_row_order {
MergeRowOrder::Stack(_) => self.boxed_iter_stacked(),
@@ -50,7 +50,7 @@ impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
}
}
impl<'a> RemappedTermOrdinalsValues<'a> {
impl RemappedTermOrdinalsValues<'_> {
fn boxed_iter_stacked(&self) -> Box<dyn Iterator<Item = u64> + '_> {
let iter = self
.bytes_columns

View File

@@ -10,13 +10,13 @@ pub struct HeapItem<'a> {
pub segment_ord: usize,
}
impl<'a> PartialEq for HeapItem<'a> {
impl PartialEq for HeapItem<'_> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}
impl<'a> Eq for HeapItem<'a> {}
impl Eq for HeapItem<'_> {}
impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {

View File

@@ -1,6 +1,7 @@
use std::{fmt, io, mem};
use common::file_slice::FileSlice;
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::BinarySerializable;
use sstable::{Dictionary, RangeSSTable};
@@ -76,6 +77,19 @@ fn read_all_columns_in_stream(
Ok(results)
}
fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
format!("{}{}", column_name, '\0')
}
fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
}
impl ColumnarReader {
/// Opens a new Columnar file.
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
@@ -144,32 +158,14 @@ impl ColumnarReader {
Ok(self.iter_columns()?.collect())
}
fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
//
// This is in turn equivalent to searching for the range
// `[column_name,\0`..column_name\1)`.
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
let mut start_key = column_name.to_string();
start_key.push('\0');
let mut end_key = column_name.to_string();
end_key.push(1u8 as char);
self.column_dictionary
.range()
.ge(start_key.as_bytes())
.lt(end_key.as_bytes())
}
pub async fn read_columns_async(
&self,
column_name: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self
.stream_for_column_range(column_name)
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
@@ -180,7 +176,35 @@ impl ColumnarReader {
/// There can be more than one column associated to a given column name, provided they have
/// different types.
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let stream = self.stream_for_column_range(column_name).into_stream()?;
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
pub async fn read_subpath_columns_async(
&self,
root_path: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
///
/// There can be more than one column associated to each path within the JSON structure,
/// provided they have different types.
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix.as_bytes())
.into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
@@ -192,6 +216,8 @@ impl ColumnarReader {
#[cfg(test)]
mod tests {
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
#[test]
@@ -224,6 +250,64 @@ mod tests {
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
}
#[test]
fn test_read_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_column_type("col", ColumnType::U64, false);
columnar_writer.record_numerical(1, "col", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_columns("col").unwrap();
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].column_type(), ColumnType::U64);
}
{
let columns = columnar.read_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
fn test_read_subpath_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_str(
0,
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
"hello",
);
columnar_writer.record_numerical(
0,
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
1i64,
);
columnar_writer.record_str(1, "col1", "hello");
columnar_writer.record_str(0, "col2", "hello");
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_subpath_columns("col1").unwrap();
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].column_type(), ColumnType::Str);
assert_eq!(columns[1].column_type(), ColumnType::I64);
}
{
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("col2").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
#[should_panic(expected = "Input type forbidden")]
fn test_list_columns_strict_typing_panics_on_wrong_types() {

View File

@@ -285,7 +285,6 @@ impl ColumnarWriter {
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
);
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {

View File

@@ -67,7 +67,7 @@ pub struct ColumnSerializer<'a, W: io::Write> {
start_offset: u64,
}
impl<'a, W: io::Write> ColumnSerializer<'a, W> {
impl<W: io::Write> ColumnSerializer<'_, W> {
pub fn finalize(self) -> io::Result<()> {
let end_offset: u64 = self.columnar_serializer.wrt.written_bytes();
let byte_range = self.start_offset..end_offset;
@@ -80,7 +80,7 @@ impl<'a, W: io::Write> ColumnSerializer<'a, W> {
}
}
impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
impl<W: io::Write> io::Write for ColumnSerializer<'_, W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.columnar_serializer.wrt.write(buf)
}

View File

@@ -7,7 +7,7 @@ pub trait Iterable<T = u64> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_>;
}
impl<'a, T: Copy> Iterable<T> for &'a [T] {
impl<T: Copy> Iterable<T> for &[T] {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new(self.iter().copied())
}

View File

@@ -1,5 +1,6 @@
use std::fs::File;
use std::ops::{Deref, Range, RangeBounds};
use std::path::Path;
use std::sync::Arc;
use std::{fmt, io};
@@ -177,6 +178,12 @@ fn combine_ranges<R: RangeBounds<usize>>(orig_range: Range<usize>, rel_range: R)
}
impl FileSlice {
/// Creates a FileSlice from a path.
pub fn open(path: &Path) -> io::Result<FileSlice> {
let wrap_file = WrapFile::new(File::open(path)?)?;
Ok(FileSlice::new(Arc::new(wrap_file)))
}
/// Wraps a FileHandle.
pub fn new(file_handle: Arc<dyn FileHandle>) -> Self {
let num_bytes = file_handle.len();

View File

@@ -87,7 +87,7 @@ impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
}
}
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
impl TerminatingWrite for &mut Vec<u8> {
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
self.flush()
}

View File

@@ -1497,6 +1497,11 @@ mod test {
test_is_parse_err(r#"field:(+a -"b c""#, r#"(+"field":a -"field":"b c")"#);
}
#[test]
fn field_re_specification() {
test_parse_query_to_ast_helper(r#"field:(abc AND b:cde)"#, r#"(+"field":abc +"b":cde)"#);
}
#[test]
fn test_parse_query_single_term() {
test_parse_query_to_ast_helper("abc", "abc");

View File

@@ -271,10 +271,6 @@ impl AggregationWithAccessor {
field: ref field_name,
..
})
| Count(CountAggregation {
field: ref field_name,
..
})
| Max(MaxAggregation {
field: ref field_name,
..
@@ -299,6 +295,24 @@ impl AggregationWithAccessor {
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Count(CountAggregation {
field: ref field_name,
..
}) => {
let allowed_column_types = [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::Str,
ColumnType::DateTime,
ColumnType::Bool,
ColumnType::IpAddr,
// ColumnType::Bytes Unsupported
];
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(&allowed_column_types))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Percentiles(ref percentiles) => {
let (accessor, column_type) = get_ff_reader(
reader,

View File

@@ -220,9 +220,23 @@ impl SegmentStatsCollector {
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
}
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
if [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::DateTime,
]
.contains(&self.field_type)
{
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
} else {
for _val in agg_accessor.column_block_accessor.iter_vals() {
// we ignore the value and simply record that we got something
self.stats.collect(0.0);
}
}
}
}
@@ -435,6 +449,11 @@ mod tests {
"field": "score",
},
},
"count_str": {
"value_count": {
"field": "text",
},
},
"range": range_agg
}))
.unwrap();
@@ -500,6 +519,13 @@ mod tests {
})
);
assert_eq!(
res["count_str"],
json!({
"value": 7.0,
})
);
Ok(())
}

View File

@@ -578,7 +578,7 @@ mod tests {
.set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_fast(None)
.set_fast(Some("raw"))
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let date_field = schema_builder.add_date_field("date", FAST);

View File

@@ -1,3 +1,9 @@
//! The footer is a small metadata structure that is appended at the end of every file.
//!
//! The footer is used to store a checksum of the file content.
//! The footer also stores the version of the index format.
//! This version is used to detect incompatibility between the index and the library version.
use std::io;
use std::io::Write;
@@ -20,20 +26,22 @@ type CrcHashU32 = u32;
/// A Footer is appended to every file
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Footer {
/// The version of the index format
pub version: Version,
/// The crc32 hash of the body
pub crc: CrcHashU32,
}
impl Footer {
pub fn new(crc: CrcHashU32) -> Self {
pub(crate) fn new(crc: CrcHashU32) -> Self {
let version = crate::VERSION.clone();
Footer { version, crc }
}
pub fn crc(&self) -> CrcHashU32 {
pub(crate) fn crc(&self) -> CrcHashU32 {
self.crc
}
pub fn append_footer<W: io::Write>(&self, mut write: &mut W) -> io::Result<()> {
pub(crate) fn append_footer<W: io::Write>(&self, mut write: &mut W) -> io::Result<()> {
let mut counting_write = CountingWriter::wrap(&mut write);
counting_write.write_all(serde_json::to_string(&self)?.as_ref())?;
let footer_payload_len = counting_write.written_bytes();
@@ -42,6 +50,7 @@ impl Footer {
Ok(())
}
/// Extracts the tantivy Footer from the file and returns the footer and the rest of the file
pub fn extract_footer(file: FileSlice) -> io::Result<(Footer, FileSlice)> {
if file.len() < 4 {
return Err(io::Error::new(

View File

@@ -6,7 +6,7 @@ mod mmap_directory;
mod directory;
mod directory_lock;
mod file_watcher;
mod footer;
pub mod footer;
mod managed_directory;
mod ram_directory;
mod watch_event_router;

View File

@@ -217,7 +217,7 @@ impl FastFieldReaders {
Ok(dynamic_column.into())
}
/// Returning a `dynamic_column_handle`.
/// Returns a `dynamic_column_handle`.
pub fn dynamic_column_handle(
&self,
field_name: &str,
@@ -234,7 +234,7 @@ impl FastFieldReaders {
Ok(dynamic_column_handle_opt)
}
/// Returning all `dynamic_column_handle`.
/// Returns all `dynamic_column_handle` that match the given field name.
pub fn dynamic_column_handles(
&self,
field_name: &str,
@@ -250,6 +250,22 @@ impl FastFieldReaders {
Ok(dynamic_column_handles)
}
/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
pub fn dynamic_subpath_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let dynamic_column_handles = self
.columnar
.read_subpath_columns(&resolved_field_name)?
.into_iter()
.collect();
Ok(dynamic_column_handles)
}
#[doc(hidden)]
pub async fn list_dynamic_column_handles(
&self,
@@ -265,6 +281,21 @@ impl FastFieldReaders {
Ok(columns)
}
#[doc(hidden)]
pub async fn list_subpath_dynamic_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let columns = self
.columnar
.read_subpath_columns_async(&resolved_field_name)
.await?;
Ok(columns)
}
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
/// i64, u64, f64, DateTime).
///
@@ -476,6 +507,15 @@ mod tests {
.iter()
.any(|column| column.column_type() == ColumnType::Str));
println!("*** {:?}", fast_fields.columnar().list_columns());
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
assert_eq!(json_columns.len(), 0);
let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
assert_eq!(json_subcolumns.len(), 3);
let foo_subcolumns = fast_fields
.dynamic_subpath_column_handles("json.foo")
.unwrap();
assert_eq!(foo_subcolumns.len(), 0);
}
}

View File

@@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::index_writer::{
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
};
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
@@ -519,6 +521,43 @@ impl Index {
load_metas(self.directory(), &self.inventory)
}
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `options` defines the writer configuration which includes things like buffer sizes,
/// indexer threads, etc...
///
/// # Errors
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_options<D: Document>(
&self,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
IndexWriter::new(self, options, directory_lock)
}
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
@@ -543,27 +582,12 @@ impl Index {
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
memory_arena_in_bytes_per_thread,
directory_lock,
)
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
.build();
self.writer_with_options(options)
}
/// Helper to create an index writer for tests.

View File

@@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
))
}
#[derive(Clone, bon::Builder)]
/// A builder for creating a new [IndexWriter] for an index.
pub struct IndexWriterOptions {
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
/// The memory budget per indexer thread.
///
/// When an indexer thread has buffered this much data in memory
/// it will flush the segment to disk (although this is not searchable until commit is called.)
memory_budget_per_thread: usize,
#[builder(default = 1)]
/// The number of indexer worker threads to use.
num_worker_threads: usize,
#[builder(default = 4)]
/// Defines the number of merger threads to use.
num_merge_threads: usize,
}
/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
@@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
index: Index,
// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -70,8 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
worker_id: usize,
num_threads: usize,
delete_queue: DeleteQueue,
stamper: Stamper,
@@ -265,23 +279,27 @@ impl<D: Document> IndexWriter<D> {
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
directory_lock: DirectoryLock,
) -> crate::Result<Self> {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if options.num_worker_threads == 0 {
let err_msg = "At least one worker thread is required, got 0".to_string();
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
@@ -291,13 +309,17 @@ impl<D: Document> IndexWriter<D> {
let stamper = Stamper::new(current_opstamp);
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let segment_updater = SegmentUpdater::create(
index.clone(),
stamper.clone(),
&delete_queue.cursor(),
options.num_merge_threads,
)?;
let mut index_writer = Self {
_directory_lock: Some(directory_lock),
memory_budget_in_bytes_per_thread,
options: options.clone(),
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
@@ -305,7 +327,6 @@ impl<D: Document> IndexWriter<D> {
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
@@ -398,7 +419,7 @@ impl<D: Document> IndexWriter<D> {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.memory_budget_in_bytes_per_thread;
let mem_budget = self.options.memory_budget_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -451,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
}
fn start_workers(&mut self) -> crate::Result<()> {
for _ in 0..self.num_threads {
for _ in 0..self.options.num_worker_threads {
self.add_indexing_worker()?;
}
Ok(())
@@ -553,12 +574,7 @@ impl<D: Document> IndexWriter<D> {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_budget_in_bytes_per_thread,
directory_lock,
)?;
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;
// the current `self` is dropped right away because of this call.
//
@@ -812,7 +828,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
@@ -2533,4 +2549,36 @@ mod tests {
index_writer.commit().unwrap();
Ok(())
}
#[test]
fn test_writer_options_validation() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_bool_field("example", STORED);
let index = Index::create_in_ram(schema_builder.build());
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
assert!(result.is_err(), "Writer should reject 0 thread count");
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(10 << 10)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too low memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(5 << 30)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too high memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
}
}

View File

@@ -31,7 +31,7 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;
pub use self::index_writer::IndexWriter;
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};

View File

@@ -25,8 +25,6 @@ use crate::indexer::{
};
use crate::{FutureResult, Opstamp};
const NUM_MERGE_THREADS: usize = 4;
/// Save the index meta file.
/// This operation is atomic:
/// Either
@@ -273,6 +271,7 @@ impl SegmentUpdater {
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
@@ -287,7 +286,18 @@ impl SegmentUpdater {
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(NUM_MERGE_THREADS)
.num_threads(num_merge_threads)
.panic_handler(move |panic| {
let message = if let Some(msg) = panic.downcast_ref::<&str>() {
*msg
} else if let Some(msg) = panic.downcast_ref::<String>() {
msg.as_str()
} else {
"UNKNOWN"
};
eprintln!("merge thread panicked with: {message}")
})
.build()
.map_err(|_| {
crate::TantivyError::SystemError(

View File

@@ -422,6 +422,7 @@ mod tests {
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use columnar::ColumnType;
use tempfile::TempDir;
use crate::collector::{Count, TopDocs};
@@ -431,15 +432,15 @@ mod tests {
use crate::query::{PhraseQuery, QueryParser};
use crate::schema::{
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
DATE_TIME_PRECISION_INDEXED, STORED, STRING, TEXT,
DATE_TIME_PRECISION_INDEXED, FAST, STORED, STRING, TEXT,
};
use crate::store::{Compressor, StoreReader, StoreWriter};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::OffsetDateTime;
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, TantivyDocument, Term,
TERMINATED,
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
TantivyDocument, Term, TERMINATED,
};
#[test]
@@ -841,6 +842,75 @@ mod tests {
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}
#[test]
fn test_json_fast() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST);
let schema = schema_builder.build();
let json_val: serde_json::Value = serde_json::from_str(
r#"{
"toto": "titi",
"float": -0.2,
"bool": true,
"unsigned": 1,
"signed": -2,
"complexobject": {
"field.with.dot": 1
},
"date": "1985-04-12T23:20:50.52Z",
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
}"#,
)
.unwrap();
let doc = doc!(json_field=>json_val.clone());
let index = Index::create_in_ram(schema.clone());
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
fn assert_type(reader: &SegmentReader, field: &str, typ: ColumnType) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 1, "{}", field);
assert_eq!(cols[0].column_type(), typ, "{}", field);
}
assert_type(segment_reader, "json.toto", ColumnType::Str);
assert_type(segment_reader, "json.float", ColumnType::F64);
assert_type(segment_reader, "json.bool", ColumnType::Bool);
assert_type(segment_reader, "json.unsigned", ColumnType::I64);
assert_type(segment_reader, "json.signed", ColumnType::I64);
assert_type(
segment_reader,
"json.complexobject.field\\.with\\.dot",
ColumnType::I64,
);
assert_type(segment_reader, "json.date", ColumnType::DateTime);
assert_type(segment_reader, "json.my_arr", ColumnType::I64);
assert_type(segment_reader, "json.my_arr.my_key", ColumnType::Str);
fn assert_empty(reader: &SegmentReader, field: &str) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 0);
}
assert_empty(segment_reader, "unknown");
assert_empty(segment_reader, "json");
assert_empty(segment_reader, "json.toto.titi");
let sub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json")
.unwrap();
assert_eq!(sub_columns.len(), 9);
let subsub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json.complexobject")
.unwrap();
assert_eq!(subsub_columns.len(), 1);
}
#[test]
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
// https://github.com/quickwit-oss/tantivy/issues/2283

View File

@@ -7,14 +7,32 @@ use crate::docset::{DocSet, TERMINATED};
use crate::index::SegmentReader;
use crate::query::explanation::does_not_match;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::schema::Type;
use crate::{DocId, Score, TantivyError};
/// Query that matches all documents with a non-null value in the specified field.
/// Query that matches all documents with a non-null value in the specified
/// field.
///
/// When querying inside a JSON field, "exists" queries can be executed strictly
/// on the field name or check all the subpaths. In that second case a document
/// will be matched if a non-null value exists in any subpath. For example,
/// assuming the following document where `myfield` is a JSON fast field:
/// ```json
/// {
/// "myfield": {
/// "mysubfield": "hello"
/// }
/// }
/// ```
/// With `json_subpaths` enabled queries on either `myfield` or
/// `myfield.mysubfield` will match the document. If it is set to false, only
/// `myfield.mysubfield` will match it.
///
/// All of the matched documents get the score 1.0.
#[derive(Clone, Debug)]
pub struct ExistsQuery {
field_name: String,
json_subpaths: bool,
}
impl ExistsQuery {
@@ -23,8 +41,28 @@ impl ExistsQuery {
/// This query matches all documents with at least one non-null value in the specified field.
/// This constructor never fails, but executing the search with this query will return an
/// error if the specified field doesn't exists or is not a fast field.
#[deprecated]
pub fn new_exists_query(field: String) -> ExistsQuery {
ExistsQuery { field_name: field }
ExistsQuery {
field_name: field,
json_subpaths: false,
}
}
/// Creates a new `ExistQuery` from the given field.
///
/// This query matches all documents with at least one non-null value in the
/// specified field. If `json_subpaths` is set to true, documents with
/// non-null values in any JSON subpath will also be matched.
///
/// This constructor never fails, but executing the search with this query will
/// return an error if the specified field doesn't exists or is not a fast
/// field.
pub fn new(field: String, json_subpaths: bool) -> Self {
Self {
field_name: field,
json_subpaths,
}
}
}
@@ -43,6 +81,8 @@ impl Query for ExistsQuery {
}
Ok(Box::new(ExistsWeight {
field_name: self.field_name.clone(),
field_type: field_type.value_type(),
json_subpaths: self.json_subpaths,
}))
}
}
@@ -50,13 +90,20 @@ impl Query for ExistsQuery {
/// Weight associated with the `ExistsQuery` query.
pub struct ExistsWeight {
field_name: String,
field_type: Type,
json_subpaths: bool,
}
impl Weight for ExistsWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let fast_field_reader = reader.fast_fields();
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
.dynamic_column_handles(&self.field_name)?
let mut column_handles = fast_field_reader.dynamic_column_handles(&self.field_name)?;
if self.field_type == Type::Json && self.json_subpaths {
let mut sub_columns =
fast_field_reader.dynamic_subpath_column_handles(&self.field_name)?;
column_handles.append(&mut sub_columns);
}
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = column_handles
.into_iter()
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
.collect();
@@ -180,11 +227,12 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
assert_eq!(count_existing_fields(&searcher, "all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "multi", false)?, 10);
assert_eq!(count_existing_fields(&searcher, "multi", true)?, 10);
assert_eq!(count_existing_fields(&searcher, "never", false)?, 0);
// exercise seek
let query = BooleanQuery::intersection(vec![
@@ -192,7 +240,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 50)),
Bound::Unbounded,
)),
Box::new(ExistsQuery::new_exists_query("even".to_string())),
Box::new(ExistsQuery::new("even".to_string(), false)),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -201,7 +249,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 0)),
Bound::Included(Term::from_field_u64(all_field, 50)),
)),
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
Box::new(ExistsQuery::new("odd".to_string(), false)),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -230,22 +278,18 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.even", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json", true)?, 100);
// Handling of non-existing fields:
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists.absent'"
);
assert_eq!(count_existing_fields(&searcher, "json.absent", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json.absent", true)?, 0);
assert_does_not_exist(&searcher, "does_not_exists.absent", true);
assert_does_not_exist(&searcher, "does_not_exists.absent", false);
Ok(())
}
@@ -284,12 +328,13 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
assert_eq!(count_existing_fields(&searcher, "bool", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "bool", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "date", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "f64", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "facet", false)?, 50);
Ok(())
}
@@ -313,31 +358,33 @@ mod tests {
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("not_fast".to_string()),
&Count
)
.search(&ExistsQuery::new("not_fast".to_string(), false), &Count)
.unwrap_err()
.to_string(),
"Schema error: 'Field not_fast is not a fast field.'"
);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists'"
);
assert_does_not_exist(&searcher, "does_not_exists", false);
Ok(())
}
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
let query = ExistsQuery::new_exists_query(field.to_string());
fn count_existing_fields(
searcher: &Searcher,
field: &str,
json_subpaths: bool,
) -> crate::Result<usize> {
let query = ExistsQuery::new(field.to_string(), json_subpaths);
searcher.search(&query, &Count)
}
fn assert_does_not_exist(searcher: &Searcher, field: &str, json_subpaths: bool) {
assert_eq!(
searcher
.search(&ExistsQuery::new(field.to_string(), json_subpaths), &Count)
.unwrap_err()
.to_string(),
format!("The field does not exist: '{}'", field)
);
}
}

View File

@@ -521,6 +521,25 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
StreamerBuilder::new(self, AlwaysMatch)
}
/// Returns a range builder filtered with a prefix.
pub fn prefix_range<K: AsRef<[u8]>>(&self, prefix: K) -> StreamerBuilder<TSSTable> {
let lower_bound = prefix.as_ref();
let mut upper_bound = lower_bound.to_vec();
for idx in (0..upper_bound.len()).rev() {
if upper_bound[idx] == 255 {
upper_bound.pop();
} else {
upper_bound[idx] += 1;
break;
}
}
let mut builder = self.range().ge(lower_bound);
if !upper_bound.is_empty() {
builder = builder.lt(upper_bound);
}
builder
}
/// A stream of all the sorted terms.
pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
self.range().into_stream()
@@ -928,4 +947,62 @@ mod tests {
}
assert!(!stream.advance());
}
#[test]
fn test_prefix() {
let (dic, _slice) = make_test_sstable();
{
let mut stream = dic.prefix_range("1").into_stream().unwrap();
for i in 0x10000..0x20000 {
assert!(stream.advance());
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("").into_stream().unwrap();
for i in 0..0x3ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("0FF").into_stream().unwrap();
for i in 0x0ff00..=0x0ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
}
#[test]
fn test_prefix_edge() {
let dict = {
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
builder.insert(&[0, 254], &0).unwrap();
builder.insert(&[0, 255], &1).unwrap();
builder.insert(&[0, 255, 12], &2).unwrap();
builder.insert(&[1], &2).unwrap();
builder.insert(&[1, 0], &2).unwrap();
let table = builder.finish().unwrap();
let table = Arc::new(PermissionedHandle::new(table));
let slice = common::file_slice::FileSlice::new(table.clone());
Dictionary::<MonotonicU64SSTable>::open(slice).unwrap()
};
let mut stream = dict.prefix_range(&[0, 255]).into_stream().unwrap();
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255]);
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255, 12]);
assert!(!stream.advance());
}
}

View File

@@ -161,7 +161,7 @@ where
_lifetime: std::marker::PhantomData<&'a ()>,
}
impl<'a, TSSTable> Streamer<'a, TSSTable, AlwaysMatch>
impl<TSSTable> Streamer<'_, TSSTable, AlwaysMatch>
where TSSTable: SSTable
{
pub fn empty() -> Self {
@@ -178,7 +178,7 @@ where TSSTable: SSTable
}
}
impl<'a, TSSTable, A> Streamer<'a, TSSTable, A>
impl<TSSTable, A> Streamer<'_, TSSTable, A>
where
A: Automaton,
A::State: Clone,

View File

@@ -26,7 +26,7 @@ path = "example/hashmap.rs"
[dev-dependencies]
rand = "0.8.5"
zipf = "7.0.0"
rustc-hash = "1.1.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.14.0" }

View File

@@ -74,7 +74,7 @@ fn ensure_capacity<'a>(
eull.remaining_cap = allocate as u16;
}
impl<'a> ExpUnrolledLinkedListWriter<'a> {
impl ExpUnrolledLinkedListWriter<'_> {
#[inline]
pub fn write_u32_vint(&mut self, val: u32) {
let mut buf = [0u8; 8];

View File

@@ -63,7 +63,7 @@ pub trait Tokenizer: 'static + Clone + Send + Sync {
/// Simple wrapper of `Box<dyn TokenStream + 'a>`.
pub struct BoxTokenStream<'a>(Box<dyn TokenStream + 'a>);
impl<'a> TokenStream for BoxTokenStream<'a> {
impl TokenStream for BoxTokenStream<'_> {
fn advance(&mut self) -> bool {
self.0.advance()
}
@@ -90,7 +90,7 @@ impl<'a> Deref for BoxTokenStream<'a> {
&*self.0
}
}
impl<'a> DerefMut for BoxTokenStream<'a> {
impl DerefMut for BoxTokenStream<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.0
}