Compare commits

...

35 Commits

Author SHA1 Message Date
dependabot[bot]
d32d392691 Update rand_distr requirement from 0.4.3 to 0.5.1
Updates the requirements on [rand_distr](https://github.com/rust-random/rand_distr) to permit the latest version.
- [Release notes](https://github.com/rust-random/rand_distr/releases)
- [Changelog](https://github.com/rust-random/rand_distr/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-random/rand_distr/compare/0.4.3...0.5.1)

---
updated-dependencies:
- dependency-name: rand_distr
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-02-20 21:00:13 +00:00
trinity-1686a
f060e86bc6 Merge pull request #2578 from quickwit-oss/1686a/buildable-histo-agg
make DateHistogramAggregationReq buildable
2025-02-18 15:30:54 +01:00
trinity Pointard
0368162ef0 make DateHistogramAggregationReq buildable 2025-02-18 11:45:24 +01:00
trinity-1686a
e843c71015 Merge pull request #2568 from quickwit-oss/trinity/wildcard-query-parser
allow term starting with wildcard in query parser
2025-02-12 16:47:25 +01:00
trinity Pointard
5cea16ef9f improve handling of spcial char after exist query 2025-01-22 16:04:31 +01:00
dependabot[bot]
4aa8cd2470 Update downcast-rs requirement from 1.2.1 to 2.0.1 (#2566)
Updates the requirements on [downcast-rs](https://github.com/marcianx/downcast-rs) to permit the latest version.
- [Changelog](https://github.com/marcianx/downcast-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/marcianx/downcast-rs/compare/v1.2.1...v2.0.1)

---
updated-dependencies:
- dependency-name: downcast-rs
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-01-22 10:32:24 +01:00
trinity Pointard
4d4ee1b0ac allow term starting with wildcard in query parser 2025-01-15 10:27:48 +01:00
dependabot[bot]
43c89b4360 Update itertools requirement from 0.13.0 to 0.14.0 (#2563)
Updates the requirements on [itertools](https://github.com/rust-itertools/itertools) to permit the latest version.
- [Changelog](https://github.com/rust-itertools/itertools/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-itertools/itertools/compare/v0.13.0...v0.14.0)

---
updated-dependencies:
- dependency-name: itertools
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-01-08 17:11:46 +01:00
trinity-1686a
d281ca3e65 Merge pull request #2559 from quickwit-oss/trinity/sstable-partial-automaton
allow warming partially an sstable for an automaton
2025-01-08 16:35:35 +01:00
trinity Pointard
be17daf658 split iterator 2025-01-08 16:24:34 +01:00
trinity Pointard
6ca84a61fa make termdict always clone 2025-01-08 16:19:54 +01:00
trinity Pointard
037d12c9c9 fix deadlocking on automaton warmup 2025-01-06 11:58:58 +01:00
Remi Dettai
71cf19870b Exist queries match subpath fields (#2558)
* Exist queries match subpath fields

* Make subpath check optional

* Add async subpath listing
2025-01-06 10:17:39 +01:00
trinity Pointard
175a529c41 use executor for cpu-heavy sstable decompression for automaton 2025-01-03 19:14:07 +01:00
trinity Pointard
fe0c7c5408 change rangebound style 2025-01-02 11:56:05 +01:00
Harrison Burt
148594f0f9 Improve IndexWriter customisation via builder (#2562)
* Improve `IndexWriter` customisation via builder

* Remove change noise from PR

* Correct documentation

* Resolve comments and add test
2025-01-02 09:43:22 +01:00
dependabot[bot]
8edb439440 Update rustc-hash requirement from 1.1.0 to 2.1.0 (#2551)
---
updated-dependencies:
- dependency-name: rustc-hash
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-12-26 10:25:05 +01:00
trinity Pointard
dfff5f3bcb rename merge_holes_under => merge_holes_under_bytes 2024-12-23 16:17:44 +01:00
trinity-1686a
ebf4d84553 add comment about cpu-intensive operation in async context 2024-12-20 12:23:49 +01:00
trinity-1686a
42efc7f7c8 clippy 2024-12-20 11:00:11 +01:00
trinity-1686a
192395c311 attempt at simplifying can_block_match_automaton 2024-12-20 10:25:38 +01:00
trinity-1686a
a1447cc9c2 remove breaking change in sstable public api 2024-12-19 17:30:05 +01:00
trinity-1686a
c39d91f827 Merge pull request #2547 from quickwit-oss/trinity/count-str
add support for counting non integer in aggregation
2024-12-17 15:27:30 +01:00
trinity Pointard
32b6e9711b add tests 2024-12-13 16:06:24 +01:00
trinity-1686a
24c5dc2398 allow warming up automaton 2024-12-10 13:32:12 +01:00
trinity-1686a
9e2ddec4b3 merge adjacent block when building delta for automaton 2024-12-10 13:32:12 +01:00
trinity-1686a
1f6a8e74bb support iterating over partially loaded sstable 2024-12-10 13:32:12 +01:00
trinity-1686a
7e901f523b get iter for blocks of sstable matching automaton 2024-12-10 13:32:12 +01:00
trinity-1686a
3c30a41c14 add helper to figure if block can match automaton 2024-12-10 13:32:12 +01:00
dependabot[bot]
0f99d4f420 Update measure_time requirement from 0.8.2 to 0.9.0 (#2557)
---
updated-dependencies:
- dependency-name: measure_time
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-12-09 21:39:01 +01:00
Pierre Barre
6e02c5cb25 Make NUM_MERGE_THREADS configurable (#2535)
* Make `NUM_MERGE_THREADS` configurable

* Remove unused import

* Reword comment src/index/index.rs

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>

---------

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>
2024-12-09 16:53:11 +08:00
PSeitz
876a579e5d queryparser: add field respecification test (#2550) 2024-12-02 14:17:12 +01:00
PSeitz
4c52499622 clippy (#2549) 2024-11-29 16:08:21 +08:00
trinity-1686a
0bac391291 add support for counting non integer in aggregation 2024-11-28 19:52:47 +01:00
PSeitz
52d4e81e70 update CHANGELOG (#2546) 2024-11-27 20:49:35 +08:00
48 changed files with 1519 additions and 233 deletions

View File

@@ -1,11 +1,12 @@
Tantivy 0.23 - Unreleased
================================
Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0.21.
Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0.21. The new minimum rust version will be 1.75.
#### Bugfixes
- fix potential endless loop in merge [#2457](https://github.com/quickwit-oss/tantivy/pull/2457)(@PSeitz)
- fix bug that causes out-of-order sstable key. [#2445](https://github.com/quickwit-oss/tantivy/pull/2445)(@fulmicoton)
- fix ReferenceValue API flaw [#2372](https://github.com/quickwit-oss/tantivy/pull/2372)(@PSeitz)
- fix `OwnedBytes` debug panic [#2512](https://github.com/quickwit-oss/tantivy/pull/2512)(@b41sh)
#### Breaking API Changes
- remove index sorting [#2434](https://github.com/quickwit-oss/tantivy/pull/2434)(@PSeitz)
@@ -35,7 +36,15 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
- make find_field_with_default return json fields without path [#2476](https://github.com/quickwit-oss/tantivy/pull/2476)(@trinity-1686a)
- feat(query): Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
- **Optional Index in Multivalue Columnar Index** For mostly empty multivalued indices there was a large overhead during creation when iterating all docids (merge case). This is alleviated by placing an optional index in the multivalued index to mark documents that have values. This will slightly increase space and access time. [#2439](https://github.com/quickwit-oss/tantivy/pull/2439)(@PSeitz)
- **RegexPhraseQuery**
`RegexPhraseQuery` supports phrase queries with regex. E.g. query "b.* b.* wolf" matches "big bad wolf". Slop is supported as well: "b.* wolf"~2 matches "big bad wolf" [#2516](https://github.com/quickwit-oss/tantivy/pull/2516)(@PSeitz)
- **Optional Index in Multivalue Columnar Index**
For mostly empty multivalued indices there was a large overhead during creation when iterating all docids (merge case).
This is alleviated by placing an optional index in the multivalued index to mark documents that have values.
This will slightly increase space and access time. [#2439](https://github.com/quickwit-oss/tantivy/pull/2439)(@PSeitz)
- **Store DateTime as nanoseconds in doc store** DateTime in the doc store was truncated to microseconds previously. This removes this truncation, while still keeping backwards compatibility. [#2486](https://github.com/quickwit-oss/tantivy/pull/2486)(@PSeitz)
- **Performace/Memory**
- lift clauses in LogicalAst for optimized ast during execution [#2449](https://github.com/quickwit-oss/tantivy/pull/2449)(@PSeitz)
@@ -57,12 +66,13 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
- add bench & test for columnar merging [#2428](https://github.com/quickwit-oss/tantivy/pull/2428)(@PSeitz)
- Change in Executor API [#2391](https://github.com/quickwit-oss/tantivy/pull/2391)(@fulmicoton)
- Removed usage of num_cpus [#2387](https://github.com/quickwit-oss/tantivy/pull/2387)(@fulmicoton)
- use bingang for agg benchmark [#2378](https://github.com/quickwit-oss/tantivy/pull/2378)(@PSeitz)
- use bingang for agg and stacker benchmark [#2378](https://github.com/quickwit-oss/tantivy/pull/2378)[#2492](https://github.com/quickwit-oss/tantivy/pull/2492)(@PSeitz)
- cleanup top level exports [#2382](https://github.com/quickwit-oss/tantivy/pull/2382)(@PSeitz)
- make convert_to_fast_value_and_append_to_json_term pub [#2370](https://github.com/quickwit-oss/tantivy/pull/2370)(@PSeitz)
- remove JsonTermWriter [#2238](https://github.com/quickwit-oss/tantivy/pull/2238)(@PSeitz)
- validate sort by field type [#2336](https://github.com/quickwit-oss/tantivy/pull/2336)(@PSeitz)
- Fix trait bound of StoreReader::iter [#2360](https://github.com/quickwit-oss/tantivy/pull/2360)(@adamreichold)
- remove read_postings_no_deletes [#2526](https://github.com/quickwit-oss/tantivy/pull/2526)(@PSeitz)
Tantivy 0.22
================================
@@ -717,7 +727,7 @@ Tantivy 0.4.0
- Raise the limit of number of fields (previously 256 fields) (@fulmicoton)
- Removed u32 fields. They are replaced by u64 and i64 fields (#65) (@fulmicoton)
- Optimized skip in SegmentPostings (#130) (@lnicola)
- Replacing rustc_serialize by serde. Kudos to @KodrAus and @lnicola
- Replacing rustc_serialize by serde. Kudos to benchmark@KodrAus and @lnicola
- Using error-chain (@KodrAus)
- QueryParser: (@fulmicoton)
- Explicit error returned when searched for a term that is not indexed

View File

@@ -38,7 +38,7 @@ levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.1"
downcast-rs = "2.0.1"
bitpacking = { version = "0.9.2", default-features = false, features = [
"bitpacker4x",
] }
@@ -52,9 +52,10 @@ smallvec = "1.8.0"
rayon = "1.5.2"
lru = "0.12.0"
fastdivide = "0.4.0"
itertools = "0.13.0"
measure_time = "0.8.2"
itertools = "0.14.0"
measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
@@ -66,6 +67,7 @@ tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"
[target.'cfg(windows)'.dependencies]
@@ -82,7 +84,7 @@ test-log = "0.2.10"
futures = "0.3.21"
paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.4.3"
rand_distr = "0.5.1"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
postcard = { version = "1.0.4", features = [
"use-std",
@@ -120,7 +122,7 @@ zstd-compression = ["zstd"]
failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.
quickwit = ["sstable", "futures-util"]
quickwit = ["sstable", "futures-util", "futures-channel"]
# Compares only the hash of a string when indexing data.
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.

View File

@@ -9,7 +9,7 @@ description = "column oriented storage for tantivy"
categories = ["database-implementations", "data-structures", "compression"]
[dependencies]
itertools = "0.13.0"
itertools = "0.14.0"
fastdivide = "0.4.0"
stacker = { version= "0.3", path = "../stacker", package="tantivy-stacker"}
@@ -17,7 +17,7 @@ sstable = { version= "0.3", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.7", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.6", path = "../bitpacker/" }
serde = "1.0.152"
downcast-rs = "1.2.0"
downcast-rs = "2.0.1"
[dev-dependencies]
proptest = "1"

View File

@@ -58,7 +58,7 @@ struct ShuffledIndex<'a> {
merge_order: &'a ShuffleMergeOrder,
}
impl<'a> Iterable<u32> for ShuffledIndex<'a> {
impl Iterable<u32> for ShuffledIndex<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(
self.merge_order
@@ -127,7 +127,7 @@ fn integrate_num_vals(num_vals: impl Iterator<Item = u32>) -> impl Iterator<Item
)
}
impl<'a> Iterable<u32> for ShuffledMultivaluedIndex<'a> {
impl Iterable<u32> for ShuffledMultivaluedIndex<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
let num_vals_per_row = iter_num_values(self.column_indexes, self.merge_order);
Box::new(integrate_num_vals(num_vals_per_row))

View File

@@ -123,7 +123,7 @@ fn get_num_values_iterator<'a>(
}
}
impl<'a> Iterable<u32> for StackedStartOffsets<'a> {
impl Iterable<u32> for StackedStartOffsets<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
let num_values_it = (0..self.column_indexes.len()).flat_map(|columnar_id| {
let num_docs = self.stack_merge_order.columnar_range(columnar_id).len() as u32;

View File

@@ -86,7 +86,7 @@ pub struct OptionalIndex {
block_metas: Arc<[BlockMeta]>,
}
impl<'a> Iterable<u32> for &'a OptionalIndex {
impl Iterable<u32> for &OptionalIndex {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
Box::new(self.iter_rows())
}
@@ -123,7 +123,7 @@ enum BlockSelectCursor<'a> {
Sparse(<SparseBlock<'a> as Set<u16>>::SelectCursor<'a>),
}
impl<'a> BlockSelectCursor<'a> {
impl BlockSelectCursor<'_> {
fn select(&mut self, rank: u16) -> u16 {
match self {
BlockSelectCursor::Dense(dense_select_cursor) => dense_select_cursor.select(rank),
@@ -141,7 +141,7 @@ pub struct OptionalIndexSelectCursor<'a> {
num_null_rows_before_block: RowId,
}
impl<'a> OptionalIndexSelectCursor<'a> {
impl OptionalIndexSelectCursor<'_> {
fn search_and_load_block(&mut self, rank: RowId) {
if rank < self.current_block_end_rank {
// we are already in the right block
@@ -165,7 +165,7 @@ impl<'a> OptionalIndexSelectCursor<'a> {
}
}
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
impl SelectCursor<RowId> for OptionalIndexSelectCursor<'_> {
fn select(&mut self, rank: RowId) -> RowId {
self.search_and_load_block(rank);
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
@@ -505,7 +505,7 @@ fn deserialize_optional_index_block_metadatas(
non_null_rows_before_block += num_non_null_rows;
}
block_metas.resize(
((num_rows + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK) as usize,
num_rows.div_ceil(ELEMENTS_PER_BLOCK) as usize,
BlockMeta {
non_null_rows_before_block,
start_byte_offset,

View File

@@ -23,7 +23,6 @@ fn set_bit_at(input: &mut u64, n: u16) {
///
/// When translating a dense index to the original index, we can use the offset to find the correct
/// block. Direct computation is not possible, but we can employ a linear or binary search.
const ELEMENTS_PER_MINI_BLOCK: u16 = 64;
const MINI_BLOCK_BITVEC_NUM_BYTES: usize = 8;
const MINI_BLOCK_OFFSET_NUM_BYTES: usize = 2;
@@ -109,7 +108,7 @@ pub struct DenseBlockSelectCursor<'a> {
dense_block: DenseBlock<'a>,
}
impl<'a> SelectCursor<u16> for DenseBlockSelectCursor<'a> {
impl SelectCursor<u16> for DenseBlockSelectCursor<'_> {
#[inline]
fn select(&mut self, rank: u16) -> u16 {
self.block_id = self
@@ -175,7 +174,7 @@ impl<'a> Set<u16> for DenseBlock<'a> {
}
}
impl<'a> DenseBlock<'a> {
impl DenseBlock<'_> {
#[inline]
fn mini_block(&self, mini_block_id: u16) -> DenseMiniBlock {
let data_start_pos = mini_block_id as usize * MINI_BLOCK_NUM_BYTES;

View File

@@ -31,7 +31,7 @@ impl<'a> SelectCursor<u16> for SparseBlock<'a> {
}
}
impl<'a> Set<u16> for SparseBlock<'a> {
impl Set<u16> for SparseBlock<'_> {
type SelectCursor<'b>
= Self
where Self: 'b;
@@ -69,7 +69,7 @@ fn get_u16(data: &[u8], byte_position: usize) -> u16 {
u16::from_le_bytes(bytes)
}
impl<'a> SparseBlock<'a> {
impl SparseBlock<'_> {
#[inline(always)]
fn value_at_idx(&self, data: &[u8], idx: u16) -> u16 {
let start_offset: usize = idx as usize * 2;

View File

@@ -31,7 +31,7 @@ pub enum SerializableColumnIndex<'a> {
Multivalued(SerializableMultivalueIndex<'a>),
}
impl<'a> SerializableColumnIndex<'a> {
impl SerializableColumnIndex<'_> {
pub fn get_cardinality(&self) -> Cardinality {
match self {
SerializableColumnIndex::Full => Cardinality::Full,

View File

@@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> {
pub(crate) merge_row_order: &'a MergeRowOrder,
}
impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'a, T> {
impl<T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'_, T> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
match self.merge_row_order {
MergeRowOrder::Stack(_) => Box::new(

View File

@@ -39,7 +39,7 @@ impl BinarySerializable for Block {
}
fn compute_num_blocks(num_vals: u32) -> u32 {
(num_vals + BLOCK_SIZE - 1) / BLOCK_SIZE
num_vals.div_ceil(BLOCK_SIZE)
}
pub struct BlockwiseLinearEstimator {

View File

@@ -39,7 +39,7 @@ struct RemappedTermOrdinalsValues<'a> {
merge_row_order: &'a MergeRowOrder,
}
impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
impl Iterable for RemappedTermOrdinalsValues<'_> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
match self.merge_row_order {
MergeRowOrder::Stack(_) => self.boxed_iter_stacked(),
@@ -50,7 +50,7 @@ impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
}
}
impl<'a> RemappedTermOrdinalsValues<'a> {
impl RemappedTermOrdinalsValues<'_> {
fn boxed_iter_stacked(&self) -> Box<dyn Iterator<Item = u64> + '_> {
let iter = self
.bytes_columns

View File

@@ -10,13 +10,13 @@ pub struct HeapItem<'a> {
pub segment_ord: usize,
}
impl<'a> PartialEq for HeapItem<'a> {
impl PartialEq for HeapItem<'_> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}
impl<'a> Eq for HeapItem<'a> {}
impl Eq for HeapItem<'_> {}
impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {

View File

@@ -1,6 +1,7 @@
use std::{fmt, io, mem};
use common::file_slice::FileSlice;
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::BinarySerializable;
use sstable::{Dictionary, RangeSSTable};
@@ -76,6 +77,19 @@ fn read_all_columns_in_stream(
Ok(results)
}
fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
format!("{}{}", column_name, '\0')
}
fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
}
impl ColumnarReader {
/// Opens a new Columnar file.
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
@@ -144,32 +158,14 @@ impl ColumnarReader {
Ok(self.iter_columns()?.collect())
}
fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
//
// This is in turn equivalent to searching for the range
// `[column_name,\0`..column_name\1)`.
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
let mut start_key = column_name.to_string();
start_key.push('\0');
let mut end_key = column_name.to_string();
end_key.push(1u8 as char);
self.column_dictionary
.range()
.ge(start_key.as_bytes())
.lt(end_key.as_bytes())
}
pub async fn read_columns_async(
&self,
column_name: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self
.stream_for_column_range(column_name)
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
@@ -180,7 +176,35 @@ impl ColumnarReader {
/// There can be more than one column associated to a given column name, provided they have
/// different types.
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let stream = self.stream_for_column_range(column_name).into_stream()?;
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
pub async fn read_subpath_columns_async(
&self,
root_path: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
///
/// There can be more than one column associated to each path within the JSON structure,
/// provided they have different types.
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix.as_bytes())
.into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
@@ -192,6 +216,8 @@ impl ColumnarReader {
#[cfg(test)]
mod tests {
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
#[test]
@@ -224,6 +250,64 @@ mod tests {
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
}
#[test]
fn test_read_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_column_type("col", ColumnType::U64, false);
columnar_writer.record_numerical(1, "col", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_columns("col").unwrap();
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].column_type(), ColumnType::U64);
}
{
let columns = columnar.read_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
fn test_read_subpath_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_str(
0,
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
"hello",
);
columnar_writer.record_numerical(
0,
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
1i64,
);
columnar_writer.record_str(1, "col1", "hello");
columnar_writer.record_str(0, "col2", "hello");
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_subpath_columns("col1").unwrap();
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].column_type(), ColumnType::Str);
assert_eq!(columns[1].column_type(), ColumnType::I64);
}
{
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("col2").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
#[should_panic(expected = "Input type forbidden")]
fn test_list_columns_strict_typing_panics_on_wrong_types() {

View File

@@ -285,7 +285,6 @@ impl ColumnarWriter {
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
);
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {

View File

@@ -67,7 +67,7 @@ pub struct ColumnSerializer<'a, W: io::Write> {
start_offset: u64,
}
impl<'a, W: io::Write> ColumnSerializer<'a, W> {
impl<W: io::Write> ColumnSerializer<'_, W> {
pub fn finalize(self) -> io::Result<()> {
let end_offset: u64 = self.columnar_serializer.wrt.written_bytes();
let byte_range = self.start_offset..end_offset;
@@ -80,7 +80,7 @@ impl<'a, W: io::Write> ColumnSerializer<'a, W> {
}
}
impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
impl<W: io::Write> io::Write for ColumnSerializer<'_, W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.columnar_serializer.wrt.write(buf)
}

View File

@@ -7,7 +7,7 @@ pub trait Iterable<T = u64> {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_>;
}
impl<'a, T: Copy> Iterable<T> for &'a [T] {
impl<T: Copy> Iterable<T> for &[T] {
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new(self.iter().copied())
}

View File

@@ -87,7 +87,7 @@ impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
}
}
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
impl TerminatingWrite for &mut Vec<u8> {
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
self.flush()
}

View File

@@ -321,7 +321,17 @@ fn exists(inp: &str) -> IResult<&str, UserInputLeaf> {
UserInputLeaf::Exists {
field: String::new(),
},
tuple((multispace0, char('*'))),
tuple((
multispace0,
char('*'),
peek(alt((
value(
"",
satisfy(|c: char| c.is_whitespace() || ESCAPE_IN_WORD.contains(&c)),
),
eof,
))),
)),
)(inp)
}
@@ -331,7 +341,14 @@ fn exists_precond(inp: &str) -> IResult<&str, (), ()> {
peek(tuple((
field_name,
multispace0,
char('*'), // when we are here, we know it can't be anything but a exists
char('*'),
peek(alt((
value(
"",
satisfy(|c: char| c.is_whitespace() || ESCAPE_IN_WORD.contains(&c)),
),
eof,
))), // we need to check this isn't a wildcard query
))),
)(inp)
.map_err(|e| e.map(|_| ()))
@@ -1497,6 +1514,11 @@ mod test {
test_is_parse_err(r#"field:(+a -"b c""#, r#"(+"field":a -"field":"b c")"#);
}
#[test]
fn field_re_specification() {
test_parse_query_to_ast_helper(r#"field:(abc AND b:cde)"#, r#"(+"field":abc +"b":cde)"#);
}
#[test]
fn test_parse_query_single_term() {
test_parse_query_to_ast_helper("abc", "abc");
@@ -1619,13 +1641,19 @@ mod test {
#[test]
fn test_exist_query() {
test_parse_query_to_ast_helper("a:*", "\"a\":*");
test_parse_query_to_ast_helper("a: *", "\"a\":*");
// an exist followed by default term being b
test_is_parse_err("a:*b", "(*\"a\":* *b)");
test_parse_query_to_ast_helper("a:*", "$exists(\"a\")");
test_parse_query_to_ast_helper("a: *", "$exists(\"a\")");
// this is a term query (not a phrase prefix)
test_parse_query_to_ast_helper(
"(hello AND toto:*) OR happy",
"(?(+hello +$exists(\"toto\")) ?happy)",
);
test_parse_query_to_ast_helper("(a:*)", "$exists(\"a\")");
// these are term/wildcard query (not a phrase prefix)
test_parse_query_to_ast_helper("a:b*", "\"a\":b*");
test_parse_query_to_ast_helper("a:*b", "\"a\":*b");
test_parse_query_to_ast_helper(r#"a:*def*"#, "\"a\":*def*");
}
#[test]

View File

@@ -101,7 +101,7 @@ impl Debug for UserInputLeaf {
}
UserInputLeaf::All => write!(formatter, "*"),
UserInputLeaf::Exists { field } => {
write!(formatter, "\"{field}\":*")
write!(formatter, "$exists(\"{field}\")")
}
}
}

View File

@@ -271,10 +271,6 @@ impl AggregationWithAccessor {
field: ref field_name,
..
})
| Count(CountAggregation {
field: ref field_name,
..
})
| Max(MaxAggregation {
field: ref field_name,
..
@@ -299,6 +295,24 @@ impl AggregationWithAccessor {
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Count(CountAggregation {
field: ref field_name,
..
}) => {
let allowed_column_types = [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::Str,
ColumnType::DateTime,
ColumnType::Bool,
ColumnType::IpAddr,
// ColumnType::Bytes Unsupported
];
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(&allowed_column_types))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Percentiles(ref percentiles) => {
let (accessor, column_type) = get_ff_reader(
reader,

View File

@@ -34,10 +34,10 @@ use crate::aggregation::*;
pub struct DateHistogramAggregationReq {
#[doc(hidden)]
/// Only for validation
interval: Option<String>,
pub interval: Option<String>,
#[doc(hidden)]
/// Only for validation
calendar_interval: Option<String>,
pub calendar_interval: Option<String>,
/// The field to aggregate on.
pub field: String,
/// The format to format dates. Unsupported currently.

View File

@@ -220,9 +220,23 @@ impl SegmentStatsCollector {
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
}
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
if [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::DateTime,
]
.contains(&self.field_type)
{
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
} else {
for _val in agg_accessor.column_block_accessor.iter_vals() {
// we ignore the value and simply record that we got something
self.stats.collect(0.0);
}
}
}
}
@@ -435,6 +449,11 @@ mod tests {
"field": "score",
},
},
"count_str": {
"value_count": {
"field": "text",
},
},
"range": range_agg
}))
.unwrap();
@@ -500,6 +519,13 @@ mod tests {
})
);
assert_eq!(
res["count_str"],
json!({
"value": 7.0,
})
);
Ok(())
}

View File

@@ -578,7 +578,7 @@ mod tests {
.set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_fast(None)
.set_fast(Some("raw"))
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let date_field = schema_builder.add_date_field("date", FAST);

View File

@@ -217,7 +217,7 @@ impl FastFieldReaders {
Ok(dynamic_column.into())
}
/// Returning a `dynamic_column_handle`.
/// Returns a `dynamic_column_handle`.
pub fn dynamic_column_handle(
&self,
field_name: &str,
@@ -234,7 +234,7 @@ impl FastFieldReaders {
Ok(dynamic_column_handle_opt)
}
/// Returning all `dynamic_column_handle`.
/// Returns all `dynamic_column_handle` that match the given field name.
pub fn dynamic_column_handles(
&self,
field_name: &str,
@@ -250,6 +250,22 @@ impl FastFieldReaders {
Ok(dynamic_column_handles)
}
/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
pub fn dynamic_subpath_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let dynamic_column_handles = self
.columnar
.read_subpath_columns(&resolved_field_name)?
.into_iter()
.collect();
Ok(dynamic_column_handles)
}
#[doc(hidden)]
pub async fn list_dynamic_column_handles(
&self,
@@ -265,6 +281,21 @@ impl FastFieldReaders {
Ok(columns)
}
#[doc(hidden)]
pub async fn list_subpath_dynamic_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let columns = self
.columnar
.read_subpath_columns_async(&resolved_field_name)
.await?;
Ok(columns)
}
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
/// i64, u64, f64, DateTime).
///
@@ -476,6 +507,15 @@ mod tests {
.iter()
.any(|column| column.column_type() == ColumnType::Str));
println!("*** {:?}", fast_fields.columnar().list_columns());
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
assert_eq!(json_columns.len(), 0);
let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
assert_eq!(json_subcolumns.len(), 3);
let foo_subcolumns = fast_fields
.dynamic_subpath_column_handles("json.foo")
.unwrap();
assert_eq!(foo_subcolumns.len(), 0);
}
}

View File

@@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::index_writer::{
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
};
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
@@ -519,6 +521,43 @@ impl Index {
load_metas(self.directory(), &self.inventory)
}
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `options` defines the writer configuration which includes things like buffer sizes,
/// indexer threads, etc...
///
/// # Errors
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_options<D: Document>(
&self,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
IndexWriter::new(self, options, directory_lock)
}
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
@@ -543,27 +582,12 @@ impl Index {
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
memory_arena_in_bytes_per_thread,
directory_lock,
)
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
.build();
self.writer_with_options(options)
}
/// Helper to create an index writer for tests.

View File

@@ -3,6 +3,12 @@ use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use common::BinarySerializable;
use fnv::FnvHashSet;
#[cfg(feature = "quickwit")]
use futures_util::{FutureExt, StreamExt, TryStreamExt};
#[cfg(feature = "quickwit")]
use itertools::Itertools;
#[cfg(feature = "quickwit")]
use tantivy_fst::automaton::{AlwaysMatch, Automaton};
use crate::directory::FileSlice;
use crate::positions::PositionReader;
@@ -219,13 +225,18 @@ impl InvertedIndexReader {
self.termdict.get_async(term.serialized_value_bytes()).await
}
async fn get_term_range_async(
&self,
async fn get_term_range_async<'a, A: Automaton + 'a>(
&'a self,
terms: impl std::ops::RangeBounds<Term>,
automaton: A,
limit: Option<u64>,
) -> io::Result<impl Iterator<Item = TermInfo> + '_> {
merge_holes_under_bytes: usize,
) -> io::Result<impl Iterator<Item = TermInfo> + 'a>
where
A::State: Clone,
{
use std::ops::Bound;
let range_builder = self.termdict.range();
let range_builder = self.termdict.search(automaton);
let range_builder = match terms.start_bound() {
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
@@ -242,7 +253,9 @@ impl InvertedIndexReader {
range_builder
};
let mut stream = range_builder.into_stream_async().await?;
let mut stream = range_builder
.into_stream_async_merging_holes(merge_holes_under_bytes)
.await?;
let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));
@@ -288,7 +301,9 @@ impl InvertedIndexReader {
limit: Option<u64>,
with_positions: bool,
) -> io::Result<bool> {
let mut term_info = self.get_term_range_async(terms, limit).await?;
let mut term_info = self
.get_term_range_async(terms, AlwaysMatch, limit, 0)
.await?;
let Some(first_terminfo) = term_info.next() else {
// no key matches, nothing more to load
@@ -315,6 +330,84 @@ impl InvertedIndexReader {
Ok(true)
}
/// Warmup a block postings given a range of `Term`s.
/// This method is for an advanced usage only.
///
/// returns a boolean, whether a term matching the range was found in the dictionary
pub async fn warm_postings_automaton<
A: Automaton + Clone + Send + 'static,
E: FnOnce(Box<dyn FnOnce() -> io::Result<()> + Send>) -> F,
F: std::future::Future<Output = io::Result<()>>,
>(
&self,
automaton: A,
// with_positions: bool, at the moment we have no use for it, and supporting it would add
// complexity to the coalesce
executor: E,
) -> io::Result<bool>
where
A::State: Clone,
{
// merge holes under 4MiB, that's how many bytes we can hope to receive during a TTFB from
// S3 (~80MiB/s, and 50ms latency)
const MERGE_HOLES_UNDER_BYTES: usize = (80 * 1024 * 1024 * 50) / 1000;
// we build a first iterator to download everything. Simply calling the function already
// download everything we need from the sstable, but doesn't start iterating over it.
let _term_info_iter = self
.get_term_range_async(.., automaton.clone(), None, MERGE_HOLES_UNDER_BYTES)
.await?;
let (sender, posting_ranges_to_load_stream) = futures_channel::mpsc::unbounded();
let termdict = self.termdict.clone();
let cpu_bound_task = move || {
// then we build a 2nd iterator, this one with no holes, so we don't go through blocks
// we can't match.
// This makes the assumption there is a caching layer below us, which gives sync read
// for free after the initial async access. This might not always be true, but is in
// Quickwit.
// We build things from this closure otherwise we get into lifetime issues that can only
// be solved with self referential strucs. Returning an io::Result from here is a bit
// more leaky abstraction-wise, but a lot better than the alternative
let mut stream = termdict.search(automaton).into_stream()?;
// we could do without an iterator, but this allows us access to coalesce which simplify
// things
let posting_ranges_iter =
std::iter::from_fn(move || stream.next().map(|(_k, v)| v.postings_range.clone()));
let merged_posting_ranges_iter = posting_ranges_iter.coalesce(|range1, range2| {
if range1.end + MERGE_HOLES_UNDER_BYTES >= range2.start {
Ok(range1.start..range2.end)
} else {
Err((range1, range2))
}
});
for posting_range in merged_posting_ranges_iter {
if let Err(_) = sender.unbounded_send(posting_range) {
// this should happen only when search is cancelled
return Err(io::Error::other("failed to send posting range back"));
}
}
Ok(())
};
let task_handle = executor(Box::new(cpu_bound_task));
let posting_downloader = posting_ranges_to_load_stream
.map(|posting_slice| {
self.postings_file_slice
.read_bytes_slice_async(posting_slice)
.map(|result| result.map(|_slice| ()))
})
.buffer_unordered(5)
.try_collect::<Vec<()>>();
let (_, slices_downloaded) =
futures_util::future::try_join(task_handle, posting_downloader).await?;
Ok(!slices_downloaded.is_empty())
}
/// Warmup the block postings for all terms.
/// This method is for an advanced usage only.
///

View File

@@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
))
}
#[derive(Clone, bon::Builder)]
/// A builder for creating a new [IndexWriter] for an index.
pub struct IndexWriterOptions {
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
/// The memory budget per indexer thread.
///
/// When an indexer thread has buffered this much data in memory
/// it will flush the segment to disk (although this is not searchable until commit is called.)
memory_budget_per_thread: usize,
#[builder(default = 1)]
/// The number of indexer worker threads to use.
num_worker_threads: usize,
#[builder(default = 4)]
/// Defines the number of merger threads to use.
num_merge_threads: usize,
}
/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
@@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
index: Index,
// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -70,8 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
worker_id: usize,
num_threads: usize,
delete_queue: DeleteQueue,
stamper: Stamper,
@@ -265,23 +279,27 @@ impl<D: Document> IndexWriter<D> {
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
directory_lock: DirectoryLock,
) -> crate::Result<Self> {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if options.num_worker_threads == 0 {
let err_msg = "At least one worker thread is required, got 0".to_string();
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
@@ -291,13 +309,17 @@ impl<D: Document> IndexWriter<D> {
let stamper = Stamper::new(current_opstamp);
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let segment_updater = SegmentUpdater::create(
index.clone(),
stamper.clone(),
&delete_queue.cursor(),
options.num_merge_threads,
)?;
let mut index_writer = Self {
_directory_lock: Some(directory_lock),
memory_budget_in_bytes_per_thread,
options: options.clone(),
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
@@ -305,7 +327,6 @@ impl<D: Document> IndexWriter<D> {
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
@@ -398,7 +419,7 @@ impl<D: Document> IndexWriter<D> {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.memory_budget_in_bytes_per_thread;
let mem_budget = self.options.memory_budget_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -451,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
}
fn start_workers(&mut self) -> crate::Result<()> {
for _ in 0..self.num_threads {
for _ in 0..self.options.num_worker_threads {
self.add_indexing_worker()?;
}
Ok(())
@@ -553,12 +574,7 @@ impl<D: Document> IndexWriter<D> {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_budget_in_bytes_per_thread,
directory_lock,
)?;
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;
// the current `self` is dropped right away because of this call.
//
@@ -812,7 +828,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
@@ -2533,4 +2549,36 @@ mod tests {
index_writer.commit().unwrap();
Ok(())
}
#[test]
fn test_writer_options_validation() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_bool_field("example", STORED);
let index = Index::create_in_ram(schema_builder.build());
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
assert!(result.is_err(), "Writer should reject 0 thread count");
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(10 << 10)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too low memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(5 << 30)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too high memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
}
}

View File

@@ -31,7 +31,7 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;
pub use self::index_writer::IndexWriter;
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};

View File

@@ -25,8 +25,6 @@ use crate::indexer::{
};
use crate::{FutureResult, Opstamp};
const NUM_MERGE_THREADS: usize = 4;
/// Save the index meta file.
/// This operation is atomic:
/// Either
@@ -273,6 +271,7 @@ impl SegmentUpdater {
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
@@ -287,7 +286,7 @@ impl SegmentUpdater {
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(NUM_MERGE_THREADS)
.num_threads(num_merge_threads)
.build()
.map_err(|_| {
crate::TantivyError::SystemError(

View File

@@ -422,6 +422,7 @@ mod tests {
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use columnar::ColumnType;
use tempfile::TempDir;
use crate::collector::{Count, TopDocs};
@@ -431,15 +432,15 @@ mod tests {
use crate::query::{PhraseQuery, QueryParser};
use crate::schema::{
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
DATE_TIME_PRECISION_INDEXED, STORED, STRING, TEXT,
DATE_TIME_PRECISION_INDEXED, FAST, STORED, STRING, TEXT,
};
use crate::store::{Compressor, StoreReader, StoreWriter};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::OffsetDateTime;
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, TantivyDocument, Term,
TERMINATED,
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
TantivyDocument, Term, TERMINATED,
};
#[test]
@@ -841,6 +842,75 @@ mod tests {
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}
#[test]
fn test_json_fast() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST);
let schema = schema_builder.build();
let json_val: serde_json::Value = serde_json::from_str(
r#"{
"toto": "titi",
"float": -0.2,
"bool": true,
"unsigned": 1,
"signed": -2,
"complexobject": {
"field.with.dot": 1
},
"date": "1985-04-12T23:20:50.52Z",
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
}"#,
)
.unwrap();
let doc = doc!(json_field=>json_val.clone());
let index = Index::create_in_ram(schema.clone());
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
fn assert_type(reader: &SegmentReader, field: &str, typ: ColumnType) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 1, "{}", field);
assert_eq!(cols[0].column_type(), typ, "{}", field);
}
assert_type(segment_reader, "json.toto", ColumnType::Str);
assert_type(segment_reader, "json.float", ColumnType::F64);
assert_type(segment_reader, "json.bool", ColumnType::Bool);
assert_type(segment_reader, "json.unsigned", ColumnType::I64);
assert_type(segment_reader, "json.signed", ColumnType::I64);
assert_type(
segment_reader,
"json.complexobject.field\\.with\\.dot",
ColumnType::I64,
);
assert_type(segment_reader, "json.date", ColumnType::DateTime);
assert_type(segment_reader, "json.my_arr", ColumnType::I64);
assert_type(segment_reader, "json.my_arr.my_key", ColumnType::Str);
fn assert_empty(reader: &SegmentReader, field: &str) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 0);
}
assert_empty(segment_reader, "unknown");
assert_empty(segment_reader, "json");
assert_empty(segment_reader, "json.toto.titi");
let sub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json")
.unwrap();
assert_eq!(sub_columns.len(), 9);
let subsub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json.complexobject")
.unwrap();
assert_eq!(subsub_columns.len(), 1);
}
#[test]
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
// https://github.com/quickwit-oss/tantivy/issues/2283

View File

@@ -7,14 +7,32 @@ use crate::docset::{DocSet, TERMINATED};
use crate::index::SegmentReader;
use crate::query::explanation::does_not_match;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::schema::Type;
use crate::{DocId, Score, TantivyError};
/// Query that matches all documents with a non-null value in the specified field.
/// Query that matches all documents with a non-null value in the specified
/// field.
///
/// When querying inside a JSON field, "exists" queries can be executed strictly
/// on the field name or check all the subpaths. In that second case a document
/// will be matched if a non-null value exists in any subpath. For example,
/// assuming the following document where `myfield` is a JSON fast field:
/// ```json
/// {
/// "myfield": {
/// "mysubfield": "hello"
/// }
/// }
/// ```
/// With `json_subpaths` enabled queries on either `myfield` or
/// `myfield.mysubfield` will match the document. If it is set to false, only
/// `myfield.mysubfield` will match it.
///
/// All of the matched documents get the score 1.0.
#[derive(Clone, Debug)]
pub struct ExistsQuery {
field_name: String,
json_subpaths: bool,
}
impl ExistsQuery {
@@ -23,8 +41,28 @@ impl ExistsQuery {
/// This query matches all documents with at least one non-null value in the specified field.
/// This constructor never fails, but executing the search with this query will return an
/// error if the specified field doesn't exists or is not a fast field.
#[deprecated]
pub fn new_exists_query(field: String) -> ExistsQuery {
ExistsQuery { field_name: field }
ExistsQuery {
field_name: field,
json_subpaths: false,
}
}
/// Creates a new `ExistQuery` from the given field.
///
/// This query matches all documents with at least one non-null value in the
/// specified field. If `json_subpaths` is set to true, documents with
/// non-null values in any JSON subpath will also be matched.
///
/// This constructor never fails, but executing the search with this query will
/// return an error if the specified field doesn't exists or is not a fast
/// field.
pub fn new(field: String, json_subpaths: bool) -> Self {
Self {
field_name: field,
json_subpaths,
}
}
}
@@ -43,6 +81,8 @@ impl Query for ExistsQuery {
}
Ok(Box::new(ExistsWeight {
field_name: self.field_name.clone(),
field_type: field_type.value_type(),
json_subpaths: self.json_subpaths,
}))
}
}
@@ -50,13 +90,20 @@ impl Query for ExistsQuery {
/// Weight associated with the `ExistsQuery` query.
pub struct ExistsWeight {
field_name: String,
field_type: Type,
json_subpaths: bool,
}
impl Weight for ExistsWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let fast_field_reader = reader.fast_fields();
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
.dynamic_column_handles(&self.field_name)?
let mut column_handles = fast_field_reader.dynamic_column_handles(&self.field_name)?;
if self.field_type == Type::Json && self.json_subpaths {
let mut sub_columns =
fast_field_reader.dynamic_subpath_column_handles(&self.field_name)?;
column_handles.append(&mut sub_columns);
}
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = column_handles
.into_iter()
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
.collect();
@@ -180,11 +227,12 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
assert_eq!(count_existing_fields(&searcher, "all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "multi", false)?, 10);
assert_eq!(count_existing_fields(&searcher, "multi", true)?, 10);
assert_eq!(count_existing_fields(&searcher, "never", false)?, 0);
// exercise seek
let query = BooleanQuery::intersection(vec![
@@ -192,7 +240,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 50)),
Bound::Unbounded,
)),
Box::new(ExistsQuery::new_exists_query("even".to_string())),
Box::new(ExistsQuery::new("even".to_string(), false)),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -201,7 +249,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 0)),
Bound::Included(Term::from_field_u64(all_field, 50)),
)),
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
Box::new(ExistsQuery::new("odd".to_string(), false)),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -230,22 +278,18 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.even", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json", true)?, 100);
// Handling of non-existing fields:
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists.absent'"
);
assert_eq!(count_existing_fields(&searcher, "json.absent", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json.absent", true)?, 0);
assert_does_not_exist(&searcher, "does_not_exists.absent", true);
assert_does_not_exist(&searcher, "does_not_exists.absent", false);
Ok(())
}
@@ -284,12 +328,13 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
assert_eq!(count_existing_fields(&searcher, "bool", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "bool", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "date", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "f64", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "facet", false)?, 50);
Ok(())
}
@@ -313,31 +358,33 @@ mod tests {
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("not_fast".to_string()),
&Count
)
.search(&ExistsQuery::new("not_fast".to_string(), false), &Count)
.unwrap_err()
.to_string(),
"Schema error: 'Field not_fast is not a fast field.'"
);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists'"
);
assert_does_not_exist(&searcher, "does_not_exists", false);
Ok(())
}
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
let query = ExistsQuery::new_exists_query(field.to_string());
fn count_existing_fields(
searcher: &Searcher,
field: &str,
json_subpaths: bool,
) -> crate::Result<usize> {
let query = ExistsQuery::new(field.to_string(), json_subpaths);
searcher.search(&query, &Count)
}
fn assert_does_not_exist(searcher: &Searcher, field: &str, json_subpaths: bool) {
assert_eq!(
searcher
.search(&ExistsQuery::new(field.to_string(), json_subpaths), &Count)
.unwrap_err()
.to_string(),
format!("The field does not exist: '{}'", field)
);
}
}

View File

@@ -93,6 +93,7 @@ impl TermInfoBlockMeta {
}
}
#[derive(Clone)]
pub struct TermInfoStore {
num_terms: usize,
block_meta_bytes: OwnedBytes,

View File

@@ -1,4 +1,5 @@
use std::io::{self, Write};
use std::sync::Arc;
use common::{BinarySerializable, CountingWriter};
use once_cell::sync::Lazy;
@@ -113,8 +114,9 @@ static EMPTY_TERM_DICT_FILE: Lazy<FileSlice> = Lazy::new(|| {
/// The `Fst` crate is used to associate terms to their
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
#[derive(Clone)]
pub struct TermDictionary {
fst_index: tantivy_fst::Map<OwnedBytes>,
fst_index: Arc<tantivy_fst::Map<OwnedBytes>>,
term_info_store: TermInfoStore,
}
@@ -136,7 +138,7 @@ impl TermDictionary {
let fst_index = open_fst_index(fst_file_slice)?;
let term_info_store = TermInfoStore::open(values_file_slice)?;
Ok(TermDictionary {
fst_index,
fst_index: Arc::new(fst_index),
term_info_store,
})
}

View File

@@ -74,6 +74,7 @@ const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable;
// TODO in the future this should become an enum of supported dictionaries
/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one.
#[derive(Clone)]
pub struct TermDictionary(InnerTermDict);
impl TermDictionary {

View File

@@ -28,6 +28,7 @@ pub type TermDictionaryBuilder<W> = sstable::Writer<W, TermInfoValueWriter>;
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;
/// SSTable used to store TermInfo objects.
#[derive(Clone)]
pub struct TermSSTable;
pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>;

View File

@@ -11,6 +11,8 @@ description = "sstables for tantivy"
[dependencies]
common = {version= "0.7", path="../common", package="tantivy-common"}
futures-util = "0.3.30"
itertools = "0.14.0"
tantivy-bitpacker = { version= "0.6", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound

View File

@@ -0,0 +1,271 @@
use tantivy_fst::Automaton;
/// Returns whether a block can match an automaton based on its bounds.
///
/// start key is exclusive, and optional to account for the first block. end key is inclusive and
/// mandatory.
pub(crate) fn can_block_match_automaton(
start_key_opt: Option<&[u8]>,
end_key: &[u8],
automaton: &impl Automaton,
) -> bool {
let start_key = if let Some(start_key) = start_key_opt {
start_key
} else {
// if start_key_opt is None, we would allow an automaton matching the empty string to match
if automaton.is_match(&automaton.start()) {
return true;
}
&[]
};
can_block_match_automaton_with_start(start_key, end_key, automaton)
}
// similar to can_block_match_automaton, ignoring the edge case of the initial block
fn can_block_match_automaton_with_start(
start_key: &[u8],
end_key: &[u8],
automaton: &impl Automaton,
) -> bool {
// notation: in loops, we use `kb` to denotate a key byte (a byte taken from the start/end key),
// and `rb`, a range byte (usually all values higher than a `kb` when comparing with
// start_key, or all values lower than a `kb` when comparing with end_key)
if start_key >= end_key {
return false;
}
let common_prefix_len = crate::common_prefix_len(start_key, end_key);
let mut base_state = automaton.start();
for kb in &start_key[0..common_prefix_len] {
base_state = automaton.accept(&base_state, *kb);
}
// this is not required for correctness, but allows dodging more expensive checks
if !automaton.can_match(&base_state) {
return false;
}
// we have 3 distinct case:
// - keys are `abc` and `abcd` => we test for abc[\0-d].*
// - keys are `abcd` and `abce` => we test for abc[d-e].*
// - keys are `abcd` and `abc` => contradiction with start_key < end_key.
//
// ideally for (abc, abcde] we could test for abc([\0-c].*|d([\0-d].*|e)?)
// but let's start simple (and correct), and tighten our bounds latter
//
// and for (abcde, abcfg] we could test for abc(d(e.+|[f-\xff].*)|e.*|f([\0-f].*|g)?)
// abc (
// d(e.+|[f-\xff].*) |
// e.* |
// f([\0-f].*|g)?
// )
//
// these are all written as regex, but can be converted to operations we can do:
// - [x-y] is a for c in x..=y
// - .* is a can_match()
// - .+ is a for c in 0..=255 { accept(c).can_match() }
// - ? is a the thing before can_match(), or current state.is_match()
// - | means test both side
// we have two cases, either start_key is a prefix of end_key (e.g. (abc, abcjp]),
// or it is not (e.g. (abcdg, abcjp]). It is not possible however that end_key be a prefix of
// start_key (or that both are equal) because we already handled start_key >= end_key.
//
// if we are in the first case, we want to visit the following states:
// abc (
// [\0-i].* |
// j (
// [\0-o].* |
// p
// )?
// )
// Everything after `abc` is handled by `match_range_end`
//
// if we are in the 2nd case, we want to visit the following states:
// abc (
// d(g.+|[h-\xff].*) | // this is handled by match_range_start
//
// [e-i].* | // this is handled here
//
// j ( // this is handled by match_range_end (but countrary to the other
// [\0-o].* | // case, j is already consumed so to not check [\0-i].* )
// p
// )?
// )
let Some(start_range) = start_key.get(common_prefix_len) else {
return match_range_end(&end_key[common_prefix_len..], &automaton, base_state);
};
let end_range = end_key[common_prefix_len];
// things starting with start_range were handled in match_range_start
// this starting with end_range are handled bellow.
// this can run for 0 iteration in cases such as (abc, abd]
for rb in (start_range + 1)..end_range {
let new_state = automaton.accept(&base_state, rb);
if automaton.can_match(&new_state) {
return true;
}
}
let state_for_start = automaton.accept(&base_state, *start_range);
if match_range_start(
&start_key[common_prefix_len + 1..],
&automaton,
state_for_start,
) {
return true;
}
let state_for_end = automaton.accept(&base_state, end_range);
if automaton.is_match(&state_for_end) {
return true;
}
match_range_end(&end_key[common_prefix_len + 1..], &automaton, state_for_end)
}
fn match_range_start<S, A: Automaton<State = S>>(
start_key: &[u8],
automaton: &A,
mut state: S,
) -> bool {
// case (abcdgj, abcpqr], `abcd` is already consumed, we need to handle:
// - [h-\xff].*
// - g[k-\xff].*
// - gj.+ == gf[\0-\xff].*
for kb in start_key {
// this is an optimisation, and is not needed for correctness
if !automaton.can_match(&state) {
return false;
}
// does the [h-\xff].* part. we skip if kb==255 as [\{0100}-\xff] is an empty range, and
// this would overflow in our u8 world
if *kb < u8::MAX {
for rb in (kb + 1)..=u8::MAX {
let temp_state = automaton.accept(&state, rb);
if automaton.can_match(&temp_state) {
return true;
}
}
}
// push g
state = automaton.accept(&state, *kb);
}
// this isn't required for correctness, but can save us from looping 256 below
if !automaton.can_match(&state) {
return false;
}
// does the final `.+`, which is the same as `[\0-\xff].*`
for rb in 0..=u8::MAX {
let temp_state = automaton.accept(&state, rb);
if automaton.can_match(&temp_state) {
return true;
}
}
false
}
fn match_range_end<S, A: Automaton<State = S>>(
end_key: &[u8],
automaton: &A,
mut state: S,
) -> bool {
// for (abcdef, abcmps]. the prefix `abcm` has been consumed, `[d-l].*` was handled elsewhere,
// we just need to handle
// - [\0-o].*
// - p
// - p[\0-r].*
// - ps
for kb in end_key {
// this is an optimisation, and is not needed for correctness
if !automaton.can_match(&state) {
return false;
}
// does the `[\0-o].*`
for rb in 0..*kb {
let temp_state = automaton.accept(&state, rb);
if automaton.can_match(&temp_state) {
return true;
}
}
// push p
state = automaton.accept(&state, *kb);
// verify the `p` case
if automaton.is_match(&state) {
return true;
}
}
false
}
#[cfg(test)]
pub(crate) mod tests {
use proptest::prelude::*;
use tantivy_fst::Automaton;
use super::*;
pub(crate) struct EqBuffer(pub Vec<u8>);
impl Automaton for EqBuffer {
type State = Option<usize>;
fn start(&self) -> Self::State {
Some(0)
}
fn is_match(&self, state: &Self::State) -> bool {
*state == Some(self.0.len())
}
fn accept(&self, state: &Self::State, byte: u8) -> Self::State {
state
.filter(|pos| self.0.get(*pos) == Some(&byte))
.map(|pos| pos + 1)
}
fn can_match(&self, state: &Self::State) -> bool {
state.is_some()
}
fn will_always_match(&self, _state: &Self::State) -> bool {
false
}
}
fn gen_key_strategy() -> impl Strategy<Value = Vec<u8>> {
// we only generate bytes in [0, 1, 2, 254, 255] to reduce the search space without
// ignoring edge cases that might ocure with integer over/underflow
proptest::collection::vec(prop_oneof![0u8..=2, 254u8..=255], 0..5)
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 10000, .. ProptestConfig::default()
})]
#[test]
fn test_proptest_automaton_match_block(start in gen_key_strategy(), end in gen_key_strategy(), key in gen_key_strategy()) {
let expected = start < key && end >= key;
let automaton = EqBuffer(key);
assert_eq!(can_block_match_automaton(Some(&start), &end, &automaton), expected);
}
#[test]
fn test_proptest_automaton_match_first_block(end in gen_key_strategy(), key in gen_key_strategy()) {
let expected = end >= key;
let automaton = EqBuffer(key);
assert_eq!(can_block_match_automaton(None, &end, &automaton), expected);
}
}
}

View File

@@ -7,6 +7,7 @@ use zstd::bulk::Decompressor;
pub struct BlockReader {
buffer: Vec<u8>,
reader: OwnedBytes,
next_readers: std::vec::IntoIter<OwnedBytes>,
offset: usize,
}
@@ -15,6 +16,18 @@ impl BlockReader {
BlockReader {
buffer: Vec::new(),
reader,
next_readers: Vec::new().into_iter(),
offset: 0,
}
}
pub fn from_multiple_blocks(readers: Vec<OwnedBytes>) -> BlockReader {
let mut next_readers = readers.into_iter();
let reader = next_readers.next().unwrap_or_else(OwnedBytes::empty);
BlockReader {
buffer: Vec::new(),
reader,
next_readers,
offset: 0,
}
}
@@ -34,42 +47,52 @@ impl BlockReader {
self.offset = 0;
self.buffer.clear();
let block_len = match self.reader.len() {
0 => return Ok(false),
1..=3 => {
loop {
let block_len = match self.reader.len() {
0 => {
// we are out of data for this block. Check if we have another block after
if let Some(new_reader) = self.next_readers.next() {
self.reader = new_reader;
continue;
} else {
return Ok(false);
}
}
1..=3 => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block_len",
))
}
_ => self.reader.read_u32() as usize,
};
if block_len <= 1 {
return Ok(false);
}
let compress = self.reader.read_u8();
let block_len = block_len - 1;
if self.reader.len() < block_len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block_len",
))
"failed to read block content",
));
}
_ => self.reader.read_u32() as usize,
};
if block_len <= 1 {
return Ok(false);
}
let compress = self.reader.read_u8();
let block_len = block_len - 1;
if compress == 1 {
let required_capacity =
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
self.buffer.reserve(required_capacity);
Decompressor::new()?
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
if self.reader.len() < block_len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block content",
));
}
if compress == 1 {
let required_capacity =
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
self.buffer.reserve(required_capacity);
Decompressor::new()?
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
self.reader.advance(block_len);
} else {
self.buffer.resize(block_len, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
}
self.reader.advance(block_len);
} else {
self.buffer.resize(block_len, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
return Ok(true);
}
Ok(true)
}
#[inline(always)]

View File

@@ -143,6 +143,16 @@ where TValueReader: value::ValueReader
}
}
pub fn from_multiple_blocks(reader: Vec<OwnedBytes>) -> Self {
DeltaReader {
idx: 0,
common_prefix_len: 0,
suffix_range: 0..0,
value_reader: TValueReader::default(),
block_reader: BlockReader::from_multiple_blocks(reader),
}
}
pub fn empty() -> Self {
DeltaReader::new(OwnedBytes::empty())
}

View File

@@ -7,6 +7,8 @@ use std::sync::Arc;
use common::bounds::{transform_bound_inner_res, TransformBound};
use common::file_slice::FileSlice;
use common::{BinarySerializable, OwnedBytes};
use futures_util::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
@@ -98,20 +100,52 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
&self,
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
automaton: &impl Automaton,
merge_holes_under_bytes: usize,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes_async().await?;
Ok(TSSTable::delta_reader(data))
let match_all = automaton.will_always_match(&automaton.start());
if match_all {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes_async().await?;
Ok(TSSTable::delta_reader(data))
} else {
let blocks = stream::iter(self.get_block_iterator_for_range_and_automaton(
key_range,
automaton,
merge_holes_under_bytes,
));
let data = blocks
.map(|block_addr| {
self.sstable_slice
.read_bytes_slice_async(block_addr.byte_range)
})
.buffered(5)
.try_collect::<Vec<_>>()
.await?;
Ok(DeltaReader::from_multiple_blocks(data))
}
}
pub(crate) fn sstable_delta_reader_for_key_range(
&self,
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
automaton: &impl Automaton,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes()?;
Ok(TSSTable::delta_reader(data))
let match_all = automaton.will_always_match(&automaton.start());
if match_all {
let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes()?;
Ok(TSSTable::delta_reader(data))
} else {
// if operations are sync, we assume latency is almost null, and there is no point in
// merging accross holes
let blocks = self.get_block_iterator_for_range_and_automaton(key_range, automaton, 0);
let data = blocks
.map(|block_addr| self.sstable_slice.read_bytes_slice(block_addr.byte_range))
.collect::<Result<Vec<_>, _>>()?;
Ok(DeltaReader::from_multiple_blocks(data))
}
}
pub(crate) fn sstable_delta_reader_block(
@@ -204,6 +238,42 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
self.sstable_slice.slice((start_bound, end_bound))
}
fn get_block_iterator_for_range_and_automaton<'a>(
&'a self,
key_range: impl RangeBounds<[u8]>,
automaton: &'a impl Automaton,
merge_holes_under_bytes: usize,
) -> impl Iterator<Item = BlockAddr> + 'a {
let lower_bound = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX)
}
Bound::Unbounded => 0,
};
let upper_bound = match key_range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
self.sstable_index.locate_with_key(key).unwrap_or(u64::MAX)
}
Bound::Unbounded => u64::MAX,
};
let block_range = lower_bound..=upper_bound;
self.sstable_index
.get_block_for_automaton(automaton)
.filter(move |(block_id, _)| block_range.contains(block_id))
.map(|(_, block_addr)| block_addr)
.coalesce(move |first, second| {
if first.byte_range.end + merge_holes_under_bytes >= second.byte_range.start {
Ok(BlockAddr {
first_ordinal: first.first_ordinal,
byte_range: first.byte_range.start..second.byte_range.end,
})
} else {
Err((first, second))
}
})
}
/// Opens a `TermDictionary`.
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
@@ -521,6 +591,25 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
StreamerBuilder::new(self, AlwaysMatch)
}
/// Returns a range builder filtered with a prefix.
pub fn prefix_range<K: AsRef<[u8]>>(&self, prefix: K) -> StreamerBuilder<TSSTable> {
let lower_bound = prefix.as_ref();
let mut upper_bound = lower_bound.to_vec();
for idx in (0..upper_bound.len()).rev() {
if upper_bound[idx] == 255 {
upper_bound.pop();
} else {
upper_bound[idx] += 1;
break;
}
}
let mut builder = self.range().ge(lower_bound);
if !upper_bound.is_empty() {
builder = builder.lt(upper_bound);
}
builder
}
/// A stream of all the sorted terms.
pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
self.range().into_stream()
@@ -928,4 +1017,62 @@ mod tests {
}
assert!(!stream.advance());
}
#[test]
fn test_prefix() {
let (dic, _slice) = make_test_sstable();
{
let mut stream = dic.prefix_range("1").into_stream().unwrap();
for i in 0x10000..0x20000 {
assert!(stream.advance());
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("").into_stream().unwrap();
for i in 0..0x3ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("0FF").into_stream().unwrap();
for i in 0x0ff00..=0x0ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
}
#[test]
fn test_prefix_edge() {
let dict = {
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
builder.insert(&[0, 254], &0).unwrap();
builder.insert(&[0, 255], &1).unwrap();
builder.insert(&[0, 255, 12], &2).unwrap();
builder.insert(&[1], &2).unwrap();
builder.insert(&[1, 0], &2).unwrap();
let table = builder.finish().unwrap();
let table = Arc::new(PermissionedHandle::new(table));
let slice = common::file_slice::FileSlice::new(table.clone());
Dictionary::<MonotonicU64SSTable>::open(slice).unwrap()
};
let mut stream = dict.prefix_range(&[0, 255]).into_stream().unwrap();
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255]);
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255, 12]);
assert!(!stream.advance());
}
}

View File

@@ -3,6 +3,7 @@ use std::ops::Range;
use merge::ValueMerger;
mod block_match_automaton;
mod delta;
mod dictionary;
pub mod merge;

View File

@@ -1,10 +1,12 @@
use common::OwnedBytes;
use tantivy_fst::Automaton;
use crate::block_match_automaton::can_block_match_automaton;
use crate::{BlockAddr, SSTable, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
pub(crate) blocks: Vec<BlockMeta>,
}
impl SSTableIndex {
@@ -74,6 +76,31 @@ impl SSTableIndex {
// locate_with_ord always returns an index within range
self.get_block(self.locate_with_ord(ord)).unwrap()
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
std::iter::once((None, &self.blocks[0]))
.chain(self.blocks.windows(2).map(|window| {
let [prev, curr] = window else {
unreachable!();
};
(Some(&*prev.last_key_or_greater), curr)
}))
.enumerate()
.filter_map(move |(pos, (prev_key, current_block))| {
if can_block_match_automaton(
prev_key,
&current_block.last_key_or_greater,
automaton,
) {
Some((pos as u64, current_block.block_addr.clone()))
} else {
None
}
})
}
}
#[derive(Debug, Clone)]
@@ -99,3 +126,106 @@ impl SSTable for IndexSSTable {
type ValueWriter = crate::value::index::IndexValueWriter;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::block_match_automaton::tests::EqBuffer;
#[test]
fn test_get_block_for_automaton() {
let sstable = SSTableIndex {
blocks: vec![
BlockMeta {
last_key_or_greater: vec![0, 1, 2],
block_addr: BlockAddr {
first_ordinal: 0,
byte_range: 0..10,
},
},
BlockMeta {
last_key_or_greater: vec![0, 2, 2],
block_addr: BlockAddr {
first_ordinal: 5,
byte_range: 10..20,
},
},
BlockMeta {
last_key_or_greater: vec![0, 3, 2],
block_addr: BlockAddr {
first_ordinal: 10,
byte_range: 20..30,
},
},
],
};
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 1, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 2, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
1,
BlockAddr {
first_ordinal: 5,
byte_range: 10..20
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 3, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 4, 1]))
.collect::<Vec<_>>();
assert!(res.is_empty());
let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1]));
let res = sstable
.get_block_for_automaton(&complex_automaton)
.collect::<Vec<_>>();
assert_eq!(
res,
vec![
(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
),
(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)
]
);
}
}

View File

@@ -5,8 +5,9 @@ use std::sync::Arc;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_bitpacker::{compute_num_bits, BitPacker};
use tantivy_fst::raw::Fst;
use tantivy_fst::{IntoStreamer, Map, MapBuilder, Streamer};
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
use crate::block_match_automaton::can_block_match_automaton;
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
#[derive(Debug, Clone)]
@@ -64,6 +65,41 @@ impl SSTableIndex {
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
#[derive(Debug, Clone)]
@@ -123,6 +159,59 @@ impl SSTableIndexV3 {
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
self.block_addr_store.binary_search_ord(ord).1
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
// this is more complicated than other index formats: we don't have a ready made list of
// blocks, and instead need to stream-decode the sstable.
GetBlockForAutomaton {
streamer: self.fst_index.stream(),
block_addr_store: &self.block_addr_store,
prev_key: None,
automaton,
}
}
}
// TODO we iterate over the entire Map to find matching blocks,
// we could manually iterate on the underlying Fst and skip whole branches if our Automaton says
// cannot match. this isn't as bad as it sounds given the fst is a lot smaller than the rest of the
// sstable.
// To do that, we can't use tantivy_fst's Stream with an automaton, as we need to know 2 consecutive
// fst keys to form a proper opinion on whether this is a match, which we wan't translate into a
// single automaton
struct GetBlockForAutomaton<'a, A: Automaton> {
streamer: tantivy_fst::map::Stream<'a>,
block_addr_store: &'a BlockAddrStore,
prev_key: Option<Vec<u8>>,
automaton: &'a A,
}
impl<A: Automaton> Iterator for GetBlockForAutomaton<'_, A> {
type Item = (u64, BlockAddr);
fn next(&mut self) -> Option<Self::Item> {
while let Some((new_key, block_id)) = self.streamer.next() {
if let Some(prev_key) = self.prev_key.as_mut() {
if can_block_match_automaton(Some(prev_key), new_key, self.automaton) {
prev_key.clear();
prev_key.extend_from_slice(new_key);
return Some((block_id, self.block_addr_store.get(block_id).unwrap()));
}
prev_key.clear();
prev_key.extend_from_slice(new_key);
} else {
self.prev_key = Some(new_key.to_owned());
if can_block_match_automaton(None, new_key, self.automaton) {
return Some((block_id, self.block_addr_store.get(block_id).unwrap()));
}
}
}
None
}
}
#[derive(Debug, Clone)]
@@ -734,7 +823,8 @@ fn find_best_slope(elements: impl Iterator<Item = (usize, u64)> + Clone) -> (u32
mod tests {
use common::OwnedBytes;
use super::{BlockAddr, SSTableIndexBuilder, SSTableIndexV3};
use super::*;
use crate::block_match_automaton::tests::EqBuffer;
use crate::SSTableDataCorruption;
#[test]
@@ -823,4 +913,108 @@ mod tests {
(12345, 1)
);
}
#[test]
fn test_get_block_for_automaton() {
let sstable_index_builder = SSTableIndexBuilder {
blocks: vec![
BlockMeta {
last_key_or_greater: vec![0, 1, 2],
block_addr: BlockAddr {
first_ordinal: 0,
byte_range: 0..10,
},
},
BlockMeta {
last_key_or_greater: vec![0, 2, 2],
block_addr: BlockAddr {
first_ordinal: 5,
byte_range: 10..20,
},
},
BlockMeta {
last_key_or_greater: vec![0, 3, 2],
block_addr: BlockAddr {
first_ordinal: 10,
byte_range: 20..30,
},
},
],
};
let mut sstable_index_bytes = Vec::new();
let fst_len = sstable_index_builder
.serialize(&mut sstable_index_bytes)
.unwrap();
let sstable = SSTableIndexV3::load(OwnedBytes::new(sstable_index_bytes), fst_len).unwrap();
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 1, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 2, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
1,
BlockAddr {
first_ordinal: 5,
byte_range: 10..20
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 3, 1]))
.collect::<Vec<_>>();
assert_eq!(
res,
vec![(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)]
);
let res = sstable
.get_block_for_automaton(&EqBuffer(vec![0, 4, 1]))
.collect::<Vec<_>>();
assert!(res.is_empty());
let complex_automaton = EqBuffer(vec![0, 1, 1]).union(EqBuffer(vec![0, 3, 1]));
let res = sstable
.get_block_for_automaton(&complex_automaton)
.collect::<Vec<_>>();
assert_eq!(
res,
vec![
(
0,
BlockAddr {
first_ordinal: 0,
byte_range: 0..10
}
),
(
2,
BlockAddr {
first_ordinal: 10,
byte_range: 20..30
}
)
]
);
}
}

View File

@@ -86,16 +86,24 @@ where
bound_as_byte_slice(&self.upper),
);
self.term_dict
.sstable_delta_reader_for_key_range(key_range, self.limit)
.sstable_delta_reader_for_key_range(key_range, self.limit, &self.automaton)
}
async fn delta_reader_async(&self) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
async fn delta_reader_async(
&self,
merge_holes_under_bytes: usize,
) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let key_range = (
bound_as_byte_slice(&self.lower),
bound_as_byte_slice(&self.upper),
);
self.term_dict
.sstable_delta_reader_for_key_range_async(key_range, self.limit)
.sstable_delta_reader_for_key_range_async(
key_range,
self.limit,
&self.automaton,
merge_holes_under_bytes,
)
.await
}
@@ -130,7 +138,16 @@ where
/// See `into_stream(..)`
pub async fn into_stream_async(self) -> io::Result<Streamer<'a, TSSTable, A>> {
let delta_reader = self.delta_reader_async().await?;
self.into_stream_async_merging_holes(0).await
}
/// Same as `into_stream_async`, but tries to issue a single io operation when requesting
/// blocks that are not consecutive, but also less than `merge_holes_under_bytes` bytes appart.
pub async fn into_stream_async_merging_holes(
self,
merge_holes_under_bytes: usize,
) -> io::Result<Streamer<'a, TSSTable, A>> {
let delta_reader = self.delta_reader_async(merge_holes_under_bytes).await?;
self.into_stream_given_delta_reader(delta_reader)
}
@@ -161,7 +178,7 @@ where
_lifetime: std::marker::PhantomData<&'a ()>,
}
impl<'a, TSSTable> Streamer<'a, TSSTable, AlwaysMatch>
impl<TSSTable> Streamer<'_, TSSTable, AlwaysMatch>
where TSSTable: SSTable
{
pub fn empty() -> Self {
@@ -178,7 +195,7 @@ where TSSTable: SSTable
}
}
impl<'a, TSSTable, A> Streamer<'a, TSSTable, A>
impl<TSSTable, A> Streamer<'_, TSSTable, A>
where
A: Automaton,
A::State: Clone,
@@ -327,4 +344,7 @@ mod tests {
assert!(!term_streamer.advance());
Ok(())
}
// TODO add test for sparse search with a block of poison (starts with 0xffffffff) => such a
// block instantly causes an unexpected EOF error
}

View File

@@ -11,7 +11,7 @@ description = "term hashmap used for indexing"
murmurhash32 = "0.3"
common = { version = "0.7", path = "../common/", package = "tantivy-common" }
ahash = { version = "0.8.11", default-features = false, optional = true }
rand_distr = "0.4.3"
rand_distr = "0.5.1"
[[bench]]
@@ -26,7 +26,7 @@ path = "example/hashmap.rs"
[dev-dependencies]
rand = "0.8.5"
zipf = "7.0.0"
rustc-hash = "1.1.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.14.0" }

View File

@@ -74,7 +74,7 @@ fn ensure_capacity<'a>(
eull.remaining_cap = allocate as u16;
}
impl<'a> ExpUnrolledLinkedListWriter<'a> {
impl ExpUnrolledLinkedListWriter<'_> {
#[inline]
pub fn write_u32_vint(&mut self, val: u32) {
let mut buf = [0u8; 8];

View File

@@ -63,7 +63,7 @@ pub trait Tokenizer: 'static + Clone + Send + Sync {
/// Simple wrapper of `Box<dyn TokenStream + 'a>`.
pub struct BoxTokenStream<'a>(Box<dyn TokenStream + 'a>);
impl<'a> TokenStream for BoxTokenStream<'a> {
impl TokenStream for BoxTokenStream<'_> {
fn advance(&mut self) -> bool {
self.0.advance()
}
@@ -90,7 +90,7 @@ impl<'a> Deref for BoxTokenStream<'a> {
&*self.0
}
}
impl<'a> DerefMut for BoxTokenStream<'a> {
impl DerefMut for BoxTokenStream<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.0
}