mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-03 15:52:55 +00:00
Compare commits
114 Commits
dependabot
...
0.25.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4139bc353 | ||
|
|
8676a1f57b | ||
|
|
021ff2ad63 | ||
|
|
39e027667b | ||
|
|
a1d65c3df3 | ||
|
|
2e4615c2d3 | ||
|
|
610091e2c4 | ||
|
|
c301e7b1c4 | ||
|
|
d9eb093368 | ||
|
|
d4b090124c | ||
|
|
811c68cdb2 | ||
|
|
bc1c789897 | ||
|
|
e7c8c331bd | ||
|
|
2f01152a3c | ||
|
|
4e84c70387 | ||
|
|
f2c77f06c5 | ||
|
|
74334f9c9a | ||
|
|
cc4beb61ba | ||
|
|
6742e5981b | ||
|
|
b128299976 | ||
|
|
945af922d1 | ||
|
|
295d07e55c | ||
|
|
080fa4d1f4 | ||
|
|
988c2b35e7 | ||
|
|
bf3cc12610 | ||
|
|
a2400f4e73 | ||
|
|
436ec6caea | ||
|
|
4a6123d3ff | ||
|
|
5a2fe42c24 | ||
|
|
5379c99ea2 | ||
|
|
3fa90e70e2 | ||
|
|
6ab4102253 | ||
|
|
11c6329ca5 | ||
|
|
ab8bb93928 | ||
|
|
2b668bd2bf | ||
|
|
97a7137ef8 | ||
|
|
ffa7cdf397 | ||
|
|
caf1275e60 | ||
|
|
fb12b7be28 | ||
|
|
6f77083493 | ||
|
|
cd7745da7a | ||
|
|
eb8304dee9 | ||
|
|
e5638112a9 | ||
|
|
81110152fb | ||
|
|
ae88a7ece5 | ||
|
|
bdd5f80fd9 | ||
|
|
3f62ef22e5 | ||
|
|
8102e19e48 | ||
|
|
175c853ea7 | ||
|
|
c992cf3f37 | ||
|
|
83f6c2f265 | ||
|
|
17bf8aa092 | ||
|
|
6fc0e96ff8 | ||
|
|
06d2dcf469 | ||
|
|
b681ec9335 | ||
|
|
da2ff5712a | ||
|
|
18da402e27 | ||
|
|
18ae3ffe94 | ||
|
|
0a37b7acaa | ||
|
|
1a9fd885dd | ||
|
|
3e660905a7 | ||
|
|
0c2b984cb4 | ||
|
|
a69b1c609c | ||
|
|
8d4a6fcaba | ||
|
|
feced4762f | ||
|
|
0149317c5a | ||
|
|
3fcb6f9597 | ||
|
|
388fcd763b | ||
|
|
e488f9e6a2 | ||
|
|
9426d5be7b | ||
|
|
d5d2d41264 | ||
|
|
80f5f1ecd4 | ||
|
|
519e5d2ed1 | ||
|
|
df2d52a84e | ||
|
|
371dba9414 | ||
|
|
0afabad494 | ||
|
|
89b052cd42 | ||
|
|
c48c649436 | ||
|
|
58c0739953 | ||
|
|
e7daf69de9 | ||
|
|
f060e86bc6 | ||
|
|
0368162ef0 | ||
|
|
e843c71015 | ||
|
|
5cea16ef9f | ||
|
|
4aa8cd2470 | ||
|
|
4d4ee1b0ac | ||
|
|
43c89b4360 | ||
|
|
d281ca3e65 | ||
|
|
be17daf658 | ||
|
|
6ca84a61fa | ||
|
|
037d12c9c9 | ||
|
|
71cf19870b | ||
|
|
175a529c41 | ||
|
|
fe0c7c5408 | ||
|
|
148594f0f9 | ||
|
|
8edb439440 | ||
|
|
dfff5f3bcb | ||
|
|
ebf4d84553 | ||
|
|
42efc7f7c8 | ||
|
|
192395c311 | ||
|
|
a1447cc9c2 | ||
|
|
c39d91f827 | ||
|
|
32b6e9711b | ||
|
|
24c5dc2398 | ||
|
|
9e2ddec4b3 | ||
|
|
1f6a8e74bb | ||
|
|
7e901f523b | ||
|
|
3c30a41c14 | ||
|
|
0f99d4f420 | ||
|
|
6e02c5cb25 | ||
|
|
876a579e5d | ||
|
|
4c52499622 | ||
|
|
0bac391291 | ||
|
|
52d4e81e70 |
2
.github/workflows/coverage.yml
vendored
2
.github/workflows/coverage.yml
vendored
@@ -21,7 +21,7 @@ jobs:
|
||||
- name: Generate code coverage
|
||||
run: cargo +nightly-2024-07-01 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v5
|
||||
uses: codecov/codecov-action@v3
|
||||
continue-on-error: true
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos
|
||||
|
||||
44
CHANGELOG.md
44
CHANGELOG.md
@@ -1,11 +1,30 @@
|
||||
Tantivy 0.23 - Unreleased
|
||||
Tantivy 0.25
|
||||
================================
|
||||
Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0.21.
|
||||
|
||||
## Bugfixes
|
||||
- fix union performance regression in tantivy 0.24 [#2663](https://github.com/quickwit-oss/tantivy/pull/2663)(@PSeitz)
|
||||
- make zstd optional in sstable [#2633](https://github.com/quickwit-oss/tantivy/pull/2633)(@Parth)
|
||||
- Fix TopDocs::order_by_string_fast_field for asc order [#2672](https://github.com/quickwit-oss/tantivy/pull/2672)(@stuhood @PSeitz)
|
||||
|
||||
## Features/Improvements
|
||||
- add docs/example and Vec<u32> values to sstable [#2660](https://github.com/quickwit-oss/tantivy/pull/2660)(@PSeitz)
|
||||
- Add string fast field support to `TopDocs`. [#2642](https://github.com/quickwit-oss/tantivy/pull/2642)(@stuhood)
|
||||
- update edition to 2024 [#2620](https://github.com/quickwit-oss/tantivy/pull/2620)(@PSeitz)
|
||||
- Allow optional spaces between the field name and the value in the query parser [#2678](https://github.com/quickwit-oss/tantivy/pull/2678)(@Darkheir)
|
||||
- Support mixed field types in query parser [#2676](https://github.com/quickwit-oss/tantivy/pull/2676)(@trinity-1686a)
|
||||
- Add per-field size details [#2679](https://github.com/quickwit-oss/tantivy/pull/2679)(@fulmicoton)
|
||||
|
||||
Tantivy 0.24
|
||||
================================
|
||||
Tantivy 0.24 will be backwards compatible with indices created with v0.22 and v0.21. The new minimum rust version will be 1.75. Tantivy 0.23 will be skipped.
|
||||
|
||||
#### Bugfixes
|
||||
- fix potential endless loop in merge [#2457](https://github.com/quickwit-oss/tantivy/pull/2457)(@PSeitz)
|
||||
- fix bug that causes out-of-order sstable key. [#2445](https://github.com/quickwit-oss/tantivy/pull/2445)(@fulmicoton)
|
||||
- fix ReferenceValue API flaw [#2372](https://github.com/quickwit-oss/tantivy/pull/2372)(@PSeitz)
|
||||
- fix `OwnedBytes` debug panic [#2512](https://github.com/quickwit-oss/tantivy/pull/2512)(@b41sh)
|
||||
- catch panics during merges [#2582](https://github.com/quickwit-oss/tantivy/pull/2582)(@rdettai)
|
||||
- switch from u32 to usize in bitpacker. This enables multivalued columns larger than 4GB, which crashed during merge before. [#2581](https://github.com/quickwit-oss/tantivy/pull/2581) [#2586](https://github.com/quickwit-oss/tantivy/pull/2586)(@fulmicoton-dd @PSeitz)
|
||||
|
||||
#### Breaking API Changes
|
||||
- remove index sorting [#2434](https://github.com/quickwit-oss/tantivy/pull/2434)(@PSeitz)
|
||||
@@ -23,6 +42,7 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
|
||||
- reduce top hits memory consumption [#2426](https://github.com/quickwit-oss/tantivy/pull/2426)(@PSeitz)
|
||||
- check unsupported parameters top_hits [#2351](https://github.com/quickwit-oss/tantivy/pull/2351)(@PSeitz)
|
||||
- Change AggregationLimits to AggregationLimitsGuard [#2495](https://github.com/quickwit-oss/tantivy/pull/2495)(@PSeitz)
|
||||
- add support for counting non integer in aggregation [#2547](https://github.com/quickwit-oss/tantivy/pull/2547)(@trinity-1686a)
|
||||
- **Range Queries**
|
||||
- Support fast field range queries on json fields [#2456](https://github.com/quickwit-oss/tantivy/pull/2456)(@PSeitz)
|
||||
- Add support for str fast field range query [#2460](https://github.com/quickwit-oss/tantivy/pull/2460) [#2452](https://github.com/quickwit-oss/tantivy/pull/2452) [#2453](https://github.com/quickwit-oss/tantivy/pull/2453)(@PSeitz)
|
||||
@@ -33,9 +53,18 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
|
||||
- add columnar format compatibility tests [#2433](https://github.com/quickwit-oss/tantivy/pull/2433)(@PSeitz)
|
||||
- Improved snippet ranges algorithm [#2474](https://github.com/quickwit-oss/tantivy/pull/2474)(@gezihuzi)
|
||||
- make find_field_with_default return json fields without path [#2476](https://github.com/quickwit-oss/tantivy/pull/2476)(@trinity-1686a)
|
||||
- feat(query): Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
|
||||
- Make `BooleanQuery` support `minimum_number_should_match` [#2405](https://github.com/quickwit-oss/tantivy/pull/2405)(@LebranceBW)
|
||||
- Make `NUM_MERGE_THREADS` configurable [#2535](https://github.com/quickwit-oss/tantivy/pull/2535)(@Barre)
|
||||
|
||||
- **Optional Index in Multivalue Columnar Index** For mostly empty multivalued indices there was a large overhead during creation when iterating all docids (merge case). This is alleviated by placing an optional index in the multivalued index to mark documents that have values. This will slightly increase space and access time. [#2439](https://github.com/quickwit-oss/tantivy/pull/2439)(@PSeitz)
|
||||
- **RegexPhraseQuery**
|
||||
`RegexPhraseQuery` supports phrase queries with regex. E.g. query "b.* b.* wolf" matches "big bad wolf". Slop is supported as well: "b.* wolf"~2 matches "big bad wolf" [#2516](https://github.com/quickwit-oss/tantivy/pull/2516)(@PSeitz)
|
||||
|
||||
- **Optional Index in Multivalue Columnar Index**
|
||||
For mostly empty multivalued indices there was a large overhead during creation when iterating all docids (merge case).
|
||||
This is alleviated by placing an optional index in the multivalued index to mark documents that have values.
|
||||
This will slightly increase space and access time. [#2439](https://github.com/quickwit-oss/tantivy/pull/2439)(@PSeitz)
|
||||
|
||||
- **Store DateTime as nanoseconds in doc store** DateTime in the doc store was truncated to microseconds previously. This removes this truncation, while still keeping backwards compatibility. [#2486](https://github.com/quickwit-oss/tantivy/pull/2486)(@PSeitz)
|
||||
|
||||
- **Performace/Memory**
|
||||
- lift clauses in LogicalAst for optimized ast during execution [#2449](https://github.com/quickwit-oss/tantivy/pull/2449)(@PSeitz)
|
||||
@@ -51,18 +80,21 @@ Tantivy 0.23 will be backwards compatible with indices created with v0.22 and v0
|
||||
- fix de-escaping too much in query parser [#2427](https://github.com/quickwit-oss/tantivy/pull/2427)(@trinity-1686a)
|
||||
- improve query parser [#2416](https://github.com/quickwit-oss/tantivy/pull/2416)(@trinity-1686a)
|
||||
- Support field grouping `title:(return AND "pink panther")` [#2333](https://github.com/quickwit-oss/tantivy/pull/2333)(@trinity-1686a)
|
||||
- allow term starting with wildcard [#2568](https://github.com/quickwit-oss/tantivy/pull/2568)(@trinity-1686a)
|
||||
|
||||
- Exist queries match subpath fields [#2558](https://github.com/quickwit-oss/tantivy/pull/2558)(@rdettai)
|
||||
- add access benchmark for columnar [#2432](https://github.com/quickwit-oss/tantivy/pull/2432)(@PSeitz)
|
||||
- extend indexwriter proptests [#2342](https://github.com/quickwit-oss/tantivy/pull/2342)(@PSeitz)
|
||||
- add bench & test for columnar merging [#2428](https://github.com/quickwit-oss/tantivy/pull/2428)(@PSeitz)
|
||||
- Change in Executor API [#2391](https://github.com/quickwit-oss/tantivy/pull/2391)(@fulmicoton)
|
||||
- Removed usage of num_cpus [#2387](https://github.com/quickwit-oss/tantivy/pull/2387)(@fulmicoton)
|
||||
- use bingang for agg benchmark [#2378](https://github.com/quickwit-oss/tantivy/pull/2378)(@PSeitz)
|
||||
- use bingang for agg and stacker benchmark [#2378](https://github.com/quickwit-oss/tantivy/pull/2378)[#2492](https://github.com/quickwit-oss/tantivy/pull/2492)(@PSeitz)
|
||||
- cleanup top level exports [#2382](https://github.com/quickwit-oss/tantivy/pull/2382)(@PSeitz)
|
||||
- make convert_to_fast_value_and_append_to_json_term pub [#2370](https://github.com/quickwit-oss/tantivy/pull/2370)(@PSeitz)
|
||||
- remove JsonTermWriter [#2238](https://github.com/quickwit-oss/tantivy/pull/2238)(@PSeitz)
|
||||
- validate sort by field type [#2336](https://github.com/quickwit-oss/tantivy/pull/2336)(@PSeitz)
|
||||
- Fix trait bound of StoreReader::iter [#2360](https://github.com/quickwit-oss/tantivy/pull/2360)(@adamreichold)
|
||||
- remove read_postings_no_deletes [#2526](https://github.com/quickwit-oss/tantivy/pull/2526)(@PSeitz)
|
||||
|
||||
Tantivy 0.22
|
||||
================================
|
||||
@@ -717,7 +749,7 @@ Tantivy 0.4.0
|
||||
- Raise the limit of number of fields (previously 256 fields) (@fulmicoton)
|
||||
- Removed u32 fields. They are replaced by u64 and i64 fields (#65) (@fulmicoton)
|
||||
- Optimized skip in SegmentPostings (#130) (@lnicola)
|
||||
- Replacing rustc_serialize by serde. Kudos to @KodrAus and @lnicola
|
||||
- Replacing rustc_serialize by serde. Kudos to benchmark@KodrAus and @lnicola
|
||||
- Using error-chain (@KodrAus)
|
||||
- QueryParser: (@fulmicoton)
|
||||
- Explicit error returned when searched for a term that is not indexed
|
||||
|
||||
39
Cargo.toml
39
Cargo.toml
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.23.0"
|
||||
version = "0.25.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
rust-version = "1.75"
|
||||
rust-version = "1.85"
|
||||
exclude = ["benches/*.json", "benches/*.txt"]
|
||||
|
||||
[dependencies]
|
||||
@@ -31,14 +31,14 @@ lz4_flex = { version = "0.11", default-features = false, optional = true }
|
||||
zstd = { version = "0.13", optional = true, default-features = false }
|
||||
tempfile = { version = "3.12.0", optional = true }
|
||||
log = "0.4.16"
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
serde_json = "1.0.79"
|
||||
fs4 = { version = "0.8.0", optional = true }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
fs4 = { version = "0.13.1", optional = true }
|
||||
levenshtein_automata = "0.2.1"
|
||||
uuid = { version = "1.0.0", features = ["v4", "serde"] }
|
||||
crossbeam-channel = "0.5.4"
|
||||
rust-stemmers = "1.2.0"
|
||||
downcast-rs = "1.2.1"
|
||||
downcast-rs = "2.0.1"
|
||||
bitpacking = { version = "0.9.2", default-features = false, features = [
|
||||
"bitpacker4x",
|
||||
] }
|
||||
@@ -52,20 +52,22 @@ smallvec = "1.8.0"
|
||||
rayon = "1.5.2"
|
||||
lru = "0.12.0"
|
||||
fastdivide = "0.4.0"
|
||||
itertools = "0.13.0"
|
||||
measure_time = "0.8.2"
|
||||
itertools = "0.14.0"
|
||||
measure_time = "0.9.0"
|
||||
arc-swap = "1.5.0"
|
||||
bon = "3.3.1"
|
||||
|
||||
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
|
||||
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||
stacker = { version = "0.3", path = "./stacker", package = "tantivy-stacker" }
|
||||
query-grammar = { version = "0.22.0", path = "./query-grammar", package = "tantivy-query-grammar" }
|
||||
tantivy-bitpacker = { version = "0.6", path = "./bitpacker" }
|
||||
common = { version = "0.7", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
columnar = { version = "0.6", path = "./columnar", package = "tantivy-columnar" }
|
||||
sstable = { version = "0.6", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||
stacker = { version = "0.6", path = "./stacker", package = "tantivy-stacker" }
|
||||
query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tantivy-query-grammar" }
|
||||
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
|
||||
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
|
||||
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
fnv = "1.0.7"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
@@ -110,17 +112,20 @@ debug-assertions = true
|
||||
overflow-checks = true
|
||||
|
||||
[features]
|
||||
default = ["mmap", "stopwords", "lz4-compression"]
|
||||
default = ["mmap", "stopwords", "lz4-compression", "columnar-zstd-compression"]
|
||||
mmap = ["fs4", "tempfile", "memmap2"]
|
||||
stopwords = []
|
||||
|
||||
lz4-compression = ["lz4_flex"]
|
||||
zstd-compression = ["zstd"]
|
||||
|
||||
# enable zstd-compression in columnar (and sstable)
|
||||
columnar-zstd-compression = ["columnar/zstd-compression"]
|
||||
|
||||
failpoints = ["fail", "fail/failpoints"]
|
||||
unstable = [] # useful for benches.
|
||||
|
||||
quickwit = ["sstable", "futures-util"]
|
||||
quickwit = ["sstable", "futures-util", "futures-channel"]
|
||||
|
||||
# Compares only the hash of a string when indexing data.
|
||||
# Increases indexing speed, but may lead to extremely rare missing terms, when there's a hash collision.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "tantivy-bitpacker"
|
||||
version = "0.6.0"
|
||||
edition = "2021"
|
||||
version = "0.9.0"
|
||||
edition = "2024"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = []
|
||||
|
||||
@@ -65,7 +65,7 @@ impl BitPacker {
|
||||
|
||||
#[derive(Clone, Debug, Default, Copy)]
|
||||
pub struct BitUnpacker {
|
||||
num_bits: u32,
|
||||
num_bits: usize,
|
||||
mask: u64,
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ impl BitUnpacker {
|
||||
(1u64 << num_bits) - 1u64
|
||||
};
|
||||
BitUnpacker {
|
||||
num_bits: u32::from(num_bits),
|
||||
num_bits: usize::from(num_bits),
|
||||
mask,
|
||||
}
|
||||
}
|
||||
@@ -94,14 +94,14 @@ impl BitUnpacker {
|
||||
|
||||
#[inline]
|
||||
pub fn get(&self, idx: u32, data: &[u8]) -> u64 {
|
||||
let addr_in_bits = idx * self.num_bits;
|
||||
let addr = (addr_in_bits >> 3) as usize;
|
||||
let addr_in_bits = idx as usize * self.num_bits;
|
||||
let addr = addr_in_bits >> 3;
|
||||
if addr + 8 > data.len() {
|
||||
if self.num_bits == 0 {
|
||||
return 0;
|
||||
}
|
||||
let bit_shift = addr_in_bits & 7;
|
||||
return self.get_slow_path(addr, bit_shift, data);
|
||||
return self.get_slow_path(addr, bit_shift as u32, data);
|
||||
}
|
||||
let bit_shift = addr_in_bits & 7;
|
||||
let bytes: [u8; 8] = (&data[addr..addr + 8]).try_into().unwrap();
|
||||
@@ -134,12 +134,13 @@ impl BitUnpacker {
|
||||
"Bitwidth must be <= 32 to use this method."
|
||||
);
|
||||
|
||||
let end_idx = start_idx + output.len() as u32;
|
||||
let end_idx: u32 = start_idx + output.len() as u32;
|
||||
|
||||
let end_bit_read = end_idx * self.num_bits;
|
||||
// We use `usize` here to avoid overflow issues.
|
||||
let end_bit_read = (end_idx as usize) * self.num_bits;
|
||||
let end_byte_read = (end_bit_read + 7) / 8;
|
||||
assert!(
|
||||
end_byte_read as usize <= data.len(),
|
||||
end_byte_read <= data.len(),
|
||||
"Requested index is out of bounds."
|
||||
);
|
||||
|
||||
@@ -159,24 +160,24 @@ impl BitUnpacker {
|
||||
// We want the start of the fast track to start align with bytes.
|
||||
// A sufficient condition is to start with an idx that is a multiple of 8,
|
||||
// so highway start is the closest multiple of 8 that is >= start_idx.
|
||||
let entrance_ramp_len = 8 - (start_idx % 8) % 8;
|
||||
let entrance_ramp_len: u32 = 8 - (start_idx % 8) % 8;
|
||||
|
||||
let highway_start: u32 = start_idx + entrance_ramp_len;
|
||||
|
||||
if highway_start + BitPacker1x::BLOCK_LEN as u32 > end_idx {
|
||||
if highway_start + (BitPacker1x::BLOCK_LEN as u32) > end_idx {
|
||||
// We don't have enough values to have even a single block of highway.
|
||||
// Let's just supply the values the simple way.
|
||||
get_batch_ramp(start_idx, output);
|
||||
return;
|
||||
}
|
||||
|
||||
let num_blocks: u32 = (end_idx - highway_start) / BitPacker1x::BLOCK_LEN as u32;
|
||||
let num_blocks: usize = (end_idx - highway_start) as usize / BitPacker1x::BLOCK_LEN;
|
||||
|
||||
// Entrance ramp
|
||||
get_batch_ramp(start_idx, &mut output[..entrance_ramp_len as usize]);
|
||||
|
||||
// Highway
|
||||
let mut offset = (highway_start * self.num_bits) as usize / 8;
|
||||
let mut offset = (highway_start as usize * self.num_bits) / 8;
|
||||
let mut output_cursor = (highway_start - start_idx) as usize;
|
||||
for _ in 0..num_blocks {
|
||||
offset += BitPacker1x.decompress(
|
||||
@@ -188,7 +189,7 @@ impl BitUnpacker {
|
||||
}
|
||||
|
||||
// Exit ramp
|
||||
let highway_end = highway_start + num_blocks * BitPacker1x::BLOCK_LEN as u32;
|
||||
let highway_end: u32 = highway_start + (num_blocks * BitPacker1x::BLOCK_LEN) as u32;
|
||||
get_batch_ramp(highway_end, &mut output[output_cursor..]);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::bitpacker::BitPacker;
|
||||
use super::compute_num_bits;
|
||||
use crate::{minmax, BitUnpacker};
|
||||
use crate::{BitUnpacker, minmax};
|
||||
|
||||
const BLOCK_SIZE: usize = 128;
|
||||
|
||||
@@ -34,7 +34,7 @@ struct BlockedBitpackerEntryMetaData {
|
||||
|
||||
impl BlockedBitpackerEntryMetaData {
|
||||
fn new(offset: u64, num_bits: u8, base_value: u64) -> Self {
|
||||
let encoded = offset | (num_bits as u64) << (64 - 8);
|
||||
let encoded = offset | (u64::from(num_bits) << (64 - 8));
|
||||
Self {
|
||||
encoded,
|
||||
base_value,
|
||||
|
||||
@@ -33,11 +33,7 @@ pub use crate::blocked_bitpacker::BlockedBitpacker;
|
||||
/// number of bits.
|
||||
pub fn compute_num_bits(n: u64) -> u8 {
|
||||
let amplitude = (64u32 - n.leading_zeros()) as u8;
|
||||
if amplitude <= 64 - 8 {
|
||||
amplitude
|
||||
} else {
|
||||
64
|
||||
}
|
||||
if amplitude <= 64 - 8 { amplitude } else { 64 }
|
||||
}
|
||||
|
||||
/// Computes the (min, max) of an iterator of `PartialOrd` values.
|
||||
|
||||
@@ -16,14 +16,14 @@ body = """
|
||||
|
||||
{%- if version %} in {{ version }}{%- endif -%}
|
||||
{% for commit in commits %}
|
||||
{% if commit.github.pr_title -%}
|
||||
{%- set commit_message = commit.github.pr_title -%}
|
||||
{% if commit.remote.pr_title -%}
|
||||
{%- set commit_message = commit.remote.pr_title -%}
|
||||
{%- else -%}
|
||||
{%- set commit_message = commit.message -%}
|
||||
{%- endif -%}
|
||||
- {{ commit_message | split(pat="\n") | first | trim }}\
|
||||
{% if commit.github.pr_number %} \
|
||||
[#{{ commit.github.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.github.pr_number }}){% if commit.github.username %}(@{{ commit.github.username }}){%- endif -%} \
|
||||
{% if commit.remote.pr_number %} \
|
||||
[#{{ commit.remote.pr_number }}]({{ self::remote_url() }}/pull/{{ commit.remote.pr_number }}){% if commit.remote.username %}(@{{ commit.remote.username }}){%- endif -%} \
|
||||
{%- endif %}
|
||||
{%- endfor -%}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "tantivy-columnar"
|
||||
version = "0.3.0"
|
||||
edition = "2021"
|
||||
version = "0.6.0"
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -9,15 +9,15 @@ description = "column oriented storage for tantivy"
|
||||
categories = ["database-implementations", "data-structures", "compression"]
|
||||
|
||||
[dependencies]
|
||||
itertools = "0.13.0"
|
||||
itertools = "0.14.0"
|
||||
fastdivide = "0.4.0"
|
||||
|
||||
stacker = { version= "0.3", path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { version= "0.3", path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { version= "0.7", path = "../common", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version= "0.6", path = "../bitpacker/" }
|
||||
stacker = { version= "0.6", path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { version= "0.6", path = "../sstable", package = "tantivy-sstable" }
|
||||
common = { version= "0.10", path = "../common", package = "tantivy-common" }
|
||||
tantivy-bitpacker = { version= "0.9", path = "../bitpacker/" }
|
||||
serde = "1.0.152"
|
||||
downcast-rs = "1.2.0"
|
||||
downcast-rs = "2.0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1"
|
||||
@@ -33,6 +33,29 @@ harness = false
|
||||
name = "bench_access"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_first_vals"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_values_u64"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_values_u128"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_create_column_values"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_column_values_get"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_optional_index"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
unstable = []
|
||||
zstd-compression = ["sstable/zstd-compression"]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use binggan::{black_box, InputGroup};
|
||||
use binggan::{InputGroup, black_box};
|
||||
use common::*;
|
||||
use tantivy_columnar::Column;
|
||||
|
||||
@@ -19,7 +19,7 @@ fn main() {
|
||||
|
||||
let mut add_card = |card1: Card| {
|
||||
inputs.push((
|
||||
format!("{card1}"),
|
||||
card1.to_string(),
|
||||
generate_columnar_and_open(card1, NUM_DOCS),
|
||||
));
|
||||
};
|
||||
@@ -50,6 +50,7 @@ fn bench_group(mut runner: InputGroup<Column>) {
|
||||
let mut buffer = vec![None; BLOCK_SIZE];
|
||||
for i in (0..NUM_DOCS).step_by(BLOCK_SIZE) {
|
||||
// fill docs
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for idx in 0..BLOCK_SIZE {
|
||||
docs[idx] = idx as u32 + i;
|
||||
}
|
||||
|
||||
61
columnar/benches/bench_column_values_get.rs
Normal file
61
columnar/benches/bench_column_values_get.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use binggan::{InputGroup, black_box};
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use tantivy_columnar::ColumnValues;
|
||||
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
|
||||
|
||||
fn get_data() -> Vec<u64> {
|
||||
let mut rng = StdRng::seed_from_u64(2u64);
|
||||
let mut data: Vec<_> = (100..55_000_u64)
|
||||
.map(|num| num + rng.r#gen::<u8>() as u64)
|
||||
.collect();
|
||||
data.push(99_000);
|
||||
data.insert(1000, 2000);
|
||||
data.insert(2000, 100);
|
||||
data.insert(3000, 4100);
|
||||
data.insert(4000, 100);
|
||||
data.insert(5000, 800);
|
||||
data
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
fn value_iter() -> impl Iterator<Item = u64> {
|
||||
0..20_000
|
||||
}
|
||||
|
||||
type Col = Arc<dyn ColumnValues<u64>>;
|
||||
|
||||
fn main() {
|
||||
let data = get_data();
|
||||
let inputs: Vec<(String, Col)> = vec![
|
||||
(
|
||||
"bitpacked".to_string(),
|
||||
serialize_and_load_u64_based_column_values(&data.as_slice(), &[CodecType::Bitpacked]),
|
||||
),
|
||||
(
|
||||
"linear".to_string(),
|
||||
serialize_and_load_u64_based_column_values(&data.as_slice(), &[CodecType::Linear]),
|
||||
),
|
||||
(
|
||||
"blockwise_linear".to_string(),
|
||||
serialize_and_load_u64_based_column_values(
|
||||
&data.as_slice(),
|
||||
&[CodecType::BlockwiseLinear],
|
||||
),
|
||||
),
|
||||
];
|
||||
|
||||
let mut group: InputGroup<Col> = InputGroup::new_with_inputs(inputs);
|
||||
|
||||
group.register("fastfield_get", |col: &Col| {
|
||||
let mut sum = 0u64;
|
||||
for pos in value_iter() {
|
||||
sum = sum.wrapping_add(col.get_val(pos as u32));
|
||||
}
|
||||
black_box(sum);
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
44
columnar/benches/bench_create_column_values.rs
Normal file
44
columnar/benches/bench_create_column_values.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use binggan::{InputGroup, black_box};
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use tantivy_columnar::column_values::{CodecType, serialize_u64_based_column_values};
|
||||
|
||||
fn get_data() -> Vec<u64> {
|
||||
let mut rng = StdRng::seed_from_u64(2u64);
|
||||
let mut data: Vec<_> = (100..55_000_u64)
|
||||
.map(|num| num + rng.r#gen::<u8>() as u64)
|
||||
.collect();
|
||||
data.push(99_000);
|
||||
data.insert(1000, 2000);
|
||||
data.insert(2000, 100);
|
||||
data.insert(3000, 4100);
|
||||
data.insert(4000, 100);
|
||||
data.insert(5000, 800);
|
||||
data
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let data = get_data();
|
||||
let mut group: InputGroup<(CodecType, Vec<u64>)> = InputGroup::new_with_inputs(vec![
|
||||
(
|
||||
"bitpacked codec".to_string(),
|
||||
(CodecType::Bitpacked, data.clone()),
|
||||
),
|
||||
(
|
||||
"linear codec".to_string(),
|
||||
(CodecType::Linear, data.clone()),
|
||||
),
|
||||
(
|
||||
"blockwise linear codec".to_string(),
|
||||
(CodecType::BlockwiseLinear, data.clone()),
|
||||
),
|
||||
]);
|
||||
|
||||
group.register("serialize column_values", |data| {
|
||||
let mut buffer = Vec::new();
|
||||
serialize_u64_based_column_values(&data.1.as_slice(), &[data.0], &mut buffer).unwrap();
|
||||
black_box(buffer.len());
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
@@ -1,12 +1,9 @@
|
||||
#![feature(test)]
|
||||
extern crate test;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use binggan::{InputGroup, black_box};
|
||||
use rand::prelude::*;
|
||||
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
|
||||
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
|
||||
use tantivy_columnar::*;
|
||||
use test::{black_box, Bencher};
|
||||
|
||||
struct Columns {
|
||||
pub optional: Column,
|
||||
@@ -68,88 +65,45 @@ pub fn serialize_and_load(column: &[u64], codec_type: CodecType) -> Arc<dyn Colu
|
||||
serialize_and_load_u64_based_column_values(&column, &[codec_type])
|
||||
}
|
||||
|
||||
fn run_bench_on_column_full_scan(b: &mut Bencher, column: Column) {
|
||||
let num_iter = black_box(NUM_VALUES);
|
||||
b.iter(|| {
|
||||
fn main() {
|
||||
let Columns {
|
||||
optional,
|
||||
full,
|
||||
multi,
|
||||
} = get_test_columns();
|
||||
|
||||
let inputs = vec![
|
||||
("full".to_string(), full),
|
||||
("optional".to_string(), optional),
|
||||
("multi".to_string(), multi),
|
||||
];
|
||||
|
||||
let mut group = InputGroup::new_with_inputs(inputs);
|
||||
|
||||
group.register("first_full_scan", |column| {
|
||||
let mut sum = 0u64;
|
||||
for i in 0..num_iter as u32 {
|
||||
for i in 0..NUM_VALUES as u32 {
|
||||
let val = column.first(i);
|
||||
sum += val.unwrap_or(0);
|
||||
}
|
||||
sum
|
||||
black_box(sum);
|
||||
});
|
||||
}
|
||||
fn run_bench_on_column_block_fetch(b: &mut Bencher, column: Column) {
|
||||
let mut block: Vec<Option<u64>> = vec![None; 64];
|
||||
let fetch_docids = (0..64).collect::<Vec<_>>();
|
||||
b.iter(move || {
|
||||
|
||||
group.register("first_block_fetch", |column| {
|
||||
let mut block: Vec<Option<u64>> = vec![None; 64];
|
||||
let fetch_docids = (0..64).collect::<Vec<_>>();
|
||||
column.first_vals(&fetch_docids, &mut block);
|
||||
block[0]
|
||||
black_box(block[0]);
|
||||
});
|
||||
}
|
||||
fn run_bench_on_column_block_single_calls(b: &mut Bencher, column: Column) {
|
||||
let mut block: Vec<Option<u64>> = vec![None; 64];
|
||||
let fetch_docids = (0..64).collect::<Vec<_>>();
|
||||
b.iter(move || {
|
||||
|
||||
group.register("first_block_single_calls", |column| {
|
||||
let mut block: Vec<Option<u64>> = vec![None; 64];
|
||||
let fetch_docids = (0..64).collect::<Vec<_>>();
|
||||
for i in 0..fetch_docids.len() {
|
||||
block[i] = column.first(fetch_docids[i]);
|
||||
}
|
||||
block[0]
|
||||
black_box(block[0]);
|
||||
});
|
||||
}
|
||||
|
||||
/// Column first method
|
||||
#[bench]
|
||||
fn bench_get_first_on_full_column_full_scan(b: &mut Bencher) {
|
||||
let column = get_test_columns().full;
|
||||
run_bench_on_column_full_scan(b, column);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_first_on_optional_column_full_scan(b: &mut Bencher) {
|
||||
let column = get_test_columns().optional;
|
||||
run_bench_on_column_full_scan(b, column);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_first_on_multi_column_full_scan(b: &mut Bencher) {
|
||||
let column = get_test_columns().multi;
|
||||
run_bench_on_column_full_scan(b, column);
|
||||
}
|
||||
|
||||
/// Block fetch column accessor
|
||||
#[bench]
|
||||
fn bench_get_block_first_on_optional_column(b: &mut Bencher) {
|
||||
let column = get_test_columns().optional;
|
||||
run_bench_on_column_block_fetch(b, column);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_block_first_on_multi_column(b: &mut Bencher) {
|
||||
let column = get_test_columns().multi;
|
||||
run_bench_on_column_block_fetch(b, column);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_block_first_on_full_column(b: &mut Bencher) {
|
||||
let column = get_test_columns().full;
|
||||
run_bench_on_column_block_fetch(b, column);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_block_first_on_optional_column_single_calls(b: &mut Bencher) {
|
||||
let column = get_test_columns().optional;
|
||||
run_bench_on_column_block_single_calls(b, column);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_block_first_on_multi_column_single_calls(b: &mut Bencher) {
|
||||
let column = get_test_columns().multi;
|
||||
run_bench_on_column_block_single_calls(b, column);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_get_block_first_on_full_column_single_calls(b: &mut Bencher) {
|
||||
let column = get_test_columns().full;
|
||||
run_bench_on_column_block_single_calls(b, column);
|
||||
group.run();
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
pub mod common;
|
||||
|
||||
use binggan::BenchRunner;
|
||||
use common::{generate_columnar_with_name, Card};
|
||||
use common::{Card, generate_columnar_with_name};
|
||||
use tantivy_columnar::*;
|
||||
|
||||
const NUM_DOCS: u32 = 100_000;
|
||||
|
||||
106
columnar/benches/bench_optional_index.rs
Normal file
106
columnar/benches/bench_optional_index.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use binggan::{InputGroup, black_box};
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use tantivy_columnar::column_index::{OptionalIndex, Set};
|
||||
|
||||
const TOTAL_NUM_VALUES: u32 = 1_000_000;
|
||||
|
||||
fn gen_optional_index(fill_ratio: f64) -> OptionalIndex {
|
||||
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
|
||||
let vals: Vec<u32> = (0..TOTAL_NUM_VALUES)
|
||||
.map(|_| rng.gen_bool(fill_ratio))
|
||||
.enumerate()
|
||||
.filter(|(_pos, val)| *val)
|
||||
.map(|(pos, _)| pos as u32)
|
||||
.collect();
|
||||
OptionalIndex::for_test(TOTAL_NUM_VALUES, &vals)
|
||||
}
|
||||
|
||||
fn random_range_iterator(
|
||||
start: u32,
|
||||
end: u32,
|
||||
avg_step_size: u32,
|
||||
avg_deviation: u32,
|
||||
) -> impl Iterator<Item = u32> {
|
||||
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
|
||||
let mut current = start;
|
||||
std::iter::from_fn(move || {
|
||||
current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
|
||||
if current >= end { None } else { Some(current) }
|
||||
})
|
||||
}
|
||||
|
||||
fn n_percent_step_iterator(percent: f32, num_values: u32) -> impl Iterator<Item = u32> {
|
||||
let ratio = percent / 100.0;
|
||||
let step_size = (1f32 / ratio) as u32;
|
||||
let deviation = step_size - 1;
|
||||
random_range_iterator(0, num_values, step_size, deviation)
|
||||
}
|
||||
|
||||
fn walk_over_data(codec: &OptionalIndex, avg_step_size: u32) -> Option<u32> {
|
||||
walk_over_data_from_positions(
|
||||
codec,
|
||||
random_range_iterator(0, TOTAL_NUM_VALUES, avg_step_size, 0),
|
||||
)
|
||||
}
|
||||
|
||||
fn walk_over_data_from_positions(
|
||||
codec: &OptionalIndex,
|
||||
positions: impl Iterator<Item = u32>,
|
||||
) -> Option<u32> {
|
||||
let mut dense_idx: Option<u32> = None;
|
||||
for idx in positions {
|
||||
dense_idx = dense_idx.or(codec.rank_if_exists(idx));
|
||||
}
|
||||
dense_idx
|
||||
}
|
||||
|
||||
fn main() {
|
||||
// Build separate inputs for each fill ratio.
|
||||
let inputs: Vec<(String, OptionalIndex)> = vec![
|
||||
("fill=1%".to_string(), gen_optional_index(0.01)),
|
||||
("fill=5%".to_string(), gen_optional_index(0.05)),
|
||||
("fill=10%".to_string(), gen_optional_index(0.10)),
|
||||
("fill=50%".to_string(), gen_optional_index(0.50)),
|
||||
("fill=90%".to_string(), gen_optional_index(0.90)),
|
||||
];
|
||||
|
||||
let mut group: InputGroup<OptionalIndex> = InputGroup::new_with_inputs(inputs);
|
||||
|
||||
// Translate orig->codec (rank_if_exists) with sampling
|
||||
group.register("orig_to_codec_10pct_hit", |codec: &OptionalIndex| {
|
||||
black_box(walk_over_data(codec, 100));
|
||||
});
|
||||
group.register("orig_to_codec_1pct_hit", |codec: &OptionalIndex| {
|
||||
black_box(walk_over_data(codec, 1000));
|
||||
});
|
||||
group.register("orig_to_codec_full_scan", |codec: &OptionalIndex| {
|
||||
black_box(walk_over_data_from_positions(codec, 0..TOTAL_NUM_VALUES));
|
||||
});
|
||||
|
||||
// Translate codec->orig (select/select_batch) on sampled ranks
|
||||
fn bench_translate_codec_to_orig_util(codec: &OptionalIndex, percent_hit: f32) {
|
||||
let num_non_nulls = codec.num_non_nulls();
|
||||
let idxs: Vec<u32> = if percent_hit == 100.0f32 {
|
||||
(0..num_non_nulls).collect()
|
||||
} else {
|
||||
n_percent_step_iterator(percent_hit, num_non_nulls).collect()
|
||||
};
|
||||
let mut output = vec![0u32; idxs.len()];
|
||||
output.copy_from_slice(&idxs[..]);
|
||||
codec.select_batch(&mut output);
|
||||
black_box(output);
|
||||
}
|
||||
|
||||
group.register("codec_to_orig_0.005pct_hit", |codec: &OptionalIndex| {
|
||||
bench_translate_codec_to_orig_util(codec, 0.005);
|
||||
});
|
||||
group.register("codec_to_orig_10pct_hit", |codec: &OptionalIndex| {
|
||||
bench_translate_codec_to_orig_util(codec, 10.0);
|
||||
});
|
||||
group.register("codec_to_orig_full_scan", |codec: &OptionalIndex| {
|
||||
bench_translate_codec_to_orig_util(codec, 100.0);
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
@@ -1,15 +1,12 @@
|
||||
#![feature(test)]
|
||||
|
||||
use std::ops::RangeInclusive;
|
||||
use std::sync::Arc;
|
||||
|
||||
use binggan::{InputGroup, black_box};
|
||||
use common::OwnedBytes;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::{random, Rng, SeedableRng};
|
||||
use rand::{Rng, SeedableRng, random};
|
||||
use tantivy_columnar::ColumnValues;
|
||||
use test::Bencher;
|
||||
extern crate test;
|
||||
|
||||
// TODO does this make sense for IPv6 ?
|
||||
fn generate_random() -> Vec<u64> {
|
||||
@@ -47,78 +44,77 @@ fn get_data_50percent_item() -> Vec<u128> {
|
||||
}
|
||||
data.push(SINGLE_ITEM);
|
||||
data.shuffle(&mut rng);
|
||||
let data = data.iter().map(|el| *el as u128).collect::<Vec<_>>();
|
||||
data
|
||||
data.iter().map(|el| *el as u128).collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_getrange_u128_50percent_hit(b: &mut Bencher) {
|
||||
fn main() {
|
||||
let data = get_data_50percent_item();
|
||||
let column = get_u128_column_from_data(&data);
|
||||
let column_range = get_u128_column_from_data(&data);
|
||||
let column_random = get_u128_column_random();
|
||||
|
||||
b.iter(|| {
|
||||
struct Inputs {
|
||||
data: Vec<u128>,
|
||||
column_range: Arc<dyn ColumnValues<u128>>,
|
||||
column_random: Arc<dyn ColumnValues<u128>>,
|
||||
}
|
||||
|
||||
let inputs = Inputs {
|
||||
data,
|
||||
column_range,
|
||||
column_random,
|
||||
};
|
||||
let mut group: InputGroup<Inputs> =
|
||||
InputGroup::new_with_inputs(vec![("u128 benches".to_string(), inputs)]);
|
||||
|
||||
group.register(
|
||||
"intfastfield_getrange_u128_50percent_hit",
|
||||
|inp: &Inputs| {
|
||||
let mut positions = Vec::new();
|
||||
inp.column_range.get_row_ids_for_value_range(
|
||||
*FIFTY_PERCENT_RANGE.start() as u128..=*FIFTY_PERCENT_RANGE.end() as u128,
|
||||
0..inp.data.len() as u32,
|
||||
&mut positions,
|
||||
);
|
||||
black_box(positions.len());
|
||||
},
|
||||
);
|
||||
|
||||
group.register("intfastfield_getrange_u128_single_hit", |inp: &Inputs| {
|
||||
let mut positions = Vec::new();
|
||||
column.get_row_ids_for_value_range(
|
||||
*FIFTY_PERCENT_RANGE.start() as u128..=*FIFTY_PERCENT_RANGE.end() as u128,
|
||||
0..data.len() as u32,
|
||||
&mut positions,
|
||||
);
|
||||
positions
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_getrange_u128_single_hit(b: &mut Bencher) {
|
||||
let data = get_data_50percent_item();
|
||||
let column = get_u128_column_from_data(&data);
|
||||
|
||||
b.iter(|| {
|
||||
let mut positions = Vec::new();
|
||||
column.get_row_ids_for_value_range(
|
||||
inp.column_range.get_row_ids_for_value_range(
|
||||
*SINGLE_ITEM_RANGE.start() as u128..=*SINGLE_ITEM_RANGE.end() as u128,
|
||||
0..data.len() as u32,
|
||||
0..inp.data.len() as u32,
|
||||
&mut positions,
|
||||
);
|
||||
positions
|
||||
black_box(positions.len());
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_getrange_u128_hit_all(b: &mut Bencher) {
|
||||
let data = get_data_50percent_item();
|
||||
let column = get_u128_column_from_data(&data);
|
||||
|
||||
b.iter(|| {
|
||||
group.register("intfastfield_getrange_u128_hit_all", |inp: &Inputs| {
|
||||
let mut positions = Vec::new();
|
||||
column.get_row_ids_for_value_range(0..=u128::MAX, 0..data.len() as u32, &mut positions);
|
||||
positions
|
||||
inp.column_range.get_row_ids_for_value_range(
|
||||
0..=u128::MAX,
|
||||
0..inp.data.len() as u32,
|
||||
&mut positions,
|
||||
);
|
||||
black_box(positions.len());
|
||||
});
|
||||
}
|
||||
// U128 RANGE END
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_scan_all_fflookup_u128(b: &mut Bencher) {
|
||||
let column = get_u128_column_random();
|
||||
|
||||
b.iter(|| {
|
||||
group.register("intfastfield_scan_all_fflookup_u128", |inp: &Inputs| {
|
||||
let mut a = 0u128;
|
||||
for i in 0u64..column.num_vals() as u64 {
|
||||
a += column.get_val(i as u32);
|
||||
for i in 0u64..inp.column_random.num_vals() as u64 {
|
||||
a += inp.column_random.get_val(i as u32);
|
||||
}
|
||||
a
|
||||
black_box(a);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_jumpy_stride5_u128(b: &mut Bencher) {
|
||||
let column = get_u128_column_random();
|
||||
|
||||
b.iter(|| {
|
||||
let n = column.num_vals();
|
||||
group.register("intfastfield_jumpy_stride5_u128", |inp: &Inputs| {
|
||||
let n = inp.column_random.num_vals();
|
||||
let mut a = 0u128;
|
||||
for i in (0..n / 5).map(|val| val * 5) {
|
||||
a += column.get_val(i);
|
||||
a += inp.column_random.get_val(i);
|
||||
}
|
||||
a
|
||||
black_box(a);
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
#![feature(test)]
|
||||
extern crate test;
|
||||
|
||||
use std::ops::RangeInclusive;
|
||||
use std::sync::Arc;
|
||||
|
||||
use binggan::{InputGroup, black_box};
|
||||
use rand::prelude::*;
|
||||
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
|
||||
use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_column_values};
|
||||
use tantivy_columnar::*;
|
||||
use test::Bencher;
|
||||
|
||||
// Warning: this generates the same permutation at each call
|
||||
fn generate_permutation() -> Vec<u64> {
|
||||
@@ -27,37 +24,11 @@ pub fn serialize_and_load(column: &[u64], codec_type: CodecType) -> Arc<dyn Colu
|
||||
serialize_and_load_u64_based_column_values(&column, &[codec_type])
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) {
|
||||
let permutation = generate_permutation();
|
||||
let n = permutation.len();
|
||||
b.iter(|| {
|
||||
let mut a = 0u64;
|
||||
for _ in 0..n {
|
||||
a = permutation[a as usize];
|
||||
}
|
||||
a
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_jumpy_fflookup_bitpacked(b: &mut Bencher) {
|
||||
let permutation = generate_permutation();
|
||||
let n = permutation.len();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
|
||||
b.iter(|| {
|
||||
let mut a = 0u64;
|
||||
for _ in 0..n {
|
||||
a = column.get_val(a as u32);
|
||||
}
|
||||
a
|
||||
});
|
||||
}
|
||||
|
||||
const FIFTY_PERCENT_RANGE: RangeInclusive<u64> = 1..=50;
|
||||
const SINGLE_ITEM: u64 = 90;
|
||||
const SINGLE_ITEM_RANGE: RangeInclusive<u64> = 90..=90;
|
||||
const ONE_PERCENT_ITEM_RANGE: RangeInclusive<u64> = 49..=49;
|
||||
|
||||
fn get_data_50percent_item() -> Vec<u128> {
|
||||
let mut rng = StdRng::from_seed([1u8; 32]);
|
||||
|
||||
@@ -69,135 +40,122 @@ fn get_data_50percent_item() -> Vec<u128> {
|
||||
data.push(SINGLE_ITEM);
|
||||
|
||||
data.shuffle(&mut rng);
|
||||
let data = data.iter().map(|el| *el as u128).collect::<Vec<_>>();
|
||||
data
|
||||
data.iter().map(|el| *el as u128).collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
// U64 RANGE START
|
||||
#[bench]
|
||||
fn bench_intfastfield_getrange_u64_50percent_hit(b: &mut Bencher) {
|
||||
let data = get_data_50percent_item();
|
||||
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
|
||||
b.iter(|| {
|
||||
let mut positions = Vec::new();
|
||||
column.get_row_ids_for_value_range(
|
||||
FIFTY_PERCENT_RANGE,
|
||||
0..data.len() as u32,
|
||||
&mut positions,
|
||||
);
|
||||
positions
|
||||
});
|
||||
}
|
||||
type VecCol = (Vec<u64>, Arc<dyn ColumnValues<u64>>);
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_getrange_u64_1percent_hit(b: &mut Bencher) {
|
||||
let data = get_data_50percent_item();
|
||||
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
|
||||
|
||||
b.iter(|| {
|
||||
let mut positions = Vec::new();
|
||||
column.get_row_ids_for_value_range(
|
||||
ONE_PERCENT_ITEM_RANGE,
|
||||
0..data.len() as u32,
|
||||
&mut positions,
|
||||
);
|
||||
positions
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_getrange_u64_single_hit(b: &mut Bencher) {
|
||||
let data = get_data_50percent_item();
|
||||
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
|
||||
|
||||
b.iter(|| {
|
||||
let mut positions = Vec::new();
|
||||
column.get_row_ids_for_value_range(SINGLE_ITEM_RANGE, 0..data.len() as u32, &mut positions);
|
||||
positions
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_getrange_u64_hit_all(b: &mut Bencher) {
|
||||
let data = get_data_50percent_item();
|
||||
let data = data.iter().map(|el| *el as u64).collect::<Vec<_>>();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&data, CodecType::Bitpacked);
|
||||
|
||||
b.iter(|| {
|
||||
let mut positions = Vec::new();
|
||||
column.get_row_ids_for_value_range(0..=u64::MAX, 0..data.len() as u32, &mut positions);
|
||||
positions
|
||||
});
|
||||
}
|
||||
// U64 RANGE END
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_stride7_vec(b: &mut Bencher) {
|
||||
fn bench_access() {
|
||||
let permutation = generate_permutation();
|
||||
let n = permutation.len();
|
||||
b.iter(|| {
|
||||
let column_perm: Arc<dyn ColumnValues<u64>> =
|
||||
serialize_and_load(&permutation, CodecType::Bitpacked);
|
||||
|
||||
let permutation_gcd = generate_permutation_gcd();
|
||||
let column_perm_gcd: Arc<dyn ColumnValues<u64>> =
|
||||
serialize_and_load(&permutation_gcd, CodecType::Bitpacked);
|
||||
|
||||
let mut group: InputGroup<VecCol> = InputGroup::new_with_inputs(vec![
|
||||
(
|
||||
"access".to_string(),
|
||||
(permutation.clone(), column_perm.clone()),
|
||||
),
|
||||
(
|
||||
"access_gcd".to_string(),
|
||||
(permutation_gcd.clone(), column_perm_gcd.clone()),
|
||||
),
|
||||
]);
|
||||
|
||||
group.register("stride7_vec", |inp: &VecCol| {
|
||||
let n = inp.0.len();
|
||||
let mut a = 0u64;
|
||||
for i in (0..n / 7).map(|val| val * 7) {
|
||||
a += permutation[i as usize];
|
||||
a += inp.0[i];
|
||||
}
|
||||
a
|
||||
black_box(a);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) {
|
||||
let permutation = generate_permutation();
|
||||
let n = permutation.len();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
|
||||
b.iter(|| {
|
||||
let mut a = 0;
|
||||
group.register("fullscan_vec", |inp: &VecCol| {
|
||||
let mut a = 0u64;
|
||||
for i in 0..inp.0.len() {
|
||||
a += inp.0[i];
|
||||
}
|
||||
black_box(a);
|
||||
});
|
||||
|
||||
group.register("stride7_column_values", |inp: &VecCol| {
|
||||
let n = inp.1.num_vals() as usize;
|
||||
let mut a = 0u64;
|
||||
for i in (0..n / 7).map(|val| val * 7) {
|
||||
a += column.get_val(i as u32);
|
||||
a += inp.1.get_val(i as u32);
|
||||
}
|
||||
a
|
||||
black_box(a);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) {
|
||||
let permutation = generate_permutation();
|
||||
let n = permutation.len();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
|
||||
let column_ref = column.as_ref();
|
||||
b.iter(|| {
|
||||
let mut a = 0u64;
|
||||
for i in 0u32..n as u32 {
|
||||
a += column_ref.get_val(i);
|
||||
}
|
||||
a
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) {
|
||||
let permutation = generate_permutation_gcd();
|
||||
let n = permutation.len();
|
||||
let column: Arc<dyn ColumnValues<u64>> = serialize_and_load(&permutation, CodecType::Bitpacked);
|
||||
b.iter(|| {
|
||||
group.register("fullscan_column_values", |inp: &VecCol| {
|
||||
let mut a = 0u64;
|
||||
let n = inp.1.num_vals() as usize;
|
||||
for i in 0..n {
|
||||
a += column.get_val(i as u32);
|
||||
a += inp.1.get_val(i as u32);
|
||||
}
|
||||
a
|
||||
black_box(a);
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_scan_all_vec(b: &mut Bencher) {
|
||||
let permutation = generate_permutation();
|
||||
b.iter(|| {
|
||||
let mut a = 0u64;
|
||||
for i in 0..permutation.len() {
|
||||
a += permutation[i as usize] as u64;
|
||||
}
|
||||
a
|
||||
});
|
||||
fn bench_range() {
|
||||
let data_50 = get_data_50percent_item();
|
||||
let data_u64 = data_50.iter().map(|el| *el as u64).collect::<Vec<_>>();
|
||||
let column_data: Arc<dyn ColumnValues<u64>> =
|
||||
serialize_and_load(&data_u64, CodecType::Bitpacked);
|
||||
|
||||
let mut group: InputGroup<Arc<dyn ColumnValues<u64>>> =
|
||||
InputGroup::new_with_inputs(vec![("dist_50pct_item".to_string(), column_data.clone())]);
|
||||
|
||||
group.register(
|
||||
"fastfield_getrange_u64_50percent_hit",
|
||||
|col: &Arc<dyn ColumnValues<u64>>| {
|
||||
let mut positions = Vec::new();
|
||||
col.get_row_ids_for_value_range(FIFTY_PERCENT_RANGE, 0..col.num_vals(), &mut positions);
|
||||
black_box(positions.len());
|
||||
},
|
||||
);
|
||||
|
||||
group.register(
|
||||
"fastfield_getrange_u64_1percent_hit",
|
||||
|col: &Arc<dyn ColumnValues<u64>>| {
|
||||
let mut positions = Vec::new();
|
||||
col.get_row_ids_for_value_range(
|
||||
ONE_PERCENT_ITEM_RANGE,
|
||||
0..col.num_vals(),
|
||||
&mut positions,
|
||||
);
|
||||
black_box(positions.len());
|
||||
},
|
||||
);
|
||||
|
||||
group.register(
|
||||
"fastfield_getrange_u64_single_hit",
|
||||
|col: &Arc<dyn ColumnValues<u64>>| {
|
||||
let mut positions = Vec::new();
|
||||
col.get_row_ids_for_value_range(SINGLE_ITEM_RANGE, 0..col.num_vals(), &mut positions);
|
||||
black_box(positions.len());
|
||||
},
|
||||
);
|
||||
|
||||
group.register(
|
||||
"fastfield_getrange_u64_hit_all",
|
||||
|col: &Arc<dyn ColumnValues<u64>>| {
|
||||
let mut positions = Vec::new();
|
||||
col.get_row_ids_for_value_range(0..=u64::MAX, 0..col.num_vals(), &mut positions);
|
||||
black_box(positions.len());
|
||||
},
|
||||
);
|
||||
|
||||
group.run();
|
||||
}
|
||||
|
||||
fn main() {
|
||||
bench_access();
|
||||
bench_range();
|
||||
}
|
||||
|
||||
18
columnar/columnar-cli-inspect/Cargo.toml
Normal file
18
columnar/columnar-cli-inspect/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "tantivy-columnar-inspect"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
tantivy = {path="../..", package="tantivy"}
|
||||
columnar = {path="../", package="tantivy-columnar"}
|
||||
common = {path="../../common", package="tantivy-common"}
|
||||
|
||||
[workspace]
|
||||
members = []
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
#debug-assertions = true
|
||||
#overflow-checks = true
|
||||
54
columnar/columnar-cli-inspect/src/main.rs
Normal file
54
columnar/columnar-cli-inspect/src/main.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use columnar::ColumnarReader;
|
||||
use common::file_slice::{FileSlice, WrapFile};
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use tantivy::directory::footer::Footer;
|
||||
|
||||
fn main() -> io::Result<()> {
|
||||
println!("Opens a columnar file written by tantivy and validates it.");
|
||||
let path = std::env::args().nth(1).unwrap();
|
||||
|
||||
let path = Path::new(&path);
|
||||
println!("Reading {:?}", path);
|
||||
let _reader = open_and_validate_columnar(path.to_str().unwrap())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn validate_columnar_reader(reader: &ColumnarReader) {
|
||||
let num_rows = reader.num_rows();
|
||||
println!("num_rows: {}", num_rows);
|
||||
let columns = reader.list_columns().unwrap();
|
||||
println!("num columns: {:?}", columns.len());
|
||||
for (col_name, dynamic_column_handle) in columns {
|
||||
let col = dynamic_column_handle.open().unwrap();
|
||||
match col {
|
||||
columnar::DynamicColumn::Bool(_)
|
||||
| columnar::DynamicColumn::I64(_)
|
||||
| columnar::DynamicColumn::U64(_)
|
||||
| columnar::DynamicColumn::F64(_)
|
||||
| columnar::DynamicColumn::IpAddr(_)
|
||||
| columnar::DynamicColumn::DateTime(_)
|
||||
| columnar::DynamicColumn::Bytes(_) => {}
|
||||
columnar::DynamicColumn::Str(str_column) => {
|
||||
let num_vals = str_column.ords().values.num_vals();
|
||||
let num_terms_dict = str_column.num_terms() as u64;
|
||||
let max_ord = str_column.ords().values.iter().max().unwrap_or_default();
|
||||
println!("{col_name:35} num_vals {num_vals:10} \t num_terms_dict {num_terms_dict:8} max_ord: {max_ord:8}",);
|
||||
for ord in str_column.ords().values.iter() {
|
||||
assert!(ord < num_terms_dict);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens a columnar file that was written by tantivy and validates it.
|
||||
pub fn open_and_validate_columnar(path: &str) -> io::Result<ColumnarReader> {
|
||||
let wrap_file = WrapFile::new(std::fs::File::open(path)?)?;
|
||||
let slice = FileSlice::new(std::sync::Arc::new(wrap_file));
|
||||
let (_footer, slice) = Footer::extract_footer(slice.clone()).unwrap();
|
||||
let reader = ColumnarReader::open(slice).unwrap();
|
||||
validate_columnar_reader(&reader);
|
||||
Ok(reader)
|
||||
}
|
||||
@@ -66,7 +66,7 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
|
||||
&'a self,
|
||||
docs: &'a [u32],
|
||||
accessor: &Column<T>,
|
||||
) -> impl Iterator<Item = (DocId, T)> + 'a {
|
||||
) -> impl Iterator<Item = (DocId, T)> + 'a + use<'a, T> {
|
||||
if accessor.index.get_cardinality().is_full() {
|
||||
docs.iter().cloned().zip(self.val_cache.iter().cloned())
|
||||
} else {
|
||||
@@ -139,7 +139,7 @@ mod tests {
|
||||
missing_docs.push(missing_doc);
|
||||
});
|
||||
|
||||
assert_eq!(missing_docs, vec![]);
|
||||
assert_eq!(missing_docs, Vec::<u32>::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::{fmt, io};
|
||||
|
||||
use sstable::{Dictionary, VoidSSTable};
|
||||
|
||||
use crate::column::Column;
|
||||
use crate::RowId;
|
||||
use crate::column::Column;
|
||||
|
||||
/// Dictionary encoded column.
|
||||
///
|
||||
|
||||
@@ -9,13 +9,14 @@ use std::sync::Arc;
|
||||
use common::BinarySerializable;
|
||||
pub use dictionary_encoded::{BytesColumn, StrColumn};
|
||||
pub use serialize::{
|
||||
open_column_bytes, open_column_str, open_column_u128, open_column_u128_as_compact_u64,
|
||||
open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
|
||||
open_column_bytes, open_column_str, open_column_u64, open_column_u128,
|
||||
open_column_u128_as_compact_u64, serialize_column_mappable_to_u64,
|
||||
serialize_column_mappable_to_u128,
|
||||
};
|
||||
|
||||
use crate::column_index::{ColumnIndex, Set};
|
||||
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
|
||||
use crate::column_values::{monotonic_map_column, ColumnValues};
|
||||
use crate::column_values::{ColumnValues, monotonic_map_column};
|
||||
use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -113,7 +114,7 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Translates a block of docis to row_ids.
|
||||
/// Translates a block of docids to row_ids.
|
||||
///
|
||||
/// returns the row_ids and the matching docids on the same index
|
||||
/// e.g.
|
||||
|
||||
@@ -6,10 +6,10 @@ use common::OwnedBytes;
|
||||
use sstable::Dictionary;
|
||||
|
||||
use crate::column::{BytesColumn, Column};
|
||||
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
|
||||
use crate::column_index::{SerializableColumnIndex, serialize_column_index};
|
||||
use crate::column_values::{
|
||||
CodecType, MonotonicallyMappableToU64, MonotonicallyMappableToU128,
|
||||
load_u64_based_column_values, serialize_column_values_u128, serialize_u64_based_column_values,
|
||||
CodecType, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
|
||||
};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{StrColumn, Version};
|
||||
|
||||
@@ -99,9 +99,9 @@ mod tests {
|
||||
|
||||
use crate::column_index::merge::detect_cardinality;
|
||||
use crate::column_index::multivalued_index::{
|
||||
open_multivalued_index, serialize_multivalued_index, MultiValueIndex,
|
||||
MultiValueIndex, open_multivalued_index, serialize_multivalued_index,
|
||||
};
|
||||
use crate::column_index::{merge_column_index, OptionalIndex, SerializableColumnIndex};
|
||||
use crate::column_index::{OptionalIndex, SerializableColumnIndex, merge_column_index};
|
||||
use crate::{
|
||||
Cardinality, ColumnIndex, MergeRowOrder, RowAddr, RowId, ShuffleMergeOrder, StackMergeOrder,
|
||||
};
|
||||
|
||||
@@ -58,7 +58,7 @@ struct ShuffledIndex<'a> {
|
||||
merge_order: &'a ShuffleMergeOrder,
|
||||
}
|
||||
|
||||
impl<'a> Iterable<u32> for ShuffledIndex<'a> {
|
||||
impl Iterable<u32> for ShuffledIndex<'_> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
|
||||
Box::new(
|
||||
self.merge_order
|
||||
@@ -127,7 +127,7 @@ fn integrate_num_vals(num_vals: impl Iterator<Item = u32>) -> impl Iterator<Item
|
||||
)
|
||||
}
|
||||
|
||||
impl<'a> Iterable<u32> for ShuffledMultivaluedIndex<'a> {
|
||||
impl Iterable<u32> for ShuffledMultivaluedIndex<'_> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
|
||||
let num_vals_per_row = iter_num_values(self.column_indexes, self.merge_order);
|
||||
Box::new(integrate_num_vals(num_vals_per_row))
|
||||
@@ -137,8 +137,8 @@ impl<'a> Iterable<u32> for ShuffledMultivaluedIndex<'a> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::column_index::OptionalIndex;
|
||||
use crate::RowAddr;
|
||||
use crate::column_index::OptionalIndex;
|
||||
|
||||
#[test]
|
||||
fn test_integrate_num_vals_empty() {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use crate::column_index::SerializableColumnIndex;
|
||||
use crate::column_index::multivalued_index::{MultiValueIndex, SerializableMultivalueIndex};
|
||||
use crate::column_index::serialize::SerializableOptionalIndex;
|
||||
use crate::column_index::SerializableColumnIndex;
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder};
|
||||
|
||||
@@ -56,7 +56,7 @@ fn get_doc_ids_with_values<'a>(
|
||||
ColumnIndex::Full => Box::new(doc_range),
|
||||
ColumnIndex::Optional(optional_index) => Box::new(
|
||||
optional_index
|
||||
.iter_rows()
|
||||
.iter_docs()
|
||||
.map(move |row| row + doc_range.start),
|
||||
),
|
||||
ColumnIndex::Multivalued(multivalued_index) => match multivalued_index {
|
||||
@@ -73,7 +73,7 @@ fn get_doc_ids_with_values<'a>(
|
||||
MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new(
|
||||
multivalued_index
|
||||
.optional_index
|
||||
.iter_rows()
|
||||
.iter_docs()
|
||||
.map(move |row| row + doc_range.start),
|
||||
),
|
||||
},
|
||||
@@ -123,7 +123,7 @@ fn get_num_values_iterator<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterable<u32> for StackedStartOffsets<'a> {
|
||||
impl Iterable<u32> for StackedStartOffsets<'_> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
|
||||
let num_values_it = (0..self.column_indexes.len()).flat_map(|columnar_id| {
|
||||
let num_docs = self.stack_merge_order.columnar_range(columnar_id).len() as u32;
|
||||
@@ -177,7 +177,7 @@ impl<'a> Iterable<RowId> for StackedOptionalIndex<'a> {
|
||||
ColumnIndex::Full => Box::new(columnar_row_range),
|
||||
ColumnIndex::Optional(optional_index) => Box::new(
|
||||
optional_index
|
||||
.iter_rows()
|
||||
.iter_docs()
|
||||
.map(move |row_id: RowId| columnar_row_range.start + row_id),
|
||||
),
|
||||
ColumnIndex::Multivalued(_) => {
|
||||
|
||||
@@ -14,7 +14,7 @@ pub use merge::merge_column_index;
|
||||
pub(crate) use multivalued_index::SerializableMultivalueIndex;
|
||||
pub use optional_index::{OptionalIndex, Set};
|
||||
pub use serialize::{
|
||||
open_column_index, serialize_column_index, SerializableColumnIndex, SerializableOptionalIndex,
|
||||
SerializableColumnIndex, SerializableOptionalIndex, open_column_index, serialize_column_index,
|
||||
};
|
||||
|
||||
use crate::column_index::multivalued_index::MultiValueIndex;
|
||||
|
||||
@@ -8,7 +8,7 @@ use common::{CountingWriter, OwnedBytes};
|
||||
use super::optional_index::{open_optional_index, serialize_optional_index};
|
||||
use super::{OptionalIndex, SerializableOptionalIndex, Set};
|
||||
use crate::column_values::{
|
||||
load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues,
|
||||
CodecType, ColumnValues, load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{DocId, RowId, Version};
|
||||
|
||||
@@ -7,7 +7,7 @@ mod set_block;
|
||||
use common::{BinarySerializable, OwnedBytes, VInt};
|
||||
pub use set::{SelectCursor, Set, SetCodec};
|
||||
use set_block::{
|
||||
DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES,
|
||||
DENSE_BLOCK_NUM_BYTES, DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec,
|
||||
};
|
||||
|
||||
use crate::iterable::Iterable;
|
||||
@@ -80,23 +80,23 @@ impl BlockVariant {
|
||||
/// index is the block index. For each block `byte_start` and `offset` is computed.
|
||||
#[derive(Clone)]
|
||||
pub struct OptionalIndex {
|
||||
num_rows: RowId,
|
||||
num_non_null_rows: RowId,
|
||||
num_docs: RowId,
|
||||
num_non_null_docs: RowId,
|
||||
block_data: OwnedBytes,
|
||||
block_metas: Arc<[BlockMeta]>,
|
||||
}
|
||||
|
||||
impl<'a> Iterable<u32> for &'a OptionalIndex {
|
||||
impl Iterable<u32> for &OptionalIndex {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u32> + '_> {
|
||||
Box::new(self.iter_rows())
|
||||
Box::new(self.iter_docs())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for OptionalIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.debug_struct("OptionalIndex")
|
||||
.field("num_rows", &self.num_rows)
|
||||
.field("num_non_null_rows", &self.num_non_null_rows)
|
||||
.field("num_docs", &self.num_docs)
|
||||
.field("num_non_null_docs", &self.num_non_null_docs)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
@@ -123,7 +123,7 @@ enum BlockSelectCursor<'a> {
|
||||
Sparse(<SparseBlock<'a> as Set<u16>>::SelectCursor<'a>),
|
||||
}
|
||||
|
||||
impl<'a> BlockSelectCursor<'a> {
|
||||
impl BlockSelectCursor<'_> {
|
||||
fn select(&mut self, rank: u16) -> u16 {
|
||||
match self {
|
||||
BlockSelectCursor::Dense(dense_select_cursor) => dense_select_cursor.select(rank),
|
||||
@@ -141,7 +141,7 @@ pub struct OptionalIndexSelectCursor<'a> {
|
||||
num_null_rows_before_block: RowId,
|
||||
}
|
||||
|
||||
impl<'a> OptionalIndexSelectCursor<'a> {
|
||||
impl OptionalIndexSelectCursor<'_> {
|
||||
fn search_and_load_block(&mut self, rank: RowId) {
|
||||
if rank < self.current_block_end_rank {
|
||||
// we are already in the right block
|
||||
@@ -165,7 +165,7 @@ impl<'a> OptionalIndexSelectCursor<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> SelectCursor<RowId> for OptionalIndexSelectCursor<'a> {
|
||||
impl SelectCursor<RowId> for OptionalIndexSelectCursor<'_> {
|
||||
fn select(&mut self, rank: RowId) -> RowId {
|
||||
self.search_and_load_block(rank);
|
||||
let index_in_block = (rank - self.num_null_rows_before_block) as u16;
|
||||
@@ -259,11 +259,13 @@ impl Set<RowId> for OptionalIndex {
|
||||
|
||||
impl OptionalIndex {
|
||||
pub fn for_test(num_rows: RowId, row_ids: &[RowId]) -> OptionalIndex {
|
||||
assert!(row_ids
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_row_id| last_row_id < num_rows)
|
||||
.unwrap_or(true));
|
||||
assert!(
|
||||
row_ids
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_row_id| last_row_id < num_rows)
|
||||
.unwrap_or(true)
|
||||
);
|
||||
let mut buffer = Vec::new();
|
||||
serialize_optional_index(&row_ids, num_rows, &mut buffer).unwrap();
|
||||
let bytes = OwnedBytes::new(buffer);
|
||||
@@ -271,17 +273,17 @@ impl OptionalIndex {
|
||||
}
|
||||
|
||||
pub fn num_docs(&self) -> RowId {
|
||||
self.num_rows
|
||||
self.num_docs
|
||||
}
|
||||
|
||||
pub fn num_non_nulls(&self) -> RowId {
|
||||
self.num_non_null_rows
|
||||
self.num_non_null_docs
|
||||
}
|
||||
|
||||
pub fn iter_rows(&self) -> impl Iterator<Item = RowId> + '_ {
|
||||
pub fn iter_docs(&self) -> impl Iterator<Item = RowId> + '_ {
|
||||
// TODO optimize
|
||||
let mut select_batch = self.select_cursor();
|
||||
(0..self.num_non_null_rows).map(move |rank| select_batch.select(rank))
|
||||
(0..self.num_non_null_docs).map(move |rank| select_batch.select(rank))
|
||||
}
|
||||
pub fn select_batch(&self, ranks: &mut [RowId]) {
|
||||
let mut select_cursor = self.select_cursor();
|
||||
@@ -505,7 +507,7 @@ fn deserialize_optional_index_block_metadatas(
|
||||
non_null_rows_before_block += num_non_null_rows;
|
||||
}
|
||||
block_metas.resize(
|
||||
((num_rows + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK) as usize,
|
||||
num_rows.div_ceil(ELEMENTS_PER_BLOCK) as usize,
|
||||
BlockMeta {
|
||||
non_null_rows_before_block,
|
||||
start_byte_offset,
|
||||
@@ -519,15 +521,15 @@ pub fn open_optional_index(bytes: OwnedBytes) -> io::Result<OptionalIndex> {
|
||||
let (mut bytes, num_non_empty_blocks_bytes) = bytes.rsplit(2);
|
||||
let num_non_empty_block_bytes =
|
||||
u16::from_le_bytes(num_non_empty_blocks_bytes.as_slice().try_into().unwrap());
|
||||
let num_rows = VInt::deserialize_u64(&mut bytes)? as u32;
|
||||
let num_docs = VInt::deserialize_u64(&mut bytes)? as u32;
|
||||
let block_metas_num_bytes =
|
||||
num_non_empty_block_bytes as usize * SERIALIZED_BLOCK_META_NUM_BYTES;
|
||||
let (block_data, block_metas) = bytes.rsplit(block_metas_num_bytes);
|
||||
let (block_metas, num_non_null_rows) =
|
||||
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_rows);
|
||||
let (block_metas, num_non_null_docs) =
|
||||
deserialize_optional_index_block_metadatas(block_metas.as_slice(), num_docs);
|
||||
let optional_index = OptionalIndex {
|
||||
num_rows,
|
||||
num_non_null_rows,
|
||||
num_docs,
|
||||
num_non_null_docs,
|
||||
block_data,
|
||||
block_metas: block_metas.into(),
|
||||
};
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::io::{self, Write};
|
||||
|
||||
use common::BinarySerializable;
|
||||
|
||||
use crate::column_index::optional_index::{SelectCursor, Set, SetCodec, ELEMENTS_PER_BLOCK};
|
||||
use crate::column_index::optional_index::{ELEMENTS_PER_BLOCK, SelectCursor, Set, SetCodec};
|
||||
|
||||
#[inline(always)]
|
||||
fn get_bit_at(input: u64, n: u16) -> bool {
|
||||
@@ -23,7 +23,6 @@ fn set_bit_at(input: &mut u64, n: u16) {
|
||||
///
|
||||
/// When translating a dense index to the original index, we can use the offset to find the correct
|
||||
/// block. Direct computation is not possible, but we can employ a linear or binary search.
|
||||
|
||||
const ELEMENTS_PER_MINI_BLOCK: u16 = 64;
|
||||
const MINI_BLOCK_BITVEC_NUM_BYTES: usize = 8;
|
||||
const MINI_BLOCK_OFFSET_NUM_BYTES: usize = 2;
|
||||
@@ -109,7 +108,7 @@ pub struct DenseBlockSelectCursor<'a> {
|
||||
dense_block: DenseBlock<'a>,
|
||||
}
|
||||
|
||||
impl<'a> SelectCursor<u16> for DenseBlockSelectCursor<'a> {
|
||||
impl SelectCursor<u16> for DenseBlockSelectCursor<'_> {
|
||||
#[inline]
|
||||
fn select(&mut self, rank: u16) -> u16 {
|
||||
self.block_id = self
|
||||
@@ -175,7 +174,7 @@ impl<'a> Set<u16> for DenseBlock<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> DenseBlock<'a> {
|
||||
impl DenseBlock<'_> {
|
||||
#[inline]
|
||||
fn mini_block(&self, mini_block_id: u16) -> DenseMiniBlock {
|
||||
let data_start_pos = mini_block_id as usize * MINI_BLOCK_NUM_BYTES;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
mod dense;
|
||||
mod sparse;
|
||||
|
||||
pub use dense::{DenseBlock, DenseBlockCodec, DENSE_BLOCK_NUM_BYTES};
|
||||
pub use dense::{DENSE_BLOCK_NUM_BYTES, DenseBlock, DenseBlockCodec};
|
||||
pub use sparse::{SparseBlock, SparseBlockCodec};
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -31,7 +31,7 @@ impl<'a> SelectCursor<u16> for SparseBlock<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Set<u16> for SparseBlock<'a> {
|
||||
impl Set<u16> for SparseBlock<'_> {
|
||||
type SelectCursor<'b>
|
||||
= Self
|
||||
where Self: 'b;
|
||||
@@ -69,7 +69,7 @@ fn get_u16(data: &[u8], byte_position: usize) -> u16 {
|
||||
u16::from_le_bytes(bytes)
|
||||
}
|
||||
|
||||
impl<'a> SparseBlock<'a> {
|
||||
impl SparseBlock<'_> {
|
||||
#[inline(always)]
|
||||
fn value_at_idx(&self, data: &[u8], idx: u16) -> u16 {
|
||||
let start_offset: usize = idx as usize * 2;
|
||||
|
||||
@@ -164,7 +164,7 @@ fn test_optional_index_large() {
|
||||
fn test_optional_index_iter_aux(row_ids: &[RowId], num_rows: RowId) {
|
||||
let optional_index = OptionalIndex::for_test(num_rows, row_ids);
|
||||
assert_eq!(optional_index.num_docs(), num_rows);
|
||||
assert!(optional_index.iter_rows().eq(row_ids.iter().copied()));
|
||||
assert!(optional_index.iter_docs().eq(row_ids.iter().copied()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -219,174 +219,3 @@ fn test_optional_index_for_tests() {
|
||||
assert!(!optional_index.contains(3));
|
||||
assert_eq!(optional_index.num_docs(), 4);
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use test::Bencher;
|
||||
|
||||
use super::*;
|
||||
|
||||
const TOTAL_NUM_VALUES: u32 = 1_000_000;
|
||||
fn gen_bools(fill_ratio: f64) -> OptionalIndex {
|
||||
let mut out = Vec::new();
|
||||
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
|
||||
let vals: Vec<RowId> = (0..TOTAL_NUM_VALUES)
|
||||
.map(|_| rng.gen_bool(fill_ratio))
|
||||
.enumerate()
|
||||
.filter(|(_pos, val)| *val)
|
||||
.map(|(pos, _)| pos as RowId)
|
||||
.collect();
|
||||
serialize_optional_index(&&vals[..], TOTAL_NUM_VALUES, &mut out).unwrap();
|
||||
|
||||
open_optional_index(OwnedBytes::new(out)).unwrap()
|
||||
}
|
||||
|
||||
fn random_range_iterator(
|
||||
start: u32,
|
||||
end: u32,
|
||||
avg_step_size: u32,
|
||||
avg_deviation: u32,
|
||||
) -> impl Iterator<Item = u32> {
|
||||
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
|
||||
let mut current = start;
|
||||
std::iter::from_fn(move || {
|
||||
current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
|
||||
if current >= end {
|
||||
None
|
||||
} else {
|
||||
Some(current)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn n_percent_step_iterator(percent: f32, num_values: u32) -> impl Iterator<Item = u32> {
|
||||
let ratio = percent / 100.0;
|
||||
let step_size = (1f32 / ratio) as u32;
|
||||
let deviation = step_size - 1;
|
||||
random_range_iterator(0, num_values, step_size, deviation)
|
||||
}
|
||||
|
||||
fn walk_over_data(codec: &OptionalIndex, avg_step_size: u32) -> Option<u32> {
|
||||
walk_over_data_from_positions(
|
||||
codec,
|
||||
random_range_iterator(0, TOTAL_NUM_VALUES, avg_step_size, 0),
|
||||
)
|
||||
}
|
||||
|
||||
fn walk_over_data_from_positions(
|
||||
codec: &OptionalIndex,
|
||||
positions: impl Iterator<Item = u32>,
|
||||
) -> Option<u32> {
|
||||
let mut dense_idx: Option<u32> = None;
|
||||
for idx in positions {
|
||||
dense_idx = dense_idx.or(codec.rank_if_exists(idx));
|
||||
}
|
||||
dense_idx
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_1percent_filled_10percent_hit(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.01f64);
|
||||
bench.iter(|| walk_over_data(&codec, 100));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_5percent_filled_10percent_hit(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.05f64);
|
||||
bench.iter(|| walk_over_data(&codec, 100));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_5percent_filled_1percent_hit(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.05f64);
|
||||
bench.iter(|| walk_over_data(&codec, 1000));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_full_scan_1percent_filled(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.01f64);
|
||||
bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_full_scan_10percent_filled(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.1f64);
|
||||
bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_full_scan_90percent_filled(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.9f64);
|
||||
bench.iter(|| walk_over_data_from_positions(&codec, 0..TOTAL_NUM_VALUES));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_10percent_filled_1percent_hit(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.1f64);
|
||||
bench.iter(|| walk_over_data(&codec, 100));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_50percent_filled_1percent_hit(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.5f64);
|
||||
bench.iter(|| walk_over_data(&codec, 100));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_orig_to_codec_90percent_filled_1percent_hit(bench: &mut Bencher) {
|
||||
let codec = gen_bools(0.9f64);
|
||||
bench.iter(|| walk_over_data(&codec, 100));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_codec_to_orig_1percent_filled_0comma005percent_hit(bench: &mut Bencher) {
|
||||
bench_translate_codec_to_orig_util(0.01f64, 0.005f32, bench);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_codec_to_orig_10percent_filled_0comma005percent_hit(bench: &mut Bencher) {
|
||||
bench_translate_codec_to_orig_util(0.1f64, 0.005f32, bench);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_codec_to_orig_1percent_filled_10percent_hit(bench: &mut Bencher) {
|
||||
bench_translate_codec_to_orig_util(0.01f64, 10f32, bench);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_codec_to_orig_1percent_filled_full_scan(bench: &mut Bencher) {
|
||||
bench_translate_codec_to_orig_util(0.01f64, 100f32, bench);
|
||||
}
|
||||
|
||||
fn bench_translate_codec_to_orig_util(
|
||||
percent_filled: f64,
|
||||
percent_hit: f32,
|
||||
bench: &mut Bencher,
|
||||
) {
|
||||
let codec = gen_bools(percent_filled);
|
||||
let num_non_nulls = codec.num_non_nulls();
|
||||
let idxs: Vec<u32> = if percent_hit == 100.0f32 {
|
||||
(0..num_non_nulls).collect()
|
||||
} else {
|
||||
n_percent_step_iterator(percent_hit, num_non_nulls).collect()
|
||||
};
|
||||
let mut output = vec![0u32; idxs.len()];
|
||||
bench.iter(|| {
|
||||
output.copy_from_slice(&idxs[..]);
|
||||
codec.select_batch(&mut output);
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_codec_to_orig_90percent_filled_0comma005percent_hit(bench: &mut Bencher) {
|
||||
bench_translate_codec_to_orig_util(0.9f64, 0.005, bench);
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_translate_codec_to_orig_90percent_filled_full_scan(bench: &mut Bencher) {
|
||||
bench_translate_codec_to_orig_util(0.9f64, 100.0f32, bench);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,11 +3,11 @@ use std::io::Write;
|
||||
|
||||
use common::{CountingWriter, OwnedBytes};
|
||||
|
||||
use super::multivalued_index::SerializableMultivalueIndex;
|
||||
use super::OptionalIndex;
|
||||
use super::multivalued_index::SerializableMultivalueIndex;
|
||||
use crate::column_index::ColumnIndex;
|
||||
use crate::column_index::multivalued_index::serialize_multivalued_index;
|
||||
use crate::column_index::optional_index::serialize_optional_index;
|
||||
use crate::column_index::ColumnIndex;
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{Cardinality, RowId, Version};
|
||||
|
||||
@@ -31,7 +31,7 @@ pub enum SerializableColumnIndex<'a> {
|
||||
Multivalued(SerializableMultivalueIndex<'a>),
|
||||
}
|
||||
|
||||
impl<'a> SerializableColumnIndex<'a> {
|
||||
impl SerializableColumnIndex<'_> {
|
||||
pub fn get_cardinality(&self) -> Cardinality {
|
||||
match self {
|
||||
SerializableColumnIndex::Full => Cardinality::Full,
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::OwnedBytes;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use test::{self, Bencher};
|
||||
|
||||
use super::*;
|
||||
use crate::column_values::u64_based::*;
|
||||
|
||||
fn get_data() -> Vec<u64> {
|
||||
let mut rng = StdRng::seed_from_u64(2u64);
|
||||
let mut data: Vec<_> = (100..55000_u64)
|
||||
.map(|num| num + rng.gen::<u8>() as u64)
|
||||
.collect();
|
||||
data.push(99_000);
|
||||
data.insert(1000, 2000);
|
||||
data.insert(2000, 100);
|
||||
data.insert(3000, 4100);
|
||||
data.insert(4000, 100);
|
||||
data.insert(5000, 800);
|
||||
data
|
||||
}
|
||||
|
||||
fn compute_stats(vals: impl Iterator<Item = u64>) -> ColumnStats {
|
||||
let mut stats_collector = StatsCollector::default();
|
||||
for val in vals {
|
||||
stats_collector.collect(val);
|
||||
}
|
||||
stats_collector.stats()
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
fn value_iter() -> impl Iterator<Item = u64> {
|
||||
0..20_000
|
||||
}
|
||||
|
||||
fn get_reader_for_bench<Codec: ColumnCodec>(data: &[u64]) -> Codec::ColumnValues {
|
||||
let mut bytes = Vec::new();
|
||||
let stats = compute_stats(data.iter().cloned());
|
||||
let mut codec_serializer = Codec::estimator();
|
||||
for val in data {
|
||||
codec_serializer.collect(*val);
|
||||
}
|
||||
codec_serializer
|
||||
.serialize(&stats, Box::new(data.iter().copied()).as_mut(), &mut bytes)
|
||||
.unwrap();
|
||||
|
||||
Codec::load(OwnedBytes::new(bytes)).unwrap()
|
||||
}
|
||||
|
||||
fn bench_get<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
|
||||
let col = get_reader_for_bench::<Codec>(data);
|
||||
b.iter(|| {
|
||||
let mut sum = 0u64;
|
||||
for pos in value_iter() {
|
||||
let val = col.get_val(pos as u32);
|
||||
sum = sum.wrapping_add(val);
|
||||
}
|
||||
sum
|
||||
});
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
fn bench_get_dynamic_helper(b: &mut Bencher, col: Arc<dyn ColumnValues>) {
|
||||
b.iter(|| {
|
||||
let mut sum = 0u64;
|
||||
for pos in value_iter() {
|
||||
let val = col.get_val(pos as u32);
|
||||
sum = sum.wrapping_add(val);
|
||||
}
|
||||
sum
|
||||
});
|
||||
}
|
||||
|
||||
fn bench_get_dynamic<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
|
||||
let col = Arc::new(get_reader_for_bench::<Codec>(data));
|
||||
bench_get_dynamic_helper(b, col);
|
||||
}
|
||||
fn bench_create<Codec: ColumnCodec>(b: &mut Bencher, data: &[u64]) {
|
||||
let stats = compute_stats(data.iter().cloned());
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
b.iter(|| {
|
||||
bytes.clear();
|
||||
let mut codec_serializer = Codec::estimator();
|
||||
for val in data.iter().take(1024) {
|
||||
codec_serializer.collect(*val);
|
||||
}
|
||||
|
||||
codec_serializer.serialize(&stats, Box::new(data.iter().copied()).as_mut(), &mut bytes)
|
||||
});
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_fastfield_bitpack_create(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_create::<BitpackedCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_linearinterpol_create(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_create::<LinearCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_multilinearinterpol_create(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_create::<BlockwiseLinearCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_bitpack_get(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get::<BitpackedCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_bitpack_get_dynamic(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get_dynamic::<BitpackedCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_linearinterpol_get(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get::<LinearCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_linearinterpol_get_dynamic(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get_dynamic::<LinearCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_multilinearinterpol_get(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get::<BlockwiseLinearCodec>(b, &data);
|
||||
}
|
||||
#[bench]
|
||||
fn bench_fastfield_multilinearinterpol_get_dynamic(b: &mut Bencher) {
|
||||
let data: Vec<_> = get_data();
|
||||
bench_get_dynamic::<BlockwiseLinearCodec>(b, &data);
|
||||
}
|
||||
@@ -10,7 +10,7 @@ pub(crate) struct MergedColumnValues<'a, T> {
|
||||
pub(crate) merge_row_order: &'a MergeRowOrder,
|
||||
}
|
||||
|
||||
impl<'a, T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'a, T> {
|
||||
impl<T: Copy + PartialOrd + Debug + 'static> Iterable<T> for MergedColumnValues<'_, T> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
|
||||
match self.merge_row_order {
|
||||
MergeRowOrder::Stack(_) => Box::new(
|
||||
|
||||
@@ -26,13 +26,13 @@ mod monotonic_column;
|
||||
|
||||
pub(crate) use merge::MergedColumnValues;
|
||||
pub use stats::ColumnStats;
|
||||
pub use u128_based::{
|
||||
open_u128_as_compact_u64, open_u128_mapped, serialize_column_values_u128,
|
||||
CompactSpaceU64Accessor,
|
||||
};
|
||||
pub use u64_based::{
|
||||
load_u64_based_column_values, serialize_and_load_u64_based_column_values,
|
||||
serialize_u64_based_column_values, CodecType, ALL_U64_CODEC_TYPES,
|
||||
ALL_U64_CODEC_TYPES, CodecType, load_u64_based_column_values,
|
||||
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
};
|
||||
pub use u128_based::{
|
||||
CompactSpaceU64Accessor, open_u128_as_compact_u64, open_u128_mapped,
|
||||
serialize_column_values_u128,
|
||||
};
|
||||
pub use vec_column::VecColumn;
|
||||
|
||||
@@ -242,6 +242,3 @@ impl<T: Copy + PartialOrd + Debug + 'static> ColumnValues<T> for Arc<dyn ColumnV
|
||||
.get_row_ids_for_value_range(range, doc_id_range, positions)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench;
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{Range, RangeInclusive};
|
||||
|
||||
use crate::column_values::monotonic_mapping::StrictlyMonotonicFn;
|
||||
use crate::ColumnValues;
|
||||
use crate::column_values::monotonic_mapping::StrictlyMonotonicFn;
|
||||
|
||||
struct MonotonicMappingColumn<C, T, Input> {
|
||||
from_column: C,
|
||||
@@ -99,10 +99,10 @@ where
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::column_values::VecColumn;
|
||||
use crate::column_values::monotonic_mapping::{
|
||||
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
|
||||
};
|
||||
use crate::column_values::VecColumn;
|
||||
|
||||
#[test]
|
||||
fn test_monotonic_mapping_iter() {
|
||||
|
||||
@@ -24,8 +24,8 @@ use build_compact_space::get_compact_space;
|
||||
use common::{BinarySerializable, CountingWriter, OwnedBytes, VInt, VIntU128};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker};
|
||||
|
||||
use crate::column_values::ColumnValues;
|
||||
use crate::RowId;
|
||||
use crate::column_values::ColumnValues;
|
||||
|
||||
/// The cost per blank is quite hard actually, since blanks are delta encoded, the actual cost of
|
||||
/// blanks depends on the number of blanks.
|
||||
@@ -653,12 +653,14 @@ mod tests {
|
||||
),
|
||||
&[3]
|
||||
);
|
||||
assert!(get_positions_for_value_range_helper(
|
||||
&decomp,
|
||||
99998u128..=99998u128,
|
||||
complete_range.clone()
|
||||
)
|
||||
.is_empty());
|
||||
assert!(
|
||||
get_positions_for_value_range_helper(
|
||||
&decomp,
|
||||
99998u128..=99998u128,
|
||||
complete_range.clone()
|
||||
)
|
||||
.is_empty()
|
||||
);
|
||||
assert_eq!(
|
||||
&get_positions_for_value_range_helper(
|
||||
&decomp,
|
||||
|
||||
@@ -130,11 +130,11 @@ pub fn open_u128_as_compact_u64(mut bytes: OwnedBytes) -> io::Result<Arc<dyn Col
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::column_values::u64_based::{
|
||||
serialize_and_load_u64_based_column_values, serialize_u64_based_column_values,
|
||||
ALL_U64_CODEC_TYPES,
|
||||
};
|
||||
use crate::column_values::CodecType;
|
||||
use crate::column_values::u64_based::{
|
||||
ALL_U64_CODEC_TYPES, serialize_and_load_u64_based_column_values,
|
||||
serialize_u64_based_column_values,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_serialize_deserialize_u128_header() {
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::ops::{Range, RangeInclusive};
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use fastdivide::DividerU64;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
|
||||
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
use crate::{ColumnValues, RowId};
|
||||
@@ -23,11 +23,7 @@ const fn div_ceil(n: u64, q: NonZeroU64) -> u64 {
|
||||
// copied from unstable rust standard library.
|
||||
let d = n / q.get();
|
||||
let r = n % q.get();
|
||||
if r > 0 {
|
||||
d + 1
|
||||
} else {
|
||||
d
|
||||
}
|
||||
if r > 0 { d + 1 } else { d }
|
||||
}
|
||||
|
||||
// The bitpacked codec applies a linear transformation `f` over data that are bitpacked.
|
||||
|
||||
@@ -4,12 +4,12 @@ use std::{io, iter};
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom, OwnedBytes};
|
||||
use fastdivide::DividerU64;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
|
||||
|
||||
use crate::MonotonicallyMappableToU64;
|
||||
use crate::column_values::u64_based::line::Line;
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
use crate::column_values::{ColumnValues, VecColumn};
|
||||
use crate::MonotonicallyMappableToU64;
|
||||
|
||||
const BLOCK_SIZE: u32 = 512u32;
|
||||
|
||||
@@ -39,7 +39,7 @@ impl BinarySerializable for Block {
|
||||
}
|
||||
|
||||
fn compute_num_blocks(num_vals: u32) -> u32 {
|
||||
(num_vals + BLOCK_SIZE - 1) / BLOCK_SIZE
|
||||
num_vals.div_ceil(BLOCK_SIZE)
|
||||
}
|
||||
|
||||
pub struct BlockwiseLinearEstimator {
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use std::io;
|
||||
|
||||
use common::{BinarySerializable, OwnedBytes};
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
use tantivy_bitpacker::{BitPacker, BitUnpacker, compute_num_bits};
|
||||
|
||||
use super::line::Line;
|
||||
use super::ColumnValues;
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
use crate::column_values::VecColumn;
|
||||
use super::line::Line;
|
||||
use crate::RowId;
|
||||
use crate::column_values::VecColumn;
|
||||
use crate::column_values::u64_based::{ColumnCodec, ColumnCodecEstimator, ColumnStats};
|
||||
|
||||
const HALF_SPACE: u64 = u64::MAX / 2;
|
||||
const LINE_ESTIMATION_BLOCK_LEN: usize = 512;
|
||||
|
||||
@@ -17,7 +17,7 @@ pub use crate::column_values::u64_based::bitpacked::BitpackedCodec;
|
||||
pub use crate::column_values::u64_based::blockwise_linear::BlockwiseLinearCodec;
|
||||
pub use crate::column_values::u64_based::linear::LinearCodec;
|
||||
pub use crate::column_values::u64_based::stats_collector::StatsCollector;
|
||||
use crate::column_values::{monotonic_map_column, ColumnStats};
|
||||
use crate::column_values::{ColumnStats, monotonic_map_column};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::{ColumnValues, MonotonicallyMappableToU64};
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::num::NonZeroU64;
|
||||
|
||||
use fastdivide::DividerU64;
|
||||
|
||||
use crate::column_values::ColumnStats;
|
||||
use crate::RowId;
|
||||
use crate::column_values::ColumnStats;
|
||||
|
||||
/// Compute the gcd of two non null numbers.
|
||||
///
|
||||
@@ -96,8 +96,8 @@ impl StatsCollector {
|
||||
mod tests {
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use crate::column_values::u64_based::stats_collector::{compute_gcd, StatsCollector};
|
||||
use crate::column_values::u64_based::ColumnStats;
|
||||
use crate::column_values::u64_based::stats_collector::{StatsCollector, compute_gcd};
|
||||
|
||||
fn compute_stats(vals: impl Iterator<Item = u64>) -> ColumnStats {
|
||||
let mut stats_collector = StatsCollector::default();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use proptest::prelude::*;
|
||||
use proptest::{prop_oneof, proptest};
|
||||
use rand::Rng;
|
||||
|
||||
#[test]
|
||||
fn test_serialize_and_load_simple() {
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::net::Ipv6Addr;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::value::NumericalType;
|
||||
use crate::InvalidData;
|
||||
use crate::value::NumericalType;
|
||||
|
||||
/// The column type represents the column type.
|
||||
/// Any changes need to be propagated to `COLUMN_TYPES`.
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::io::{self, Write};
|
||||
use common::{BitSet, CountingWriter, ReadOnlyBitSet};
|
||||
use sstable::{SSTable, Streamer, TermOrdinal, VoidSSTable};
|
||||
|
||||
use super::term_merger::TermMerger;
|
||||
use super::term_merger::{TermMerger, TermsWithSegmentOrd};
|
||||
use crate::column::serialize_column_mappable_to_u64;
|
||||
use crate::column_index::SerializableColumnIndex;
|
||||
use crate::iterable::Iterable;
|
||||
@@ -39,7 +39,7 @@ struct RemappedTermOrdinalsValues<'a> {
|
||||
merge_row_order: &'a MergeRowOrder,
|
||||
}
|
||||
|
||||
impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
|
||||
impl Iterable for RemappedTermOrdinalsValues<'_> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
|
||||
match self.merge_row_order {
|
||||
MergeRowOrder::Stack(_) => self.boxed_iter_stacked(),
|
||||
@@ -50,7 +50,7 @@ impl<'a> Iterable for RemappedTermOrdinalsValues<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> RemappedTermOrdinalsValues<'a> {
|
||||
impl RemappedTermOrdinalsValues<'_> {
|
||||
fn boxed_iter_stacked(&self) -> Box<dyn Iterator<Item = u64> + '_> {
|
||||
let iter = self
|
||||
.bytes_columns
|
||||
@@ -126,14 +126,17 @@ fn serialize_merged_dict(
|
||||
let mut term_ord_mapping = TermOrdinalMapping::default();
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
for column_opt in bytes_columns.iter() {
|
||||
for (segment_ord, column_opt) in bytes_columns.iter().enumerate() {
|
||||
if let Some(column) = column_opt {
|
||||
term_ord_mapping.add_segment(column.dictionary.num_terms());
|
||||
let terms: Streamer<VoidSSTable> = column.dictionary.stream()?;
|
||||
field_term_streams.push(terms);
|
||||
field_term_streams.push(TermsWithSegmentOrd { terms, segment_ord });
|
||||
} else {
|
||||
term_ord_mapping.add_segment(0);
|
||||
field_term_streams.push(Streamer::empty());
|
||||
field_term_streams.push(TermsWithSegmentOrd {
|
||||
terms: Streamer::empty(),
|
||||
segment_ord,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,6 +194,7 @@ fn serialize_merged_dict(
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct TermOrdinalMapping {
|
||||
/// Contains the new term ordinals for each segment.
|
||||
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
|
||||
}
|
||||
|
||||
@@ -205,6 +209,6 @@ impl TermOrdinalMapping {
|
||||
}
|
||||
|
||||
fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] {
|
||||
&(self.per_segment_new_term_ordinals[segment_ord as usize])[..]
|
||||
&self.per_segment_new_term_ordinals[segment_ord as usize]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ impl StackMergeOrder {
|
||||
let mut cumulated_row_ids: Vec<RowId> = Vec::with_capacity(columnars.len());
|
||||
let mut cumulated_row_id = 0;
|
||||
for columnar in columnars {
|
||||
cumulated_row_id += columnar.num_rows();
|
||||
cumulated_row_id += columnar.num_docs();
|
||||
cumulated_row_ids.push(cumulated_row_id);
|
||||
}
|
||||
StackMergeOrder { cumulated_row_ids }
|
||||
|
||||
@@ -10,11 +10,11 @@ use std::sync::Arc;
|
||||
pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
|
||||
|
||||
use super::writer::ColumnarSerializer;
|
||||
use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
|
||||
use crate::column::{serialize_column_mappable_to_u64, serialize_column_mappable_to_u128};
|
||||
use crate::column_values::MergedColumnValues;
|
||||
use crate::columnar::ColumnarReader;
|
||||
use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
|
||||
use crate::columnar::writer::CompatibleNumericalTypes;
|
||||
use crate::columnar::ColumnarReader;
|
||||
use crate::dynamic_column::DynamicColumn;
|
||||
use crate::{
|
||||
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
|
||||
@@ -80,13 +80,12 @@ pub fn merge_columnar(
|
||||
output: &mut impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
let mut serializer = ColumnarSerializer::new(output);
|
||||
let num_rows_per_columnar = columnar_readers
|
||||
let num_docs_per_columnar = columnar_readers
|
||||
.iter()
|
||||
.map(|reader| reader.num_rows())
|
||||
.map(|reader| reader.num_docs())
|
||||
.collect::<Vec<u32>>();
|
||||
|
||||
let columns_to_merge =
|
||||
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
|
||||
let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns)?;
|
||||
for res in columns_to_merge {
|
||||
let ((column_name, _column_type_category), grouped_columns) = res;
|
||||
let grouped_columns = grouped_columns.open(&merge_row_order)?;
|
||||
@@ -94,15 +93,18 @@ pub fn merge_columnar(
|
||||
continue;
|
||||
}
|
||||
|
||||
let column_type = grouped_columns.column_type_after_merge();
|
||||
let column_type_after_merge = grouped_columns.column_type_after_merge();
|
||||
let mut columns = grouped_columns.columns;
|
||||
coerce_columns(column_type, &mut columns)?;
|
||||
// Make sure the number of columns is the same as the number of columnar readers.
|
||||
// Or num_docs_per_columnar would be incorrect.
|
||||
assert_eq!(columns.len(), columnar_readers.len());
|
||||
coerce_columns(column_type_after_merge, &mut columns)?;
|
||||
|
||||
let mut column_serializer =
|
||||
serializer.start_serialize_column(column_name.as_bytes(), column_type);
|
||||
serializer.start_serialize_column(column_name.as_bytes(), column_type_after_merge);
|
||||
merge_column(
|
||||
column_type,
|
||||
&num_rows_per_columnar,
|
||||
column_type_after_merge,
|
||||
&num_docs_per_columnar,
|
||||
columns,
|
||||
&merge_row_order,
|
||||
&mut column_serializer,
|
||||
@@ -128,7 +130,7 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
|
||||
fn merge_column(
|
||||
column_type: ColumnType,
|
||||
num_docs_per_column: &[u32],
|
||||
columns: Vec<Option<DynamicColumn>>,
|
||||
columns_to_merge: Vec<Option<DynamicColumn>>,
|
||||
merge_row_order: &MergeRowOrder,
|
||||
wrt: &mut impl io::Write,
|
||||
) -> io::Result<()> {
|
||||
@@ -138,20 +140,21 @@ fn merge_column(
|
||||
| ColumnType::F64
|
||||
| ColumnType::DateTime
|
||||
| ColumnType::Bool => {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
|
||||
let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
|
||||
Vec::with_capacity(columns.len());
|
||||
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
|
||||
if let Some(Column { index: idx, values }) =
|
||||
dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
|
||||
{
|
||||
column_indexes.push(idx);
|
||||
column_values.push(Some(values));
|
||||
} else {
|
||||
column_indexes.push(ColumnIndex::Empty {
|
||||
num_docs: num_docs_per_column[i],
|
||||
});
|
||||
column_values.push(None);
|
||||
Vec::with_capacity(columns_to_merge.len());
|
||||
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
|
||||
match dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic) {
|
||||
Some(Column { index: idx, values }) => {
|
||||
column_indexes.push(idx);
|
||||
column_values.push(Some(values));
|
||||
}
|
||||
None => {
|
||||
column_indexes.push(ColumnIndex::Empty {
|
||||
num_docs: num_docs_per_column[i],
|
||||
});
|
||||
column_values.push(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
let merged_column_index =
|
||||
@@ -164,10 +167,10 @@ fn merge_column(
|
||||
serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
|
||||
}
|
||||
ColumnType::IpAddr => {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
|
||||
let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
|
||||
Vec::with_capacity(columns.len());
|
||||
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
|
||||
Vec::with_capacity(columns_to_merge.len());
|
||||
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
|
||||
if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
|
||||
dynamic_column_opt
|
||||
{
|
||||
@@ -192,9 +195,10 @@ fn merge_column(
|
||||
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
|
||||
}
|
||||
ColumnType::Bytes | ColumnType::Str => {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
|
||||
let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
|
||||
for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
|
||||
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
|
||||
let mut bytes_columns: Vec<Option<BytesColumn>> =
|
||||
Vec::with_capacity(columns_to_merge.len());
|
||||
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
|
||||
match dynamic_column_opt {
|
||||
Some(DynamicColumn::Str(str_column)) => {
|
||||
column_indexes.push(str_column.term_ord_column.index.clone());
|
||||
@@ -248,13 +252,15 @@ impl GroupedColumns {
|
||||
if column_type.len() == 1 {
|
||||
return column_type.into_iter().next().unwrap();
|
||||
}
|
||||
// At the moment, only the numerical categorical column type has more than one possible
|
||||
// At the moment, only the numerical column type category has more than one possible
|
||||
// column type.
|
||||
assert!(self
|
||||
.columns
|
||||
.iter()
|
||||
.flatten()
|
||||
.all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
|
||||
assert!(
|
||||
self.columns
|
||||
.iter()
|
||||
.flatten()
|
||||
.all(|el| ColumnTypeCategory::from(el.column_type())
|
||||
== ColumnTypeCategory::Numerical)
|
||||
);
|
||||
merged_numerical_columns_type(self.columns.iter().flatten()).into()
|
||||
}
|
||||
}
|
||||
@@ -361,7 +367,7 @@ fn is_empty_after_merge(
|
||||
ColumnIndex::Empty { .. } => true,
|
||||
ColumnIndex::Full => alive_bitset.len() == 0,
|
||||
ColumnIndex::Optional(optional_index) => {
|
||||
for doc in optional_index.iter_rows() {
|
||||
for doc in optional_index.iter_docs() {
|
||||
if alive_bitset.contains(doc) {
|
||||
return false;
|
||||
}
|
||||
@@ -391,7 +397,6 @@ fn is_empty_after_merge(
|
||||
fn group_columns_for_merge<'a>(
|
||||
columnar_readers: &'a [&'a ColumnarReader],
|
||||
required_columns: &'a [(String, ColumnType)],
|
||||
_merge_row_order: &'a MergeRowOrder,
|
||||
) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
|
||||
let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();
|
||||
|
||||
|
||||
@@ -5,28 +5,29 @@ use sstable::TermOrdinal;
|
||||
|
||||
use crate::Streamer;
|
||||
|
||||
pub struct HeapItem<'a> {
|
||||
pub streamer: Streamer<'a>,
|
||||
/// The terms of a column with the ordinal of the segment.
|
||||
pub struct TermsWithSegmentOrd<'a> {
|
||||
pub terms: Streamer<'a>,
|
||||
pub segment_ord: usize,
|
||||
}
|
||||
|
||||
impl<'a> PartialEq for HeapItem<'a> {
|
||||
impl PartialEq for TermsWithSegmentOrd<'_> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.segment_ord == other.segment_ord
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Eq for HeapItem<'a> {}
|
||||
impl Eq for TermsWithSegmentOrd<'_> {}
|
||||
|
||||
impl<'a> PartialOrd for HeapItem<'a> {
|
||||
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
|
||||
impl<'a> PartialOrd for TermsWithSegmentOrd<'a> {
|
||||
fn partial_cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Ord for HeapItem<'a> {
|
||||
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
|
||||
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
|
||||
impl<'a> Ord for TermsWithSegmentOrd<'a> {
|
||||
fn cmp(&self, other: &TermsWithSegmentOrd<'a>) -> Ordering {
|
||||
(&other.terms.key(), &other.segment_ord).cmp(&(&self.terms.key(), &self.segment_ord))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,39 +38,32 @@ impl<'a> Ord for HeapItem<'a> {
|
||||
/// - the term
|
||||
/// - a slice with the ordinal of the segments containing the terms.
|
||||
pub struct TermMerger<'a> {
|
||||
heap: BinaryHeap<HeapItem<'a>>,
|
||||
current_streamers: Vec<HeapItem<'a>>,
|
||||
heap: BinaryHeap<TermsWithSegmentOrd<'a>>,
|
||||
term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>,
|
||||
}
|
||||
|
||||
impl<'a> TermMerger<'a> {
|
||||
/// Stream of merged term dictionary
|
||||
pub fn new(streams: Vec<Streamer<'a>>) -> TermMerger<'a> {
|
||||
pub fn new(term_streams_with_segment: Vec<TermsWithSegmentOrd<'a>>) -> TermMerger<'a> {
|
||||
TermMerger {
|
||||
heap: BinaryHeap::new(),
|
||||
current_streamers: streams
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(ord, streamer)| HeapItem {
|
||||
streamer,
|
||||
segment_ord: ord,
|
||||
})
|
||||
.collect(),
|
||||
term_streams_with_segment,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn matching_segments<'b: 'a>(
|
||||
&'b self,
|
||||
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
|
||||
self.current_streamers
|
||||
self.term_streams_with_segment
|
||||
.iter()
|
||||
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
|
||||
.map(|heap_item| (heap_item.segment_ord, heap_item.terms.term_ord()))
|
||||
}
|
||||
|
||||
fn advance_segments(&mut self) {
|
||||
let streamers = &mut self.current_streamers;
|
||||
let streamers = &mut self.term_streams_with_segment;
|
||||
let heap = &mut self.heap;
|
||||
for mut heap_item in streamers.drain(..) {
|
||||
if heap_item.streamer.advance() {
|
||||
if heap_item.terms.advance() {
|
||||
heap.push(heap_item);
|
||||
}
|
||||
}
|
||||
@@ -80,18 +74,19 @@ impl<'a> TermMerger<'a> {
|
||||
/// False if there is none.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
self.advance_segments();
|
||||
if let Some(head) = self.heap.pop() {
|
||||
self.current_streamers.push(head);
|
||||
while let Some(next_streamer) = self.heap.peek() {
|
||||
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
|
||||
break;
|
||||
match self.heap.pop() {
|
||||
Some(head) => {
|
||||
self.term_streams_with_segment.push(head);
|
||||
while let Some(next_streamer) = self.heap.peek() {
|
||||
if self.term_streams_with_segment[0].terms.key() != next_streamer.terms.key() {
|
||||
break;
|
||||
}
|
||||
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
|
||||
self.term_streams_with_segment.push(next_heap_it);
|
||||
}
|
||||
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
|
||||
self.current_streamers.push(next_heap_it);
|
||||
true
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +96,6 @@ impl<'a> TermMerger<'a> {
|
||||
/// if and only if advance() has been called before
|
||||
/// and "true" was returned.
|
||||
pub fn key(&self) -> &[u8] {
|
||||
self.current_streamers[0].streamer.key()
|
||||
self.term_streams_with_segment[0].terms.key()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
use itertools::Itertools;
|
||||
use proptest::collection::vec;
|
||||
use proptest::prelude::*;
|
||||
|
||||
use super::*;
|
||||
use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId};
|
||||
use crate::columnar::{ColumnarReader, MergeRowOrder, StackMergeOrder, merge_columnar};
|
||||
use crate::{Cardinality, ColumnarWriter, DynamicColumn, HasAssociatedColumnType, RowId};
|
||||
|
||||
fn make_columnar<T: Into<NumericalValue> + HasAssociatedColumnType + Copy>(
|
||||
column_name: &str,
|
||||
@@ -26,9 +29,8 @@ fn test_column_coercion_to_u64() {
|
||||
// u64 type
|
||||
let columnar2 = make_columnar("numbers", &[u64::MAX]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
|
||||
group_columns_for_merge(columnars, &[]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -38,9 +40,8 @@ fn test_column_coercion_to_i64() {
|
||||
let columnar1 = make_columnar("numbers", &[-1i64]);
|
||||
let columnar2 = make_columnar("numbers", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
|
||||
group_columns_for_merge(columnars, &[]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -63,14 +64,8 @@ fn test_group_columns_with_required_column() {
|
||||
let columnar1 = make_columnar("numbers", &[1i64]);
|
||||
let columnar2 = make_columnar("numbers", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(
|
||||
&[&columnar1, &columnar2],
|
||||
&[("numbers".to_string(), ColumnType::U64)],
|
||||
&merge_order,
|
||||
)
|
||||
.unwrap();
|
||||
group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -80,13 +75,9 @@ fn test_group_columns_required_column_with_no_existing_columns() {
|
||||
let columnar1 = make_columnar("numbers", &[2u64]);
|
||||
let columnar2 = make_columnar("numbers", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<_, _> = group_columns_for_merge(
|
||||
columnars,
|
||||
&[("required_col".to_string(), ColumnType::Str)],
|
||||
&merge_order,
|
||||
)
|
||||
.unwrap();
|
||||
let column_map: BTreeMap<_, _> =
|
||||
group_columns_for_merge(columnars, &[("required_col".to_string(), ColumnType::Str)])
|
||||
.unwrap();
|
||||
assert_eq!(column_map.len(), 2);
|
||||
let columns = &column_map
|
||||
.get(&("required_col".to_string(), ColumnTypeCategory::Str))
|
||||
@@ -102,14 +93,8 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru
|
||||
let columnar1 = make_columnar("numbers", &[2i64]);
|
||||
let columnar2 = make_columnar("numbers", &[2i64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(
|
||||
columnars,
|
||||
&[("numbers".to_string(), ColumnType::U64)],
|
||||
&merge_order,
|
||||
)
|
||||
.unwrap();
|
||||
group_columns_for_merge(columnars, &[("numbers".to_string(), ColumnType::U64)]).unwrap();
|
||||
assert_eq!(column_map.len(), 1);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
}
|
||||
@@ -119,9 +104,8 @@ fn test_missing_column() {
|
||||
let columnar1 = make_columnar("numbers", &[-1i64]);
|
||||
let columnar2 = make_columnar("numbers2", &[2u64]);
|
||||
let columnars = &[&columnar1, &columnar2];
|
||||
let merge_order = StackMergeOrder::stack(columnars).into();
|
||||
let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> =
|
||||
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
|
||||
group_columns_for_merge(columnars, &[]).unwrap();
|
||||
assert_eq!(column_map.len(), 2);
|
||||
assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical)));
|
||||
{
|
||||
@@ -224,7 +208,7 @@ fn test_merge_columnar_numbers() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 3);
|
||||
assert_eq!(columnar_reader.num_docs(), 3);
|
||||
assert_eq!(columnar_reader.num_columns(), 1);
|
||||
let cols = columnar_reader.read_columns("numbers").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -252,7 +236,7 @@ fn test_merge_columnar_texts() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 3);
|
||||
assert_eq!(columnar_reader.num_docs(), 3);
|
||||
assert_eq!(columnar_reader.num_columns(), 1);
|
||||
let cols = columnar_reader.read_columns("texts").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -301,7 +285,7 @@ fn test_merge_columnar_byte() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 4);
|
||||
assert_eq!(columnar_reader.num_docs(), 4);
|
||||
assert_eq!(columnar_reader.num_columns(), 1);
|
||||
let cols = columnar_reader.read_columns("bytes").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -357,7 +341,7 @@ fn test_merge_columnar_byte_with_missing() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 3 + 2 + 3);
|
||||
assert_eq!(columnar_reader.num_docs(), 3 + 2 + 3);
|
||||
assert_eq!(columnar_reader.num_columns(), 2);
|
||||
let cols = columnar_reader.read_columns("col").unwrap();
|
||||
let dynamic_column = cols[0].open().unwrap();
|
||||
@@ -409,7 +393,7 @@ fn test_merge_columnar_different_types() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 4);
|
||||
assert_eq!(columnar_reader.num_docs(), 4);
|
||||
assert_eq!(columnar_reader.num_columns(), 2);
|
||||
let cols = columnar_reader.read_columns("mixed").unwrap();
|
||||
|
||||
@@ -419,11 +403,11 @@ fn test_merge_columnar_different_types() {
|
||||
panic!()
|
||||
};
|
||||
assert_eq!(vals.get_cardinality(), Cardinality::Optional);
|
||||
assert_eq!(vals.values_for_doc(0).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(1).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(2).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(0).collect_vec(), Vec::<i64>::new());
|
||||
assert_eq!(vals.values_for_doc(1).collect_vec(), Vec::<i64>::new());
|
||||
assert_eq!(vals.values_for_doc(2).collect_vec(), Vec::<i64>::new());
|
||||
assert_eq!(vals.values_for_doc(3).collect_vec(), vec![1]);
|
||||
assert_eq!(vals.values_for_doc(4).collect_vec(), vec![]);
|
||||
assert_eq!(vals.values_for_doc(4).collect_vec(), Vec::<i64>::new());
|
||||
|
||||
// text column
|
||||
let dynamic_column = cols[1].open().unwrap();
|
||||
@@ -474,7 +458,7 @@ fn test_merge_columnar_different_empty_cardinality() {
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
assert_eq!(columnar_reader.num_rows(), 2);
|
||||
assert_eq!(columnar_reader.num_docs(), 2);
|
||||
assert_eq!(columnar_reader.num_columns(), 2);
|
||||
let cols = columnar_reader.read_columns("mixed").unwrap();
|
||||
|
||||
@@ -486,3 +470,119 @@ fn test_merge_columnar_different_empty_cardinality() {
|
||||
let dynamic_column = cols[1].open().unwrap();
|
||||
assert_eq!(dynamic_column.get_cardinality(), Cardinality::Optional);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ColumnSpec {
|
||||
column_name: String,
|
||||
/// (row_id, term)
|
||||
terms: Vec<(RowId, Vec<u8>)>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct ColumnarSpec {
|
||||
columns: Vec<ColumnSpec>,
|
||||
}
|
||||
|
||||
/// Generate a random (row_id, term) pair:
|
||||
/// - row_id in [0..10]
|
||||
/// - term is either from POSSIBLE_TERMS or random bytes
|
||||
fn rowid_and_term_strategy() -> impl Strategy<Value = (RowId, Vec<u8>)> {
|
||||
const POSSIBLE_TERMS: &[&[u8]] = &[b"a", b"b", b"allo"];
|
||||
|
||||
let term_strat = prop_oneof![
|
||||
// pick from the fixed list
|
||||
(0..POSSIBLE_TERMS.len()).prop_map(|i| POSSIBLE_TERMS[i].to_vec()),
|
||||
// or random bytes (length 0..10)
|
||||
prop::collection::vec(any::<u8>(), 0..10),
|
||||
];
|
||||
|
||||
(0u32..11, term_strat)
|
||||
}
|
||||
|
||||
/// Generate one ColumnSpec, with a random name and a random list of (row_id, term).
|
||||
/// We sort it by row_id so that data is in ascending order.
|
||||
fn column_spec_strategy() -> impl Strategy<Value = ColumnSpec> {
|
||||
let column_name = prop_oneof![
|
||||
Just("col".to_string()),
|
||||
Just("col2".to_string()),
|
||||
"col.*".prop_map(|s| s),
|
||||
];
|
||||
|
||||
// We'll produce 0..8 (rowid,term) entries for this column
|
||||
let data_strat = vec(rowid_and_term_strategy(), 0..8).prop_map(|mut pairs| {
|
||||
// Sort by row_id
|
||||
pairs.sort_by_key(|(row_id, _)| *row_id);
|
||||
pairs
|
||||
});
|
||||
|
||||
(column_name, data_strat).prop_map(|(name, data)| ColumnSpec {
|
||||
column_name: name,
|
||||
terms: data,
|
||||
})
|
||||
}
|
||||
|
||||
/// Strategy to generate an ColumnarSpec
|
||||
fn columnar_strategy() -> impl Strategy<Value = ColumnarSpec> {
|
||||
vec(column_spec_strategy(), 0..3).prop_map(|columns| ColumnarSpec { columns })
|
||||
}
|
||||
|
||||
/// Strategy to generate multiple ColumnarSpecs, each of which we will treat
|
||||
/// as one "columnar" to be merged together.
|
||||
fn columnars_strategy() -> impl Strategy<Value = Vec<ColumnarSpec>> {
|
||||
vec(columnar_strategy(), 1..4)
|
||||
}
|
||||
|
||||
/// Build a `ColumnarReader` from a `ColumnarSpec`
|
||||
fn build_columnar(spec: &ColumnarSpec) -> ColumnarReader {
|
||||
let mut writer = ColumnarWriter::default();
|
||||
let mut max_row_id = 0;
|
||||
for col in &spec.columns {
|
||||
for &(row_id, ref term) in &col.terms {
|
||||
writer.record_bytes(row_id, &col.column_name, term);
|
||||
max_row_id = max_row_id.max(row_id);
|
||||
}
|
||||
}
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
writer.serialize(max_row_id + 1, &mut buffer).unwrap();
|
||||
ColumnarReader::open(buffer).unwrap()
|
||||
}
|
||||
|
||||
proptest! {
|
||||
// We just test that the merge_columnar function doesn't crash.
|
||||
#![proptest_config(ProptestConfig::with_cases(256))]
|
||||
#[test]
|
||||
fn test_merge_columnar_bytes_no_crash(columnars in columnars_strategy(), second_merge_columnars in columnars_strategy()) {
|
||||
let columnars: Vec<ColumnarReader> = columnars.iter()
|
||||
.map(build_columnar)
|
||||
.collect();
|
||||
|
||||
let mut out = Vec::new();
|
||||
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
|
||||
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
|
||||
merge_columnar(
|
||||
&columnar_refs,
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut out,
|
||||
).unwrap();
|
||||
|
||||
let merged_reader = ColumnarReader::open(out).unwrap();
|
||||
|
||||
// Merge the second set of columnars with the result of the first merge
|
||||
let mut columnars: Vec<ColumnarReader> = second_merge_columnars.iter()
|
||||
.map(build_columnar)
|
||||
.collect();
|
||||
columnars.push(merged_reader);
|
||||
let mut out = Vec::new();
|
||||
let columnar_refs: Vec<&ColumnarReader> = columnars.iter().collect();
|
||||
let stack_merge_order = StackMergeOrder::stack(&columnar_refs);
|
||||
merge_columnar(
|
||||
&columnar_refs,
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut out,
|
||||
).unwrap();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,9 +5,9 @@ mod reader;
|
||||
mod writer;
|
||||
|
||||
pub use column_type::{ColumnType, HasAssociatedColumnType};
|
||||
pub use format_version::{Version, CURRENT_VERSION};
|
||||
pub use format_version::{CURRENT_VERSION, Version};
|
||||
#[cfg(test)]
|
||||
pub(crate) use merge::ColumnTypeCategory;
|
||||
pub use merge::{merge_columnar, MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
|
||||
pub use merge::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, merge_columnar};
|
||||
pub use reader::ColumnarReader;
|
||||
pub use writer::ColumnarWriter;
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::{fmt, io, mem};
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::BinarySerializable;
|
||||
use common::file_slice::FileSlice;
|
||||
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
|
||||
use sstable::{Dictionary, RangeSSTable};
|
||||
|
||||
use crate::columnar::{format_version, ColumnType};
|
||||
use crate::columnar::{ColumnType, format_version};
|
||||
use crate::dynamic_column::DynamicColumnHandle;
|
||||
use crate::{RowId, Version};
|
||||
|
||||
@@ -18,13 +19,13 @@ fn io_invalid_data(msg: String) -> io::Error {
|
||||
pub struct ColumnarReader {
|
||||
column_dictionary: Dictionary<RangeSSTable>,
|
||||
column_data: FileSlice,
|
||||
num_rows: RowId,
|
||||
num_docs: RowId,
|
||||
format_version: Version,
|
||||
}
|
||||
|
||||
impl fmt::Debug for ColumnarReader {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let num_rows = self.num_rows();
|
||||
let num_rows = self.num_docs();
|
||||
let columns = self.list_columns().unwrap();
|
||||
let num_cols = columns.len();
|
||||
let mut debug_struct = f.debug_struct("Columnar");
|
||||
@@ -76,6 +77,19 @@ fn read_all_columns_in_stream(
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
|
||||
// Each column is a associated to a given `column_key`,
|
||||
// that starts by `column_name\0column_header`.
|
||||
//
|
||||
// Listing the columns associated to the given column name is therefore equivalent to
|
||||
// listing `column_key` with the prefix `column_name\0`.
|
||||
format!("{}{}", column_name, '\0')
|
||||
}
|
||||
|
||||
fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
|
||||
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
|
||||
}
|
||||
|
||||
impl ColumnarReader {
|
||||
/// Opens a new Columnar file.
|
||||
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
|
||||
@@ -98,13 +112,13 @@ impl ColumnarReader {
|
||||
Ok(ColumnarReader {
|
||||
column_dictionary,
|
||||
column_data,
|
||||
num_rows,
|
||||
num_docs: num_rows,
|
||||
format_version,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn num_rows(&self) -> RowId {
|
||||
self.num_rows
|
||||
pub fn num_docs(&self) -> RowId {
|
||||
self.num_docs
|
||||
}
|
||||
// Iterate over the columns in a sorted way
|
||||
pub fn iter_columns(
|
||||
@@ -144,32 +158,14 @@ impl ColumnarReader {
|
||||
Ok(self.iter_columns()?.collect())
|
||||
}
|
||||
|
||||
fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
|
||||
// Each column is a associated to a given `column_key`,
|
||||
// that starts by `column_name\0column_header`.
|
||||
//
|
||||
// Listing the columns associated to the given column name is therefore equivalent to
|
||||
// listing `column_key` with the prefix `column_name\0`.
|
||||
//
|
||||
// This is in turn equivalent to searching for the range
|
||||
// `[column_name,\0`..column_name\1)`.
|
||||
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
|
||||
let mut start_key = column_name.to_string();
|
||||
start_key.push('\0');
|
||||
let mut end_key = column_name.to_string();
|
||||
end_key.push(1u8 as char);
|
||||
self.column_dictionary
|
||||
.range()
|
||||
.ge(start_key.as_bytes())
|
||||
.lt(end_key.as_bytes())
|
||||
}
|
||||
|
||||
pub async fn read_columns_async(
|
||||
&self,
|
||||
column_name: &str,
|
||||
) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let prefix = column_dictionary_prefix_for_column_name(column_name);
|
||||
let stream = self
|
||||
.stream_for_column_range(column_name)
|
||||
.column_dictionary
|
||||
.prefix_range(prefix)
|
||||
.into_stream_async()
|
||||
.await?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
@@ -180,7 +176,35 @@ impl ColumnarReader {
|
||||
/// There can be more than one column associated to a given column name, provided they have
|
||||
/// different types.
|
||||
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let stream = self.stream_for_column_range(column_name).into_stream()?;
|
||||
let prefix = column_dictionary_prefix_for_column_name(column_name);
|
||||
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
}
|
||||
|
||||
pub async fn read_subpath_columns_async(
|
||||
&self,
|
||||
root_path: &str,
|
||||
) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let prefix = column_dictionary_prefix_for_subpath(root_path);
|
||||
let stream = self
|
||||
.column_dictionary
|
||||
.prefix_range(prefix)
|
||||
.into_stream_async()
|
||||
.await?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
}
|
||||
|
||||
/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
|
||||
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
|
||||
///
|
||||
/// There can be more than one column associated to each path within the JSON structure,
|
||||
/// provided they have different types.
|
||||
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let prefix = column_dictionary_prefix_for_subpath(root_path);
|
||||
let stream = self
|
||||
.column_dictionary
|
||||
.prefix_range(prefix.as_bytes())
|
||||
.into_stream()?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
}
|
||||
|
||||
@@ -192,6 +216,8 @@ impl ColumnarReader {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
|
||||
|
||||
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
|
||||
|
||||
#[test]
|
||||
@@ -224,6 +250,64 @@ mod tests {
|
||||
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_columns() {
|
||||
let mut columnar_writer = ColumnarWriter::default();
|
||||
columnar_writer.record_column_type("col", ColumnType::U64, false);
|
||||
columnar_writer.record_numerical(1, "col", 1u64);
|
||||
let mut buffer = Vec::new();
|
||||
columnar_writer.serialize(2, &mut buffer).unwrap();
|
||||
let columnar = ColumnarReader::open(buffer).unwrap();
|
||||
{
|
||||
let columns = columnar.read_columns("col").unwrap();
|
||||
assert_eq!(columns.len(), 1);
|
||||
assert_eq!(columns[0].column_type(), ColumnType::U64);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_columns("other").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_subpath_columns() {
|
||||
let mut columnar_writer = ColumnarWriter::default();
|
||||
columnar_writer.record_str(
|
||||
0,
|
||||
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
|
||||
"hello",
|
||||
);
|
||||
columnar_writer.record_numerical(
|
||||
0,
|
||||
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
|
||||
1i64,
|
||||
);
|
||||
columnar_writer.record_str(1, "col1", "hello");
|
||||
columnar_writer.record_str(0, "col2", "hello");
|
||||
let mut buffer = Vec::new();
|
||||
columnar_writer.serialize(2, &mut buffer).unwrap();
|
||||
|
||||
let columnar = ColumnarReader::open(buffer).unwrap();
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("col1").unwrap();
|
||||
assert_eq!(columns.len(), 2);
|
||||
assert_eq!(columns[0].column_type(), ColumnType::Str);
|
||||
assert_eq!(columns[1].column_type(), ColumnType::I64);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("col2").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("other").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Input type forbidden")]
|
||||
fn test_list_columns_strict_typing_panics_on_wrong_types() {
|
||||
|
||||
@@ -42,7 +42,7 @@ impl ColumnWriter {
|
||||
&self,
|
||||
arena: &MemoryArena,
|
||||
buffer: &'a mut Vec<u8>,
|
||||
) -> impl Iterator<Item = ColumnOperation<V>> + 'a {
|
||||
) -> impl Iterator<Item = ColumnOperation<V>> + 'a + use<'a, V> {
|
||||
buffer.clear();
|
||||
self.values.read_to_end(arena, buffer);
|
||||
let mut cursor: &[u8] = &buffer[..];
|
||||
@@ -104,9 +104,10 @@ pub(crate) struct NumericalColumnWriter {
|
||||
|
||||
impl NumericalColumnWriter {
|
||||
pub fn force_numerical_type(&mut self, numerical_type: NumericalType) {
|
||||
assert!(self
|
||||
.compatible_numerical_types
|
||||
.is_type_accepted(numerical_type));
|
||||
assert!(
|
||||
self.compatible_numerical_types
|
||||
.is_type_accepted(numerical_type)
|
||||
);
|
||||
self.compatible_numerical_types = CompatibleNumericalTypes::StaticType(numerical_type);
|
||||
}
|
||||
}
|
||||
@@ -211,7 +212,7 @@ impl NumericalColumnWriter {
|
||||
self,
|
||||
arena: &MemoryArena,
|
||||
buffer: &'a mut Vec<u8>,
|
||||
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a {
|
||||
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a + use<'a> {
|
||||
self.column_writer.operation_iterator(arena, buffer)
|
||||
}
|
||||
}
|
||||
@@ -255,7 +256,7 @@ impl StrOrBytesColumnWriter {
|
||||
&self,
|
||||
arena: &MemoryArena,
|
||||
byte_buffer: &'a mut Vec<u8>,
|
||||
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a {
|
||||
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a + use<'a> {
|
||||
self.column_writer.operation_iterator(arena, byte_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,13 +8,13 @@ use std::net::Ipv6Addr;
|
||||
|
||||
use column_operation::ColumnOperation;
|
||||
pub(crate) use column_writers::CompatibleNumericalTypes;
|
||||
use common::json_path_writer::JSON_END_OF_PATH;
|
||||
use common::CountingWriter;
|
||||
use common::json_path_writer::JSON_END_OF_PATH;
|
||||
pub(crate) use serializer::ColumnarSerializer;
|
||||
use stacker::{Addr, ArenaHashMap, MemoryArena};
|
||||
|
||||
use crate::column_index::{SerializableColumnIndex, SerializableOptionalIndex};
|
||||
use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64};
|
||||
use crate::column_values::{MonotonicallyMappableToU64, MonotonicallyMappableToU128};
|
||||
use crate::columnar::column_type::ColumnType;
|
||||
use crate::columnar::writer::column_writers::{
|
||||
ColumnWriter, NumericalColumnWriter, StrOrBytesColumnWriter,
|
||||
@@ -285,7 +285,6 @@ impl ColumnarWriter {
|
||||
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
|
||||
);
|
||||
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
||||
|
||||
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
|
||||
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
||||
for (column_name, column_type, addr) in columns {
|
||||
|
||||
@@ -3,11 +3,11 @@ use std::io::Write;
|
||||
|
||||
use common::json_path_writer::JSON_END_OF_PATH;
|
||||
use common::{BinarySerializable, CountingWriter};
|
||||
use sstable::value::RangeValueWriter;
|
||||
use sstable::RangeSSTable;
|
||||
use sstable::value::RangeValueWriter;
|
||||
|
||||
use crate::columnar::ColumnType;
|
||||
use crate::RowId;
|
||||
use crate::columnar::ColumnType;
|
||||
|
||||
pub struct ColumnarSerializer<W: io::Write> {
|
||||
wrt: CountingWriter<W>,
|
||||
@@ -67,7 +67,7 @@ pub struct ColumnSerializer<'a, W: io::Write> {
|
||||
start_offset: u64,
|
||||
}
|
||||
|
||||
impl<'a, W: io::Write> ColumnSerializer<'a, W> {
|
||||
impl<W: io::Write> ColumnSerializer<'_, W> {
|
||||
pub fn finalize(self) -> io::Result<()> {
|
||||
let end_offset: u64 = self.columnar_serializer.wrt.written_bytes();
|
||||
let byte_range = self.start_offset..end_offset;
|
||||
@@ -80,7 +80,7 @@ impl<'a, W: io::Write> ColumnSerializer<'a, W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
|
||||
impl<W: io::Write> io::Write for ColumnSerializer<'_, W> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.columnar_serializer.wrt.write(buf)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::RowId;
|
||||
use crate::column_index::{SerializableMultivalueIndex, SerializableOptionalIndex};
|
||||
use crate::iterable::Iterable;
|
||||
use crate::RowId;
|
||||
|
||||
/// The `IndexBuilder` interprets a sequence of
|
||||
/// calls of the form:
|
||||
@@ -31,12 +31,13 @@ pub struct OptionalIndexBuilder {
|
||||
|
||||
impl OptionalIndexBuilder {
|
||||
pub fn finish(&mut self, num_rows: RowId) -> impl Iterable<RowId> + '_ {
|
||||
debug_assert!(self
|
||||
.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_doc| last_doc < num_rows)
|
||||
.unwrap_or(true));
|
||||
debug_assert!(
|
||||
self.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|last_doc| last_doc < num_rows)
|
||||
.unwrap_or(true)
|
||||
);
|
||||
&self.docs[..]
|
||||
}
|
||||
|
||||
@@ -48,12 +49,13 @@ impl OptionalIndexBuilder {
|
||||
impl IndexBuilder for OptionalIndexBuilder {
|
||||
#[inline(always)]
|
||||
fn record_row(&mut self, doc: RowId) {
|
||||
debug_assert!(self
|
||||
.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|prev_doc| doc > prev_doc)
|
||||
.unwrap_or(true));
|
||||
debug_assert!(
|
||||
self.docs
|
||||
.last()
|
||||
.copied()
|
||||
.map(|prev_doc| doc > prev_doc)
|
||||
.unwrap_or(true)
|
||||
);
|
||||
self.docs.push(doc);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,8 +3,8 @@ use std::path::PathBuf;
|
||||
use itertools::Itertools;
|
||||
|
||||
use crate::{
|
||||
merge_columnar, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder,
|
||||
CURRENT_VERSION,
|
||||
CURRENT_VERSION, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder,
|
||||
merge_columnar,
|
||||
};
|
||||
|
||||
const NUM_DOCS: u32 = u16::MAX as u32;
|
||||
|
||||
@@ -6,7 +6,7 @@ use common::file_slice::FileSlice;
|
||||
use common::{ByteCount, DateTime, HasLen, OwnedBytes};
|
||||
|
||||
use crate::column::{BytesColumn, Column, StrColumn};
|
||||
use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn};
|
||||
use crate::column_values::{StrictlyMonotonicFn, monotonic_map_column};
|
||||
use crate::columnar::ColumnType;
|
||||
use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType, Version};
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ pub trait Iterable<T = u64> {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_>;
|
||||
}
|
||||
|
||||
impl<'a, T: Copy> Iterable<T> for &'a [T] {
|
||||
impl<T: Copy> Iterable<T> for &[T] {
|
||||
fn boxed_iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
|
||||
Box::new(self.iter().copied())
|
||||
}
|
||||
|
||||
@@ -17,15 +17,10 @@
|
||||
//! column.
|
||||
//! - [column_values]: Stores the values of a column in a dense format.
|
||||
|
||||
#![cfg_attr(all(feature = "unstable", test), feature(test))]
|
||||
|
||||
#[cfg(test)]
|
||||
#[macro_use]
|
||||
extern crate more_asserts;
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
extern crate test;
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::io;
|
||||
|
||||
@@ -44,11 +39,11 @@ pub use block_accessor::ColumnBlockAccessor;
|
||||
pub use column::{BytesColumn, Column, StrColumn};
|
||||
pub use column_index::ColumnIndex;
|
||||
pub use column_values::{
|
||||
ColumnValues, EmptyColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
|
||||
ColumnValues, EmptyColumnValues, MonotonicallyMappableToU64, MonotonicallyMappableToU128,
|
||||
};
|
||||
pub use columnar::{
|
||||
merge_columnar, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
|
||||
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, CURRENT_VERSION,
|
||||
CURRENT_VERSION, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
|
||||
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, merge_columnar,
|
||||
};
|
||||
use sstable::VoidSSTable;
|
||||
pub use value::{NumericalType, NumericalValue};
|
||||
|
||||
@@ -380,7 +380,7 @@ fn assert_columnar_eq(
|
||||
right: &ColumnarReader,
|
||||
lenient_on_numerical_value: bool,
|
||||
) {
|
||||
assert_eq!(left.num_rows(), right.num_rows());
|
||||
assert_eq!(left.num_docs(), right.num_docs());
|
||||
let left_columns = left.list_columns().unwrap();
|
||||
let right_columns = right.list_columns().unwrap();
|
||||
assert_eq!(left_columns.len(), right_columns.len());
|
||||
@@ -588,7 +588,7 @@ proptest! {
|
||||
#[test]
|
||||
fn test_single_columnar_builder_proptest(docs in columnar_docs_strategy()) {
|
||||
let columnar = build_columnar(&docs[..]);
|
||||
assert_eq!(columnar.num_rows() as usize, docs.len());
|
||||
assert_eq!(columnar.num_docs() as usize, docs.len());
|
||||
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
|
||||
for (doc_id, doc_vals) in docs.iter().enumerate() {
|
||||
for (col_name, col_val) in doc_vals {
|
||||
@@ -715,8 +715,9 @@ fn test_columnar_merging_number_columns() {
|
||||
// TODO test required_columns
|
||||
// TODO document edge case: required_columns incompatible with values.
|
||||
|
||||
fn columnar_docs_and_remap(
|
||||
) -> impl Strategy<Value = (Vec<Vec<Vec<(&'static str, ColumnValue)>>>, Vec<RowAddr>)> {
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn columnar_docs_and_remap()
|
||||
-> impl Strategy<Value = (Vec<Vec<Vec<(&'static str, ColumnValue)>>>, Vec<RowAddr>)> {
|
||||
proptest::collection::vec(columnar_docs_strategy(), 2..=3).prop_flat_map(
|
||||
|columnars_docs: Vec<Vec<Vec<(&str, ColumnValue)>>>| {
|
||||
let row_addrs: Vec<RowAddr> = columnars_docs
|
||||
@@ -819,7 +820,7 @@ fn test_columnar_merge_empty() {
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
assert_eq!(merged_columnar.num_rows(), 0);
|
||||
assert_eq!(merged_columnar.num_docs(), 0);
|
||||
assert_eq!(merged_columnar.num_columns(), 0);
|
||||
}
|
||||
|
||||
@@ -845,7 +846,7 @@ fn test_columnar_merge_single_str_column() {
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
assert_eq!(merged_columnar.num_rows(), 1);
|
||||
assert_eq!(merged_columnar.num_docs(), 1);
|
||||
assert_eq!(merged_columnar.num_columns(), 1);
|
||||
}
|
||||
|
||||
@@ -877,7 +878,7 @@ fn test_delete_decrease_cardinality() {
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
assert_eq!(merged_columnar.num_rows(), 1);
|
||||
assert_eq!(merged_columnar.num_docs(), 1);
|
||||
assert_eq!(merged_columnar.num_columns(), 1);
|
||||
let cols = merged_columnar.read_columns("c").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
[package]
|
||||
name = "tantivy-common"
|
||||
version = "0.7.0"
|
||||
version = "0.10.0"
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
license = "MIT"
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
description = "common traits and utility functions used by multiple tantivy subcrates"
|
||||
documentation = "https://docs.rs/tantivy_common/"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
@@ -13,7 +13,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.4.3"
|
||||
ownedbytes = { version= "0.7", path="../ownedbytes" }
|
||||
ownedbytes = { version= "0.9", path="../ownedbytes" }
|
||||
async-trait = "0.1"
|
||||
time = { version = "0.3.10", features = ["serde-well-known"] }
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use binggan::{black_box, BenchRunner};
|
||||
use binggan::{BenchRunner, black_box};
|
||||
use rand::seq::IteratorRandom;
|
||||
use rand::thread_rng;
|
||||
use tantivy_common::{serialize_vint_u32, BitSet, TinySet};
|
||||
use tantivy_common::{BitSet, TinySet, serialize_vint_u32};
|
||||
|
||||
fn bench_vint() {
|
||||
let mut runner = BenchRunner::new();
|
||||
|
||||
@@ -65,11 +65,11 @@ pub fn transform_bound_inner_res<TFrom, TTo>(
|
||||
) -> io::Result<Bound<TTo>> {
|
||||
use self::Bound::*;
|
||||
Ok(match bound {
|
||||
Excluded(ref from_val) => match transform(from_val)? {
|
||||
Excluded(from_val) => match transform(from_val)? {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Excluded(new_val),
|
||||
},
|
||||
Included(ref from_val) => match transform(from_val)? {
|
||||
Included(from_val) => match transform(from_val)? {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Included(new_val),
|
||||
},
|
||||
@@ -85,11 +85,11 @@ pub fn transform_bound_inner<TFrom, TTo>(
|
||||
) -> Bound<TTo> {
|
||||
use self::Bound::*;
|
||||
match bound {
|
||||
Excluded(ref from_val) => match transform(from_val) {
|
||||
Excluded(from_val) => match transform(from_val) {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Excluded(new_val),
|
||||
},
|
||||
Included(ref from_val) => match transform(from_val) {
|
||||
Included(from_val) => match transform(from_val) {
|
||||
TransformBound::NewBound(new_val) => new_val,
|
||||
TransformBound::Existing(new_val) => Included(new_val),
|
||||
},
|
||||
@@ -111,8 +111,8 @@ pub fn map_bound<TFrom, TTo>(
|
||||
) -> Bound<TTo> {
|
||||
use self::Bound::*;
|
||||
match bound {
|
||||
Excluded(ref from_val) => Bound::Excluded(transform(from_val)),
|
||||
Included(ref from_val) => Bound::Included(transform(from_val)),
|
||||
Excluded(from_val) => Bound::Excluded(transform(from_val)),
|
||||
Included(from_val) => Bound::Included(transform(from_val)),
|
||||
Unbounded => Unbounded,
|
||||
}
|
||||
}
|
||||
@@ -123,8 +123,8 @@ pub fn map_bound_res<TFrom, TTo, Err>(
|
||||
) -> Result<Bound<TTo>, Err> {
|
||||
use self::Bound::*;
|
||||
Ok(match bound {
|
||||
Excluded(ref from_val) => Excluded(transform(from_val)?),
|
||||
Included(ref from_val) => Included(transform(from_val)?),
|
||||
Excluded(from_val) => Excluded(transform(from_val)?),
|
||||
Included(from_val) => Included(transform(from_val)?),
|
||||
Unbounded => Unbounded,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::fs::File;
|
||||
use std::ops::{Deref, Range, RangeBounds};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
@@ -73,7 +74,7 @@ impl FileHandle for WrapFile {
|
||||
{
|
||||
use std::io::{Read, Seek};
|
||||
let mut file = self.file.try_clone()?; // Clone the file to read from it separately
|
||||
// Seek to the start position in the file
|
||||
// Seek to the start position in the file
|
||||
file.seek(io::SeekFrom::Start(start as u64))?;
|
||||
// Read the data into the buffer
|
||||
file.read_exact(&mut buffer)?;
|
||||
@@ -177,6 +178,12 @@ fn combine_ranges<R: RangeBounds<usize>>(orig_range: Range<usize>, rel_range: R)
|
||||
}
|
||||
|
||||
impl FileSlice {
|
||||
/// Creates a FileSlice from a path.
|
||||
pub fn open(path: &Path) -> io::Result<FileSlice> {
|
||||
let wrap_file = WrapFile::new(File::open(path)?)?;
|
||||
Ok(FileSlice::new(Arc::new(wrap_file)))
|
||||
}
|
||||
|
||||
/// Wraps a FileHandle.
|
||||
pub fn new(file_handle: Arc<dyn FileHandle>) -> Self {
|
||||
let num_bytes = file_handle.len();
|
||||
@@ -339,8 +346,8 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{FileHandle, FileSlice};
|
||||
use crate::file_slice::combine_ranges;
|
||||
use crate::HasLen;
|
||||
use crate::file_slice::combine_ranges;
|
||||
|
||||
#[test]
|
||||
fn test_file_slice() -> io::Result<()> {
|
||||
|
||||
@@ -22,7 +22,7 @@ pub use json_path_writer::JsonPathWriter;
|
||||
pub use ownedbytes::{OwnedBytes, StableDeref};
|
||||
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};
|
||||
pub use vint::{
|
||||
read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt, VIntU128,
|
||||
VInt, VIntU128, read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint,
|
||||
};
|
||||
pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite};
|
||||
|
||||
@@ -177,8 +177,10 @@ pub(crate) mod test {
|
||||
|
||||
#[test]
|
||||
fn test_f64_order() {
|
||||
assert!(!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY))
|
||||
.contains(&f64_to_u64(f64::NAN))); // nan is not a number
|
||||
assert!(
|
||||
!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY))
|
||||
.contains(&f64_to_u64(f64::NAN))
|
||||
); // nan is not a number
|
||||
assert!(f64_to_u64(1.5) > f64_to_u64(1.0)); // same exponent, different mantissa
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.0)); // same mantissa, different exponent
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.5)); // different exponent and mantissa
|
||||
|
||||
@@ -222,7 +222,7 @@ impl BinarySerializable for VInt {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{serialize_vint_u32, BinarySerializable, VInt};
|
||||
use super::{BinarySerializable, VInt, serialize_vint_u32};
|
||||
|
||||
fn aux_test_vint(val: u64) {
|
||||
let mut v = [14u8; 10];
|
||||
|
||||
@@ -87,7 +87,7 @@ impl<W: TerminatingWrite> TerminatingWrite for BufWriter<W> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TerminatingWrite for &'a mut Vec<u8> {
|
||||
impl TerminatingWrite for &mut Vec<u8> {
|
||||
fn terminate_ref(&mut self, _a: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()
|
||||
}
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 30 KiB After Width: | Height: | Size: 7.4 KiB |
@@ -51,7 +51,7 @@ fn main() -> tantivy::Result<()> {
|
||||
|
||||
// Our second field is body.
|
||||
// We want full-text search for it, but we do not
|
||||
// need to be able to be able to retrieve it
|
||||
// need to be able to retrieve it
|
||||
// for our application.
|
||||
//
|
||||
// We can make our index lighter by omitting the `STORED` flag.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
|
||||
name = "ownedbytes"
|
||||
version = "0.7.0"
|
||||
version = "0.9.0"
|
||||
edition = "2021"
|
||||
description = "Expose data as static slice"
|
||||
license = "MIT"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy-query-grammar"
|
||||
version = "0.22.0"
|
||||
version = "0.25.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
@@ -9,7 +9,9 @@ homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
nom = "7"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use std::convert::Infallible;
|
||||
|
||||
use nom::{AsChar, IResult, InputLength, InputTakeAtPosition};
|
||||
use serde::Serialize;
|
||||
|
||||
pub(crate) type ErrorList = Vec<LenientErrorInternal>;
|
||||
pub(crate) type JResult<I, O> = IResult<I, (O, ErrorList), Infallible>;
|
||||
@@ -15,7 +16,8 @@ pub(crate) struct LenientErrorInternal {
|
||||
}
|
||||
|
||||
/// A recoverable error and the position it happened at
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct LenientError {
|
||||
pub pos: usize,
|
||||
pub message: String,
|
||||
@@ -184,19 +186,19 @@ macro_rules! tuple_trait_impl(
|
||||
);
|
||||
|
||||
macro_rules! tuple_trait_inner(
|
||||
($it:tt, $self:expr, $input:expr, (), $error_list:expr, $head:ident $($id:ident)+) => ({
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, (), $error_list:expr_2021, $head:ident $($id:ident)+) => ({
|
||||
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
|
||||
$error_list.append(&mut err);
|
||||
|
||||
succ!($it, tuple_trait_inner!($self, i, ( o ), $error_list, $($id)+))
|
||||
});
|
||||
($it:tt, $self:expr, $input:expr, ($($parsed:tt)*), $error_list:expr, $head:ident $($id:ident)+) => ({
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, ($($parsed:tt)*), $error_list:expr_2021, $head:ident $($id:ident)+) => ({
|
||||
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
|
||||
$error_list.append(&mut err);
|
||||
|
||||
succ!($it, tuple_trait_inner!($self, i, ($($parsed)* , o), $error_list, $($id)+))
|
||||
});
|
||||
($it:tt, $self:expr, $input:expr, ($($parsed:tt)*), $error_list:expr, $head:ident) => ({
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, ($($parsed:tt)*), $error_list:expr_2021, $head:ident) => ({
|
||||
let (i, (o, mut err)) = $self.$it.parse($input.clone())?;
|
||||
$error_list.append(&mut err);
|
||||
|
||||
@@ -326,13 +328,13 @@ macro_rules! alt_trait_impl(
|
||||
);
|
||||
|
||||
macro_rules! alt_trait_inner(
|
||||
($it:tt, $self:expr, $input:expr, $head_cond:ident $head:ident, $($id_cond:ident $id:ident),+) => (
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, $head_cond:ident $head:ident, $($id_cond:ident $id:ident),+) => (
|
||||
match $self.$it.0.parse($input.clone()) {
|
||||
Err(_) => succ!($it, alt_trait_inner!($self, $input, $($id_cond $id),+)),
|
||||
Ok((input_left, _)) => Some($self.$it.1.parse(input_left)),
|
||||
}
|
||||
);
|
||||
($it:tt, $self:expr, $input:expr, $head_cond:ident $head:ident) => (
|
||||
($it:tt, $self:expr_2021, $input:expr_2021, $head_cond:ident $head:ident) => (
|
||||
None
|
||||
);
|
||||
);
|
||||
@@ -353,3 +355,21 @@ where
|
||||
{
|
||||
move |i: I| l.choice(i.clone()).unwrap_or_else(|| default.parse(i))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_lenient_error_serialization() {
|
||||
let error = LenientError {
|
||||
pos: 42,
|
||||
message: "test error message".to_string(),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_string(&error).unwrap(),
|
||||
"{\"pos\":42,\"message\":\"test error message\"}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#![allow(clippy::derive_partial_eq_without_eq)]
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
mod infallible;
|
||||
mod occur;
|
||||
mod query_grammar;
|
||||
@@ -12,6 +14,8 @@ pub use crate::user_input_ast::{
|
||||
Delimiter, UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral,
|
||||
};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct Error;
|
||||
|
||||
/// Parse a query
|
||||
@@ -24,3 +28,31 @@ pub fn parse_query(query: &str) -> Result<UserInputAst, Error> {
|
||||
pub fn parse_query_lenient(query: &str) -> (UserInputAst, Vec<LenientError>) {
|
||||
parse_to_ast_lenient(query)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{parse_query, parse_query_lenient};
|
||||
|
||||
#[test]
|
||||
fn test_parse_query_serialization() {
|
||||
let ast = parse_query("title:hello OR title:x").unwrap();
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"bool","clauses":[["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}],["should",{"type":"literal","field_name":"title","phrase":"x","delimiter":"none","slop":0,"prefix":false}]]}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_query_wrong_query() {
|
||||
assert!(parse_query("title:").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_query_lenient_wrong_query() {
|
||||
let (_, errors) = parse_query_lenient("title:");
|
||||
assert!(errors.len() == 1);
|
||||
let json = serde_json::to_string(&errors).unwrap();
|
||||
assert_eq!(json, r#"[{"pos":6,"message":"expected word"}]"#);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use std::fmt;
|
||||
use std::fmt::Write;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
/// Defines whether a term in a query must be present,
|
||||
/// should be present or must not be present.
|
||||
#[derive(Debug, Clone, Hash, Copy, Eq, PartialEq)]
|
||||
#[derive(Debug, Clone, Hash, Copy, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Occur {
|
||||
/// For a given document to be considered for scoring,
|
||||
/// at least one of the queries with the Should or the Must
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::borrow::Cow;
|
||||
use std::iter::once;
|
||||
|
||||
use nom::IResult;
|
||||
use nom::branch::alt;
|
||||
use nom::bytes::complete::tag;
|
||||
use nom::character::complete::{
|
||||
@@ -10,12 +11,11 @@ use nom::combinator::{eof, map, map_res, opt, peek, recognize, value, verify};
|
||||
use nom::error::{Error, ErrorKind};
|
||||
use nom::multi::{many0, many1, separated_list0};
|
||||
use nom::sequence::{delimited, preceded, separated_pair, terminated, tuple};
|
||||
use nom::IResult;
|
||||
|
||||
use super::user_input_ast::{UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral};
|
||||
use crate::Occur;
|
||||
use crate::infallible::*;
|
||||
use crate::user_input_ast::Delimiter;
|
||||
use crate::Occur;
|
||||
|
||||
// Note: '-' char is only forbidden at the beginning of a field name, would be clearer to add it to
|
||||
// special characters.
|
||||
@@ -36,7 +36,7 @@ fn field_name(inp: &str) -> IResult<&str, String> {
|
||||
alt((first_char, escape_sequence())),
|
||||
many0(alt((simple_char, escape_sequence(), char('\\')))),
|
||||
)),
|
||||
char(':'),
|
||||
tuple((multispace0, char(':'), multispace0)),
|
||||
),
|
||||
|(first_char, next)| once(first_char).chain(next).collect(),
|
||||
)(inp)
|
||||
@@ -321,7 +321,17 @@ fn exists(inp: &str) -> IResult<&str, UserInputLeaf> {
|
||||
UserInputLeaf::Exists {
|
||||
field: String::new(),
|
||||
},
|
||||
tuple((multispace0, char('*'))),
|
||||
tuple((
|
||||
multispace0,
|
||||
char('*'),
|
||||
peek(alt((
|
||||
value(
|
||||
"",
|
||||
satisfy(|c: char| c.is_whitespace() || ESCAPE_IN_WORD.contains(&c)),
|
||||
),
|
||||
eof,
|
||||
))),
|
||||
)),
|
||||
)(inp)
|
||||
}
|
||||
|
||||
@@ -331,7 +341,14 @@ fn exists_precond(inp: &str) -> IResult<&str, (), ()> {
|
||||
peek(tuple((
|
||||
field_name,
|
||||
multispace0,
|
||||
char('*'), // when we are here, we know it can't be anything but a exists
|
||||
char('*'),
|
||||
peek(alt((
|
||||
value(
|
||||
"",
|
||||
satisfy(|c: char| c.is_whitespace() || ESCAPE_IN_WORD.contains(&c)),
|
||||
),
|
||||
eof,
|
||||
))), // we need to check this isn't a wildcard query
|
||||
))),
|
||||
)(inp)
|
||||
.map_err(|e| e.map(|_| ()))
|
||||
@@ -1013,7 +1030,7 @@ fn rewrite_ast(mut input: UserInputAst) -> UserInputAst {
|
||||
|
||||
fn rewrite_ast_clause(input: &mut (Option<Occur>, UserInputAst)) {
|
||||
match input {
|
||||
(None, UserInputAst::Clause(ref mut clauses)) if clauses.len() == 1 => {
|
||||
(None, UserInputAst::Clause(clauses)) if clauses.len() == 1 => {
|
||||
*input = clauses.pop().unwrap(); // safe because clauses.len() == 1
|
||||
}
|
||||
_ => {}
|
||||
@@ -1266,6 +1283,10 @@ mod test {
|
||||
super::field_name("~my~field:a"),
|
||||
Ok(("a", "~my~field".to_string()))
|
||||
);
|
||||
assert_eq!(
|
||||
super::field_name(".my.field.name : a"),
|
||||
Ok(("a", ".my.field.name".to_string()))
|
||||
);
|
||||
for special_char in SPECIAL_CHARS.iter() {
|
||||
let query = &format!("\\{special_char}my\\{special_char}field:a");
|
||||
assert_eq!(
|
||||
@@ -1359,7 +1380,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_range_parser_lenient() {
|
||||
let literal = |query| literal_infallible(query).unwrap().1 .0.unwrap();
|
||||
let literal = |query| literal_infallible(query).unwrap().1.0.unwrap();
|
||||
|
||||
// same tests as non-lenient
|
||||
let res = literal("title: <hello");
|
||||
@@ -1497,6 +1518,11 @@ mod test {
|
||||
test_is_parse_err(r#"field:(+a -"b c""#, r#"(+"field":a -"field":"b c")"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_re_specification() {
|
||||
test_parse_query_to_ast_helper(r#"field:(abc AND b:cde)"#, r#"(+"field":abc +"b":cde)"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_query_single_term() {
|
||||
test_parse_query_to_ast_helper("abc", "abc");
|
||||
@@ -1619,13 +1645,19 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_exist_query() {
|
||||
test_parse_query_to_ast_helper("a:*", "\"a\":*");
|
||||
test_parse_query_to_ast_helper("a: *", "\"a\":*");
|
||||
// an exist followed by default term being b
|
||||
test_is_parse_err("a:*b", "(*\"a\":* *b)");
|
||||
test_parse_query_to_ast_helper("a:*", "$exists(\"a\")");
|
||||
test_parse_query_to_ast_helper("a: *", "$exists(\"a\")");
|
||||
|
||||
// this is a term query (not a phrase prefix)
|
||||
test_parse_query_to_ast_helper(
|
||||
"(hello AND toto:*) OR happy",
|
||||
"(?(+hello +$exists(\"toto\")) ?happy)",
|
||||
);
|
||||
test_parse_query_to_ast_helper("(a:*)", "$exists(\"a\")");
|
||||
|
||||
// these are term/wildcard query (not a phrase prefix)
|
||||
test_parse_query_to_ast_helper("a:b*", "\"a\":b*");
|
||||
test_parse_query_to_ast_helper("a:*b", "\"a\":*b");
|
||||
test_parse_query_to_ast_helper(r#"a:*def*"#, "\"a\":*def*");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1661,4 +1693,15 @@ mod test {
|
||||
fn test_invalid_field() {
|
||||
test_is_parse_err(r#"!bc:def"#, "!bc:def");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_space_before_value() {
|
||||
test_parse_query_to_ast_helper("field : a", r#""field":a"#);
|
||||
test_parse_query_to_ast_helper("field: a", r#""field":a"#);
|
||||
test_parse_query_to_ast_helper("field :a", r#""field":a"#);
|
||||
test_parse_query_to_ast_helper(
|
||||
"field : 'happy tax payer' AND other_field : 1",
|
||||
r#"(+"field":'happy tax payer' +"other_field":1)"#,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
use std::fmt;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::Occur;
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
#[derive(PartialEq, Clone, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum UserInputLeaf {
|
||||
Literal(UserInputLiteral),
|
||||
All,
|
||||
@@ -47,7 +51,7 @@ impl UserInputLeaf {
|
||||
|
||||
pub(crate) fn set_default_field(&mut self, default_field: String) {
|
||||
match self {
|
||||
UserInputLeaf::Literal(ref mut literal) if literal.field_name.is_none() => {
|
||||
UserInputLeaf::Literal(literal) if literal.field_name.is_none() => {
|
||||
literal.field_name = Some(default_field)
|
||||
}
|
||||
UserInputLeaf::All => {
|
||||
@@ -55,12 +59,8 @@ impl UserInputLeaf {
|
||||
field: default_field,
|
||||
}
|
||||
}
|
||||
UserInputLeaf::Range { ref mut field, .. } if field.is_none() => {
|
||||
*field = Some(default_field)
|
||||
}
|
||||
UserInputLeaf::Set { ref mut field, .. } if field.is_none() => {
|
||||
*field = Some(default_field)
|
||||
}
|
||||
UserInputLeaf::Range { field, .. } if field.is_none() => *field = Some(default_field),
|
||||
UserInputLeaf::Set { field, .. } if field.is_none() => *field = Some(default_field),
|
||||
_ => (), // field was already set, do nothing
|
||||
}
|
||||
}
|
||||
@@ -71,11 +71,11 @@ impl Debug for UserInputLeaf {
|
||||
match self {
|
||||
UserInputLeaf::Literal(literal) => literal.fmt(formatter),
|
||||
UserInputLeaf::Range {
|
||||
ref field,
|
||||
ref lower,
|
||||
ref upper,
|
||||
field,
|
||||
lower,
|
||||
upper,
|
||||
} => {
|
||||
if let Some(ref field) = field {
|
||||
if let Some(field) = field {
|
||||
// TODO properly escape field (in case of \")
|
||||
write!(formatter, "\"{field}\":")?;
|
||||
}
|
||||
@@ -85,7 +85,7 @@ impl Debug for UserInputLeaf {
|
||||
Ok(())
|
||||
}
|
||||
UserInputLeaf::Set { field, elements } => {
|
||||
if let Some(ref field) = field {
|
||||
if let Some(field) = field {
|
||||
// TODO properly escape field (in case of \")
|
||||
write!(formatter, "\"{field}\": ")?;
|
||||
}
|
||||
@@ -101,20 +101,22 @@ impl Debug for UserInputLeaf {
|
||||
}
|
||||
UserInputLeaf::All => write!(formatter, "*"),
|
||||
UserInputLeaf::Exists { field } => {
|
||||
write!(formatter, "\"{field}\":*")
|
||||
write!(formatter, "$exists(\"{field}\")")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum Delimiter {
|
||||
SingleQuotes,
|
||||
DoubleQuotes,
|
||||
None,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
#[derive(PartialEq, Clone, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub struct UserInputLiteral {
|
||||
pub field_name: Option<String>,
|
||||
pub phrase: String,
|
||||
@@ -152,7 +154,9 @@ impl fmt::Debug for UserInputLiteral {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug, Clone)]
|
||||
#[derive(PartialEq, Debug, Clone, Serialize)]
|
||||
#[serde(tag = "type", content = "value")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum UserInputBound {
|
||||
Inclusive(String),
|
||||
Exclusive(String),
|
||||
@@ -187,11 +191,38 @@ impl UserInputBound {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
#[derive(PartialEq, Clone, Serialize)]
|
||||
#[serde(into = "UserInputAstSerde")]
|
||||
pub enum UserInputAst {
|
||||
Clause(Vec<(Option<Occur>, UserInputAst)>),
|
||||
Leaf(Box<UserInputLeaf>),
|
||||
Boost(Box<UserInputAst>, f64),
|
||||
Leaf(Box<UserInputLeaf>),
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum UserInputAstSerde {
|
||||
Bool {
|
||||
clauses: Vec<(Option<Occur>, UserInputAst)>,
|
||||
},
|
||||
Boost {
|
||||
underlying: Box<UserInputAst>,
|
||||
boost: f64,
|
||||
},
|
||||
#[serde(untagged)]
|
||||
Leaf(Box<UserInputLeaf>),
|
||||
}
|
||||
|
||||
impl From<UserInputAst> for UserInputAstSerde {
|
||||
fn from(ast: UserInputAst) -> Self {
|
||||
match ast {
|
||||
UserInputAst::Clause(clause) => UserInputAstSerde::Bool { clauses: clause },
|
||||
UserInputAst::Boost(underlying, boost) => {
|
||||
UserInputAstSerde::Boost { underlying, boost }
|
||||
}
|
||||
UserInputAst::Leaf(leaf) => UserInputAstSerde::Leaf(leaf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UserInputAst {
|
||||
@@ -232,7 +263,7 @@ impl UserInputAst {
|
||||
.iter_mut()
|
||||
.for_each(|(_, ast)| ast.set_default_field(field.clone())),
|
||||
UserInputAst::Leaf(leaf) => leaf.set_default_field(field),
|
||||
UserInputAst::Boost(ref mut ast, _) => ast.set_default_field(field),
|
||||
UserInputAst::Boost(ast, _) => ast.set_default_field(field),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -285,3 +316,126 @@ impl fmt::Debug for UserInputAst {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_all_leaf_serialization() {
|
||||
let ast = UserInputAst::Leaf(Box::new(UserInputLeaf::All));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(json, r#"{"type":"all"}"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_literal_leaf_serialization() {
|
||||
let literal = UserInputLiteral {
|
||||
field_name: Some("title".to_string()),
|
||||
phrase: "hello".to_string(),
|
||||
delimiter: Delimiter::None,
|
||||
slop: 0,
|
||||
prefix: false,
|
||||
};
|
||||
let ast = UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(literal)));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_leaf_serialization() {
|
||||
let range = UserInputLeaf::Range {
|
||||
field: Some("price".to_string()),
|
||||
lower: UserInputBound::Inclusive("10".to_string()),
|
||||
upper: UserInputBound::Exclusive("100".to_string()),
|
||||
};
|
||||
let ast = UserInputAst::Leaf(Box::new(range));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"range","field":"price","lower":{"type":"inclusive","value":"10"},"upper":{"type":"exclusive","value":"100"}}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range_leaf_unbounded_serialization() {
|
||||
let range = UserInputLeaf::Range {
|
||||
field: Some("price".to_string()),
|
||||
lower: UserInputBound::Inclusive("10".to_string()),
|
||||
upper: UserInputBound::Unbounded,
|
||||
};
|
||||
let ast = UserInputAst::Leaf(Box::new(range));
|
||||
let json = serde_json::to_string(&ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"range","field":"price","lower":{"type":"inclusive","value":"10"},"upper":{"type":"unbounded"}}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_boost_serialization() {
|
||||
let inner_ast = UserInputAst::Leaf(Box::new(UserInputLeaf::All));
|
||||
let boost_ast = UserInputAst::Boost(Box::new(inner_ast), 2.5);
|
||||
let json = serde_json::to_string(&boost_ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"boost","underlying":{"type":"all"},"boost":2.5}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_boost_serialization2() {
|
||||
let boost_ast = UserInputAst::Boost(
|
||||
Box::new(UserInputAst::Clause(vec![
|
||||
(
|
||||
Some(Occur::Must),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::All)),
|
||||
),
|
||||
(
|
||||
Some(Occur::Should),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(UserInputLiteral {
|
||||
field_name: Some("title".to_string()),
|
||||
phrase: "hello".to_string(),
|
||||
delimiter: Delimiter::None,
|
||||
slop: 0,
|
||||
prefix: false,
|
||||
}))),
|
||||
),
|
||||
])),
|
||||
2.5,
|
||||
);
|
||||
let json = serde_json::to_string(&boost_ast).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"boost","underlying":{"type":"bool","clauses":[["must",{"type":"all"}],["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}]]},"boost":2.5}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clause_serialization() {
|
||||
let clause = UserInputAst::Clause(vec![
|
||||
(
|
||||
Some(Occur::Must),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::All)),
|
||||
),
|
||||
(
|
||||
Some(Occur::Should),
|
||||
UserInputAst::Leaf(Box::new(UserInputLeaf::Literal(UserInputLiteral {
|
||||
field_name: Some("title".to_string()),
|
||||
phrase: "hello".to_string(),
|
||||
delimiter: Delimiter::None,
|
||||
slop: 0,
|
||||
prefix: false,
|
||||
}))),
|
||||
),
|
||||
]);
|
||||
let json = serde_json::to_string(&clause).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"type":"bool","clauses":[["must",{"type":"all"}],["should",{"type":"literal","field_name":"title","phrase":"hello","delimiter":"none","slop":0,"prefix":false}]]}"#
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,10 +271,6 @@ impl AggregationWithAccessor {
|
||||
field: ref field_name,
|
||||
..
|
||||
})
|
||||
| Count(CountAggregation {
|
||||
field: ref field_name,
|
||||
..
|
||||
})
|
||||
| Max(MaxAggregation {
|
||||
field: ref field_name,
|
||||
..
|
||||
@@ -299,6 +295,24 @@ impl AggregationWithAccessor {
|
||||
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
||||
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
|
||||
}
|
||||
Count(CountAggregation {
|
||||
field: ref field_name,
|
||||
..
|
||||
}) => {
|
||||
let allowed_column_types = [
|
||||
ColumnType::I64,
|
||||
ColumnType::U64,
|
||||
ColumnType::F64,
|
||||
ColumnType::Str,
|
||||
ColumnType::DateTime,
|
||||
ColumnType::Bool,
|
||||
ColumnType::IpAddr,
|
||||
// ColumnType::Bytes Unsupported
|
||||
];
|
||||
let (accessor, column_type) =
|
||||
get_ff_reader(reader, field_name, Some(&allowed_column_types))?;
|
||||
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
|
||||
}
|
||||
Percentiles(ref percentiles) => {
|
||||
let (accessor, column_type) = get_ff_reader(
|
||||
reader,
|
||||
|
||||
@@ -34,10 +34,10 @@ use crate::aggregation::*;
|
||||
pub struct DateHistogramAggregationReq {
|
||||
#[doc(hidden)]
|
||||
/// Only for validation
|
||||
interval: Option<String>,
|
||||
pub interval: Option<String>,
|
||||
#[doc(hidden)]
|
||||
/// Only for validation
|
||||
calendar_interval: Option<String>,
|
||||
pub calendar_interval: Option<String>,
|
||||
/// The field to aggregate on.
|
||||
pub field: String,
|
||||
/// The format to format dates. Unsupported currently.
|
||||
|
||||
@@ -518,7 +518,7 @@ impl SegmentTermCollector {
|
||||
|term| {
|
||||
let entry = entries[idx];
|
||||
let intermediate_entry = into_intermediate_bucket_entry(entry.0, entry.1)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
.map_err(io::Error::other)?;
|
||||
dict.insert(
|
||||
IntermediateKey::Str(
|
||||
String::from_utf8(term.to_vec()).expect("could not convert to String"),
|
||||
|
||||
@@ -220,9 +220,23 @@ impl SegmentStatsCollector {
|
||||
.column_block_accessor
|
||||
.fetch_block(docs, &agg_accessor.accessor);
|
||||
}
|
||||
for val in agg_accessor.column_block_accessor.iter_vals() {
|
||||
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.stats.collect(val1);
|
||||
if [
|
||||
ColumnType::I64,
|
||||
ColumnType::U64,
|
||||
ColumnType::F64,
|
||||
ColumnType::DateTime,
|
||||
]
|
||||
.contains(&self.field_type)
|
||||
{
|
||||
for val in agg_accessor.column_block_accessor.iter_vals() {
|
||||
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.stats.collect(val1);
|
||||
}
|
||||
} else {
|
||||
for _val in agg_accessor.column_block_accessor.iter_vals() {
|
||||
// we ignore the value and simply record that we got something
|
||||
self.stats.collect(0.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -435,6 +449,11 @@ mod tests {
|
||||
"field": "score",
|
||||
},
|
||||
},
|
||||
"count_str": {
|
||||
"value_count": {
|
||||
"field": "text",
|
||||
},
|
||||
},
|
||||
"range": range_agg
|
||||
}))
|
||||
.unwrap();
|
||||
@@ -500,6 +519,13 @@ mod tests {
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
res["count_str"],
|
||||
json!({
|
||||
"value": 7.0,
|
||||
})
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -229,6 +229,7 @@ impl TopHitsAggregationReq {
|
||||
self.sort
|
||||
.iter()
|
||||
.map(|KeyOrder { field, .. }| field.as_str())
|
||||
.chain(self.doc_value_fields.iter().map(|s| s.as_str()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
||||
@@ -366,8 +366,12 @@ impl PartialEq for Key {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::Str(l), Self::Str(r)) => l == r,
|
||||
(Self::F64(l), Self::F64(r)) => l == r,
|
||||
_ => false,
|
||||
(Self::F64(l), Self::F64(r)) => l.to_bits() == r.to_bits(),
|
||||
(Self::I64(l), Self::I64(r)) => l == r,
|
||||
(Self::U64(l), Self::U64(r)) => l == r,
|
||||
// we list all variant of left operand to make sure this gets updated when we add
|
||||
// variants to the enum
|
||||
(Self::Str(_) | Self::F64(_) | Self::I64(_) | Self::U64(_), _) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -578,7 +582,7 @@ mod tests {
|
||||
.set_indexing_options(
|
||||
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
|
||||
)
|
||||
.set_fast(None)
|
||||
.set_fast(Some("raw"))
|
||||
.set_stored();
|
||||
let text_field = schema_builder.add_text_field("text", text_fieldtype);
|
||||
let date_field = schema_builder.add_date_field("date", FAST);
|
||||
|
||||
@@ -739,7 +739,7 @@ mod tests {
|
||||
.flat_map(|(c, count)| {
|
||||
let facet = Facet::from(&format!("/facet/{c}"));
|
||||
let doc = doc!(facet_field => facet);
|
||||
iter::repeat(doc).take(count)
|
||||
std::iter::repeat_n(doc, count)
|
||||
})
|
||||
.map(|mut doc| {
|
||||
doc.add_facet(
|
||||
@@ -787,7 +787,7 @@ mod tests {
|
||||
.flat_map(|(c, count)| {
|
||||
let facet = Facet::from(&format!("/facet/{c}"));
|
||||
let doc = doc!(facet_field => facet);
|
||||
iter::repeat(doc).take(count)
|
||||
std::iter::repeat_n(doc, count)
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
||||
@@ -2,11 +2,13 @@ use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use columnar::ColumnValues;
|
||||
use columnar::{ColumnValues, StrColumn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::Collector;
|
||||
use crate::collector::custom_score_top_collector::CustomScoreTopCollector;
|
||||
use crate::collector::custom_score_top_collector::{
|
||||
CustomScoreTopCollector, CustomScoreTopSegmentCollector,
|
||||
};
|
||||
use crate::collector::top_collector::{ComparableDoc, TopCollector, TopSegmentCollector};
|
||||
use crate::collector::tweak_score_top_collector::TweakedScoreTopCollector;
|
||||
use crate::collector::{
|
||||
@@ -14,6 +16,7 @@ use crate::collector::{
|
||||
};
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastValue};
|
||||
use crate::query::Weight;
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::{DocAddress, DocId, Order, Score, SegmentOrdinal, SegmentReader, TantivyError};
|
||||
|
||||
struct FastFieldConvertCollector<
|
||||
@@ -83,6 +86,163 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
struct StringConvertCollector {
|
||||
pub collector: CustomScoreTopCollector<ScorerByField, u64>,
|
||||
pub field: String,
|
||||
order: Order,
|
||||
limit: usize,
|
||||
offset: usize,
|
||||
}
|
||||
|
||||
impl Collector for StringConvertCollector {
|
||||
type Fruit = Vec<(String, DocAddress)>;
|
||||
|
||||
type Child = StringConvertSegmentCollector;
|
||||
|
||||
fn for_segment(
|
||||
&self,
|
||||
segment_local_id: crate::SegmentOrdinal,
|
||||
segment: &SegmentReader,
|
||||
) -> crate::Result<Self::Child> {
|
||||
let schema = segment.schema();
|
||||
let field = schema.get_field(&self.field)?;
|
||||
let field_entry = schema.get_field_entry(field);
|
||||
if !field_entry.is_fast() {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {:?} is not a fast field.",
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let requested_type = crate::schema::Type::Str;
|
||||
let schema_type = field_entry.field_type().value_type();
|
||||
if schema_type != requested_type {
|
||||
return Err(TantivyError::SchemaError(format!(
|
||||
"Field {:?} is of type {schema_type:?}!={requested_type:?}",
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let ff = segment
|
||||
.fast_fields()
|
||||
.str(&self.field)?
|
||||
.expect("ff should be a str field");
|
||||
Ok(StringConvertSegmentCollector {
|
||||
collector: self.collector.for_segment(segment_local_id, segment)?,
|
||||
ff,
|
||||
order: self.order.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
self.collector.requires_scoring()
|
||||
}
|
||||
|
||||
fn merge_fruits(
|
||||
&self,
|
||||
child_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
|
||||
) -> crate::Result<Self::Fruit> {
|
||||
if self.limit == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
if self.order.is_desc() {
|
||||
let mut top_collector: TopNComputer<_, _, true> =
|
||||
TopNComputer::new(self.limit + self.offset);
|
||||
for child_fruit in child_fruits {
|
||||
for (feature, doc) in child_fruit {
|
||||
top_collector.push(feature, doc);
|
||||
}
|
||||
}
|
||||
Ok(top_collector
|
||||
.into_sorted_vec()
|
||||
.into_iter()
|
||||
.skip(self.offset)
|
||||
.map(|cdoc| (cdoc.feature, cdoc.doc))
|
||||
.collect())
|
||||
} else {
|
||||
let mut top_collector: TopNComputer<_, _, false> =
|
||||
TopNComputer::new(self.limit + self.offset);
|
||||
for child_fruit in child_fruits {
|
||||
for (feature, doc) in child_fruit {
|
||||
top_collector.push(feature, doc);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(top_collector
|
||||
.into_sorted_vec()
|
||||
.into_iter()
|
||||
.skip(self.offset)
|
||||
.map(|cdoc| (cdoc.feature, cdoc.doc))
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct StringConvertSegmentCollector {
|
||||
pub collector: CustomScoreTopSegmentCollector<ScorerByFastFieldReader, u64>,
|
||||
ff: StrColumn,
|
||||
order: Order,
|
||||
}
|
||||
|
||||
impl SegmentCollector for StringConvertSegmentCollector {
|
||||
type Fruit = Vec<(String, DocAddress)>;
|
||||
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
self.collector.collect(doc, score);
|
||||
}
|
||||
|
||||
fn harvest(self) -> Vec<(String, DocAddress)> {
|
||||
let top_ordinals: Vec<(TermOrdinal, DocAddress)> = self.collector.harvest();
|
||||
|
||||
// Collect terms.
|
||||
let mut terms: Vec<String> = Vec::with_capacity(top_ordinals.len());
|
||||
let result = if self.order.is_asc() {
|
||||
self.ff.dictionary().sorted_ords_to_term_cb(
|
||||
top_ordinals.iter().map(|(term_ord, _)| u64::MAX - term_ord),
|
||||
|term| {
|
||||
terms.push(
|
||||
std::str::from_utf8(term)
|
||||
.expect("Failed to decode term as unicode")
|
||||
.to_owned(),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
} else {
|
||||
self.ff.dictionary().sorted_ords_to_term_cb(
|
||||
top_ordinals.iter().rev().map(|(term_ord, _)| *term_ord),
|
||||
|term| {
|
||||
terms.push(
|
||||
std::str::from_utf8(term)
|
||||
.expect("Failed to decode term as unicode")
|
||||
.to_owned(),
|
||||
);
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
};
|
||||
|
||||
assert!(
|
||||
result.expect("Failed to read terms from term dictionary"),
|
||||
"Not all terms were matched in segment."
|
||||
);
|
||||
|
||||
// Zip them back with their docs.
|
||||
if self.order.is_asc() {
|
||||
terms
|
||||
.into_iter()
|
||||
.zip(top_ordinals)
|
||||
.map(|(term, (_, doc))| (term, doc))
|
||||
.collect()
|
||||
} else {
|
||||
terms
|
||||
.into_iter()
|
||||
.rev()
|
||||
.zip(top_ordinals)
|
||||
.map(|(term, (_, doc))| (term, doc))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `TopDocs` collector keeps track of the top `K` documents
|
||||
/// sorted by their score.
|
||||
///
|
||||
@@ -410,6 +570,30 @@ impl TopDocs {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like `order_by_fast_field`, but for a `String` fast field.
|
||||
pub fn order_by_string_fast_field(
|
||||
self,
|
||||
fast_field: impl ToString,
|
||||
order: Order,
|
||||
) -> impl Collector<Fruit = Vec<(String, DocAddress)>> {
|
||||
let limit = self.0.limit;
|
||||
let offset = self.0.offset;
|
||||
let u64_collector = CustomScoreTopCollector::new(
|
||||
ScorerByField {
|
||||
field: fast_field.to_string(),
|
||||
order: order.clone(),
|
||||
},
|
||||
self.0.into_tscore(),
|
||||
);
|
||||
StringConvertCollector {
|
||||
collector: u64_collector,
|
||||
field: fast_field.to_string(),
|
||||
order,
|
||||
limit,
|
||||
offset,
|
||||
}
|
||||
}
|
||||
|
||||
/// Ranks the documents using a custom score.
|
||||
///
|
||||
/// This method offers a convenient way to tweak or replace
|
||||
@@ -786,7 +970,7 @@ impl<Score, D, const R: bool> From<TopNComputerDeser<Score, D, R>> for TopNCompu
|
||||
}
|
||||
}
|
||||
|
||||
impl<Score, D, const R: bool> TopNComputer<Score, D, R>
|
||||
impl<Score, D, const REVERSE_ORDER: bool> TopNComputer<Score, D, REVERSE_ORDER>
|
||||
where
|
||||
Score: PartialOrd + Clone,
|
||||
D: Ord,
|
||||
@@ -807,7 +991,10 @@ where
|
||||
#[inline]
|
||||
pub fn push(&mut self, feature: Score, doc: D) {
|
||||
if let Some(last_median) = self.threshold.clone() {
|
||||
if feature < last_median {
|
||||
if !REVERSE_ORDER && feature > last_median {
|
||||
return;
|
||||
}
|
||||
if REVERSE_ORDER && feature < last_median {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -842,7 +1029,7 @@ where
|
||||
}
|
||||
|
||||
/// Returns the top n elements in sorted order.
|
||||
pub fn into_sorted_vec(mut self) -> Vec<ComparableDoc<Score, D, R>> {
|
||||
pub fn into_sorted_vec(mut self) -> Vec<ComparableDoc<Score, D, REVERSE_ORDER>> {
|
||||
if self.buffer.len() > self.top_n {
|
||||
self.truncate_top_n();
|
||||
}
|
||||
@@ -853,7 +1040,7 @@ where
|
||||
/// Returns the top n elements in stored order.
|
||||
/// Useful if you do not need the elements in sorted order,
|
||||
/// for example when merging the results of multiple segments.
|
||||
pub fn into_vec(mut self) -> Vec<ComparableDoc<Score, D, R>> {
|
||||
pub fn into_vec(mut self) -> Vec<ComparableDoc<Score, D, REVERSE_ORDER>> {
|
||||
if self.buffer.len() > self.top_n {
|
||||
self.truncate_top_n();
|
||||
}
|
||||
@@ -863,9 +1050,11 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use proptest::prelude::*;
|
||||
|
||||
use super::{TopDocs, TopNComputer};
|
||||
use crate::collector::top_collector::ComparableDoc;
|
||||
use crate::collector::Collector;
|
||||
use crate::collector::{Collector, DocSetCollector};
|
||||
use crate::query::{AllQuery, Query, QueryParser};
|
||||
use crate::schema::{Field, Schema, FAST, STORED, TEXT};
|
||||
use crate::time::format_description::well_known::Rfc3339;
|
||||
@@ -960,6 +1149,44 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_topn_computer_asc_prop(
|
||||
limit in 0..10_usize,
|
||||
docs in proptest::collection::vec((0..100_u64, 0..100_u64), 0..100_usize),
|
||||
) {
|
||||
let mut computer: TopNComputer<_, _, false> = TopNComputer::new(limit);
|
||||
for (feature, doc) in &docs {
|
||||
computer.push(*feature, *doc);
|
||||
}
|
||||
let mut comparable_docs = docs.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc }).collect::<Vec<_>>();
|
||||
comparable_docs.sort();
|
||||
comparable_docs.truncate(limit);
|
||||
prop_assert_eq!(
|
||||
computer.into_sorted_vec(),
|
||||
comparable_docs,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_topn_computer_desc_prop(
|
||||
limit in 0..10_usize,
|
||||
docs in proptest::collection::vec((0..100_u64, 0..100_u64), 0..100_usize),
|
||||
) {
|
||||
let mut computer: TopNComputer<_, _, true> = TopNComputer::new(limit);
|
||||
for (feature, doc) in &docs {
|
||||
computer.push(*feature, *doc);
|
||||
}
|
||||
let mut comparable_docs = docs.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc }).collect::<Vec<_>>();
|
||||
comparable_docs.sort();
|
||||
comparable_docs.truncate(limit);
|
||||
prop_assert_eq!(
|
||||
computer.into_sorted_vec(),
|
||||
comparable_docs,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_collector_not_at_capacity_without_offset() -> crate::Result<()> {
|
||||
let index = make_index()?;
|
||||
@@ -1066,6 +1293,220 @@ mod tests {
|
||||
assert_eq!(page_0, &page_2[..page_0.len()]);
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(20))]
|
||||
/// Build multiple segments with equal-scoring docs and verify stable ordering
|
||||
/// across pages when increasing limit or offset.
|
||||
#[test]
|
||||
fn proptest_stable_ordering_across_segments_with_pagination(
|
||||
docs_per_segment in proptest::collection::vec(1usize..50, 2..5)
|
||||
) {
|
||||
use crate::indexer::NoMergePolicy;
|
||||
|
||||
// Build an index with multiple segments; all docs will have the same score using AllQuery.
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
|
||||
for num_docs in &docs_per_segment {
|
||||
for _ in 0..*num_docs {
|
||||
writer.add_document(doc!(text => "x")).unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
|
||||
let total_docs: usize = docs_per_segment.iter().sum();
|
||||
// Full result set, first assert all scores are identical.
|
||||
let full_with_scores: Vec<(Score, DocAddress)> = searcher
|
||||
.search(&AllQuery, &TopDocs::with_limit(total_docs))
|
||||
.unwrap();
|
||||
// Sanity: at least one document was returned.
|
||||
prop_assert!(!full_with_scores.is_empty());
|
||||
let first_score = full_with_scores[0].0;
|
||||
prop_assert!(full_with_scores.iter().all(|(score, _)| *score == first_score));
|
||||
|
||||
// Keep only the addresses for the remaining checks.
|
||||
let full: Vec<DocAddress> = full_with_scores
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
|
||||
// Sanity: we actually created multiple segments and have documents.
|
||||
prop_assert!(docs_per_segment.len() >= 2);
|
||||
prop_assert!(total_docs >= 2);
|
||||
|
||||
// 1) Increasing limit should preserve prefix ordering.
|
||||
for k in 1..=total_docs {
|
||||
let page: Vec<DocAddress> = searcher
|
||||
.search(&AllQuery, &TopDocs::with_limit(k))
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
prop_assert_eq!(page, full[..k].to_vec());
|
||||
}
|
||||
|
||||
// 2) Offset + limit pages should always match the corresponding slice.
|
||||
// For each offset, check three representative page sizes:
|
||||
// - first page (size 1)
|
||||
// - a middle page (roughly half of remaining)
|
||||
// - the last page (size = remaining)
|
||||
for offset in 0..total_docs {
|
||||
let remaining = total_docs - offset;
|
||||
|
||||
let assert_page_eq = |limit: usize| -> proptest::test_runner::TestCaseResult {
|
||||
let page: Vec<DocAddress> = searcher
|
||||
.search(&AllQuery, &TopDocs::with_limit(limit).and_offset(offset))
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
prop_assert_eq!(page, full[offset..offset + limit].to_vec());
|
||||
Ok(())
|
||||
};
|
||||
|
||||
// Smallest page.
|
||||
assert_page_eq(1)?;
|
||||
// A middle-sized page (dedupes to 1 if remaining == 1).
|
||||
assert_page_eq((remaining / 2).max(1))?;
|
||||
// Largest page for this offset.
|
||||
assert_page_eq(remaining)?;
|
||||
}
|
||||
|
||||
// 3) Concatenating fixed-size pages by offset reproduces the full order.
|
||||
for page_size in 1..=total_docs.min(5) {
|
||||
let mut concat: Vec<DocAddress> = Vec::new();
|
||||
let mut offset = 0;
|
||||
while offset < total_docs {
|
||||
let size = page_size.min(total_docs - offset);
|
||||
let page: Vec<DocAddress> = searcher
|
||||
.search(&AllQuery, &TopDocs::with_limit(size).and_offset(offset))
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
concat.extend(page);
|
||||
offset += size;
|
||||
}
|
||||
// Avoid moving `full` across loop iterations.
|
||||
prop_assert_eq!(concat, full.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#![proptest_config(ProptestConfig::with_cases(20))]
|
||||
/// Build multiple segments with same-scoring term matches and verify stable ordering
|
||||
/// across pages for a real scoring query (TermQuery with identical TF and fieldnorm).
|
||||
#[test]
|
||||
fn proptest_stable_ordering_across_segments_with_term_query_and_pagination(
|
||||
docs_per_segment in proptest::collection::vec(1usize..50, 2..5)
|
||||
) {
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::query::TermQuery;
|
||||
use crate::Term;
|
||||
|
||||
// Build an index with multiple segments; each doc has exactly one token "x",
|
||||
// ensuring equal BM25 scores across all matching docs (same TF=1 and fieldnorm=1).
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
|
||||
for num_docs in &docs_per_segment {
|
||||
for _ in 0..*num_docs {
|
||||
writer.add_document(doc!(text => "x")).unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
|
||||
let total_docs: usize = docs_per_segment.iter().sum();
|
||||
let term = Term::from_field_text(text, "x");
|
||||
let tq = TermQuery::new(term, IndexRecordOption::WithFreqs);
|
||||
|
||||
// Full result set, first assert all scores are identical across docs.
|
||||
let full_with_scores: Vec<(Score, DocAddress)> = searcher
|
||||
.search(&tq, &TopDocs::with_limit(total_docs))
|
||||
.unwrap();
|
||||
// Sanity: at least one document was returned.
|
||||
prop_assert!(!full_with_scores.is_empty());
|
||||
let first_score = full_with_scores[0].0;
|
||||
prop_assert!(full_with_scores.iter().all(|(score, _)| *score == first_score));
|
||||
|
||||
// Keep only the addresses for the remaining checks.
|
||||
let full: Vec<DocAddress> = full_with_scores
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
|
||||
// Sanity: we actually created multiple segments and have documents.
|
||||
prop_assert!(docs_per_segment.len() >= 2);
|
||||
prop_assert!(total_docs >= 2);
|
||||
|
||||
// 1) Increasing limit should preserve prefix ordering.
|
||||
for k in 1..=total_docs {
|
||||
let page: Vec<DocAddress> = searcher
|
||||
.search(&tq, &TopDocs::with_limit(k))
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
prop_assert_eq!(page, full[..k].to_vec());
|
||||
}
|
||||
|
||||
// 2) Offset + limit pages should always match the corresponding slice.
|
||||
// Check three representative page sizes for each offset: 1, ~half, and remaining.
|
||||
for offset in 0..total_docs {
|
||||
let remaining = total_docs - offset;
|
||||
|
||||
let assert_page_eq = |limit: usize| -> proptest::test_runner::TestCaseResult {
|
||||
let page: Vec<DocAddress> = searcher
|
||||
.search(&tq, &TopDocs::with_limit(limit).and_offset(offset))
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
prop_assert_eq!(page, full[offset..offset + limit].to_vec());
|
||||
Ok(())
|
||||
};
|
||||
|
||||
assert_page_eq(1)?;
|
||||
assert_page_eq((remaining / 2).max(1))?;
|
||||
assert_page_eq(remaining)?;
|
||||
}
|
||||
|
||||
// 3) Concatenating fixed-size pages by offset reproduces the full order.
|
||||
for page_size in 1..=total_docs.min(5) {
|
||||
let mut concat: Vec<DocAddress> = Vec::new();
|
||||
let mut offset = 0;
|
||||
while offset < total_docs {
|
||||
let size = page_size.min(total_docs - offset);
|
||||
let page: Vec<DocAddress> = searcher
|
||||
.search(&tq, &TopDocs::with_limit(size).and_offset(offset))
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|(_score, addr)| addr)
|
||||
.collect();
|
||||
concat.extend(page);
|
||||
offset += size;
|
||||
}
|
||||
prop_assert_eq!(concat, full.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_top_0() {
|
||||
@@ -1214,6 +1655,160 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_field_collector_string() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let city = schema_builder.add_text_field("city", TEXT | FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(
|
||||
city => "austin",
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
city => "greenville",
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
city => "tokyo",
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
|
||||
fn query(
|
||||
index: &Index,
|
||||
order: Order,
|
||||
limit: usize,
|
||||
offset: usize,
|
||||
) -> crate::Result<Vec<(String, DocAddress)>> {
|
||||
let searcher = index.reader()?.searcher();
|
||||
let top_collector = TopDocs::with_limit(limit)
|
||||
.and_offset(offset)
|
||||
.order_by_string_fast_field("city", order);
|
||||
searcher.search(&AllQuery, &top_collector)
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Desc, 3, 0)?,
|
||||
&[
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Desc, 2, 0)?,
|
||||
&[
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(&query(&index, Order::Desc, 3, 3)?, &[]);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Desc, 2, 1)?,
|
||||
&[
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Asc, 3, 0)?,
|
||||
&[
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Asc, 2, 1)?,
|
||||
&[
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
("tokyo".to_owned(), DocAddress::new(0, 2)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
&query(&index, Order::Asc, 2, 0)?,
|
||||
&[
|
||||
("austin".to_owned(), DocAddress::new(0, 0)),
|
||||
("greenville".to_owned(), DocAddress::new(0, 1)),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(&query(&index, Order::Asc, 3, 3)?, &[]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_top_field_collect_string_prop(
|
||||
order in prop_oneof!(Just(Order::Desc), Just(Order::Asc)),
|
||||
limit in 1..256_usize,
|
||||
offset in 0..256_usize,
|
||||
segments_terms in
|
||||
proptest::collection::vec(
|
||||
proptest::collection::vec(0..32_u8, 1..32_usize),
|
||||
0..8_usize,
|
||||
)
|
||||
) {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let city = schema_builder.add_text_field("city", TEXT | FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
|
||||
// A Vec<Vec<u8>>, where the outer Vec represents segments, and the inner Vec
|
||||
// represents terms.
|
||||
for segment_terms in segments_terms.into_iter() {
|
||||
for term in segment_terms.into_iter() {
|
||||
let term = format!("{term:0>3}");
|
||||
index_writer.add_document(doc!(
|
||||
city => term,
|
||||
))?;
|
||||
}
|
||||
index_writer.commit()?;
|
||||
}
|
||||
|
||||
let searcher = index.reader()?.searcher();
|
||||
let top_n_results = searcher.search(&AllQuery, &TopDocs::with_limit(limit)
|
||||
.and_offset(offset)
|
||||
.order_by_string_fast_field("city", order.clone()))?;
|
||||
let all_results = searcher.search(&AllQuery, &DocSetCollector)?.into_iter().map(|doc_address| {
|
||||
// Get the term for this address.
|
||||
// NOTE: We can't determine the SegmentIds that will be generated for Segments
|
||||
// ahead of time, so we can't pre-compute the expected `DocAddress`es.
|
||||
let column = searcher.segment_readers()[doc_address.segment_ord as usize].fast_fields().str("city").unwrap().unwrap();
|
||||
let term_ord = column.term_ords(doc_address.doc_id).next().unwrap();
|
||||
let mut city = Vec::new();
|
||||
column.dictionary().ord_to_term(term_ord, &mut city).unwrap();
|
||||
(String::try_from(city).unwrap(), doc_address)
|
||||
});
|
||||
|
||||
// Using the TopDocs collector should always be equivalent to sorting, skipping the
|
||||
// offset, and then taking the limit.
|
||||
let sorted_docs: Vec<_> = if order.is_desc() {
|
||||
let mut comparable_docs: Vec<ComparableDoc<_, _, true>> =
|
||||
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
|
||||
comparable_docs.sort();
|
||||
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
|
||||
} else {
|
||||
let mut comparable_docs: Vec<ComparableDoc<_, _, false>> =
|
||||
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
|
||||
comparable_docs.sort();
|
||||
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
|
||||
};
|
||||
let expected_docs = sorted_docs.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
|
||||
prop_assert_eq!(
|
||||
expected_docs,
|
||||
top_n_results
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_field_does_not_exist() {
|
||||
@@ -1373,4 +1968,29 @@ mod tests {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_topn_computer_asc() {
|
||||
let mut computer: TopNComputer<u32, u32, false> = TopNComputer::new(2);
|
||||
|
||||
computer.push(1u32, 1u32);
|
||||
computer.push(2u32, 2u32);
|
||||
computer.push(3u32, 3u32);
|
||||
computer.push(2u32, 4u32);
|
||||
computer.push(4u32, 5u32);
|
||||
computer.push(1u32, 6u32);
|
||||
assert_eq!(
|
||||
computer.into_sorted_vec(),
|
||||
&[
|
||||
ComparableDoc {
|
||||
feature: 1u32,
|
||||
doc: 1u32,
|
||||
},
|
||||
ComparableDoc {
|
||||
feature: 1u32,
|
||||
doc: 6u32,
|
||||
}
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ fn create_format() {
|
||||
}
|
||||
|
||||
fn path_for_version(version: &str) -> String {
|
||||
format!("./tests/compat_tests_data/index_v{}/", version)
|
||||
format!("./tests/compat_tests_data/index_v{version}/")
|
||||
}
|
||||
|
||||
/// feature flag quickwit uses a different dictionary type
|
||||
|
||||
@@ -41,16 +41,12 @@ impl Executor {
|
||||
///
|
||||
/// Regardless of the executor (`SingleThread` or `ThreadPool`), panics in the task
|
||||
/// will propagate to the caller.
|
||||
pub fn map<
|
||||
pub fn map<A, R, F>(&self, f: F, args: impl Iterator<Item = A>) -> crate::Result<Vec<R>>
|
||||
where
|
||||
A: Send,
|
||||
R: Send,
|
||||
AIterator: Iterator<Item = A>,
|
||||
F: Sized + Sync + Fn(A) -> crate::Result<R>,
|
||||
>(
|
||||
&self,
|
||||
f: F,
|
||||
args: AIterator,
|
||||
) -> crate::Result<Vec<R>> {
|
||||
{
|
||||
match self {
|
||||
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
||||
Executor::ThreadPool(pool) => {
|
||||
@@ -69,8 +65,7 @@ impl Executor {
|
||||
if let Err(err) = fruit_sender_ref.send((idx, fruit)) {
|
||||
error!(
|
||||
"Failed to send search task. It probably means all search \
|
||||
threads have panicked. {:?}",
|
||||
err
|
||||
threads have panicked. {err:?}"
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -214,7 +214,7 @@ impl Searcher {
|
||||
/// It is powerless at making search faster if your index consists in
|
||||
/// one large segment.
|
||||
///
|
||||
/// Also, keep in my multithreading a single query on several
|
||||
/// Also, keep in mind multithreading a single query on several
|
||||
/// threads will not improve your throughput. It can actually
|
||||
/// hurt it. It will however, decrease the average response time.
|
||||
pub fn search_with_executor<C: Collector>(
|
||||
|
||||
@@ -56,7 +56,7 @@ impl<T: Send + Sync + 'static> From<Box<T>> for DirectoryLock {
|
||||
impl Drop for DirectoryLockGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.directory.delete(&self.path) {
|
||||
error!("Failed to remove the lock file. {:?}", e);
|
||||
error!("Failed to remove the lock file. {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ impl FileWatcher {
|
||||
.map(|current_checksum| current_checksum != checksum)
|
||||
.unwrap_or(true);
|
||||
if metafile_has_changed {
|
||||
info!("Meta file {:?} was modified", path);
|
||||
info!("Meta file {path:?} was modified");
|
||||
current_checksum_opt = Some(checksum);
|
||||
// We actually ignore callbacks failing here.
|
||||
// We just wait for the end of their execution.
|
||||
@@ -75,7 +75,7 @@ impl FileWatcher {
|
||||
let reader = match fs::File::open(path) {
|
||||
Ok(f) => io::BufReader::new(f),
|
||||
Err(e) => {
|
||||
warn!("Failed to open meta file {:?}: {:?}", path, e);
|
||||
warn!("Failed to open meta file {path:?}: {e:?}");
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,3 +1,9 @@
|
||||
//! The footer is a small metadata structure that is appended at the end of every file.
|
||||
//!
|
||||
//! The footer is used to store a checksum of the file content.
|
||||
//! The footer also stores the version of the index format.
|
||||
//! This version is used to detect incompatibility between the index and the library version.
|
||||
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
@@ -20,20 +26,22 @@ type CrcHashU32 = u32;
|
||||
/// A Footer is appended to every file
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Footer {
|
||||
/// The version of the index format
|
||||
pub version: Version,
|
||||
/// The crc32 hash of the body
|
||||
pub crc: CrcHashU32,
|
||||
}
|
||||
|
||||
impl Footer {
|
||||
pub fn new(crc: CrcHashU32) -> Self {
|
||||
pub(crate) fn new(crc: CrcHashU32) -> Self {
|
||||
let version = crate::VERSION.clone();
|
||||
Footer { version, crc }
|
||||
}
|
||||
|
||||
pub fn crc(&self) -> CrcHashU32 {
|
||||
pub(crate) fn crc(&self) -> CrcHashU32 {
|
||||
self.crc
|
||||
}
|
||||
pub fn append_footer<W: io::Write>(&self, mut write: &mut W) -> io::Result<()> {
|
||||
pub(crate) fn append_footer<W: io::Write>(&self, mut write: &mut W) -> io::Result<()> {
|
||||
let mut counting_write = CountingWriter::wrap(&mut write);
|
||||
counting_write.write_all(serde_json::to_string(&self)?.as_ref())?;
|
||||
let footer_payload_len = counting_write.written_bytes();
|
||||
@@ -42,6 +50,7 @@ impl Footer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extracts the tantivy Footer from the file and returns the footer and the rest of the file
|
||||
pub fn extract_footer(file: FileSlice) -> io::Result<(Footer, FileSlice)> {
|
||||
if file.len() < 4 {
|
||||
return Err(io::Error::new(
|
||||
|
||||
@@ -157,7 +157,7 @@ impl ManagedDirectory {
|
||||
for file_to_delete in files_to_delete {
|
||||
match self.delete(&file_to_delete) {
|
||||
Ok(_) => {
|
||||
info!("Deleted {:?}", file_to_delete);
|
||||
info!("Deleted {file_to_delete:?}");
|
||||
deleted_files.push(file_to_delete);
|
||||
}
|
||||
Err(file_error) => {
|
||||
@@ -170,7 +170,7 @@ impl ManagedDirectory {
|
||||
if !cfg!(target_os = "windows") {
|
||||
// On windows, delete is expected to fail if the file
|
||||
// is mmapped.
|
||||
error!("Failed to delete {:?}", file_to_delete);
|
||||
error!("Failed to delete {file_to_delete:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock, Weak};
|
||||
|
||||
use common::StableDeref;
|
||||
use fs4::FileExt;
|
||||
use fs4::fs_std::FileExt;
|
||||
#[cfg(all(feature = "mmap", unix))]
|
||||
pub use memmap2::Advice;
|
||||
use memmap2::Mmap;
|
||||
@@ -29,7 +29,7 @@ pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
|
||||
/// Create a default io error given a string.
|
||||
pub(crate) fn make_io_err(msg: String) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, msg)
|
||||
io::Error::other(msg)
|
||||
}
|
||||
|
||||
/// Returns `None` iff the file exists, can be read, but is empty (and hence
|
||||
@@ -369,7 +369,7 @@ pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
debug!("Open Read {path:?}");
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
||||
@@ -414,7 +414,7 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
|
||||
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
debug!("Open Write {:?}", path);
|
||||
debug!("Open Write {path:?}");
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let open_res = OpenOptions::new()
|
||||
@@ -467,7 +467,7 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
|
||||
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
debug!("Atomic Write {:?}", path);
|
||||
debug!("Atomic Write {path:?}");
|
||||
let full_path = self.resolve_path(path);
|
||||
atomic_write(&full_path, content)?;
|
||||
Ok(())
|
||||
@@ -485,7 +485,9 @@ impl Directory for MmapDirectory {
|
||||
if lock.is_blocking {
|
||||
file.lock_exclusive().map_err(LockError::wrap_io_error)?;
|
||||
} else {
|
||||
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
|
||||
if !file.try_lock_exclusive().map_err(|_| LockError::LockBusy)? {
|
||||
return Err(LockError::LockBusy);
|
||||
}
|
||||
}
|
||||
// dropping the file handle will release the lock.
|
||||
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
|
||||
|
||||
@@ -6,7 +6,7 @@ mod mmap_directory;
|
||||
mod directory;
|
||||
mod directory_lock;
|
||||
mod file_watcher;
|
||||
mod footer;
|
||||
pub mod footer;
|
||||
mod managed_directory;
|
||||
mod ram_directory;
|
||||
mod watch_event_router;
|
||||
|
||||
@@ -191,7 +191,7 @@ impl Directory for RamDirectory {
|
||||
.fs
|
||||
.read()
|
||||
.map_err(|e| OpenReadError::IoError {
|
||||
io_error: Arc::new(io::Error::new(io::ErrorKind::Other, e.to_string())),
|
||||
io_error: Arc::new(io::Error::other(e.to_string())),
|
||||
filepath: path.to_path_buf(),
|
||||
})?
|
||||
.exists(path))
|
||||
|
||||
@@ -90,10 +90,7 @@ impl WatchCallbackList {
|
||||
let _ = sender.send(Ok(()));
|
||||
});
|
||||
if let Err(err) = spawn_res {
|
||||
error!(
|
||||
"Failed to spawn thread to call watch callbacks. Cause: {:?}",
|
||||
err
|
||||
);
|
||||
error!("Failed to spawn thread to call watch callbacks. Cause: {err:?}");
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
@@ -942,7 +942,7 @@ mod tests {
|
||||
|
||||
let numbers = [100, 200, 300];
|
||||
let test_range = |range: RangeInclusive<u64>| {
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(num)).count();
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(*num)).count();
|
||||
let mut vec = vec![];
|
||||
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
|
||||
assert_eq!(vec.len(), expected_count);
|
||||
@@ -1020,7 +1020,7 @@ mod tests {
|
||||
|
||||
let numbers = [1000, 1001, 1003];
|
||||
let test_range = |range: RangeInclusive<u64>| {
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(num)).count();
|
||||
let expected_count = numbers.iter().filter(|num| range.contains(*num)).count();
|
||||
let mut vec = vec![];
|
||||
field.get_row_ids_for_value_range(range, 0..u32::MAX, &mut vec);
|
||||
assert_eq!(vec.len(), expected_count);
|
||||
|
||||
@@ -217,7 +217,7 @@ impl FastFieldReaders {
|
||||
Ok(dynamic_column.into())
|
||||
}
|
||||
|
||||
/// Returning a `dynamic_column_handle`.
|
||||
/// Returns a `dynamic_column_handle`.
|
||||
pub fn dynamic_column_handle(
|
||||
&self,
|
||||
field_name: &str,
|
||||
@@ -234,7 +234,7 @@ impl FastFieldReaders {
|
||||
Ok(dynamic_column_handle_opt)
|
||||
}
|
||||
|
||||
/// Returning all `dynamic_column_handle`.
|
||||
/// Returns all `dynamic_column_handle` that match the given field name.
|
||||
pub fn dynamic_column_handles(
|
||||
&self,
|
||||
field_name: &str,
|
||||
@@ -250,6 +250,22 @@ impl FastFieldReaders {
|
||||
Ok(dynamic_column_handles)
|
||||
}
|
||||
|
||||
/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
|
||||
pub fn dynamic_subpath_column_handles(
|
||||
&self,
|
||||
root_path: &str,
|
||||
) -> crate::Result<Vec<DynamicColumnHandle>> {
|
||||
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
let dynamic_column_handles = self
|
||||
.columnar
|
||||
.read_subpath_columns(&resolved_field_name)?
|
||||
.into_iter()
|
||||
.collect();
|
||||
Ok(dynamic_column_handles)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn list_dynamic_column_handles(
|
||||
&self,
|
||||
@@ -265,6 +281,21 @@ impl FastFieldReaders {
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn list_subpath_dynamic_column_handles(
|
||||
&self,
|
||||
root_path: &str,
|
||||
) -> crate::Result<Vec<DynamicColumnHandle>> {
|
||||
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
let columns = self
|
||||
.columnar
|
||||
.read_subpath_columns_async(&resolved_field_name)
|
||||
.await?;
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
|
||||
/// i64, u64, f64, DateTime).
|
||||
///
|
||||
@@ -476,6 +507,15 @@ mod tests {
|
||||
.iter()
|
||||
.any(|column| column.column_type() == ColumnType::Str));
|
||||
|
||||
println!("*** {:?}", fast_fields.columnar().list_columns());
|
||||
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
|
||||
assert_eq!(json_columns.len(), 0);
|
||||
|
||||
let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
|
||||
assert_eq!(json_subcolumns.len(), 3);
|
||||
|
||||
let foo_subcolumns = fast_fields
|
||||
.dynamic_subpath_column_handles("json.foo")
|
||||
.unwrap();
|
||||
assert_eq!(foo_subcolumns.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user