Compare commits

...

43 Commits

Author SHA1 Message Date
Pascal Seitz
edfb02b47e switch to enum, fix mixed types for cardinality agg 2026-05-05 16:39:51 +08:00
Pascal Seitz
d0fad88bac use bitsets for card agg 2026-05-05 16:39:51 +08:00
Pascal Seitz
351280c0b4 add card bench for high card 2026-05-05 16:39:51 +08:00
James Sewell
4480cf0a98 Enable BMW for single-scorer boolean queries by removing early return in scorer_union (#2915)
The early return for `scorers.len() == 1` in `scorer_union` short-circuits a single TermScorer into `SpecializedScorer::Other`, bypassing the `TermUnion` path that enables block-max WAND (BMW) in `for_each_pruning`.

This was originally addressed in PR #2898 (backed out), which added a special case in `BooleanWeight::for_each_pruning`. PR #2912 (merged as d27ca164a) added a single-scorer fast path inside `block_wand` itself, but did not remove this early return — so a single SHOULD TermScorer still never reaches the BMW path.

Removing the early return lets a single TermScorer with freq reading flow through to `SpecializedScorer::TermUnion`, where `block_wand` → `block_wand_single_scorer` handles it efficiently.
2026-04-28 14:49:53 -07:00
Pascal Seitz
d47abdf104 early cut off for order by sub agg in term agg 2026-04-28 16:59:59 +02:00
Pascal Seitz
c11952eb7c add order by agg benchmark 2026-04-28 16:59:59 +02:00
trinity-1686a
09667ee9c8 Merge pull request #2909 from osyniakov/claude/add-ossf-scorecard-1z6Vn
Add OpenSSF Scorecard workflow
2026-04-28 11:57:36 +02:00
trinity-1686a
333ccf5300 Merge pull request #2896 from osyniakov/claude/fix-issues-5945-5937-eQm1Q
ci: pin GitHub Actions to full commit SHAs and restrict token permissions
2026-04-28 11:57:18 +02:00
Oleksii Syniakov
60a39a4689 Merge branch 'main' into claude/fix-issues-5945-5937-eQm1Q 2026-04-28 10:28:23 +02:00
Oleksii Syniakov
f8f3e4277f remove not neeeded permissions for the public repo 2026-04-28 10:09:30 +02:00
Oleksii Syniakov
ff1433713a bump upload-sarif -> 4.35.2
Co-authored-by: trinity-1686a <trinity.pointard@gmail.com>
2026-04-28 10:07:45 +02:00
trinity-1686a
ca139d8eb1 Merge pull request #2910 from quickwit-oss/abdul.andha/composite-agg-after
Composite aggregations: send after key on last page
2026-04-27 23:38:52 +02:00
Abdul Andha
ac508108aa address pr comment 2026-04-27 12:39:38 -04:00
Paul Masurel
63da5a21b2 Optimizing top K using Adrien Grand's ideas (#2865)
* Optimizing top K using Adrien Grand's ideas

https://jpountz.github.io/2025/08/28/compiled-vs-vectorized-search-engine-edition.html

* Suffix-sum pruning for multi-term intersection candidates

After scoring each secondary in Phase 2, check whether remaining
secondaries' block_max scores can still beat the threshold. Skip
to the next candidate early if impossible, avoiding expensive seeks
into later secondaries.

Improves three-term intersection by ~8% on the balanced benchmark
while keeping two-term performance neutral.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Claude CR comment

* Removed 16 term scorer limit.

---------

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-26 12:14:40 +02:00
lif
54cd5bba98 fix: skip sentinel facet ords in harvest to prevent wrong root (#2867)
When a document has the exact registered facet path (not a child),
compute_collapse_mapping_one maps it to a sentinel (u64::MAX, 0).
Without filtering, harvest() passes u64::MAX to ord_to_term which
resolves to the last dictionary entry, producing a spurious facet
from an unrelated branch.

Skip entries where facet_ord == u64::MAX in harvest().

Closes #2494

Signed-off-by: majiayu000 <1835304752@qq.com>
2026-04-25 22:23:30 +02:00
Paul Masurel
d27ca164a9 block_wand: use single-scorer path when there is only one scorer 2026-04-25 16:35:00 +02:00
dependabot[bot]
2f5a48e8b1 Update criterion requirement from 0.5 to 0.8 (#2873)
Updates the requirements on [criterion](https://github.com/criterion-rs/criterion.rs) to permit the latest version.
- [Release notes](https://github.com/criterion-rs/criterion.rs/releases)
- [Changelog](https://github.com/criterion-rs/criterion.rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/criterion-rs/criterion.rs/compare/0.5.0...criterion-v0.8.2)

---
updated-dependencies:
- dependency-name: criterion
  dependency-version: 0.8.2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-25 14:15:53 +02:00
dependabot[bot]
ae0ab907fe Bump actions/checkout from 4 to 6 (#2875)
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 6.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v6)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-25 14:15:27 +02:00
dependabot[bot]
7d62e084e7 Bump codecov/codecov-action from 3 to 6 (#2876)
Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 3 to 6.
- [Release notes](https://github.com/codecov/codecov-action/releases)
- [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md)
- [Commits](https://github.com/codecov/codecov-action/compare/v3...v6)

---
updated-dependencies:
- dependency-name: codecov/codecov-action
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-04-25 14:14:54 +02:00
James Sewell
322286ee16 Tighen Block-Max in single-scorer (#2897)
In the Block-Max WAND single-scorer, it uses block_max_score() < threshold,
whereas the multi-term one uses  block_max_score_upperbound <= threshold.

As both of these are guarded later on with if score > threshold we can
use the more efficent form in single-scorer.

Single-scorer block skip (<, should be <=): https://github.com/quickwit-oss/tantivy/blob/main/src/query/boolean_query/block_wand.rs#L231
Multi-scorer block skip (already <=): https://github.com/quickwit-oss/tantivy/blob/main/src/query/boolean_query/block_wand.rs#L179
Single-scorer per-doc guard (>): https://github.com/quickwit-oss/tantivy/blob/main/src/query/boolean_query/block_wand.rs#L246
Multi-scorer per-doc guard (>): https://github.com/quickwit-oss/tantivy/blob/main/src/query/boolean_query/block_wand.rs#L206

This will improve performance when there are many identical scores.
2026-04-25 14:13:07 +02:00
RJ Barman
73ad18fa1e fix: Add space for missing sentinel in allowed bitset when a missing key is provided (#119) (#2907)
## Bug Overview
Under certain conditions, a `terms` aggregation request can cause a
bounds-check panic. Those conditions are:
- The queried field must be a text field
- There must be a segment where the number of distinct terms in it's
dictionary for the queried field is divisible by 64 (i.e.e where
`count(term_dict.keys) % 64 == 0`)
- That same segment must contain at least one document that does not
contain this field.
- The request contain a `missing` key that is a string.
- The request must contain an `include` or `exclude` filter.
For example:
```json
{
    "my_bool": {
        "terms": {
            "field": "title",
            "include": "foo",
            "missing": "__NULL__",
        }
    }
}
```
Check out the added tests in `src/aggregation/bucket/term_agg.rs` to see
this in action

## How the bug happens
### Preparation
While preparing the aggregation nodes:
1) When we've provided a `missing` key, we derive a missing sentinel.
For string keys this column's max value (which for string keys is always
the number of terms in this segment) + 1.
2) for string columns only, we optionally prep an "allowed" `BitSet` for
allowed term ids. (`build_allowed_term_ids_for_str` in
`src/aggregation/agg_data.rs`)
- If no `include` or `exclude` filter is provided, this just returns
`None`, causing this check to be skipped down the line
- Otherwise the bitset is initialized to be able to hold the exact
number of terms in the segments term dictionary, and the bits are set to
signify which terms are to be included in the results.

### Collection
If we have an "allowed" `BitSet`, filter documents against that. For
each document, we check if the `BitSet` contains the documents term id.
For documents without the field, this is the missing sentinel we derived
earlier, minus 1 (to account for zero-based indexing): `(num_terms + 1)
- 1`.However, the `BitSet`s size is only `num_terms`. Normally, this
slips by without a problem, but if `num_terms % 64 == 0`, this will
cause a panic.

### Why `BitSet` panics
`BitSet` is represented under the hood by a boxed slice of `u64`s. When
you go to check a bit using `BitSet::contains`, it must determine which
of those `u64`s the bit is in, and then the position within that `u64`
of the bit.

In cases where the number of terms is not divisible by 64, the `BitSet`
must waste some bits. When we then look up the missing sentinel's bit,
it happens to be one of those wasted bits, for which `BitSet` is happy
to return the value of. For example, if the number of terms was 63:
```rust
let bitset_init_size = 63; // so BitSet's boxed slice has a length of 1, capable of holding 64 bits, term id [0, 62]
let missing_sentinel = 63; // num_terms + 1 - 1;
let byte_pos = missing_sentinel / 64; // 0 - within the valid slice
let bit_pos = missing_sentinel % 64; // 63 - hits the 1 wasted bit
```

But if the number of terms is indeed divisible by 64, then the `BitSet`
is perfectly aligned to the byte boundary:
```rust
let bitset_init_size = 64; // so BitSet's boxed slice has a length of 1, capable of holding 64 bits, term ids [0, 63]
let missing_sentinel = 64; // num_terms + 1 - 1, 
let byte_pos = missing_sentinel / 64; // 1 - idx 1 >= slice length 1
let bit_pos = missing_sentinel % 64; // 0 
```
We try to access a byte outside of the bounds of the boxed slice,
causing a panic from the bounds check to failing.

## Fixing it
The fix is simple. If we need to account for the missing sentinel,
initialize the `BitSet` with capacity for one more bit.

## Tests
- Added a bunch of unit tests that hit these conditions. I ensured they
failed without the fix, and that they now pass.
- All unit tests pass with the fix in place

## Other
- The investigation that led to finding this bug began with
https://github.com/paradedb/paradedb/issues/4746.
2026-04-25 14:11:47 +02:00
Abdul Andha
4fbae92187 send after key on last page 2026-04-24 15:33:26 -04:00
Cameron
89f0cef807 Fix O(2^n) query parser regression for deeply-nested queries (#2905)
* Fix O(2^n) query parser regression for deeply-nested queries

The top-level `ast()` parser used `alt((boolean_expr, single_leaf))` at
every group level. When the group contained a single leaf with no
trailing operand, `boolean_expr` would parse `occur_leaf` (recursing
into the inner group), fail at `multispace1`, backtrack, and then
`single_leaf` would re-parse `occur_leaf` from scratch. Every nesting
level doubled the work, giving O(2^n) time for queries like
`(((((title:test)))))`.

Parse `occur_leaf` once and peek ahead for a trailing operand instead
of backtracking. This keeps parsing O(n) and also avoids the duplicate
parse for simple single-leaf queries.

Fixes #2498.

Measured on the issue reproducer (release build):

    depth   before     after
       20   0.87 s   <1 us
       25  28.23 s   <1 us
       60  (years)   ~5 us

Non-pathological queries are unaffected or slightly faster:

    query                     before     after
    hello                     650 ns     308 ns
    a AND b AND c            1380 ns    1364 ns
    title:rust AND (...)     3426 ns    3460 ns

All 53 existing grammar tests and 56 query_parser tests pass. Adds a
regression test at depth 60 that would not complete under the old
parser.

* Add ignored benchmark for nested query parsing at depth 20/21

Matches the depths from issue #2498 which reported 0.87 s / 1.72 s
under the regression. With the fix these parse in single-digit
microseconds. Runs via:

  cargo test -p tantivy-query-grammar --release bench_deeply_nested \
      -- --ignored --nocapture

* Propagate Err::Failure and Err::Incomplete from operand parser

`alt((boolean_expr, single_leaf))` only retried on `Err::Error` and
propagated `Err::Failure` and `Err::Incomplete`. The replacement was
catching all three with `Err(_)`, which would silently fall back to
a single leaf if any cut point were ever added to `operand_leaf` or
its descendants. Match specifically on `Err::Error` to preserve the
original `alt` semantics.

* Replace inline bench with binggan bench in benches/

Move the nested-query benchmark out of the query-grammar test module
and into a proper binggan benchmark at benches/query_parser_nested.rs,
registered as a harnessless bench in Cargo.toml. Keeps the correctness
regression test (depth 60) in place.

Run with: cargo bench --bench query_parser_nested

* Fix rustfmt import ordering in query_parser_nested bench
2026-04-24 03:54:00 -04:00
Claude
a5d297c75f Add OpenSSF Scorecard workflow
Runs weekly security analysis and uploads SARIF results to GitHub code
scanning. Third-party actions are pinned by commit SHA. Adds the Scorecard
badge to the README.

Based on quickwit-oss/quickwit#5969.
2026-04-24 06:56:58 +00:00
Pascal Seitz
2e16243f9a fix memory consumption for histogram 2026-04-21 13:58:39 +02:00
Pascal Seitz
e015abab8e docs: add 0.26.1 changelog entry for aggregation perf fix 2026-04-21 11:12:37 +02:00
Pascal Seitz
73c711ec74 perf(agg): only measure active parent bucket in composite collect
Same change as 26a589e for SegmentCompositeCollector: get_memory_consumption
summed across all parent_buckets on every block, scaling with outer bucket
cardinality. Pass parent_bucket_id and index the single bucket.
2026-04-21 07:26:58 +02:00
Pascal Seitz
cb037c8079 add inline 2026-04-21 07:26:58 +02:00
Pascal Seitz
ed3453606b agg fix: compute memory consumption only for current bucket 2026-04-21 07:26:58 +02:00
Pascal Seitz
e9641f99c5 add nested term benchmark 2026-04-21 07:26:58 +02:00
Paul Masurel
13d74c3c20 Update binggan requirement from 0.16.0 to 0.16.1 (#2899) 2026-04-20 11:59:47 +02:00
Claude
3a6a3de8d7 ci: update pinned Action SHAs to current latest versions
The previous commit pinned actions to commit SHAs but used stale
version tags (v4.2.2, v2.7.5, old nextest/cargo-llvm-cov refs).
Update to the actual current HEAD of each pinned tag:

  actions/checkout        v4.2.2 → v4.3.1  (34e114876b0b...)
  Swatinem/rust-cache     v2.7.5 → v2.9.1  (c19371144df3...)
  taiki-e/install-action  nextest           (56cc9adf3a3e...)
  taiki-e/install-action  cargo-llvm-cov    (e4b3a0453201...)

actions-rs/toolchain, actions-rs/clippy-check, and
codecov/codecov-action SHAs were already correct.

https://claude.ai/code/session_01VD7Bo8upj3cQwWDf9ni2Ln
2026-04-16 06:49:47 +00:00
Claude
af3c6c0070 ci: pin GitHub Actions to full commit SHAs and restrict token permissions
Fixes two supply chain / token security issues:

- Pin all third-party Actions to immutable full commit SHAs instead of
  mutable version tags (addresses unpinned-dependencies risk, analogous
  to quickwit-oss/quickwit#5937):
    actions/checkout v4.2.2
    actions-rs/toolchain v1.0.7
    Swatinem/rust-cache v2.7.5
    taiki-e/install-action nextest / cargo-llvm-cov
    actions-rs/clippy-check v1.0.7
    codecov/codecov-action v3.1.6

- Add explicit least-privilege `permissions` blocks at workflow and job
  level (addresses excessive GITHUB_TOKEN permissions, analogous to
  quickwit-oss/quickwit#5945):
    default: contents: read
    check job: also grants checks: write (required by clippy-check)

https://claude.ai/code/session_01VD7Bo8upj3cQwWDf9ni2Ln
2026-04-15 20:55:43 +00:00
dependabot[bot]
058afff8b7 Update binggan requirement from 0.15.3 to 0.16.0
Updates the requirements on [binggan](https://github.com/pseitz/binggan) to permit the latest version.
- [Changelog](https://github.com/PSeitz/binggan/blob/main/CHANGELOG.md)
- [Commits](https://github.com/pseitz/binggan/commits)

---
updated-dependencies:
- dependency-name: binggan
  dependency-version: 0.16.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-15 08:58:03 +02:00
Paul Masurel
58aa4b7074 Fix cardinality aggregation using invalid coupons (#2893)
Previously, coupons were computed via murmurhash32 and fed as raw u32
to the HLL sketch, bypassing the sketch's internal hashing and producing
invalid (slot, value) pairs. Switch to Coupon::from_hash from the
datasketches crate which correctly derives coupons, and drop the
now-unused murmurhash32 dependency.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 19:14:30 +02:00
Paul Masurel
04beab3b29 Performance improvement for nested cardinality aggregation
When a string cardinality aggregation is nested it end up being applied to different buckets.
Dictionary encoding relies on a different dictionaries for each segment.

As a result, during segment collection, we only collect term ordinals in a HashSet, and decode them in the
term dictionary at the end of collection.

Before this PR, this decoding phase was done once for each bucket, causing the same work to be done over and over. This PR introduce a coupon cache. The HLL sketch relies on a hash of the string values.

We populate the cache before bucket collection, and get our values from it.

This PR also rename "caching" "buffering" in aggregation (it was never caching), and does several cleanups.
2026-04-10 14:51:00 +02:00
alexanderbianchi
3cd9011f87 Make BucketEntries::iter, PercentileValuesVecEntry fields, and TopNComputer::threshold public (#2890)
These items need to be accessible from the tantivy-datafusion crate:
- BucketEntries::iter() for iterating aggregation bucket results
- PercentileValuesVecEntry.key/.value for reading percentile results
- TopNComputer.threshold for Block-WAND score pruning in the inverted index provider

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Paul Masurel <paul@quickwit.io>
2026-04-09 13:32:31 +02:00
Paul Masurel
d2c1b8bc2c Optimized intersection count using a bitset when the first leg is dense 2026-04-06 12:01:52 -04:00
nuri
a65107135a Use BinaryHeap for score-based top-K collection (#2881)
* Use BinaryHeap for score-based top-K collection

* Use peek_mut and add proptest for TopNHeap

---------

Co-authored-by: nryoo <nryoo@nryooui-MacBookPro.local>
2026-04-04 19:49:05 +02:00
Pascal Seitz
5c344db1bf chore: Release 2026-03-31 17:15:34 +08:00
Pascal Seitz
dc0f31554d unbump for release and update Changelog.md 2026-03-31 17:15:34 +08:00
trinity-1686a
a28ce3ee54 Merge pull request #2869 from quickwit-oss/trinity.pointard/maint
add dependabot cooldown
2026-03-31 09:52:22 +02:00
trinity Pointard
cf9800f981 add dependabot cooldown 2026-03-30 11:36:04 +02:00
55 changed files with 3453 additions and 488 deletions

View File

@@ -6,6 +6,8 @@ updates:
interval: daily
time: "20:00"
open-pull-requests-limit: 10
cooldown:
default-days: 2
- package-ecosystem: "github-actions"
directory: "/"
@@ -13,3 +15,5 @@ updates:
interval: daily
time: "20:00"
open-pull-requests-limit: 10
cooldown:
default-days: 2

View File

@@ -4,6 +4,9 @@ on:
push:
branches: [main]
permissions:
contents: read
# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -12,16 +15,20 @@ concurrency:
jobs:
coverage:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install Rust
run: rustup toolchain install nightly-2025-12-01 --profile minimal --component llvm-tools-preview
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1
- uses: taiki-e/install-action@e4b3a0453201addddc06d3a72db90326aad87084 # cargo-llvm-cov
- name: Generate code coverage
run: cargo +nightly-2025-12-01 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@57e3a136b779b570ffcdbf80b3bdc90e7fab3de2 # v6.0.0
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos

View File

@@ -8,6 +8,9 @@ env:
CARGO_TERM_COLOR: always
NUM_FUNCTIONAL_TEST_ITERATIONS: 20000
permissions:
contents: read
# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -18,10 +21,13 @@ jobs:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install stable
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: stable
profile: minimal

49
.github/workflows/scorecard.yml vendored Normal file
View File

@@ -0,0 +1,49 @@
name: OpenSSF Scorecard
on:
schedule:
- cron: '0 0 * * 0'
push:
branches:
- main
permissions:
contents: read
jobs:
analysis:
name: Scorecards analysis
runs-on: ubuntu-latest
permissions:
# Needed to upload the results to code-scanning dashboard.
security-events: write
# Needed to publish results
id-token: write
steps:
- name: 'Checkout code'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: 'Run analysis'
uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # v2.4.3
with:
results_file: results.sarif
results_format: sarif
repo_token: ${{ secrets.GITHUB_TOKEN }}
publish_results: true
# Upload the results as artifacts.
- name: 'Upload artifact'
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: SARIF file
path: results.sarif
retention-days: 5
# Upload the results to GitHub's code scanning dashboard.
- name: 'Upload to code-scanning'
uses: github/codeql-action/upload-sarif@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
with:
sarif_file: results.sarif

View File

@@ -9,6 +9,9 @@ on:
env:
CARGO_TERM_COLOR: always
permissions:
contents: read
# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -19,23 +22,27 @@ jobs:
runs-on: ubuntu-latest
permissions:
contents: read
checks: write
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install nightly
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: nightly
profile: minimal
components: rustfmt
- name: Install stable
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: stable
profile: minimal
components: clippy
- uses: Swatinem/rust-cache@v2
- uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1
- name: Check Formatting
run: cargo +nightly fmt --all -- --check
@@ -47,7 +54,7 @@ jobs:
- name: Check Bench Compilation
run: cargo +nightly bench --no-run --profile=dev --all-features
- uses: actions-rs/clippy-check@v1
- uses: actions-rs/clippy-check@b5b5f21f4797c02da247df37026fcd0a5024aa4d # v1.0.7
with:
toolchain: stable
token: ${{ secrets.GITHUB_TOKEN }}
@@ -57,6 +64,9 @@ jobs:
runs-on: ubuntu-latest
permissions:
contents: read
strategy:
matrix:
features:
@@ -67,17 +77,17 @@ jobs:
name: test-${{ matrix.features.label}}
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install stable
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: stable
profile: minimal
override: true
- uses: taiki-e/install-action@nextest
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@56cc9adf3a3e2c23eafb56e8acaf9d0373cb845a # nextest
- uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1
- name: Run tests
run: |

View File

@@ -1,3 +1,9 @@
Tantivy 0.26.1
================================
## Performance
- Fix quadratic runtime in nested term and composite aggregations: memory accounting scanned all parent buckets on every collect instead of just the current parent (@PSeitz @fulmicoton)
Tantivy 0.26 (Unreleased)
================================
@@ -45,6 +51,7 @@ Tantivy 0.26 (Unreleased)
- Add `seek_danger` on `DocSet` for more efficient intersections [#2538](https://github.com/quickwit-oss/tantivy/pull/2538) [#2810](https://github.com/quickwit-oss/tantivy/pull/2810)(@PSeitz @stuhood @fulmicoton)
- Skip column traversal in `RangeDocSet` when query range does not overlap with column bounds [#2783](https://github.com/quickwit-oss/tantivy/pull/2783)(@ChangRui-Ryan)
- Speed up exclude queries by supporting multiple excluded `DocSet`s without intermediate union [#2825](https://github.com/quickwit-oss/tantivy/pull/2825)(@PSeitz)
- Improve union performance for non-score unions with `fill_buffer` and optimized `TinySet` [#2863](https://github.com/quickwit-oss/tantivy/pull/2863)(@PSeitz)
Tantivy 0.25
================================

View File

@@ -57,15 +57,15 @@ measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"
columnar = { version = "0.6", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.6", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.6", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
columnar = { version = "0.7", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.7", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.7", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.26.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
datasketches = "0.2.0"
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"
@@ -75,7 +75,7 @@ typetag = "0.2.21"
winapi = "0.3.9"
[dev-dependencies]
binggan = "0.15.3"
binggan = "0.16.1"
rand = "0.9"
maplit = "1.0.2"
matches = "0.1.9"
@@ -92,7 +92,7 @@ postcard = { version = "1.0.4", features = [
], default-features = false }
[target.'cfg(not(windows))'.dev-dependencies]
criterion = { version = "0.5", default-features = false }
criterion = { version = "0.8", default-features = false }
[dev-dependencies.fail]
version = "0.5.0"
@@ -201,3 +201,11 @@ harness = false
[[bench]]
name = "regex_all_terms"
harness = false
[[bench]]
name = "query_parser_nested"
harness = false
[[bench]]
name = "intersection_bench"
harness = false

View File

@@ -1,6 +1,7 @@
[![Docs](https://docs.rs/tantivy/badge.svg)](https://docs.rs/crate/tantivy/)
[![Build Status](https://github.com/quickwit-oss/tantivy/actions/workflows/test.yml/badge.svg)](https://github.com/quickwit-oss/tantivy/actions/workflows/test.yml)
[![codecov](https://codecov.io/gh/quickwit-oss/tantivy/branch/main/graph/badge.svg)](https://codecov.io/gh/quickwit-oss/tantivy)
[![OpenSSF Scorecard](https://api.scorecard.dev/projects/github.com/quickwit-oss/tantivy/badge)](https://scorecard.dev/viewer/?uri=github.com/quickwit-oss/tantivy)
[![Join the chat at https://discord.gg/MT27AG5EVE](https://shields.io/discord/908281611840282624?label=chat%20on%20discord)](https://discord.gg/MT27AG5EVE)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Crates.io](https://img.shields.io/crates/v/tantivy.svg)](https://crates.io/crates/tantivy)

View File

@@ -63,6 +63,8 @@ fn bench_agg(mut group: InputGroup<Index>) {
register!(group, terms_all_unique_with_avg_sub_agg);
register!(group, terms_many_with_avg_sub_agg);
register!(group, terms_status_with_avg_sub_agg);
register!(group, terms_status_with_terms_zipf_1000_sub_agg);
register!(group, terms_zipf_1000_with_terms_status_sub_agg);
register!(group, terms_status_with_histogram);
register!(group, terms_zipf_1000);
register!(group, terms_zipf_1000_with_histogram);
@@ -77,7 +79,12 @@ fn bench_agg(mut group: InputGroup<Index>) {
register!(group, composite_histogram_calendar);
register!(group, cardinality_agg);
register!(group, cardinality_agg_high_card);
register!(group, cardinality_agg_low_card);
register!(group, terms_status_with_cardinality_agg);
register!(group, terms_100_buckets_with_cardinality_agg);
register!(group, terms_many_with_single_term_order_by_card);
register!(group, terms_many_with_single_term_2_order_by_card);
register!(group, range_agg);
register!(group, range_agg_with_avg_sub_agg);
@@ -165,10 +172,52 @@ fn cardinality_agg(index: &Index) {
});
execute_agg(index, agg_req);
}
// Full-scan cardinality on a near-1M-cardinality string field.
// Hits the dense (PagedBitset) path: every doc has a unique term,
// so the bucket promotes from FxHashSet shortly into the scan.
fn cardinality_agg_high_card(index: &Index) {
let agg_req = json!({
"cardinality": {
"cardinality": {
"field": "text_all_unique_terms"
},
}
});
execute_agg(index, agg_req);
}
// Full-scan cardinality on a tiny-cardinality string field (7 distinct
// values). Stays on the FxHashSet path — the promotion threshold is
// never crossed. Validates no regression on the sparse path.
fn cardinality_agg_low_card(index: &Index) {
let agg_req = json!({
"cardinality": {
"cardinality": {
"field": "text_few_terms_status"
},
}
});
execute_agg(index, agg_req);
}
fn terms_status_with_cardinality_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_few_terms_status" },
"aggs": {
"cardinality": {
"cardinality": {
"field": "text_few_terms_status"
},
}
}
},
});
execute_agg(index, agg_req);
}
fn terms_100_buckets_with_cardinality_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_1000_terms_zipf", "size": 100 },
"aggs": {
"cardinality": {
"cardinality": {
@@ -181,6 +230,58 @@ fn terms_status_with_cardinality_agg(index: &Index) {
execute_agg(index, agg_req);
}
fn terms_many_with_single_term_order_by_card(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_many_terms" },
"aggs": {
"nested_terms": {
"terms": {
"field": "single_term",
"order": { "cardinality": "desc" }
},
"aggs": {
"cardinality": {
"cardinality": { "field": "text_few_terms" }
}
}
}
}
},
});
execute_agg(index, agg_req);
}
// Two-level terms ordered by cardinality at each level: a high-card outer terms
// (text_many_terms) ordered by a cardinality sub-agg, with a nested low-card terms
// (text_few_terms_status) also ordered by a cardinality sub-agg, plus an avg.
fn terms_many_with_single_term_2_order_by_card(index: &Index) {
let agg_req = json!({
"by_ip": {
"terms": {
"field": "text_many_terms",
"order": { "card_few_terms": "desc" }
},
"aggs": {
"card_few_terms": {
"cardinality": { "field": "text_few_terms" }
},
"nested_terms": {
"terms": {
"field": " single_term",
"order": { "distinct_path2": "desc" }
},
"aggs": {
"avg_botscore": { "avg": { "field": "score" } },
"distinct_path2": { "cardinality": { "field": "text_few_terms" } }
}
}
}
}
});
execute_agg(index, agg_req);
}
fn terms_7(index: &Index) {
let agg_req = json!({
"my_texts": { "terms": { "field": "text_few_terms_status" } },
@@ -253,6 +354,30 @@ fn terms_all_unique_with_avg_sub_agg(index: &Index) {
});
execute_agg(index, agg_req);
}
fn terms_status_with_terms_zipf_1000_sub_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_few_terms_status" },
"aggs": {
"nested_terms": { "terms": { "field": "text_1000_terms_zipf" } }
}
}
});
execute_agg(index, agg_req);
}
fn terms_zipf_1000_with_terms_status_sub_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_1000_terms_zipf" },
"aggs": {
"nested_terms": { "terms": { "field": "text_few_terms_status" } }
}
}
});
execute_agg(index, agg_req);
}
fn terms_status_with_histogram(index: &Index) {
let agg_req = json!({
"my_texts": {
@@ -566,7 +691,8 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let text_field = schema_builder.add_text_field("text", text_fieldtype.clone());
let single_term = schema_builder.add_text_field("single_term", FAST);
let json_field = schema_builder.add_json_field("json", FAST);
let text_field_all_unique_terms =
schema_builder.add_text_field("text_all_unique_terms", STRING | FAST);
@@ -630,6 +756,8 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
index_writer.add_document(doc!(
json_field => json!({"mixed_type": 10.0}),
json_field => json!({"mixed_type": 10.0}),
single_term => "single_term",
single_term => "single_term",
text_field => "cool",
text_field => "cool",
text_field_all_unique_terms => "cool",
@@ -664,6 +792,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
json!({"mixed_type": many_terms_data.choose(&mut rng).unwrap().to_string()})
};
index_writer.add_document(doc!(
single_term => "single_term",
text_field => "cool",
json_field => json,
text_field_all_unique_terms => format!("unique_term_{}", rng.random::<u64>()),

View File

@@ -0,0 +1,149 @@
// Benchmarks top-K intersection of term scorers (block_wand_intersection).
//
// What's measured:
// - Conjunctive queries (+a +b, +a +b +c) with top-10 by score
// - Varying doc-frequency balance between terms (balanced, skewed, very skewed)
// - Realistic term frequencies (geometric distribution, mostly low)
// - 1M-doc single segment
//
// Run with: cargo bench --bench intersection_bench
use binggan::{black_box, BenchRunner};
use rand::prelude::*;
use rand::rngs::StdRng;
use rand::SeedableRng;
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use tantivy::schema::{Schema, TEXT};
use tantivy::{doc, Index, ReloadPolicy, Searcher};
const NUM_DOCS: usize = 1_000_000;
struct BenchIndex {
searcher: Searcher,
query_parser: QueryParser,
}
/// Generate term frequency from a geometric-like distribution.
/// Most values are 1, a few are 2-3, rarely higher.
/// p controls the decay: higher p → more weight on tf=1.
fn random_term_freq(rng: &mut StdRng, p: f64) -> u32 {
let mut tf = 1u32;
while tf < 10 && rng.random_bool(1.0 - p) {
tf += 1;
}
tf
}
/// Build an index with three terms (a, b, c) with given doc-frequency probabilities.
/// Each term occurrence has a realistic term frequency (geometric distribution).
/// Field length is padded with filler tokens to create varied fieldnorms.
fn build_index(p_a: f64, p_b: f64, p_c: f64) -> BenchIndex {
let mut schema_builder = Schema::builder();
let body = schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut rng = StdRng::from_seed([42u8; 32]);
{
let mut writer = index.writer_with_num_threads(1, 500_000_000).unwrap();
for _ in 0..NUM_DOCS {
let mut tokens: Vec<String> = Vec::new();
if rng.random_bool(p_a) {
let tf = random_term_freq(&mut rng, 0.7);
for _ in 0..tf {
tokens.push("aaa".to_string());
}
}
if rng.random_bool(p_b) {
let tf = random_term_freq(&mut rng, 0.7);
for _ in 0..tf {
tokens.push("bbb".to_string());
}
}
if rng.random_bool(p_c) {
let tf = random_term_freq(&mut rng, 0.7);
for _ in 0..tf {
tokens.push("ccc".to_string());
}
}
// Pad with filler to create varied field lengths (5-30 tokens).
let filler_count = rng.random_range(5u32..30u32);
for _ in 0..filler_count {
tokens.push("filler".to_string());
}
let text = tokens.join(" ");
writer.add_document(doc!(body => text)).unwrap();
}
writer.commit().unwrap();
}
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![body]);
BenchIndex {
searcher,
query_parser,
}
}
fn main() {
// Scenarios: (label, p_a, p_b, p_c)
//
// "balanced": all terms ~10% → intersection ~1% of docs
// "skewed": one common (50%), one rare (2%) → intersection ~1%
// "very_skewed": one very common (80%), one very rare (0.5%) → intersection ~0.4%
// "three_balanced": three terms ~20% each → intersection ~0.8%
// "three_skewed": 50% / 10% / 2% → intersection ~0.1%
let scenarios: Vec<(&str, f64, f64, f64)> = vec![
("balanced_10%_10%", 0.10, 0.10, 0.0),
("skewed_50%_2%", 0.50, 0.02, 0.0),
("very_skewed_80%_0.5%", 0.80, 0.005, 0.0),
("three_balanced_20%_20%_20%", 0.20, 0.20, 0.20),
("three_skewed_50%_10%_2%", 0.50, 0.10, 0.02),
];
let mut runner = BenchRunner::new();
for (label, p_a, p_b, p_c) in &scenarios {
let bench_index = build_index(*p_a, *p_b, *p_c);
let mut group = runner.new_group();
group.set_name(format!("intersection — {label}"));
// Two-term intersection
if *p_a > 0.0 && *p_b > 0.0 {
let query_str = "+aaa +bbb";
let query = bench_index.query_parser.parse_query(query_str).unwrap();
let searcher = bench_index.searcher.clone();
group.register(format!("{query_str} top10"), move |_| {
let collector = TopDocs::with_limit(10).order_by_score();
black_box(searcher.search(&query, &collector).unwrap());
1usize
});
}
// Three-term intersection
if *p_c > 0.0 {
let query_str = "+aaa +bbb +ccc";
let query = bench_index.query_parser.parse_query(query_str).unwrap();
let searcher = bench_index.searcher.clone();
group.register(format!("{query_str} top10"), move |_| {
let collector = TopDocs::with_limit(10).order_by_score();
black_box(searcher.search(&query, &collector).unwrap());
1usize
});
}
group.run();
}
}

View File

@@ -0,0 +1,35 @@
// Benchmark for the query grammar parsing deeply nested queries.
//
// Regression guard for https://github.com/quickwit-oss/tantivy/issues/2498:
// at depth 20/21 the old parser took 0.87 s / 1.72 s respectively because
// `ast()` retried `occur_leaf` on backtrack, giving O(2^n) time. With the
// fix parsing is linear and completes in microseconds.
//
// Run with: `cargo bench --bench query_parser_nested`.
use binggan::{black_box, BenchRunner};
use tantivy::query_grammar::parse_query;
fn nested_query(depth: usize, leading_plus: bool) -> String {
let leading = "(".repeat(depth);
let trailing = ")".repeat(depth);
let prefix = if leading_plus { "+" } else { "" };
format!("{prefix}{leading}title:test{trailing}")
}
fn main() {
let mut runner = BenchRunner::new();
for depth in [20, 21] {
for leading_plus in [false, true] {
let query = nested_query(depth, leading_plus);
let label = format!(
"parse_nested_depth_{depth}_{}",
if leading_plus { "plus" } else { "plain" },
);
runner.bench_function(&label, move |_| {
black_box(parse_query(black_box(&query)).unwrap());
});
}
}
}

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-bitpacker"
version = "0.9.0"
version = "0.10.0"
edition = "2024"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-columnar"
version = "0.6.0"
version = "0.7.0"
edition = "2024"
license = "MIT"
homepage = "https://github.com/quickwit-oss/tantivy"
@@ -12,10 +12,10 @@ categories = ["database-implementations", "data-structures", "compression"]
itertools = "0.14.0"
fastdivide = "0.4.0"
stacker = { version= "0.6", path = "../stacker", package="tantivy-stacker"}
sstable = { version= "0.6", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.10", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.9", path = "../bitpacker/" }
stacker = { version= "0.7", path = "../stacker", package="tantivy-stacker"}
sstable = { version= "0.7", path = "../sstable", package = "tantivy-sstable" }
common = { version= "0.11", path = "../common", package = "tantivy-common" }
tantivy-bitpacker = { version= "0.10", path = "../bitpacker/" }
serde = "1.0.152"
downcast-rs = "2.0.1"
@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.9"
binggan = "0.15.3"
binggan = "0.16.1"
[[bench]]
name = "bench_merge"

View File

@@ -33,14 +33,14 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
&mut self,
docs: &[u32],
accessor: &Column<T>,
missing: Option<T>,
missing_opt: Option<T>,
) {
self.fetch_block(docs, accessor);
// no missing values
if accessor.index.get_cardinality().is_full() {
return;
}
let Some(missing) = missing else {
let Some(missing) = missing_opt else {
return;
};
@@ -191,6 +191,7 @@ where F: FnMut(u32) {
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use super::*;

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-common"
version = "0.10.0"
version = "0.11.0"
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2024"
@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.15.3"
binggan = "0.16.1"
proptest = "1.0.0"
rand = "0.9"

View File

@@ -47,6 +47,9 @@ impl TinySet {
TinySet(val)
}
/// An empty `TinySet` constant.
pub const EMPTY: TinySet = TinySet(0u64);
/// Returns an empty `TinySet`.
#[inline]
pub fn empty() -> TinySet {

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-query-grammar"
version = "0.25.0"
version = "0.26.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]

View File

@@ -1045,18 +1045,43 @@ fn operand_leaf(inp: &str) -> IResult<&str, (Option<BinaryOperand>, Option<Occur
}
fn ast(inp: &str) -> IResult<&str, UserInputAst> {
let boolean_expr = map_res(
separated_pair(occur_leaf, multispace1, many1(operand_leaf)),
|(left, right)| aggregate_binary_expressions(left, right),
);
let single_leaf = map(occur_leaf, |(occur, ast)| {
if occur == Some(Occur::MustNot) {
ast.unary(Occur::MustNot)
} else {
ast
}
});
delimited(multispace0, alt((boolean_expr, single_leaf)), multispace0)(inp)
// Parse `occur_leaf` once, then conditionally extend into a boolean
// expression. The previous implementation used `alt((boolean_expr,
// single_leaf))` which, when the input was a single leaf with no
// following operand, would parse `occur_leaf` once for `boolean_expr`,
// fail at `multispace1`, backtrack, then re-parse `occur_leaf` for
// `single_leaf`. With recursively-nested groups like `(+(+(+a)))`, that
// doubling at every level produced O(2^n) parse time. Parsing once and
// peeking ahead for the operand keeps it O(n).
delimited(
multispace0,
|inp| {
let (rest, first) = occur_leaf(inp)?;
// Only fall back on `Err::Error` (recoverable), mirroring
// `alt`'s behaviour. `Err::Failure` and `Err::Incomplete`
// must propagate so cut points and streaming needs are not
// accidentally swallowed if they are ever introduced in the
// operand parsers.
match preceded(multispace1, many1(operand_leaf))(rest) {
Ok((rest, more)) => {
let combined = aggregate_binary_expressions(first, more)
.map_err(|_| nom::Err::Error(Error::new(inp, ErrorKind::MapRes)))?;
Ok((rest, combined))
}
Err(nom::Err::Error(_)) => {
let (occur, ast) = first;
let single = if occur == Some(Occur::MustNot) {
ast.unary(Occur::MustNot)
} else {
ast
};
Ok((rest, single))
}
Err(e) => Err(e),
}
},
multispace0,
)(inp)
}
fn ast_infallible(inp: &str) -> JResult<&str, UserInputAst> {
@@ -1891,4 +1916,23 @@ mod test {
r#"(+"field":'happy tax payer' +"other_field":1)"#,
);
}
// Regression test for https://github.com/quickwit-oss/tantivy/issues/2498:
// deeply nested parenthesized queries used to take O(2^n) time because the
// top-level `ast()` parser tried `boolean_expr` first and re-parsed the
// inner `occur_leaf` when it backtracked to `single_leaf`. Depth 60 would
// take ~10^18 operations under the regression; with the fix it parses
// instantly. We use `test_parse_query_to_ast_helper` so this test would
// never finish if the regression returned.
#[test]
fn test_parse_deeply_nested_query() {
let depth = 60;
let leading: String = "(".repeat(depth);
let trailing: String = ")".repeat(depth);
let query = format!("{leading}title:test{trailing}");
test_parse_query_to_ast_helper(&query, r#""title":test"#);
let query_with_plus = format!("+{leading}title:test{trailing}");
test_parse_query_to_ast_helper(&query_with_plus, r#""title":test"#);
}
}

View File

@@ -20,8 +20,8 @@ use crate::aggregation::metric::{
build_segment_stats_collector, AverageAggregation, CardinalityAggReqData,
CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation, MaxAggregation,
MetricAggReqData, MinAggregation, SegmentCardinalityCollector, SegmentExtendedStatsCollector,
SegmentPercentilesCollector, StatsAggregation, StatsType, SumAggregation, TopHitsAggReqData,
TopHitsSegmentCollector,
SegmentPercentilesCollector, StatsAggregation, StatsType, SumAggregation, TermOrdSet,
TopHitsAggReqData, TopHitsSegmentCollector, BITSET_MAX_TERM_ORD,
};
use crate::aggregation::segment_agg_result::{
GenericSegmentAggregationResultsCollector, SegmentAggregationCollector,
@@ -413,12 +413,38 @@ pub(crate) fn build_segment_agg_collector(
}
AggKind::Cardinality => {
let req_data = &mut req.get_cardinality_req_data_mut(node.idx_in_req_data);
Ok(Box::new(SegmentCardinalityCollector::from_req(
req_data.column_type,
node.idx_in_req_data,
req_data.accessor.clone(),
req_data.missing_value_for_accessor,
)))
// For str columns, choose the per-bucket entries representation
// based on the segment's column.max_value():
// * small (< BITSET_MAX_TERM_ORD): `BitSet`, pre-allocated, no promotion machinery.
// * large: `TermOrdSet` (sparse FxHashSet that promotes to a paged bitset).
// For non-str columns the `entries` field is unused (values go
// straight into the HLL sketch); we still pick `TermOrdSet`
// because its empty Sparse(FxHashSet) costs nothing.
let is_str = req_data.column_type == ColumnType::Str;
let max_term_ord_inclusive = if is_str {
req_data.accessor.max_value()
} else {
0
};
let collector: Box<dyn SegmentAggregationCollector> =
if is_str && max_term_ord_inclusive < BITSET_MAX_TERM_ORD {
Box::new(SegmentCardinalityCollector::<BitSet>::from_req(
req_data.column_type,
node.idx_in_req_data,
req_data.accessor.clone(),
req_data.missing_value_for_accessor,
max_term_ord_inclusive,
))
} else {
Box::new(SegmentCardinalityCollector::<TermOrdSet>::from_req(
req_data.column_type,
node.idx_in_req_data,
req_data.accessor.clone(),
req_data.missing_value_for_accessor,
max_term_ord_inclusive,
))
};
Ok(collector)
}
AggKind::StatsKind(stats_type) => {
let req_data = &mut req.per_request.stats_metric_req_data[node.idx_in_req_data];
@@ -985,8 +1011,12 @@ fn build_terms_or_cardinality_nodes(
let str_col = str_dict_column
.as_ref()
.expect("str_dict_column must exist for string column");
allowed_term_ids =
build_allowed_term_ids_for_str(str_col, &req.include, &req.exclude)?;
allowed_term_ids = build_allowed_term_ids_for_str(
str_col,
&req.include,
&req.exclude,
missing.is_some(),
)?;
};
let idx_in_req_data = data.push_term_req_data(TermsAggReqData {
accessor,
@@ -1002,10 +1032,20 @@ fn build_terms_or_cardinality_nodes(
(idx_in_req_data, AggKind::Terms)
}
TermsOrCardinalityRequest::Cardinality(ref req) => {
// `str_dict_column` is computed once per field; for JSON paths
// with mixed types it's `Some` even on the numeric req_data.
// Cardinality only consults it for the str column path, so
// gate by column_type to avoid driving non-str collectors
// through the coupon-cache path.
let str_dict_column_for_req = if column_type == ColumnType::Str {
str_dict_column.clone()
} else {
None
};
let idx_in_req_data = data.push_cardinality_req_data(CardinalityAggReqData {
accessor,
column_type,
str_dict_column: str_dict_column.clone(),
str_dict_column: str_dict_column_for_req,
missing_value_for_accessor,
name: agg_name.to_string(),
req: req.clone(),
@@ -1025,16 +1065,21 @@ fn build_terms_or_cardinality_nodes(
/// Builds a single BitSet of allowed term ordinals for a string dictionary column according to
/// include/exclude parameters.
///
/// When `reserve_missing_sentinel` is true, the bitset will have 1 additional slot for the missing
/// term ordinal
fn build_allowed_term_ids_for_str(
str_col: &StrColumn,
include: &Option<IncludeExcludeParam>,
exclude: &Option<IncludeExcludeParam>,
reserve_missing_sentinel: bool,
) -> crate::Result<Option<BitSet>> {
let mut allowed: Option<BitSet> = None;
let num_terms = str_col.dictionary().num_terms() as u32;
let missing_sentinel_adjustment = if reserve_missing_sentinel { 1 } else { 0 };
let allowed_capacity = str_col.dictionary().num_terms() as u32 + missing_sentinel_adjustment;
if let Some(include) = include {
// add matches
allowed = Some(BitSet::with_max_value(num_terms));
allowed = Some(BitSet::with_max_value(allowed_capacity));
let allowed = allowed.as_mut().unwrap();
for_each_matching_term_ord(str_col, include, |ord| allowed.insert(ord))?;
};
@@ -1042,7 +1087,7 @@ fn build_allowed_term_ids_for_str(
if let Some(exclude) = exclude {
if allowed.is_none() {
// Start with all terms allowed
allowed = Some(BitSet::with_max_value_and_full(num_terms));
allowed = Some(BitSet::with_max_value_and_full(allowed_capacity));
}
let allowed = allowed.as_mut().unwrap();
for_each_matching_term_ord(str_col, exclude, |ord| allowed.remove(ord))?;

View File

@@ -208,7 +208,8 @@ pub enum BucketEntries<T> {
}
impl<T> BucketEntries<T> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = &'a T> + 'a> {
/// Iterate over all bucket entries.
pub fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = &'a T> + 'a> {
match self {
BucketEntries::Vec(vec) => Box::new(vec.iter()),
BucketEntries::HashMap(map) => Box::new(map.values()),

View File

@@ -21,7 +21,7 @@ use crate::aggregation::bucket::composite::map::{DynArrayHeapMap, MAX_DYN_ARRAY_
use crate::aggregation::bucket::{
CalendarInterval, CompositeAggregationSource, MissingOrder, Order,
};
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardSubAggCache};
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardSubAggBuffer};
use crate::aggregation::intermediate_agg_result::{
CompositeIntermediateKey, IntermediateAggregationResult, IntermediateAggregationResults,
IntermediateBucketResult, IntermediateCompositeBucketEntry, IntermediateCompositeBucketResult,
@@ -119,7 +119,7 @@ pub struct SegmentCompositeCollector {
/// One DynArrayHeapMap per parent bucket.
parent_buckets: Vec<DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>>,
accessor_idx: usize,
sub_agg: Option<CachedSubAggs<HighCardSubAggCache>>,
sub_agg: Option<BufferedSubAggs<HighCardSubAggBuffer>>,
bucket_id_provider: BucketIdProvider,
/// Number of sources, needed when creating new DynArrayHeapMaps.
num_sources: usize,
@@ -152,7 +152,7 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let mem_pre = self.get_memory_consumption();
let mem_pre = self.get_memory_consumption(parent_bucket_id);
let composite_agg_data = agg_data.take_composite_req_data(self.accessor_idx);
for doc in docs {
@@ -172,7 +172,7 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
sub_agg.check_flush_local(agg_data)?;
}
let mem_delta = self.get_memory_consumption() - mem_pre;
let mem_delta = self.get_memory_consumption(parent_bucket_id) - mem_pre;
if mem_delta > 0 {
agg_data.context.limits.add_memory_consumed(mem_delta)?;
}
@@ -199,14 +199,22 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Composite is a multi-bucket agg with no single value to extract.
None
}
}
impl SegmentCompositeCollector {
fn get_memory_consumption(&self) -> u64 {
self.parent_buckets
.iter()
.map(|m| m.memory_consumption())
.sum()
fn get_memory_consumption(&self, parent_bucket_id: BucketId) -> u64 {
self.parent_buckets[parent_bucket_id as usize].memory_consumption()
}
pub(crate) fn from_req_and_validate(
@@ -218,7 +226,7 @@ impl SegmentCompositeCollector {
let has_sub_aggregations = !node.children.is_empty();
let sub_agg = if has_sub_aggregations {
let sub_agg_collector = build_segment_agg_collectors(req_data, &node.children)?;
Some(CachedSubAggs::new(sub_agg_collector))
Some(BufferedSubAggs::new(sub_agg_collector))
} else {
None
};
@@ -332,7 +340,7 @@ fn collect_bucket_with_limit(
limit_num_buckets: usize,
buckets: &mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
key: &[InternalValueRepr],
sub_agg: &mut Option<CachedSubAggs<HighCardSubAggCache>>,
sub_agg: &mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
bucket_id_provider: &mut BucketIdProvider,
) {
let mut record_in_bucket = |bucket: &mut CompositeBucketCollector| {
@@ -488,7 +496,7 @@ struct CompositeKeyVisitor<'a> {
doc_id: crate::DocId,
composite_agg_data: &'a CompositeAggReqData,
buckets: &'a mut DynArrayHeapMap<InternalValueRepr, CompositeBucketCollector>,
sub_agg: &'a mut Option<CachedSubAggs<HighCardSubAggCache>>,
sub_agg: &'a mut Option<BufferedSubAggs<HighCardSubAggBuffer>>,
bucket_id_provider: &'a mut BucketIdProvider,
sub_level_values: SmallVec<[InternalValueRepr; MAX_DYN_ARRAY_SIZE]>,
}

View File

@@ -511,14 +511,14 @@ mod tests {
fn datetime_from_iso_str(date_str: &str) -> common::DateTime {
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
.expect(&format!("Failed to parse date: {}", date_str));
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
let timestamp_secs = dt.unix_timestamp_nanos();
common::DateTime::from_timestamp_nanos(timestamp_secs as i64)
}
fn ms_timestamp_from_iso_str(date_str: &str) -> i64 {
let dt = OffsetDateTime::parse(date_str, &Rfc3339)
.expect(&format!("Failed to parse date: {}", date_str));
.unwrap_or_else(|_| panic!("Failed to parse date: {}", date_str));
(dt.unix_timestamp_nanos() / 1_000_000) as i64
}
@@ -548,7 +548,7 @@ mod tests {
agg_req_json["my_composite"]["composite"]["after"] = after_key.take().unwrap();
}
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), &index).unwrap();
let res = exec_request(agg_req.clone(), index).unwrap();
let expected_page_buckets = &expected_buckets_vec[page_idx * page_size
..std::cmp::min((page_idx + 1) * page_size, expected_buckets_vec.len())];
assert_eq!(
@@ -559,34 +559,30 @@ mod tests {
page_size,
agg_req,
);
if page_idx + 1 < page_count {
assert!(
res["my_composite"].get("after_key").is_some(),
"expected after_key on all but last page"
);
after_key = Some(res["my_composite"]["after_key"].clone());
} else if res["my_composite"].get("after_key").is_some() {
// currently we sometime have an after_key on the last page,
// check that the next "page" is empty
let agg_req_json = json!({
"my_composite": {
"composite": {
"sources": composite_agg_sources,
"size": page_size,
"after": res["my_composite"]["after_key"].clone(),
}
}
});
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), &index).unwrap();
assert_eq!(
res["my_composite"]["buckets"],
json!([]),
"expected no buckets when using after_key from last page, query: {:?}",
agg_req
);
}
assert!(
res["my_composite"].get("after_key").is_some(),
"expected after_key on every non-empty page"
);
after_key = Some(res["my_composite"]["after_key"].clone());
}
// Using the after_key from the last page must yield an empty page.
let agg_req_json = json!({
"my_composite": {
"composite": {
"sources": composite_agg_sources,
"size": page_size,
"after": after_key,
}
}
});
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), index).unwrap();
assert_eq!(
res["my_composite"]["buckets"],
json!([]),
"expected no buckets when using after_key from last page, query: {:?}",
agg_req
);
}
}
@@ -711,8 +707,28 @@ mod tests {
{"key": {"myterm": "terme"}, "doc_count": 1}
])
);
assert!(res["my_composite"].get("after_key").is_none());
// paginating past last page should be empty
let agg_req_json = json!({
"my_composite": {
"composite": {
"sources": [
{"myterm": {"terms": {"field": "string_id"}}}
],
"size": 3,
"after": &res["my_composite"]["after_key"]
}
}
});
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), &index).unwrap();
assert!(res["my_composite"].get("after_key").is_none());
assert_eq!(
res["my_composite"]["buckets"],
json!([]),
"expected no buckets when using after_key from last page, query: {:?}",
agg_req
);
Ok(())
}
@@ -820,7 +836,10 @@ mod tests {
{"key": {"myterm": "apple"}, "doc_count": 1}
])
);
assert!(res["fruity_aggreg"].get("after_key").is_none());
assert_eq!(
res["fruity_aggreg"]["after_key"],
json!({"myterm": "str:apple"})
);
Ok(())
}
@@ -1792,7 +1811,14 @@ mod tests {
{"key": {"month": ms_timestamp_from_iso_str("2021-02-01T00:00:00Z"), "category": "books"}, "doc_count": 1},
]),
);
assert!(res["my_composite"].get("after_key").is_none());
let feb_2021_ns = ms_timestamp_from_iso_str("2021-02-01T00:00:00Z") * 1_000_000;
assert_eq!(
res["my_composite"]["after_key"],
json!({
"month": format!("dt:{}", feb_2021_ns),
"category": "str:books"
})
);
Ok(())
}

View File

@@ -6,8 +6,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::aggregation::agg_data::{
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
};
use crate::aggregation::cached_sub_aggs::{
CachedSubAggs, HighCardSubAggCache, LowCardSubAggCache, SubAggCache,
use crate::aggregation::buffered_sub_aggs::{
BufferedSubAggs, HighCardSubAggBuffer, LowCardSubAggBuffer, SubAggBuffer,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -503,17 +503,17 @@ struct DocCount {
}
/// Segment collector for filter aggregation
pub struct SegmentFilterCollector<C: SubAggCache> {
pub struct SegmentFilterCollector<B: SubAggBuffer> {
/// Document counts per parent bucket
parent_buckets: Vec<DocCount>,
/// Sub-aggregation collectors
sub_aggregations: Option<CachedSubAggs<C>>,
sub_aggregations: Option<BufferedSubAggs<B>>,
bucket_id_provider: BucketIdProvider,
/// Accessor index for this filter aggregation (to access FilterAggReqData)
accessor_idx: usize,
}
impl<C: SubAggCache> SegmentFilterCollector<C> {
impl<B: SubAggBuffer> SegmentFilterCollector<B> {
/// Create a new filter segment collector following the new agg_data pattern
pub(crate) fn from_req_and_validate(
req: &mut AggregationsSegmentCtx,
@@ -525,7 +525,7 @@ impl<C: SubAggCache> SegmentFilterCollector<C> {
} else {
None
};
let sub_agg_collector = sub_agg_collector.map(CachedSubAggs::new);
let sub_agg_collector = sub_agg_collector.map(BufferedSubAggs::new);
Ok(SegmentFilterCollector {
parent_buckets: Vec::new(),
@@ -547,16 +547,16 @@ pub(crate) fn build_segment_filter_collector(
if is_top_level {
Ok(Box::new(
SegmentFilterCollector::<LowCardSubAggCache>::from_req_and_validate(req, node)?,
SegmentFilterCollector::<LowCardSubAggBuffer>::from_req_and_validate(req, node)?,
))
} else {
Ok(Box::new(
SegmentFilterCollector::<HighCardSubAggCache>::from_req_and_validate(req, node)?,
SegmentFilterCollector::<HighCardSubAggBuffer>::from_req_and_validate(req, node)?,
))
}
}
impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
impl<B: SubAggBuffer> Debug for SegmentFilterCollector<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentFilterCollector")
.field("buckets", &self.parent_buckets)
@@ -566,7 +566,7 @@ impl<C: SubAggCache> Debug for SegmentFilterCollector<C> {
}
}
impl<C: SubAggCache> SegmentAggregationCollector for SegmentFilterCollector<C> {
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B> {
fn add_intermediate_aggregation_result(
&mut self,
agg_data: &AggregationsSegmentCtx,
@@ -674,6 +674,17 @@ impl<C: SubAggCache> SegmentAggregationCollector for SegmentFilterCollector<C> {
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// TODO: forward into the inner `sub_agg` for nested order paths (`filter.metric`).
None
}
}
/// Intermediate result for filter aggregation

View File

@@ -10,7 +10,7 @@ use crate::aggregation::agg_data::{
};
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::agg_result::BucketEntry;
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateHistogramBucketEntry,
@@ -258,7 +258,7 @@ pub(crate) struct SegmentHistogramBucketEntry {
impl SegmentHistogramBucketEntry {
pub(crate) fn into_intermediate_bucket_entry(
self,
sub_aggregation: &mut Option<HighCardCachedSubAggs>,
sub_aggregation: &mut Option<HighCardBufferedSubAggs>,
agg_data: &AggregationsSegmentCtx,
) -> crate::Result<IntermediateHistogramBucketEntry> {
let mut sub_aggregation_res = IntermediateAggregationResults::default();
@@ -283,6 +283,11 @@ impl SegmentHistogramBucketEntry {
struct HistogramBuckets {
pub buckets: FxHashMap<i64, SegmentHistogramBucketEntry>,
}
impl HistogramBuckets {
fn memory_consumption(&self) -> u64 {
self.buckets.capacity() as u64 * std::mem::size_of::<SegmentHistogramBucketEntry>() as u64
}
}
/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
@@ -291,7 +296,7 @@ pub struct SegmentHistogramCollector {
/// The buckets containing the aggregation data.
/// One Histogram bucket per parent bucket id.
parent_buckets: Vec<HistogramBuckets>,
sub_agg: Option<HighCardCachedSubAggs>,
sub_agg: Option<HighCardBufferedSubAggs>,
accessor_idx: usize,
bucket_id_provider: BucketIdProvider,
}
@@ -324,7 +329,7 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let req = agg_data.take_histogram_req_data(self.accessor_idx);
let mem_pre = self.get_memory_consumption();
let mem_pre = self.get_memory_consumption(parent_bucket_id);
let buckets = &mut self.parent_buckets[parent_bucket_id as usize].buckets;
let bounds = req.bounds;
@@ -358,12 +363,9 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
}
agg_data.put_back_histogram_req_data(self.accessor_idx, req);
let mem_delta = self.get_memory_consumption() - mem_pre;
let mem_delta = self.get_memory_consumption(parent_bucket_id) - mem_pre;
if mem_delta > 0 {
agg_data
.context
.limits
.add_memory_consumed(mem_delta as u64)?;
agg_data.context.limits.add_memory_consumed(mem_delta)?;
}
if let Some(sub_agg) = &mut self.sub_agg {
@@ -392,14 +394,24 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Histogram is a multi-bucket agg with no single value to extract.
None
}
}
impl SegmentHistogramCollector {
fn get_memory_consumption(&self) -> usize {
let self_mem = std::mem::size_of::<Self>();
let buckets_mem = self.parent_buckets.len() * std::mem::size_of::<HistogramBuckets>();
self_mem + buckets_mem
fn get_memory_consumption(&self, parent_bucket_id: BucketId) -> u64 {
self.parent_buckets[parent_bucket_id as usize].memory_consumption()
}
/// Converts the collector result into a intermediate bucket result.
fn add_intermediate_bucket_result(
&mut self,
@@ -444,7 +456,7 @@ impl SegmentHistogramCollector {
max: f64::MAX,
});
req_data.offset = req_data.req.offset.unwrap_or(0.0);
let sub_agg = sub_agg.map(CachedSubAggs::new);
let sub_agg = sub_agg.map(BufferedSubAggs::new);
Ok(Self {
parent_buckets: Default::default(),

View File

@@ -9,8 +9,9 @@ use crate::aggregation::agg_data::{
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
};
use crate::aggregation::agg_limits::AggregationLimitsGuard;
use crate::aggregation::cached_sub_aggs::{
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
use crate::aggregation::buffered_sub_aggs::{
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
SubAggBuffer,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -155,13 +156,13 @@ pub(crate) struct SegmentRangeAndBucketEntry {
/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
pub struct SegmentRangeCollector<C: SubAggCache> {
pub struct SegmentRangeCollector<B: SubAggBuffer> {
/// The buckets containing the aggregation data.
/// One for each ParentBucketId
parent_buckets: Vec<Vec<SegmentRangeAndBucketEntry>>,
column_type: ColumnType,
pub(crate) accessor_idx: usize,
sub_agg: Option<CachedSubAggs<C>>,
sub_agg: Option<BufferedSubAggs<B>>,
/// Here things get a bit weird. We need to assign unique bucket ids across all
/// parent buckets. So we keep track of the next available bucket id here.
/// This allows a kind of flattening of the bucket ids across all parent buckets.
@@ -178,7 +179,7 @@ pub struct SegmentRangeCollector<C: SubAggCache> {
limits: AggregationLimitsGuard,
}
impl<C: SubAggCache> Debug for SegmentRangeCollector<C> {
impl<B: SubAggBuffer> Debug for SegmentRangeCollector<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SegmentRangeCollector")
.field("parent_buckets_len", &self.parent_buckets.len())
@@ -229,7 +230,7 @@ impl SegmentRangeBucketEntry {
}
}
impl<C: SubAggCache> SegmentAggregationCollector for SegmentRangeCollector<C> {
impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
fn add_intermediate_aggregation_result(
&mut self,
agg_data: &AggregationsSegmentCtx,
@@ -327,6 +328,17 @@ impl<C: SubAggCache> SegmentAggregationCollector for SegmentRangeCollector<C> {
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Range is a multi-bucket agg with no single value to extract.
None
}
}
/// Build a concrete `SegmentRangeCollector` with either a Vec- or HashMap-backed
/// bucket storage, depending on the column type and aggregation level.
@@ -350,8 +362,8 @@ pub(crate) fn build_segment_range_collector(
};
if is_low_card {
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggCache> {
sub_agg: sub_agg.map(LowCardCachedSubAggs::new),
Ok(Box::new(SegmentRangeCollector::<LowCardSubAggBuffer> {
sub_agg: sub_agg.map(LowCardBufferedSubAggs::new),
column_type: field_type,
accessor_idx,
parent_buckets: Vec::new(),
@@ -359,8 +371,8 @@ pub(crate) fn build_segment_range_collector(
limits: agg_data.context.limits.clone(),
}))
} else {
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggCache> {
sub_agg: sub_agg.map(CachedSubAggs::new),
Ok(Box::new(SegmentRangeCollector::<HighCardSubAggBuffer> {
sub_agg: sub_agg.map(BufferedSubAggs::new),
column_type: field_type,
accessor_idx,
parent_buckets: Vec::new(),
@@ -370,7 +382,7 @@ pub(crate) fn build_segment_range_collector(
}
}
impl<C: SubAggCache> SegmentRangeCollector<C> {
impl<B: SubAggBuffer> SegmentRangeCollector<B> {
pub(crate) fn create_new_buckets(
&mut self,
agg_data: &AggregationsSegmentCtx,
@@ -554,7 +566,7 @@ mod tests {
pub fn get_collector_from_ranges(
ranges: Vec<RangeAggregationRange>,
field_type: ColumnType,
) -> SegmentRangeCollector<HighCardSubAggCache> {
) -> SegmentRangeCollector<HighCardSubAggBuffer> {
let req = RangeAggregation {
field: "dummy".to_string(),
ranges,

View File

@@ -1,5 +1,4 @@
use std::fmt::Debug;
use std::io;
use std::net::Ipv6Addr;
use columnar::column_values::CompactSpaceU64Accessor;
@@ -17,8 +16,9 @@ use crate::aggregation::agg_data::{
};
use crate::aggregation::agg_limits::MemoryConsumption;
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::cached_sub_aggs::{
CachedSubAggs, HighCardSubAggCache, LowCardCachedSubAggs, LowCardSubAggCache, SubAggCache,
use crate::aggregation::buffered_sub_aggs::{
BufferedSubAggs, HighCardSubAggBuffer, LowCardBufferedSubAggs, LowCardSubAggBuffer,
SubAggBuffer,
};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
@@ -352,19 +352,15 @@ pub(crate) fn build_segment_term_collector(
)));
}
// Validate sub aggregation exists when ordering by sub-aggregation.
{
if let OrderTarget::SubAggregation(sub_agg_name) = &terms_req_data.req.order.target {
let (agg_name, _agg_property) = get_agg_name_and_property(sub_agg_name);
node.get_sub_agg(agg_name, &req_data.per_request)
.ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"could not find aggregation with name {agg_name} in metric \
sub_aggregations"
))
})?;
}
// Validate that the referenced sub-aggregation exists when ordering by one.
if let OrderTarget::SubAggregation(sub_agg_name) = &terms_req_data.req.order.target {
let (agg_name, _agg_property) = get_agg_name_and_property(sub_agg_name);
node.get_sub_agg(agg_name, &req_data.per_request)
.ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"could not find aggregation with name {agg_name} in metric sub_aggregations"
))
})?;
}
// Build sub-aggregation blueprint if there are children.
@@ -391,7 +387,7 @@ pub(crate) fn build_segment_term_collector(
// Decide which bucket storage is best suited for this aggregation.
if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC && !has_sub_aggregations {
let term_buckets = VecTermBucketsNoAgg::new(max_term_id + 1, &mut bucket_id_provider);
let collector: SegmentTermCollector<_, HighCardSubAggCache> = SegmentTermCollector {
let collector: SegmentTermCollector<_, HighCardSubAggBuffer> = SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg: None,
bucket_id_provider,
@@ -401,8 +397,8 @@ pub(crate) fn build_segment_term_collector(
Ok(Box::new(collector))
} else if is_top_level && max_term_id < MAX_NUM_TERMS_FOR_VEC {
let term_buckets = VecTermBuckets::new(max_term_id + 1, &mut bucket_id_provider);
let sub_agg = sub_agg_collector.map(LowCardCachedSubAggs::new);
let collector: SegmentTermCollector<_, LowCardSubAggCache> = SegmentTermCollector {
let sub_agg = sub_agg_collector.map(LowCardBufferedSubAggs::new);
let collector: SegmentTermCollector<_, LowCardSubAggBuffer> = SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg,
bucket_id_provider,
@@ -414,8 +410,8 @@ pub(crate) fn build_segment_term_collector(
let term_buckets: PagedTermMap =
PagedTermMap::new(max_term_id + 1, &mut bucket_id_provider);
// Build sub-aggregation blueprint (flat pairs)
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggCache> =
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
let collector: SegmentTermCollector<PagedTermMap, HighCardSubAggBuffer> =
SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg,
@@ -427,8 +423,8 @@ pub(crate) fn build_segment_term_collector(
} else {
let term_buckets: HashMapTermBuckets = HashMapTermBuckets::default();
// Build sub-aggregation blueprint (flat pairs)
let sub_agg = sub_agg_collector.map(CachedSubAggs::new);
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggCache> =
let sub_agg = sub_agg_collector.map(BufferedSubAggs::new);
let collector: SegmentTermCollector<HashMapTermBuckets, HighCardSubAggBuffer> =
SegmentTermCollector {
parent_buckets: vec![term_buckets],
sub_agg,
@@ -758,10 +754,10 @@ impl TermAggregationMap for VecTermBuckets {
/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
#[derive(Debug)]
struct SegmentTermCollector<TermMap: TermAggregationMap, C: SubAggCache> {
struct SegmentTermCollector<TermMap: TermAggregationMap, B: SubAggBuffer> {
/// The buckets containing the aggregation data.
parent_buckets: Vec<TermMap>,
sub_agg: Option<CachedSubAggs<C>>,
sub_agg: Option<BufferedSubAggs<B>>,
bucket_id_provider: BucketIdProvider,
max_term_id: u64,
terms_req_data: TermsAggReqData,
@@ -772,8 +768,8 @@ pub(crate) fn get_agg_name_and_property(name: &str) -> (&str, &str) {
(agg_name, agg_property)
}
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
for SegmentTermCollector<TermMap, C>
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
for SegmentTermCollector<TermMap, B>
{
fn add_intermediate_aggregation_result(
&mut self,
@@ -790,8 +786,14 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
let term_req = &self.terms_req_data;
let name = term_req.name.clone();
let bucket =
Self::into_intermediate_bucket_result(term_req, &mut self.sub_agg, bucket, agg_data)?;
let bucket = Self::into_intermediate_bucket_result(
term_req,
self.sub_agg
.as_mut()
.map(BufferedSubAggs::get_sub_agg_collector),
bucket,
agg_data,
)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
Ok(())
}
@@ -803,7 +805,7 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
docs: &[crate::DocId],
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
let mem_pre = self.get_memory_consumption();
let mem_pre = self.get_memory_consumption(parent_bucket_id);
let req_data = &mut self.terms_req_data;
@@ -847,7 +849,7 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
}
}
let mem_delta = self.get_memory_consumption() - mem_pre;
let mem_delta = self.get_memory_consumption(parent_bucket_id) - mem_pre;
if mem_delta > 0 {
agg_data
.context
@@ -881,6 +883,17 @@ impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentAggregationCollector
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Terms is a multi-bucket agg with no single value to extract.
None
}
}
/// Missing value are represented as a sentinel value in the column.
@@ -907,30 +920,53 @@ fn extract_missing_value<T>(
Some((key, bucket))
}
impl<TermMap, C> SegmentTermCollector<TermMap, C>
fn reborrow_opt_collector<'a>(
opt: &'a mut Option<&mut dyn SegmentAggregationCollector>,
) -> Option<&'a mut dyn SegmentAggregationCollector> {
match opt {
Some(inner) => Some(*inner),
None => None,
}
}
fn into_intermediate_bucket_entry(
bucket: Bucket,
sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
agg_data: &AggregationsSegmentCtx,
) -> crate::Result<IntermediateTermBucketEntry> {
let mut sub_aggregation_res = IntermediateAggregationResults::default();
if let Some(sub_agg_collector) = sub_agg_collector {
sub_agg_collector.add_intermediate_aggregation_result(
agg_data,
&mut sub_aggregation_res,
bucket.bucket_id,
)?;
}
Ok(IntermediateTermBucketEntry {
doc_count: bucket.count,
sub_aggregation: sub_aggregation_res,
})
}
impl<TermMap, B> SegmentTermCollector<TermMap, B>
where
TermMap: TermAggregationMap,
C: SubAggCache,
B: SubAggBuffer,
{
fn get_memory_consumption(&self) -> usize {
self.parent_buckets
.iter()
.map(|b| b.get_memory_consumption())
.sum()
#[inline]
fn get_memory_consumption(&self, parent_bucket_id: BucketId) -> usize {
self.parent_buckets[parent_bucket_id as usize].get_memory_consumption()
}
#[inline]
pub(crate) fn into_intermediate_bucket_result(
term_req: &TermsAggReqData,
sub_agg: &mut Option<CachedSubAggs<C>>,
mut sub_agg_collector: Option<&mut dyn SegmentAggregationCollector>,
term_buckets: TermMap,
agg_data: &AggregationsSegmentCtx,
) -> crate::Result<IntermediateBucketResult> {
let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec();
let order_by_sub_aggregation =
matches!(term_req.req.order.target, OrderTarget::SubAggregation(_));
match &term_req.req.order.target {
OrderTarget::Key => {
// We rely on the fact, that term ordinals match the order of the strings
@@ -942,10 +978,37 @@ where
entries.sort_unstable_by_key(|bucket| bucket.0);
}
}
OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the
// results when cutting off du to unknown nature of the sub_aggregation (possible
// to check).
OrderTarget::SubAggregation(sub_agg_path) => {
// Peek segment-level metric values, sort, then fall through to
// `cut_off_buckets`. Like Elasticsearch, we always cut off when ordering
// by a sub-agg: top-K results are approximate and may differ from the
// global ordering, especially for non-monotonic metrics like avg/min.
let coll = sub_agg_collector.as_deref().ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"Could not find sub-aggregation collector for path {sub_agg_path}"
))
})?;
let (agg_name, agg_prop) = get_agg_name_and_property(sub_agg_path);
// Fetch values up-front; otherwise sort would re-compute per comparison
let mut keyed: Vec<(f64, (u64, Bucket))> = entries
.into_iter()
.map(|bucket| {
let metric_value = coll
.compute_metric_value(bucket.1.bucket_id, agg_name, agg_prop, agg_data)
.unwrap_or(0.0);
(metric_value, bucket)
})
.collect();
if term_req.req.order.order == Order::Desc {
keyed.sort_unstable_by(|a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.sort_unstable_by(|a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
}
entries = keyed.into_iter().map(|(_, e)| e).collect();
}
OrderTarget::Count => {
if term_req.req.order.order == Order::Desc {
@@ -956,40 +1019,12 @@ where
}
}
let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation {
(0, 0)
} else {
cut_off_buckets(&mut entries, term_req.req.segment_size as usize)
};
let (term_doc_count_before_cutoff, sum_other_doc_count) =
cut_off_buckets(&mut entries, term_req.req.segment_size as usize);
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
dict.reserve(entries.len());
let into_intermediate_bucket_entry =
|bucket: Bucket,
sub_agg: &mut Option<CachedSubAggs<C>>|
-> crate::Result<IntermediateTermBucketEntry> {
if let Some(sub_agg) = sub_agg {
let mut sub_aggregation_res = IntermediateAggregationResults::default();
sub_agg
.get_sub_agg_collector()
.add_intermediate_aggregation_result(
agg_data,
&mut sub_aggregation_res,
bucket.bucket_id,
)?;
Ok(IntermediateTermBucketEntry {
doc_count: bucket.count,
sub_aggregation: sub_aggregation_res,
})
} else {
Ok(IntermediateTermBucketEntry {
doc_count: bucket.count,
sub_aggregation: Default::default(),
})
}
};
if term_req.column_type == ColumnType::Str {
let fallback_dict = Dictionary::empty();
let term_dict = term_req
@@ -1000,7 +1035,11 @@ where
if let Some((intermediate_key, bucket)) = extract_missing_value(&mut entries, term_req)
{
let intermediate_entry = into_intermediate_bucket_entry(bucket, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
dict.insert(intermediate_key, intermediate_entry);
}
@@ -1008,19 +1047,28 @@ where
entries.sort_unstable_by_key(|bucket| bucket.0);
let (term_ids, buckets): (Vec<u64>, Vec<Bucket>) = entries.into_iter().unzip();
let mut buckets_it = buckets.into_iter();
term_dict.sorted_ords_to_term_cb(term_ids.into_iter(), |term| {
let bucket = buckets_it.next().unwrap();
let intermediate_entry =
into_intermediate_bucket_entry(bucket, sub_agg).map_err(io::Error::other)?;
let intermediate_entries: Vec<IntermediateTermBucketEntry> = buckets
.into_iter()
.map(|bucket| {
into_intermediate_bucket_entry(
bucket,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)
})
.collect::<crate::Result<_>>()?;
let mut intermediate_entry_it = intermediate_entries.into_iter();
term_dict.sorted_ords_to_term_cb(&term_ids[..], |term| {
let intermediate_entry = intermediate_entry_it.next().unwrap();
dict.insert(
IntermediateKey::Str(
String::from_utf8(term.to_vec()).expect("could not convert to String"),
),
intermediate_entry,
);
Ok(())
})?;
if term_req.req.min_doc_count == 0 {
@@ -1055,14 +1103,22 @@ where
}
} else if term_req.column_type == ColumnType::DateTime {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
let val = i64::from_u64(val);
let date = format_date(val)?;
dict.insert(IntermediateKey::Str(date), intermediate_entry);
}
} else if term_req.column_type == ColumnType::Bool {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
let val = bool::from_u64(val);
dict.insert(IntermediateKey::Bool(val), intermediate_entry);
}
@@ -1082,14 +1138,22 @@ where
})?;
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
let val = Ipv6Addr::from_u128(val);
dict.insert(IntermediateKey::IpAddr(val), intermediate_entry);
}
} else {
for (val, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(doc_count, sub_agg)?;
let intermediate_entry = into_intermediate_bucket_entry(
doc_count,
reborrow_opt_collector(&mut sub_agg_collector),
agg_data,
)?;
if term_req.column_type == ColumnType::U64 {
dict.insert(IntermediateKey::U64(val), intermediate_entry);
} else if term_req.column_type == ColumnType::I64 {
@@ -1123,13 +1187,13 @@ where
}
}
impl<TermMap: TermAggregationMap, C: SubAggCache> SegmentTermCollector<TermMap, C> {
impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentTermCollector<TermMap, B> {
#[inline]
fn collect_terms_with_docs(
iter: impl Iterator<Item = (crate::DocId, u64)>,
term_buckets: &mut TermMap,
bucket_id_provider: &mut BucketIdProvider,
sub_agg: &mut CachedSubAggs<C>,
sub_agg: &mut BufferedSubAggs<B>,
) {
for (doc, term_id) in iter {
let bucket_id = term_buckets.term_entry(term_id, bucket_id_provider);
@@ -1202,7 +1266,7 @@ mod tests {
use crate::aggregation::{AggregationLimitsGuard, DistributedAggregationCollector};
use crate::indexer::NoMergePolicy;
use crate::query::AllQuery;
use crate::schema::{IntoIpv6Addr, Schema, FAST, STRING};
use crate::schema::{IntoIpv6Addr, Schema, FAST, INDEXED, STRING, TEXT};
use crate::{Index, IndexWriter};
#[test]
@@ -1731,6 +1795,263 @@ mod tests {
Ok(())
}
#[test]
fn terms_aggregation_order_by_cardinality_desc_single_segment() -> crate::Result<()> {
terms_aggregation_order_by_cardinality_desc(true)
}
#[test]
fn terms_aggregation_order_by_cardinality_desc_multi_segment() -> crate::Result<()> {
terms_aggregation_order_by_cardinality_desc(false)
}
fn terms_aggregation_order_by_cardinality_desc(merge_segments: bool) -> crate::Result<()> {
// Distinct score values per bucket key: A→5, B→1, C→3.
// Order by cardinality desc must yield A, C, B.
let segment_and_terms = vec![vec![
(1.0, "A".to_string()),
(2.0, "A".to_string()),
(3.0, "A".to_string()),
(4.0, "A".to_string()),
(5.0, "A".to_string()),
(1.0, "B".to_string()),
(1.0, "B".to_string()),
(1.0, "B".to_string()),
(1.0, "C".to_string()),
(2.0, "C".to_string()),
(3.0, "C".to_string()),
]];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "card": "desc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][0]["card"]["value"], 5.0);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][1]["card"]["value"], 3.0);
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][2]["card"]["value"], 1.0);
// Asc engages the segment-cutoff path too (monotonic-safe: discarded buckets had
// local card >= cutoff, so merged card >= cutoff and they cannot be globally smallest).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "card": "asc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "A");
// size=2 with desc engages the segment cutoff: must keep top-2 by cardinality (A, C),
// and `sum_other_doc_count` reflects the dropped B (3 docs).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"size": 2,
"order": { "card": "desc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"].as_array().unwrap().len(), 2);
// size=2 with asc engages the segment cutoff: must keep bottom-2 by cardinality (B, C).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"size": 2,
"order": { "card": "asc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"].as_array().unwrap().len(), 2);
Ok(())
}
#[test]
fn terms_aggregation_order_by_sum_single_segment() -> crate::Result<()> {
terms_aggregation_order_by_sum(true)
}
#[test]
fn terms_aggregation_order_by_sum_multi_segment() -> crate::Result<()> {
terms_aggregation_order_by_sum(false)
}
fn terms_aggregation_order_by_sum(merge_segments: bool) -> crate::Result<()> {
// Per-bucket sums on the U64 `score` column (non-negative => sum is monotonic):
// A → 1+2+3+4+5 = 15, B → 1+1+1 = 3, C → 1+2+3 = 6.
let segment_and_terms = vec![
vec![
(1.0, "A".to_string()),
(2.0, "A".to_string()),
(3.0, "A".to_string()),
(1.0, "B".to_string()),
(1.0, "C".to_string()),
],
vec![
(4.0, "A".to_string()),
(5.0, "A".to_string()),
(1.0, "B".to_string()),
(1.0, "B".to_string()),
(2.0, "C".to_string()),
(3.0, "C".to_string()),
],
];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
// Desc on a Sum metric engages the fast path (column is U64).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "total": "desc" }
},
"aggs": {
"total": { "sum": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][0]["total"]["value"], 15.0);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][1]["total"]["value"], 6.0);
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][2]["total"]["value"], 3.0);
// Asc engages the fast path too — discarded buckets had local sum >= cutoff,
// and merged sum >= local (non-negative addends), so they cannot be globally smallest.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "total": "asc" }
},
"aggs": {
"total": { "sum": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "A");
// size=2 desc with cutoff: top-2 by sum (A, C).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"size": 2,
"order": { "total": "desc" }
},
"aggs": {
"total": { "sum": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"].as_array().unwrap().len(), 2);
// Stats sub-property: ordering by `mystats.sum` on a U64 column also engages.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "mystats.sum": "desc" }
},
"aggs": {
"mystats": { "stats": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
// Sum on a signed column (I64) takes the same cutoff path. Results may be
// approximate near the boundary on adversarial data, but for this dataset the
// top-K is unambiguous.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "total": "desc" }
},
"aggs": {
"total": { "sum": { "field": "score_i64" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
// Order by extended_stats sub-property exercises compute_metric_value on the
// ExtendedStats collector. A→max=5, B→max=1, C→max=3, so desc by max → A, C, B.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "ext.max": "desc" }
},
"aggs": {
"ext": { "extended_stats": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
Ok(())
}
#[test]
fn terms_aggregation_test_order_key_single_segment() -> crate::Result<()> {
terms_aggregation_test_order_key_merge_segment(true)
@@ -2896,4 +3217,101 @@ mod tests {
Ok(())
}
fn prep_index_with_n_unique_terms_plus_one_null(n: u64) -> crate::Result<Index> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED);
let title_field = schema_builder.add_text_field("title", TEXT | FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
// set to one thread to guarantee all docs end up in the same segment
let mut writer = index.writer_with_num_threads(1, 50_000_000)?;
writer.add_document(doc!(
id_field => 0u64,
))?;
for i in 1u64..=n {
let title = format!("foo{i}");
writer.add_document(doc!(
id_field => i,
title_field => title,
))?;
}
writer.commit()?;
Ok(index)
}
#[test]
fn null_bitset_bounds_check_regression() -> crate::Result<()> {
// include cases
for i in 0..=4 {
let index = prep_index_with_n_unique_terms_plus_one_null(i * 64)?;
let normal_req: Aggregations = serde_json::from_value(json!({
"my_bool": {
"terms": {
"field": "title",
"missing": "__NULL__",
"size": 1000,
}
}
}))?;
let include_req: Aggregations = serde_json::from_value(json!({
"my_bool": {
"terms": {
"field": "title",
"include": "foo(.*)",
"missing": "__NULL__",
"size": 1000,
}
}
}))?;
let exclude_req: Aggregations = serde_json::from_value(json!({
"my_bool": {
"terms": {
"field": "title",
"exclude": "foo(.*)",
"missing": "__NULL__",
"size": 1000,
}
}
}))?;
let normal_res = exec_request(normal_req, &index)?;
let normal_buckets = normal_res["my_bool"]["buckets"].as_array().unwrap();
assert_eq!(
normal_buckets.len(),
(i * 64) as usize + 1,
"The normal request should return all 'foo' buckets, plus the missing term bucket",
);
let include_res = exec_request(include_req, &index)?;
eprintln!("include_res: {include_res:?}");
let include_buckets = include_res["my_bool"]["buckets"].as_array().unwrap();
assert_eq!(
include_buckets.len(),
(i * 64) as usize,
"The include request should return all 'foo' buckets, and not the missing term \
bucket",
);
assert!(include_buckets
.iter()
.all(|b| b["key"].as_str().unwrap().starts_with("foo")));
let exclude_res = exec_request(exclude_req, &index)?;
let exclude_buckets = exclude_res["my_bool"]["buckets"].as_array().unwrap();
if i != 0 {
// TODO: Remove this if after fixing exclude + missing bug
assert_eq!(
exclude_buckets.len(),
1,
"The exclude request should exclude all 'foo' buckets, and only the missing \
term bucket",
);
assert_eq!(exclude_buckets[0]["key"], "__NULL__");
}
}
Ok(())
}
}

View File

@@ -5,7 +5,7 @@ use crate::aggregation::agg_data::{
build_segment_agg_collectors, AggRefNode, AggregationsSegmentCtx,
};
use crate::aggregation::bucket::term_agg::TermsAggregation;
use crate::aggregation::cached_sub_aggs::{CachedSubAggs, HighCardCachedSubAggs};
use crate::aggregation::buffered_sub_aggs::{BufferedSubAggs, HighCardBufferedSubAggs};
use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult,
@@ -47,7 +47,7 @@ struct MissingCount {
#[derive(Default, Debug)]
pub struct TermMissingAgg {
accessor_idx: usize,
sub_agg: Option<HighCardCachedSubAggs>,
sub_agg: Option<HighCardBufferedSubAggs>,
/// Idx = parent bucket id, Value = missing count for that bucket
missing_count_per_bucket: Vec<MissingCount>,
bucket_id_provider: BucketIdProvider,
@@ -66,7 +66,7 @@ impl TermMissingAgg {
None
};
let sub_agg = sub_agg.map(CachedSubAggs::new);
let sub_agg = sub_agg.map(BufferedSubAggs::new);
let bucket_id_provider = BucketIdProvider::default();
Ok(Self {
@@ -177,6 +177,17 @@ impl SegmentAggregationCollector for TermMissingAgg {
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// TODO: forward to `sub_agg` for nested order paths (`missing_agg>metric`).
None
}
}
#[cfg(test)]

View File

@@ -6,7 +6,7 @@ use crate::aggregation::bucket::MAX_NUM_TERMS_FOR_VEC;
use crate::aggregation::BucketId;
use crate::DocId;
/// A cache for sub-aggregations, storing doc ids per bucket id.
/// A buffer for sub-aggregations, storing doc ids per bucket id.
/// Depending on the cardinality of the parent aggregation, we use different
/// storage strategies.
///
@@ -24,21 +24,21 @@ use crate::DocId;
/// aggregations.
/// What this datastructure does in general is to group docs by bucket id.
#[derive(Debug)]
pub(crate) struct CachedSubAggs<C: SubAggCache> {
cache: C,
pub(crate) struct BufferedSubAggs<B: SubAggBuffer> {
buffer: B,
sub_agg_collector: Box<dyn SegmentAggregationCollector>,
num_docs: usize,
}
pub type LowCardCachedSubAggs = CachedSubAggs<LowCardSubAggCache>;
pub type HighCardCachedSubAggs = CachedSubAggs<HighCardSubAggCache>;
pub type LowCardBufferedSubAggs = BufferedSubAggs<LowCardSubAggBuffer>;
pub type HighCardBufferedSubAggs = BufferedSubAggs<HighCardSubAggBuffer>;
const FLUSH_THRESHOLD: usize = 2048;
/// A trait for caching sub-aggregation doc ids per bucket id.
/// A trait for buffering sub-aggregation doc ids per bucket id.
/// Different implementations can be used depending on the cardinality
/// of the parent aggregation.
pub trait SubAggCache: Debug {
pub trait SubAggBuffer: Debug {
fn new() -> Self;
fn push(&mut self, bucket_id: BucketId, doc_id: DocId);
fn flush_local(
@@ -49,22 +49,22 @@ pub trait SubAggCache: Debug {
) -> crate::Result<()>;
}
impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
impl<Backend: SubAggBuffer + Debug> BufferedSubAggs<Backend> {
pub fn new(sub_agg: Box<dyn SegmentAggregationCollector>) -> Self {
Self {
cache: Backend::new(),
buffer: Backend::new(),
sub_agg_collector: sub_agg,
num_docs: 0,
}
}
pub fn get_sub_agg_collector(&mut self) -> &mut Box<dyn SegmentAggregationCollector> {
&mut self.sub_agg_collector
pub fn get_sub_agg_collector(&mut self) -> &mut dyn SegmentAggregationCollector {
&mut *self.sub_agg_collector
}
#[inline]
pub fn push(&mut self, bucket_id: BucketId, doc_id: DocId) {
self.cache.push(bucket_id, doc_id);
self.buffer.push(bucket_id, doc_id);
self.num_docs += 1;
}
@@ -75,7 +75,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
agg_data: &mut AggregationsSegmentCtx,
) -> crate::Result<()> {
if self.num_docs >= FLUSH_THRESHOLD {
self.cache
self.buffer
.flush_local(&mut self.sub_agg_collector, agg_data, false)?;
self.num_docs = 0;
}
@@ -85,7 +85,7 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
/// Note: this _does_ flush the sub aggregations.
pub fn flush(&mut self, agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
if self.num_docs != 0 {
self.cache
self.buffer
.flush_local(&mut self.sub_agg_collector, agg_data, true)?;
self.num_docs = 0;
}
@@ -94,11 +94,11 @@ impl<Backend: SubAggCache + Debug> CachedSubAggs<Backend> {
}
}
/// Number of partitions for high cardinality sub-aggregation cache.
/// Number of partitions for high cardinality sub-aggregation buffer.
const NUM_PARTITIONS: usize = 16;
#[derive(Debug)]
pub(crate) struct HighCardSubAggCache {
pub(crate) struct HighCardSubAggBuffer {
/// This weird partitioning is used to do some cheap grouping on the bucket ids.
/// bucket ids are dense, e.g. when we don't detect the cardinality as low cardinality,
/// but there are just 16 bucket ids, each bucket id will go to its own partition.
@@ -108,7 +108,7 @@ pub(crate) struct HighCardSubAggCache {
partitions: Box<[PartitionEntry; NUM_PARTITIONS]>,
}
impl HighCardSubAggCache {
impl HighCardSubAggBuffer {
#[inline]
fn clear(&mut self) {
for partition in self.partitions.iter_mut() {
@@ -131,7 +131,7 @@ impl PartitionEntry {
}
}
impl SubAggCache for HighCardSubAggCache {
impl SubAggBuffer for HighCardSubAggBuffer {
fn new() -> Self {
Self {
partitions: Box::new(core::array::from_fn(|_| PartitionEntry::default())),
@@ -173,14 +173,14 @@ impl SubAggCache for HighCardSubAggCache {
}
#[derive(Debug)]
pub(crate) struct LowCardSubAggCache {
/// Cache doc ids per bucket for sub-aggregations.
pub(crate) struct LowCardSubAggBuffer {
/// Buffer doc ids per bucket for sub-aggregations.
///
/// The outer Vec is indexed by BucketId.
per_bucket_docs: Vec<Vec<DocId>>,
}
impl LowCardSubAggCache {
impl LowCardSubAggBuffer {
#[inline]
fn clear(&mut self) {
for v in &mut self.per_bucket_docs {
@@ -189,7 +189,7 @@ impl LowCardSubAggCache {
}
}
impl SubAggCache for LowCardSubAggCache {
impl SubAggBuffer for LowCardSubAggBuffer {
fn new() -> Self {
Self {
per_bucket_docs: Vec::new(),

View File

@@ -1,6 +1,6 @@
use super::agg_req::Aggregations;
use super::agg_result::AggregationResults;
use super::cached_sub_aggs::LowCardCachedSubAggs;
use super::buffered_sub_aggs::LowCardBufferedSubAggs;
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::AggContextParams;
// group buffering strategy is chosen explicitly by callers; no need to hash-group on the fly.
@@ -136,7 +136,7 @@ fn merge_fruits(
/// `AggregationSegmentCollector` does the aggregation collection on a segment.
pub struct AggregationSegmentCollector {
aggs_with_accessor: AggregationsSegmentCtx,
agg_collector: LowCardCachedSubAggs,
agg_collector: LowCardBufferedSubAggs,
error: Option<TantivyError>,
}
@@ -152,7 +152,7 @@ impl AggregationSegmentCollector {
let mut agg_data =
build_aggregations_data_from_req(agg, reader, segment_ordinal, context.clone())?;
let mut result =
LowCardCachedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
LowCardBufferedSubAggs::new(build_segment_agg_collectors_root(&mut agg_data)?);
result
.get_sub_agg_collector()
.prepare_max_bucket(0, &agg_data)?; // prepare for bucket zero

View File

@@ -1004,24 +1004,20 @@ impl IntermediateCompositeBucketResult {
) -> crate::Result<BucketResult> {
let trimmed_entry_vec =
trim_composite_buckets(self.entries, &self.orders, self.target_size)?;
let after_key = if trimmed_entry_vec.len() == req.size as usize {
trimmed_entry_vec
.last()
.map(|bucket| {
let (intermediate_key, _entry) = bucket;
intermediate_key
.iter()
.enumerate()
.map(|(idx, intermediate_key)| {
let source = &req.sources[idx];
(source.name().to_string(), intermediate_key.clone().into())
})
.collect()
})
.unwrap()
} else {
FxHashMap::default()
};
let after_key = trimmed_entry_vec
.last()
.map(|bucket| {
let (intermediate_key, _entry) = bucket;
intermediate_key
.iter()
.enumerate()
.map(|(idx, intermediate_key)| {
let source = &req.sources[idx];
(source.name().to_string(), intermediate_key.clone().into())
})
.collect()
})
.unwrap_or_default();
let buckets = trimmed_entry_vec
.into_iter()

File diff suppressed because it is too large Load Diff

View File

@@ -399,6 +399,26 @@ impl SegmentAggregationCollector for SegmentExtendedStatsCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
if self.name != sub_agg_name {
return None;
}
let extended = self.buckets.get(bucket_id as usize)?;
// Finalize is a pure read of accumulators — calling it here for the cutoff sort
// doesn't disturb the eventual intermediate result.
extended
.finalize()
.get_value(sub_agg_property)
.ok()
.flatten()
}
}
#[cfg(test)]

View File

@@ -107,10 +107,9 @@ pub enum PercentileValues {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// The entry when requesting percentiles with keyed: false
pub struct PercentileValuesVecEntry {
/// Percentile
/// The percentile key (e.g. 1.0, 5.0, 25.0).
pub key: f64,
/// Value at the percentile
/// The percentile value. `NaN` when there are no values.
pub value: f64,
}

View File

@@ -312,6 +312,26 @@ impl SegmentAggregationCollector for SegmentPercentilesCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
if agg_data.get_metric_req_data(self.accessor_idx).name != sub_agg_name {
return None;
}
let percentile: f64 = sub_agg_property.parse().ok()?;
if !(0.0..=100.0).contains(&percentile) {
return None;
}
let bucket = self.buckets.get(bucket_id as usize)?;
// DDSketch.quantile is a pure read; calling it here for the cutoff sort does
// not affect the intermediate state used for the final result.
bucket.sketch.quantile(percentile / 100.0).ok().flatten()
}
}
#[cfg(test)]

View File

@@ -321,6 +321,40 @@ impl<const COLUMN_TYPE_ID: u8> SegmentAggregationCollector
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
if self.name != sub_agg_name {
return None;
}
let stats = self.buckets.get(bucket_id as usize)?;
// The property depends on what we're collecting:
// - StatsType::Stats exposes count/sum/min/max/avg via dotted property.
// - Single-value kinds (Sum/Count/Min/Max/Average) expect an empty property and return
// the value they were configured to collect.
let prop = match self.collecting_for {
StatsType::Stats if !sub_agg_property.is_empty() => sub_agg_property,
StatsType::Sum if sub_agg_property.is_empty() => "sum",
StatsType::Count if sub_agg_property.is_empty() => "count",
StatsType::Max if sub_agg_property.is_empty() => "max",
StatsType::Min if sub_agg_property.is_empty() => "min",
StatsType::Average if sub_agg_property.is_empty() => "avg",
_ => return None,
};
match prop {
"count" => Some(stats.count as f64),
"sum" => Some(stats.sum),
"min" if stats.count > 0 => Some(stats.min),
"max" if stats.count > 0 => Some(stats.max),
"avg" if stats.count > 0 => Some(stats.sum / stats.count as f64),
_ => None,
}
}
}
#[inline]

View File

@@ -644,6 +644,17 @@ impl SegmentAggregationCollector for TopHitsSegmentCollector {
);
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// top_hits is not a numeric metric and cannot be used as an order target.
None
}
}
#[cfg(test)]

View File

@@ -133,7 +133,7 @@ mod agg_limits;
pub mod agg_req;
pub mod agg_result;
pub mod bucket;
pub(crate) mod cached_sub_aggs;
pub(crate) mod buffered_sub_aggs;
mod collector;
mod date;
mod error;

View File

@@ -76,6 +76,31 @@ pub trait SegmentAggregationCollector: Debug {
fn flush(&mut self, _agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
Ok(())
}
/// Compute the segment-level metric value of the named direct-child metric for `bucket_id`.
///
/// Used by parent term aggs that order by a sub-aggregation: the parent sorts on
/// this value and cuts off at segment time, matching the approximation tradeoff
/// Elasticsearch makes for any sub-agg ordering.
///
/// `sub_agg_property` is the dotted suffix (e.g. `"sum"` in `mystats.sum`); empty when
/// the metric is a single-value kind such as cardinality.
///
/// Returns `None` only on name mismatch, unknown property, or empty bucket. Implementations
/// may finalize their per-bucket state (e.g. compute a percentile from a sketch); calls
/// must be idempotent so the final intermediate result is unaffected.
///
/// No default impl on purpose: every collector must decide explicitly whether it
/// produces a metric value, forwards into children (single-bucket aggs), or rejects
/// the lookup. A silent `None` default would let a parent term agg's cutoff sort all
/// buckets to the same key and drop arbitrary winners.
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
agg_data: &AggregationsSegmentCtx,
) -> Option<f64>;
}
#[derive(Default)]
@@ -137,4 +162,21 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
for agg in &self.aggs {
if let Some(value) =
agg.compute_metric_value(bucket_id, sub_agg_name, sub_agg_property, agg_data)
{
return Some(value);
}
}
None
}
}

View File

@@ -1,5 +1,6 @@
use super::Collector;
use crate::collector::SegmentCollector;
use crate::query::Weight;
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
/// `CountCollector` collector only counts how many
@@ -55,6 +56,15 @@ impl Collector for Count {
fn merge_fruits(&self, segment_counts: Vec<usize>) -> crate::Result<usize> {
Ok(segment_counts.into_iter().sum())
}
fn collect_segment(
&self,
weight: &dyn Weight,
_segment_ord: u32,
reader: &SegmentReader,
) -> crate::Result<usize> {
Ok(weight.count(reader)? as usize)
}
}
#[derive(Default)]

View File

@@ -389,6 +389,13 @@ impl SegmentCollector for FacetSegmentCollector {
}
let mut facet = vec![];
let (facet_ord, facet_depth) = self.unique_facet_ords[collapsed_facet_ord];
// u64::MAX is used as a sentinel for unmapped ordinals (e.g. when a
// document has the exact registered facet, not a child of it).
// Passing it to ord_to_term would resolve to the last dictionary
// entry and produce a spurious facet from an unrelated branch.
if facet_ord == u64::MAX {
continue;
}
// TODO handle errors.
if facet_dict.ord_to_term(facet_ord, &mut facet).is_ok() {
if let Some((end_collapsed_facet, _)) = facet
@@ -814,6 +821,63 @@ mod tests {
assert!(!super::is_child_facet(&b"foo\0bar"[..], &b"foo"[..]));
assert!(!super::is_child_facet(&b"foo"[..], &b"foobar\0baz"[..]));
}
// Regression test for https://github.com/quickwit-oss/tantivy/issues/2494
// When a document has the exact registered facet path (not just a child),
// harvest() must not turn the unmapped sentinel into a spurious root entry.
#[test]
fn test_facet_collector_wrong_root() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer: IndexWriter = index.writer_for_tests()?;
let facets: Vec<&str> = vec![
"/science-fiction/asimov",
"/science-fiction/clarke",
"/science-fiction/dick",
"/science-fiction/herbert",
"/science-fiction/orwell",
// This exact match on the registered facet is the bug trigger:
// its ordinal maps to the sentinel (u64::MAX, 0) in the collapse
// mapping, which without the fix resolves to an unrelated term.
"/fantasy/epic-fantasy",
"/fantasy/epic-fantasy/tolkien",
"/fantasy/epic-fantasy/martin",
];
for facet_str in &facets {
index_writer.add_document(doc!(
facet_field => Facet::from(*facet_str)
))?;
}
index_writer.commit()?;
let reader = index.reader()?;
let searcher = reader.searcher();
let term = Term::from_facet(facet_field, &Facet::from("/fantasy/epic-fantasy"));
let query = TermQuery::new(term, IndexRecordOption::Basic);
let mut facet_collector = FacetCollector::for_field("facet");
facet_collector.add_facet("/fantasy/epic-fantasy");
let counts: FacetCounts = searcher.search(&query, &facet_collector)?;
let result: Vec<(String, u64)> = counts
.get("/")
.map(|(facet, count)| (facet.to_string(), count))
.collect();
// Only children of /fantasy/epic-fantasy should appear, not /science-fiction
assert_eq!(
result,
vec![
("/fantasy/epic-fantasy/martin".to_string(), 1),
("/fantasy/epic-fantasy/tolkien".to_string(), 1),
]
);
Ok(())
}
}
#[cfg(all(test, feature = "unstable"))]

View File

@@ -1,5 +1,8 @@
use std::cmp::{Ordering, Reverse};
use std::collections::BinaryHeap;
use crate::collector::sort_key::NaturalComparator;
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer, TopNComputer};
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer};
use crate::{DocAddress, DocId, Score};
/// Sort by similarity score.
@@ -25,6 +28,10 @@ impl SortKeyComputer for SortBySimilarityScore {
}
// Sorting by score is special in that it allows for the Block-Wand optimization.
//
// We use a BinaryHeap (TopNHeap) instead of TopNComputer here so that the
// threshold is always the exact K-th best score. TopNComputer only updates its
// threshold every K docs (at truncation), giving Block-WAND a stale bound.
fn collect_segment_top_k(
&self,
k: usize,
@@ -32,12 +39,10 @@ impl SortKeyComputer for SortBySimilarityScore {
reader: &crate::SegmentReader,
segment_ord: u32,
) -> crate::Result<Vec<(Self::SortKey, DocAddress)>> {
let mut top_n: TopNComputer<Score, DocId, Self::Comparator> =
TopNComputer::new_with_comparator(k, self.comparator());
let mut top_n = TopNHeap::new(k);
if let Some(alive_bitset) = reader.alive_bitset() {
let mut threshold = Score::MIN;
top_n.threshold = Some(threshold);
weight.for_each_pruning(Score::MIN, reader, &mut |doc, score| {
if alive_bitset.is_deleted(doc) {
return threshold;
@@ -56,7 +61,7 @@ impl SortKeyComputer for SortBySimilarityScore {
Ok(top_n
.into_vec()
.into_iter()
.map(|cid| (cid.sort_key, DocAddress::new(segment_ord, cid.doc)))
.map(|(score, doc)| (score, DocAddress::new(segment_ord, doc)))
.collect())
}
}
@@ -75,3 +80,204 @@ impl SegmentSortKeyComputer for SortBySimilarityScore {
score
}
}
/// Min-heap entry: higher score = greater, lower doc wins ties.
struct ScoreHeapEntry {
score: Score,
doc: DocId,
}
impl Eq for ScoreHeapEntry {}
impl PartialEq for ScoreHeapEntry {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl PartialOrd for ScoreHeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScoreHeapEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.score
.partial_cmp(&other.score)
.unwrap_or(Ordering::Equal)
.then_with(|| other.doc.cmp(&self.doc))
}
}
/// Heap-based top-K for score collection. O(log K) per insert, but the threshold
/// is always tight, so Block-WAND prunes better than with [`TopNComputer`]'s
/// buffer/median approach.
///
/// Like [`TopNComputer`], items must arrive in ascending doc order, and equal
/// scores are rejected (strict `>`) so that lower doc IDs win ties.
///
/// [`TopNComputer`]: crate::collector::TopNComputer
struct TopNHeap {
heap: BinaryHeap<Reverse<ScoreHeapEntry>>,
top_n: usize,
threshold: Option<Score>,
}
impl TopNHeap {
fn new(top_n: usize) -> Self {
TopNHeap {
heap: BinaryHeap::with_capacity(top_n),
top_n,
threshold: None,
}
}
#[inline]
fn push(&mut self, score: Score, doc: DocId) {
if self.heap.len() < self.top_n {
self.heap.push(Reverse(ScoreHeapEntry { score, doc }));
if self.heap.len() == self.top_n {
self.threshold = self.heap.peek().map(|Reverse(entry)| entry.score);
}
} else if let Some(threshold) = self.threshold {
if score > threshold {
// peek_mut + assign is a single sift-down, vs pop + push = two sifts.
if let Some(mut min) = self.heap.peek_mut() {
*min = Reverse(ScoreHeapEntry { score, doc });
}
self.threshold = self.heap.peek().map(|Reverse(entry)| entry.score);
}
}
}
fn into_vec(self) -> Vec<(Score, DocId)> {
self.heap
.into_vec()
.into_iter()
.map(|Reverse(entry)| (entry.score, entry.doc))
.collect()
}
}
#[cfg(test)]
mod tests {
use proptest::prelude::*;
use super::*;
use crate::collector::sort_key::NaturalComparator;
use crate::collector::TopNComputer;
#[test]
fn test_top_n_heap_zero_capacity() {
let mut heap = TopNHeap::new(0);
heap.push(1.0, 0);
heap.push(2.0, 1);
assert!(heap.into_vec().is_empty());
}
#[test]
fn test_top_n_heap_basic() {
let mut heap = TopNHeap::new(2);
heap.push(1.0, 0);
heap.push(3.0, 1);
heap.push(2.0, 2);
let mut results = heap.into_vec();
results.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1)));
assert_eq!(results, vec![(3.0, 1), (2.0, 2)]);
}
#[test]
fn test_top_n_heap_threshold_always_accurate() {
let mut heap = TopNHeap::new(2);
assert_eq!(heap.threshold, None);
heap.push(1.0, 0);
assert_eq!(heap.threshold, None);
heap.push(3.0, 1);
assert_eq!(heap.threshold, Some(1.0));
heap.push(2.0, 2); // evicts 1.0
assert_eq!(heap.threshold, Some(2.0));
heap.push(4.0, 3); // evicts 2.0
assert_eq!(heap.threshold, Some(3.0));
}
#[test]
fn test_top_n_heap_tiebreaking_lower_doc_wins() {
let mut heap = TopNHeap::new(2);
heap.push(5.0, 0);
heap.push(5.0, 1);
heap.push(5.0, 2); // rejected: not strictly > threshold
let mut results = heap.into_vec();
results.sort_by_key(|&(_, doc)| doc);
assert_eq!(results, vec![(5.0, 0), (5.0, 1)]);
}
#[test]
fn test_top_n_heap_single_element() {
let mut heap = TopNHeap::new(1);
heap.push(1.0, 0);
assert_eq!(heap.threshold, Some(1.0));
heap.push(0.5, 1); // rejected
heap.push(2.0, 2); // accepted
assert_eq!(heap.threshold, Some(2.0));
let results = heap.into_vec();
assert_eq!(results, vec![(2.0, 2)]);
}
#[test]
fn test_top_n_heap_under_capacity() {
let mut heap = TopNHeap::new(5);
heap.push(3.0, 0);
heap.push(1.0, 1);
heap.push(2.0, 2);
// Only 3 elements, capacity is 5 — all should be kept
assert_eq!(heap.threshold, None);
let mut results = heap.into_vec();
results.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1)));
assert_eq!(results, vec![(3.0, 0), (2.0, 2), (1.0, 1)]);
}
proptest! {
#[test]
fn test_top_n_heap_matches_top_n_computer(
limit in 0..20_usize,
mut docs in proptest::collection::vec((0..1000_u32, 0..1000_u32), 0..200_usize),
) {
// Both require ascending doc order.
docs.sort_by_key(|(_, doc_id)| *doc_id);
docs.dedup_by_key(|(_, doc_id)| *doc_id);
let mut heap = TopNHeap::new(limit);
let mut computer: TopNComputer<Score, DocId, NaturalComparator> =
TopNComputer::new_with_comparator(limit, NaturalComparator);
for &(score_u32, doc) in &docs {
let score = score_u32 as Score;
heap.push(score, doc);
computer.push(score, doc);
}
let mut heap_results = heap.into_vec();
heap_results.sort_by(|a, b| {
b.0.partial_cmp(&a.0).unwrap().then_with(|| a.1.cmp(&b.1))
});
let computer_results: Vec<(Score, DocId)> = computer
.into_sorted_vec()
.into_iter()
.map(|cd| (cd.sort_key, cd.doc))
.collect();
prop_assert_eq!(heap_results, computer_results);
}
}
}

View File

@@ -513,7 +513,9 @@ pub struct TopNComputer<Score, D, C> {
/// The buffer reverses sort order to get top-semantics instead of bottom-semantics
buffer: Vec<ComparableDoc<Score, D>>,
top_n: usize,
pub(crate) threshold: Option<Score>,
/// The current threshold for pruning. Documents with scores at or below
/// this value are skipped by `push()`. Updated when the buffer is truncated.
pub threshold: Option<Score>,
comparator: C,
}

View File

@@ -1,5 +1,7 @@
use std::borrow::{Borrow, BorrowMut};
use common::TinySet;
use crate::fastfield::AliveBitSet;
use crate::DocId;
@@ -14,6 +16,12 @@ pub const TERMINATED: DocId = i32::MAX as u32;
/// exactly this size as long as we can fill the buffer.
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].
pub const BLOCK_NUM_TINYBITSETS: usize = 16;
/// Number of doc IDs covered by one block: `BLOCK_NUM_TINYBITSETS * 64 = 1024`.
pub const BLOCK_WINDOW: u32 = BLOCK_NUM_TINYBITSETS as u32 * 64;
/// Represents an iterable set of sorted doc ids.
pub trait DocSet: Send {
/// Goes to the next element.
@@ -160,6 +168,31 @@ pub trait DocSet: Send {
self.size_hint() as u64
}
/// Fills a bitmask representing which documents in `[min_doc, min_doc + BLOCK_WINDOW)` are
/// present in this docset.
///
/// The window is divided into `BLOCK_NUM_TINYBITSETS` buckets of 64 docs each.
/// Returns the next doc `>= min_doc + BLOCK_WINDOW`, or `TERMINATED` if exhausted.
fn fill_bitset_block(
&mut self,
min_doc: DocId,
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
) -> DocId {
self.seek(min_doc);
let horizon = min_doc + BLOCK_WINDOW;
loop {
let doc = self.doc();
if doc >= horizon {
return doc;
}
let delta = doc - min_doc;
mask[(delta / 64) as usize].insert_mut(delta % 64);
if self.advance() == TERMINATED {
return TERMINATED;
}
}
}
/// Returns the number documents matching.
/// Calling this method consumes the `DocSet`.
fn count(&mut self, alive_bitset: &AliveBitSet) -> u32 {
@@ -214,6 +247,18 @@ impl DocSet for &mut dyn DocSet {
(**self).seek_danger(target)
}
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {
(**self).fill_buffer(buffer)
}
fn fill_bitset_block(
&mut self,
min_doc: DocId,
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
) -> DocId {
(**self).fill_bitset_block(min_doc, mask)
}
fn doc(&self) -> u32 {
(**self).doc()
}
@@ -256,6 +301,15 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.fill_buffer(buffer)
}
fn fill_bitset_block(
&mut self,
min_doc: DocId,
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
) -> DocId {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.fill_bitset_block(min_doc, mask)
}
fn doc(&self) -> DocId {
let unboxed: &TDocSet = self.borrow();
unboxed.doc()

View File

@@ -249,6 +249,12 @@ impl BlockSegmentPostings {
/// Returns the length of the current block.
///
/// Returns the decoded term-frequency buffer for the current block.
#[inline]
pub(crate) fn freq_output_array(&self) -> &[u32] {
self.freq_decoder.output_array()
}
/// All blocks have a length of `NUM_DOCS_PER_BLOCK`,
/// except the last block that may have a length
/// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1`
@@ -298,6 +304,11 @@ impl BlockSegmentPostings {
}
}
#[inline]
pub(crate) fn has_remaining_docs(&self) -> bool {
self.skip_reader.has_remaining_docs()
}
pub(crate) fn block_is_loaded(&self) -> bool {
self.block_loaded
}

View File

@@ -146,6 +146,11 @@ impl SkipReader {
skip_reader
}
#[inline(always)]
pub fn has_remaining_docs(&self) -> bool {
self.remaining_docs != 0
}
pub fn reset(&mut self, data: OwnedBytes, doc_freq: u32) {
self.last_doc_in_block = if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0

View File

@@ -0,0 +1,464 @@
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::query::term_query::TermScorer;
use crate::query::Scorer;
use crate::{DocId, DocSet, Score, TERMINATED};
/// Block-max pruning for top-K over intersection of term scorers.
///
/// Uses the least-frequent term as "leader" to define 128-doc processing windows.
/// For each window, the sum of block_max_scores is compared to the current threshold;
/// if the block can't beat it, the entire block is skipped.
///
/// Within non-skipped blocks, individual documents are pruned by checking whether
/// leader_score + sum(secondary block_max_scores) can exceed the threshold before
/// performing the expensive intersection membership check (seeking into secondary scorers).
///
/// # Preconditions
/// - `scorers` has at least 2 elements
/// - All scorers read frequencies (`FreqReadingOption::ReadFreq`)
pub(crate) fn block_wand_intersection(
mut scorers: Vec<TermScorer>,
mut threshold: Score,
callback: &mut dyn FnMut(DocId, Score) -> Score,
) {
assert!(scorers.len() >= 2);
// Sort by cost (ascending). scorers[0] becomes the "leader" (rarest term).
scorers.sort_by_key(TermScorer::size_hint);
let (leader, secondaries) = scorers.split_first_mut().unwrap();
// Precompute global max scores for early termination checks.
let leader_max_score: Score = leader.max_score();
let secondaries_global_max_sum: Score = secondaries.iter().map(TermScorer::max_score).sum();
// Early exit: no document can possibly beat the threshold.
if leader_max_score + secondaries_global_max_sum <= threshold {
return;
}
// Borrow fieldnorm reader and BM25 weight before the main loop.
// These are immutable references to disjoint fields from block_cursor,
// but Rust's borrow checker can't see through method calls, so we
// extract them once upfront.
let fieldnorm_reader = leader.fieldnorm_reader().clone();
let bm25_weight = leader.bm25_weight().clone();
let mut doc = leader.doc();
let mut secondary_block_max_scores: Box<[f32]> =
vec![0.0f32; secondaries.len()].into_boxed_slice();
let mut secondary_suffix_block_max: Box<[f32]> =
vec![0.0f32; secondaries.len()].into_boxed_slice();
while doc < TERMINATED {
// --- Phase 1: Block-level pruning ---
//
// Position all skip readers on the block containing `doc`.
// seek_block is cheap: it only advances the skip reader, no block decompression.
leader.seek_block(doc);
let leader_block_max: Score = leader.block_max_score();
// Compute the window end as the minimum last_doc_in_block across all scorers.
// This ensures the block_max values are valid for all docs in [doc, window_end].
// Different scorers have independently aligned blocks, so we must use the
// smallest window where all block_max values hold.
let mut window_end: DocId = leader.last_doc_in_block();
let mut secondary_block_max_sum: Score = 0.0;
let num_secondaries = secondaries.len();
for (idx, secondary) in secondaries.iter_mut().enumerate() {
secondary.block_cursor().seek_block(doc);
if !secondary.block_cursor().has_remaining_docs() {
return;
}
window_end = window_end.min(secondary.last_doc_in_block());
let bms = secondary.block_max_score();
secondary_block_max_scores[idx] = bms;
secondary_block_max_sum += bms;
}
if leader_block_max + secondary_block_max_sum <= threshold {
// The entire window cannot beat the threshold. Skip past it.
doc = window_end + 1;
continue;
}
// --- Phase 2: Batch processing within the window ---
//
// Score-first approach: decode the leader's block, filter by threshold,
// then check intersection membership only for survivors. This avoids expensive
// secondary seeks for docs that can't beat the threshold.
let block_cursor = leader.block_cursor();
// seek loads the block and returns the in-block index of the first doc >= `doc`.
let start_idx = block_cursor.seek(doc);
// Use the branchless binary search on the doc decoder to find the first
// index past window_end.
let end_idx = block_cursor
.doc_decoder
.seek_within_block(window_end + 1)
.min(block_cursor.block_len());
let block_docs = &block_cursor.doc_decoder.output_array()[start_idx..end_idx];
let block_freqs = &block_cursor.freq_output_array()[start_idx..end_idx];
// Pass 1: Batch-compute leader BM25 scores and branchlessly filter
// candidates that can't beat the threshold.
//
// The trick: always write to the buffer at `num_candidates`, then
// conditionally advance the count. The compiler can turn this into
// a cmov instead of a branch, avoiding misprediction costs.
let score_threshold = threshold - secondary_block_max_sum;
let mut candidate_doc_ids = [0u32; COMPRESSION_BLOCK_SIZE];
let mut candidate_scores = [0.0f32; COMPRESSION_BLOCK_SIZE];
let mut num_candidates = 0usize;
for (candidate_doc, term_freq) in
block_docs.iter().copied().zip(block_freqs.iter().copied())
{
let fieldnorm_id = fieldnorm_reader.fieldnorm_id(candidate_doc);
let leader_score = bm25_weight.score(fieldnorm_id, term_freq);
candidate_doc_ids[num_candidates] = candidate_doc;
candidate_scores[num_candidates] = leader_score;
num_candidates += (leader_score > score_threshold) as usize;
}
// Precompute suffix sums: suffix[i] = sum of block_max for secondaries[i+1..].
// Used in Phase 2 to prune candidates that can't beat threshold even with
// remaining secondaries contributing their block_max.
if num_candidates == 0 {
doc = window_end + 1;
continue;
}
let mut running = 0.0f32;
for idx in (0..num_secondaries).rev() {
secondary_suffix_block_max[idx] = running;
running += secondary_block_max_scores[idx];
}
// Pass 2: Check intersection membership only for survivors.
// score_threshold may be stale (threshold can increase from callbacks),
// but that's conservative — we may check a few extra candidates, never miss one.
'next_candidate: for candidate_idx in 0..num_candidates {
let candidate_doc = candidate_doc_ids[candidate_idx];
let mut total_score: Score = candidate_scores[candidate_idx];
for (secondary_idx, secondary) in secondaries.iter_mut().enumerate() {
// If a previous candidate already advanced this secondary past
// candidate_doc, the candidate can't be in the intersection.
if secondary.doc() > candidate_doc {
continue 'next_candidate;
}
let seek_result = secondary.seek(candidate_doc);
if seek_result != candidate_doc {
continue 'next_candidate;
}
total_score += secondary.score();
// Prune: even if all remaining secondaries score at their block max,
// can we still beat the threshold?
if total_score + secondary_suffix_block_max[secondary_idx] <= threshold {
continue 'next_candidate;
}
}
// All secondaries matched.
if total_score > threshold {
threshold = callback(candidate_doc, total_score);
if leader_max_score + secondaries_global_max_sum <= threshold {
return;
}
}
}
doc = window_end + 1;
}
}
#[cfg(test)]
mod tests {
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use proptest::prelude::*;
use crate::query::term_query::TermScorer;
use crate::query::{Bm25Weight, Scorer};
use crate::{DocId, DocSet, Score, TERMINATED};
struct Float(Score);
impl Eq for Float {}
impl PartialEq for Float {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl PartialOrd for Float {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Float {
fn cmp(&self, other: &Self) -> Ordering {
other.0.partial_cmp(&self.0).unwrap_or(Ordering::Equal)
}
}
fn nearly_equals(left: Score, right: Score) -> bool {
(left - right).abs() < 0.0001 * (left + right).abs()
}
/// Run block_wand_intersection and collect (doc, score) pairs above threshold.
fn compute_checkpoints_block_wand_intersection(
term_scorers: Vec<TermScorer>,
top_k: usize,
) -> Vec<(DocId, Score)> {
let mut heap: BinaryHeap<Float> = BinaryHeap::with_capacity(top_k);
let mut checkpoints: Vec<(DocId, Score)> = Vec::new();
let mut limit: Score = 0.0;
let callback = &mut |doc, score| {
heap.push(Float(score));
if heap.len() > top_k {
heap.pop().unwrap();
}
if heap.len() == top_k {
limit = heap.peek().unwrap().0;
}
if !nearly_equals(score, limit) {
checkpoints.push((doc, score));
}
limit
};
super::block_wand_intersection(term_scorers, Score::MIN, callback);
checkpoints
}
/// Naive baseline: intersect by iterating all docs.
fn compute_checkpoints_naive_intersection(
mut term_scorers: Vec<TermScorer>,
top_k: usize,
) -> Vec<(DocId, Score)> {
let mut heap: BinaryHeap<Float> = BinaryHeap::with_capacity(top_k);
let mut checkpoints: Vec<(DocId, Score)> = Vec::new();
let mut limit = Score::MIN;
// Sort by cost to use the cheapest as driver.
term_scorers.sort_by_key(|s| s.cost());
let (leader, secondaries) = term_scorers.split_first_mut().unwrap();
let mut doc = leader.doc();
while doc != TERMINATED {
let mut all_match = true;
for secondary in secondaries.iter_mut() {
let secondary_doc = secondary.doc();
let seek_result = if secondary_doc <= doc {
secondary.seek(doc)
} else {
secondary_doc
};
if seek_result != doc {
all_match = false;
break;
}
}
if all_match {
let score: Score =
leader.score() + secondaries.iter_mut().map(|s| s.score()).sum::<Score>();
if score > limit {
heap.push(Float(score));
if heap.len() > top_k {
heap.pop().unwrap();
}
if heap.len() == top_k {
limit = heap.peek().unwrap().0;
}
if !nearly_equals(score, limit) {
checkpoints.push((doc, score));
}
}
}
doc = leader.advance();
}
checkpoints
}
const MAX_TERM_FREQ: u32 = 100u32;
fn posting_list(max_doc: u32) -> BoxedStrategy<Vec<(DocId, u32)>> {
(1..max_doc + 1)
.prop_flat_map(move |doc_freq| {
(
proptest::bits::bitset::sampled(doc_freq as usize, 0..max_doc as usize),
proptest::collection::vec(1u32..MAX_TERM_FREQ, doc_freq as usize),
)
})
.prop_map(|(docset, term_freqs)| {
docset
.iter()
.map(|doc| doc as u32)
.zip(term_freqs.iter().cloned())
.collect::<Vec<_>>()
})
.boxed()
}
#[expect(clippy::type_complexity)]
fn gen_term_scorers(num_scorers: usize) -> BoxedStrategy<(Vec<Vec<(DocId, u32)>>, Vec<u32>)> {
(1u32..100u32)
.prop_flat_map(move |max_doc: u32| {
(
proptest::collection::vec(posting_list(max_doc), num_scorers),
proptest::collection::vec(2u32..10u32 * MAX_TERM_FREQ, max_doc as usize),
)
})
.boxed()
}
fn test_block_wand_intersection_aux(posting_lists: &[Vec<(DocId, u32)>], fieldnorms: &[u32]) {
// Repeat docs 64 times to create multi-block scenarios, matching block_wand.rs test
// strategy.
const REPEAT: usize = 64;
let fieldnorms_expanded: Vec<u32> = fieldnorms
.iter()
.cloned()
.flat_map(|fieldnorm| std::iter::repeat_n(fieldnorm, REPEAT))
.collect();
let postings_lists_expanded: Vec<Vec<(DocId, u32)>> = posting_lists
.iter()
.map(|posting_list| {
posting_list
.iter()
.cloned()
.flat_map(|(doc, term_freq)| {
(0_u32..REPEAT as u32).map(move |offset| {
(
doc * (REPEAT as u32) + offset,
if offset == 0 { term_freq } else { 1 },
)
})
})
.collect::<Vec<(DocId, u32)>>()
})
.collect();
let total_fieldnorms: u64 = fieldnorms_expanded
.iter()
.cloned()
.map(|fieldnorm| fieldnorm as u64)
.sum();
let average_fieldnorm = (total_fieldnorms as Score) / (fieldnorms_expanded.len() as Score);
let max_doc = fieldnorms_expanded.len();
let make_scorers = || -> Vec<TermScorer> {
postings_lists_expanded
.iter()
.map(|postings| {
let bm25_weight = Bm25Weight::for_one_term(
postings.len() as u64,
max_doc as u64,
average_fieldnorm,
);
TermScorer::create_for_test(postings, &fieldnorms_expanded[..], bm25_weight)
})
.collect()
};
for top_k in 1..4 {
let checkpoints_optimized =
compute_checkpoints_block_wand_intersection(make_scorers(), top_k);
let checkpoints_naive = compute_checkpoints_naive_intersection(make_scorers(), top_k);
assert_eq!(
checkpoints_optimized.len(),
checkpoints_naive.len(),
"Mismatch in checkpoint count for top_k={top_k}"
);
for (&(left_doc, left_score), &(right_doc, right_score)) in
checkpoints_optimized.iter().zip(checkpoints_naive.iter())
{
assert_eq!(left_doc, right_doc);
assert!(
nearly_equals(left_score, right_score),
"Score mismatch for doc {left_doc}: {left_score} vs {right_score}"
);
}
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(500))]
#[test]
fn test_block_wand_intersection_two_scorers(
(posting_lists, fieldnorms) in gen_term_scorers(2)
) {
test_block_wand_intersection_aux(&posting_lists[..], &fieldnorms[..]);
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(500))]
#[test]
fn test_block_wand_intersection_three_scorers(
(posting_lists, fieldnorms) in gen_term_scorers(3)
) {
test_block_wand_intersection_aux(&posting_lists[..], &fieldnorms[..]);
}
}
#[test]
fn test_block_wand_intersection_disjoint() {
// Two posting lists with no overlap — intersection is empty.
let fieldnorms: Vec<u32> = vec![10; 200];
let average_fieldnorm = 10.0;
let postings_a: Vec<(DocId, u32)> = (0..100).map(|d| (d, 1)).collect();
let postings_b: Vec<(DocId, u32)> = (100..200).map(|d| (d, 1)).collect();
let scorer_a = TermScorer::create_for_test(
&postings_a,
&fieldnorms,
Bm25Weight::for_one_term(100, 200, average_fieldnorm),
);
let scorer_b = TermScorer::create_for_test(
&postings_b,
&fieldnorms,
Bm25Weight::for_one_term(100, 200, average_fieldnorm),
);
let checkpoints = compute_checkpoints_block_wand_intersection(vec![scorer_a, scorer_b], 10);
assert!(checkpoints.is_empty());
}
#[test]
fn test_block_wand_intersection_all_overlap() {
// Two posting lists with full overlap.
let fieldnorms: Vec<u32> = vec![10; 50];
let average_fieldnorm = 10.0;
let postings: Vec<(DocId, u32)> = (0..50).map(|d| (d, 3)).collect();
let make_scorer = || {
TermScorer::create_for_test(
&postings,
&fieldnorms,
Bm25Weight::for_one_term(50, 50, average_fieldnorm),
)
};
let checkpoints_opt =
compute_checkpoints_block_wand_intersection(vec![make_scorer(), make_scorer()], 5);
let checkpoints_naive =
compute_checkpoints_naive_intersection(vec![make_scorer(), make_scorer()], 5);
assert_eq!(checkpoints_opt.len(), checkpoints_naive.len());
}
}

View File

@@ -50,7 +50,7 @@ fn block_max_was_too_low_advance_one_scorer(
scorers: &mut [TermScorerWithMaxScore],
pivot_len: usize,
) {
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
debug_assert!(scorers.iter().map(|scorer| scorer.doc()).is_sorted());
let mut scorer_to_seek = pivot_len - 1;
let mut global_max_score = scorers[scorer_to_seek].max_score;
let mut doc_to_seek_after = scorers[scorer_to_seek].last_doc_in_block();
@@ -76,7 +76,7 @@ fn block_max_was_too_low_advance_one_scorer(
scorers[scorer_to_seek].seek(doc_to_seek_after);
restore_ordering(scorers, scorer_to_seek);
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
debug_assert!(scorers.iter().map(|scorer| scorer.doc()).is_sorted());
}
// Given a list of term_scorers and a `ord` and assuming that `term_scorers[ord]` is sorted
@@ -90,7 +90,7 @@ fn restore_ordering(term_scorers: &mut [TermScorerWithMaxScore], ord: usize) {
}
term_scorers.swap(i, i - 1);
}
debug_assert!(is_sorted(term_scorers.iter().map(|scorer| scorer.doc())));
debug_assert!(term_scorers.iter().map(|scorer| scorer.doc()).is_sorted());
}
// Attempts to advance all term_scorers between `&term_scorers[0..before_len]` to the pivot.
@@ -150,17 +150,21 @@ pub fn block_wand(
mut threshold: Score,
callback: &mut dyn FnMut(u32, Score) -> Score,
) {
scorers.retain(|scorer| scorer.doc() < TERMINATED);
if scorers.len() == 1 {
let scorer = scorers.pop().unwrap();
return block_wand_single_scorer(scorer, threshold, callback);
}
let mut scorers: Vec<TermScorerWithMaxScore> = scorers
.iter_mut()
.map(TermScorerWithMaxScore::from)
.collect();
scorers.sort_by_key(|scorer| scorer.doc());
// At this point we need to ensure that the scorers are sorted!
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
scorers.sort_by_key(|scorer| scorer.doc());
while let Some((before_pivot_len, pivot_len, pivot_doc)) =
find_pivot_doc(&scorers[..], threshold)
{
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
debug_assert!(scorers.iter().map(|scorer| scorer.doc()).is_sorted());
debug_assert_ne!(pivot_doc, TERMINATED);
debug_assert!(before_pivot_len < pivot_len);
@@ -228,7 +232,7 @@ pub fn block_wand_single_scorer(
loop {
// We position the scorer on a block that can reach
// the threshold.
while scorer.block_max_score() < threshold {
while scorer.block_max_score() <= threshold {
let last_doc_in_block = scorer.last_doc_in_block();
if last_doc_in_block == TERMINATED {
return;
@@ -286,18 +290,6 @@ impl DerefMut for TermScorerWithMaxScore<'_> {
}
}
fn is_sorted<I: Iterator<Item = DocId>>(mut it: I) -> bool {
if let Some(first) = it.next() {
let mut prev = first;
for doc in it {
if doc < prev {
return false;
}
prev = doc;
}
}
true
}
#[cfg(test)]
mod tests {
use std::cmp::Ordering;

View File

@@ -16,6 +16,7 @@ use crate::{DocId, Score};
enum SpecializedScorer {
TermUnion(Vec<TermScorer>),
TermIntersection(Vec<TermScorer>),
Other(Box<dyn Scorer>),
}
@@ -49,10 +50,9 @@ where
TScoreCombiner: ScoreCombiner,
{
assert!(!scorers.is_empty());
if scorers.len() == 1 {
if scorers.len() == 1 && !scorers[0].is::<TermScorer>() {
return SpecializedScorer::Other(scorers.into_iter().next().unwrap()); //< we checked the size beforehand
}
{
let is_all_term_queries = scorers.iter().all(|scorer| scorer.is::<TermScorer>());
if is_all_term_queries {
@@ -66,6 +66,9 @@ where
{
// Block wand is only available if we read frequencies.
return SpecializedScorer::TermUnion(scorers);
} else if scorers.len() == 1 {
// Single TermScorer without freq reading — unwrap directly.
return SpecializedScorer::Other(Box::new(scorers.into_iter().next().unwrap()));
} else {
return SpecializedScorer::Other(Box::new(BufferedUnionScorer::build(
scorers,
@@ -93,6 +96,13 @@ fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
Box::new(union_scorer)
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
.into_iter()
.map(|s| Box::new(s) as Box<dyn Scorer>)
.collect();
intersect_scorers(boxed_scorers, num_docs)
}
SpecializedScorer::Other(scorer) => scorer,
}
}
@@ -297,14 +307,43 @@ impl<TScoreCombiner: ScoreCombiner> BooleanWeight<TScoreCombiner> {
// Result depends entirely on MUST + any removed AllScorers.
let combined_all_scorer_count = must_special_scorer_counts.num_all_scorers
+ should_special_scorer_counts.num_all_scorers;
let boxed_scorer: Box<dyn Scorer> = effective_must_scorer(
must_scorers,
combined_all_scorer_count,
reader.max_doc(),
num_docs,
)
.unwrap_or_else(|| Box::new(EmptyScorer));
SpecializedScorer::Other(boxed_scorer)
// Try to detect a pure TermScorer intersection for block-max optimization.
// Preconditions: no removed AllScorers, at least 2 scorers, all TermScorer
// with frequency reading enabled.
if combined_all_scorer_count == 0
&& must_scorers.len() >= 2
&& must_scorers.iter().all(|s| s.is::<TermScorer>())
{
let term_scorers: Vec<TermScorer> = must_scorers
.into_iter()
.map(|s| *(s.downcast::<TermScorer>().map_err(|_| ()).unwrap()))
.collect();
if term_scorers
.iter()
.all(|s| s.freq_reading_option() == FreqReadingOption::ReadFreq)
{
SpecializedScorer::TermIntersection(term_scorers)
} else {
let must_scorers: Vec<Box<dyn Scorer>> = term_scorers
.into_iter()
.map(|s| Box::new(s) as Box<dyn Scorer>)
.collect();
let boxed_scorer: Box<dyn Scorer> =
effective_must_scorer(must_scorers, 0, reader.max_doc(), num_docs)
.unwrap_or_else(|| Box::new(EmptyScorer));
SpecializedScorer::Other(boxed_scorer)
}
} else {
let boxed_scorer: Box<dyn Scorer> = effective_must_scorer(
must_scorers,
combined_all_scorer_count,
reader.max_doc(),
num_docs,
)
.unwrap_or_else(|| Box::new(EmptyScorer));
SpecializedScorer::Other(boxed_scorer)
}
}
(ShouldScorersCombinationMethod::Optional(should_scorer), must_scorers) => {
// Optional SHOULD: contributes to scoring but not required for matching.
@@ -463,15 +502,21 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
callback: &mut dyn FnMut(DocId, Score),
) -> crate::Result<()> {
let scorer = self.complex_scorer(reader, 1.0, &self.score_combiner_fn)?;
let num_docs = reader.num_docs();
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer = BufferedUnionScorer::build(
term_scorers,
&self.score_combiner_fn,
reader.num_docs(),
);
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
for_each_scorer(&mut union_scorer, callback);
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
.into_iter()
.map(|term_scorer| Box::new(term_scorer) as Box<dyn Scorer>)
.collect();
let mut intersection = intersect_scorers(boxed_scorers, num_docs);
for_each_scorer(intersection.as_mut(), callback);
}
SpecializedScorer::Other(mut scorer) => {
for_each_scorer(scorer.as_mut(), callback);
}
@@ -485,17 +530,23 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> {
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
let num_docs = reader.num_docs();
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer = BufferedUnionScorer::build(
term_scorers,
&self.score_combiner_fn,
reader.num_docs(),
);
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
.into_iter()
.map(|term_scorer| Box::new(term_scorer) as Box<dyn Scorer>)
.collect();
let mut intersection = intersect_scorers(boxed_scorers, num_docs);
for_each_docset_buffered(intersection.as_mut(), &mut buffer, callback);
}
SpecializedScorer::Other(mut scorer) => {
for_each_docset_buffered(scorer.as_mut(), &mut buffer, callback);
}
@@ -524,6 +575,9 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
SpecializedScorer::TermUnion(term_scorers) => {
super::block_wand(term_scorers, threshold, callback);
}
SpecializedScorer::TermIntersection(term_scorers) => {
super::block_wand_intersection(term_scorers, threshold, callback);
}
SpecializedScorer::Other(mut scorer) => {
for_each_pruning_scorer(scorer.as_mut(), threshold, callback);
}

View File

@@ -1,8 +1,10 @@
mod block_wand;
mod block_wand_intersection;
mod block_wand_union;
mod boolean_query;
mod boolean_weight;
pub(crate) use self::block_wand::{block_wand, block_wand_single_scorer};
pub(crate) use self::block_wand_intersection::block_wand_intersection;
pub(crate) use self::block_wand_union::{block_wand, block_wand_single_scorer};
pub use self::boolean_query::BooleanQuery;
pub use self::boolean_weight::BooleanWeight;

View File

@@ -1,5 +1,7 @@
use common::TinySet;
use super::size_hint::estimate_intersection;
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, BLOCK_NUM_TINYBITSETS, TERMINATED};
use crate::query::term_query::TermScorer;
use crate::query::{EmptyScorer, Scorer};
use crate::{DocId, Score};
@@ -17,7 +19,7 @@ use crate::{DocId, Score};
/// `size_hint` of the intersection.
pub fn intersect_scorers(
mut scorers: Vec<Box<dyn Scorer>>,
num_docs_segment: u32,
segment_num_docs: u32,
) -> Box<dyn Scorer> {
if scorers.is_empty() {
return Box::new(EmptyScorer);
@@ -42,14 +44,14 @@ pub fn intersect_scorers(
left: *(left.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
right: *(right.downcast::<TermScorer>().map_err(|_| ()).unwrap()),
others: scorers,
num_docs: num_docs_segment,
segment_num_docs,
});
}
Box::new(Intersection {
left,
right,
others: scorers,
num_docs: num_docs_segment,
segment_num_docs,
})
}
@@ -58,7 +60,7 @@ pub struct Intersection<TDocSet: DocSet, TOtherDocSet: DocSet = Box<dyn Scorer>>
left: TDocSet,
right: TDocSet,
others: Vec<TOtherDocSet>,
num_docs: u32,
segment_num_docs: u32,
}
fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
@@ -78,7 +80,10 @@ fn go_to_first_doc<TDocSet: DocSet>(docsets: &mut [TDocSet]) -> DocId {
impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
/// num_docs is the number of documents in the segment.
pub(crate) fn new(mut docsets: Vec<TDocSet>, num_docs: u32) -> Intersection<TDocSet, TDocSet> {
pub(crate) fn new(
mut docsets: Vec<TDocSet>,
segment_num_docs: u32,
) -> Intersection<TDocSet, TDocSet> {
let num_docsets = docsets.len();
assert!(num_docsets >= 2);
docsets.sort_by_key(|docset| docset.cost());
@@ -97,7 +102,7 @@ impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
left,
right,
others: docsets,
num_docs,
segment_num_docs,
}
}
}
@@ -214,7 +219,7 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
[self.left.size_hint(), self.right.size_hint()]
.into_iter()
.chain(self.others.iter().map(DocSet::size_hint)),
self.num_docs,
self.segment_num_docs,
)
}
@@ -224,6 +229,91 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
// If there are docsets that are bad at skipping, they should also influence the cost.
self.left.cost()
}
fn count_including_deleted(&mut self) -> u32 {
const DENSITY_THRESHOLD_INVERSE: u32 = 32;
if self
.left
.size_hint()
.saturating_mul(DENSITY_THRESHOLD_INVERSE)
< self.segment_num_docs
{
// Sparse path: if the lead iterator covers less than ~3% of docs,
// the block approach wastes time on mostly-empty blocks.
self.count_including_deleted_sparse()
} else {
// Dense approach. We push documents into a block bitset to then
// perform count using popcount.
self.count_including_deleted_dense()
}
}
}
const EMPTY_BLOCK: [TinySet; BLOCK_NUM_TINYBITSETS] = [TinySet::EMPTY; BLOCK_NUM_TINYBITSETS];
/// ANDs `other` into `mask` in-place. Returns `true` if the result is all zeros.
#[inline]
fn and_blocks_and_return_is_empty(
mask: &mut [TinySet; BLOCK_NUM_TINYBITSETS],
update: &[TinySet; BLOCK_NUM_TINYBITSETS],
) -> bool {
let mut all_empty = true;
for (mask_tinyset, update_tinyset) in mask.iter_mut().zip(update.iter()) {
*mask_tinyset = mask_tinyset.intersect(*update_tinyset);
all_empty &= mask_tinyset.is_empty();
}
all_empty
}
impl<TDocSet: DocSet, TOtherDocSet: DocSet> Intersection<TDocSet, TOtherDocSet> {
fn count_including_deleted_sparse(&mut self) -> u32 {
let mut count = 0u32;
let mut doc = self.doc();
while doc != TERMINATED {
count += 1;
doc = self.advance();
}
count
}
/// Dense block-wise bitmask intersection count.
///
/// Fills a 1024-doc window from each iterator, ANDs the bitmasks together,
/// and popcounts the result. `fill_bitset_block` handles seeking tails forward
/// when they lag behind the current block.
fn count_including_deleted_dense(&mut self) -> u32 {
let mut count = 0u32;
let mut next_base = self.left.doc();
while next_base < TERMINATED {
let base = next_base;
// Fill lead bitmask.
let mut mask = EMPTY_BLOCK;
next_base = next_base.max(self.left.fill_bitset_block(base, &mut mask));
let mut tail_mask = EMPTY_BLOCK;
next_base = next_base.max(self.right.fill_bitset_block(base, &mut tail_mask));
if and_blocks_and_return_is_empty(&mut mask, &tail_mask) {
continue;
}
// AND with each additional tail.
for other in &mut self.others {
let mut other_mask = EMPTY_BLOCK;
next_base = next_base.max(other.fill_bitset_block(base, &mut other_mask));
if and_blocks_and_return_is_empty(&mut mask, &other_mask) {
continue;
}
}
for tinyset in &mask {
count += tinyset.len();
}
}
count
}
}
impl<TScorer, TOtherScorer> Scorer for Intersection<TScorer, TOtherScorer>
@@ -421,6 +511,82 @@ mod tests {
}
}
proptest! {
#[test]
fn prop_test_count_including_deleted_matches_default(
a in sorted_deduped_vec(1200, 400),
b in sorted_deduped_vec(1200, 400),
c in sorted_deduped_vec(1200, 400),
num_docs in 1200u32..2000u32,
) {
// Compute expected count via set intersection.
let expected: u32 = a.iter()
.filter(|doc| b.contains(doc) && c.contains(doc))
.count() as u32;
// Test count_including_deleted (dense path).
let make_intersection = || {
Intersection::new(
vec![
VecDocSet::from(a.clone()),
VecDocSet::from(b.clone()),
VecDocSet::from(c.clone()),
],
num_docs,
)
};
let mut intersection = make_intersection();
let count = intersection.count_including_deleted();
prop_assert_eq!(count, expected,
"count_including_deleted mismatch: a={:?}, b={:?}, c={:?}", a, b, c);
}
}
#[test]
fn test_count_including_deleted_two_way() {
let left = VecDocSet::from(vec![1, 3, 9]);
let right = VecDocSet::from(vec![3, 4, 9, 18]);
let mut intersection = Intersection::new(vec![left, right], 100);
assert_eq!(intersection.count_including_deleted(), 2);
}
#[test]
fn test_count_including_deleted_empty() {
let a = VecDocSet::from(vec![1, 3]);
let b = VecDocSet::from(vec![1, 4]);
let c = VecDocSet::from(vec![3, 9]);
let mut intersection = Intersection::new(vec![a, b, c], 100);
assert_eq!(intersection.count_including_deleted(), 0);
}
/// Test with enough documents to exercise the dense path (>= num_docs/32).
#[test]
fn test_count_including_deleted_dense_path() {
// Create dense docsets: many docs relative to segment size.
let docs_a: Vec<u32> = (0..2000).step_by(2).collect(); // even numbers 0..2000
let docs_b: Vec<u32> = (0..2000).step_by(3).collect(); // multiples of 3
let expected = docs_a.iter().filter(|d| *d % 3 == 0).count() as u32;
let a = VecDocSet::from(docs_a);
let b = VecDocSet::from(docs_b);
let mut intersection = Intersection::new(vec![a, b], 2000);
assert_eq!(intersection.count_including_deleted(), expected);
}
/// Test that spans multiple blocks (>1024 docs).
#[test]
fn test_count_including_deleted_multi_block() {
let docs_a: Vec<u32> = (0..5000).collect();
let docs_b: Vec<u32> = (0..5000).step_by(7).collect();
let expected = docs_b.len() as u32; // all of b is in a
let a = VecDocSet::from(docs_a);
let b = VecDocSet::from(docs_b);
let mut intersection = Intersection::new(vec![a, b], 5000);
assert_eq!(intersection.count_including_deleted(), expected);
}
#[test]
fn test_bug_2811_intersection_candidate_should_increase() {
let mut schema_builder = Schema::builder();

View File

@@ -1,6 +1,6 @@
use crate::docset::DocSet;
use crate::fieldnorm::FieldNormReader;
use crate::postings::{FreqReadingOption, Postings, SegmentPostings};
use crate::postings::{BlockSegmentPostings, FreqReadingOption, Postings, SegmentPostings};
use crate::query::bm25::Bm25Weight;
use crate::query::{Explanation, Scorer};
use crate::{DocId, Score};
@@ -95,6 +95,21 @@ impl TermScorer {
pub fn last_doc_in_block(&self) -> DocId {
self.postings.block_cursor.skip_reader().last_doc_in_block()
}
/// Returns a mutable reference to the underlying block cursor.
pub(crate) fn block_cursor(&mut self) -> &mut BlockSegmentPostings {
&mut self.postings.block_cursor
}
/// Returns a reference to the fieldnorm reader for batch lookups.
pub(crate) fn fieldnorm_reader(&self) -> &FieldNormReader {
&self.fieldnorm_reader
}
/// Returns a reference to the BM25 weight for batch score computation.
pub(crate) fn bm25_weight(&self) -> &Bm25Weight {
&self.similarity_weight
}
}
impl DocSet for TermScorer {
@@ -117,6 +132,12 @@ impl DocSet for TermScorer {
fn size_hint(&self) -> u32 {
self.postings.size_hint()
}
// TODO
// It is probably possible to optimize fill_bitset_block for TermScorer,
// working directly with the blocks, enabling vectorization.
// I did not manage to get a performance improvement on Mac ARM,
// and do not have access to x86 to investigate.
}
impl Scorer for TermScorer {

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-sstable"
version = "0.6.0"
version = "0.7.0"
edition = "2024"
license = "MIT"
homepage = "https://github.com/quickwit-oss/tantivy"
@@ -10,10 +10,10 @@ categories = ["database-implementations", "data-structures", "compression"]
description = "sstables for tantivy"
[dependencies]
common = {version= "0.10", path="../common", package="tantivy-common"}
common = {version= "0.11", path="../common", package="tantivy-common"}
futures-util = "0.3.30"
itertools = "0.14.0"
tantivy-bitpacker = { version= "0.9", path="../bitpacker" }
tantivy-bitpacker = { version= "0.10", path="../bitpacker" }
tantivy-fst = "0.5"
# experimental gives us access to Decompressor::upper_bound
zstd = { version = "0.13", optional = true, features = ["experimental"] }
@@ -23,7 +23,7 @@ zstd-compression = ["zstd"]
[dev-dependencies]
proptest = "1"
criterion = { version = "0.5", default-features = false }
criterion = { version = "0.8", default-features = false }
names = "0.14"
rand = "0.9"

View File

@@ -512,11 +512,13 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Returns the terms for a _sorted_ list of term ordinals.
///
/// Returns true if and only if all terms have been found.
pub fn sorted_ords_to_term_cb<F: FnMut(&[u8]) -> io::Result<()>>(
pub fn sorted_ords_to_term_cb(
&self,
mut ords: impl Iterator<Item = TermOrdinal>,
mut cb: F,
ords: &[TermOrdinal],
mut cb: impl FnMut(&[u8]),
) -> io::Result<bool> {
assert!(ords.is_sorted());
let mut ords = ords.iter().copied();
let Some(mut ord) = ords.next() else {
return Ok(true);
};
@@ -538,33 +540,36 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
bytes.extend_from_slice(current_sstable_delta_reader.suffix());
current_block_ordinal += 1;
}
cb(&bytes)?;
cb(&bytes);
// fetch the next ordinal
let Some(next_ord) = ords.next() else {
return Ok(true);
let next_ord = loop {
let Some(next_ord) = ords.next() else {
return Ok(true);
};
if next_ord == ord {
// This is the same ordinal, let's just call the callback directly.
cb(&bytes);
} else {
// we checked it was sorted beforehands
debug_assert!(next_ord > ord);
break next_ord;
}
};
// advance forward if the new ord is different than the one we just processed
// TODO optimization: it is silly to do a binary search to get the block every single
// time.
//
// this allows the input TermOrdinal iterator to contain duplicates, so long as it's
// still sorted
if next_ord < ord {
panic!("Ordinals were not sorted: received {next_ord} after {ord}");
} else if next_ord > ord {
// check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
}
ord = next_ord;
} else {
// The next ord is equal to the previous ord: no need to seek or advance.
// Check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
}
ord = next_ord;
}
}
@@ -671,8 +676,8 @@ mod tests {
use common::OwnedBytes;
use super::Dictionary;
use crate::MonotonicU64SSTable;
use crate::dictionary::TermOrdHit;
use crate::{MonotonicU64SSTable, TermOrdinal};
#[derive(Debug)]
struct PermissionedHandle {
@@ -935,25 +940,24 @@ mod tests {
}
#[test]
fn test_ords_term() {
fn test_sorted_ords_to_term() {
let (dic, _slice) = make_test_sstable();
// Single term
let mut terms = Vec::new();
assert!(
dic.sorted_ords_to_term_cb(100_000..100_001, |term| {
dic.sorted_ords_to_term_cb(&[100_000], |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
assert_eq!(terms, vec![format!("{:05X}", 100_000).into_bytes(),]);
// Single term
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = (100_001..100_002).collect();
assert!(
dic.sorted_ords_to_term_cb(100_001..100_002, |term| {
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
@@ -961,9 +965,8 @@ mod tests {
// both terms
let mut terms = Vec::new();
assert!(
dic.sorted_ords_to_term_cb(100_000..100_002, |term| {
dic.sorted_ords_to_term_cb(&[100_000, 100_001], |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
@@ -976,10 +979,10 @@ mod tests {
);
// Test cross block
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = (98653..=98655).collect();
assert!(
dic.sorted_ords_to_term_cb(98653..=98655, |term| {
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
Ok(())
})
.unwrap()
);
@@ -991,6 +994,43 @@ mod tests {
format!("{:05X}", 98655).into_bytes(),
]
);
// redundant
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = vec![1, 1, 2];
assert!(
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
})
.unwrap()
);
assert_eq!(
terms,
vec![
format!("{:05X}", 1).into_bytes(),
format!("{:05X}", 1).into_bytes(),
format!("{:05X}", 2).into_bytes(),
]
);
// redundant cross block
let mut terms = Vec::new();
let ords: Vec<TermOrdinal> = vec![98653, 98653, 98654, 98654, 98655, 98655];
assert!(
dic.sorted_ords_to_term_cb(&ords, |term| {
terms.push(term.to_vec());
})
.unwrap()
);
assert_eq!(
terms,
vec![
format!("{:05X}", 98_653).into_bytes(),
format!("{:05X}", 98_653).into_bytes(),
format!("{:05X}", 98_654).into_bytes(),
format!("{:05X}", 98_654).into_bytes(),
format!("{:05X}", 98_655).into_bytes(),
format!("{:05X}", 98_655).into_bytes(),
]
);
}
#[test]

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-stacker"
version = "0.6.0"
version = "0.7.0"
edition = "2024"
license = "MIT"
homepage = "https://github.com/quickwit-oss/tantivy"
@@ -9,7 +9,7 @@ description = "term hashmap used for indexing"
[dependencies]
murmurhash32 = "0.3"
common = { version = "0.10", path = "../common/", package = "tantivy-common" }
common = { version = "0.11", path = "../common/", package = "tantivy-common" }
ahash = { version = "0.8.11", default-features = false, optional = true }
@@ -27,7 +27,7 @@ rand = "0.9"
zipf = "7.0.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.15.3" }
binggan = { version = "0.16.1" }
rand_distr = "0.5"
[features]

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-tokenizer-api"
version = "0.6.0"
version = "0.7.0"
license = "MIT"
edition = "2021"
description = "Tokenizer API of tantivy"