Compare commits

...

33 Commits

Author SHA1 Message Date
Paul Masurel
468850e9f4 Buffer up to 2048 doc ids in for_each_docset_buffered
The no-score collection path (Weight::for_each_no_score) handed the
collector's collect_block one COLLECT_BLOCK_BUFFER_LEN (64) block at a
time. For aggregations this is the dominant path, and 64 docs per
collect_block under-amortizes the per-call overhead.

for_each_docset_buffered now owns a 2048-element heap buffer and fills
it through successive fill_buffer calls over 64-element windows, flushing
a single larger block to collect_block. fill_buffer keeps its 64-element
window contract, so no DocSet implementation changes.

The buffer is allocated with Box::new_zeroed_slice (stable since 1.92,
hence the MSRV bump) to zero directly on the heap.
2026-06-01 15:25:39 +02:00
Paul Masurel
a27c64998f Cargo clippy fix (#2943)
Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-06-01 14:39:44 +02:00
Paul Masurel
46b3fb9ed3 Relying on upstream version of datasketch and stop using HLL 4. (#2936)
We were relying on a fork for:

a bugfix in LIST serialization
a better API exposing a new Coupon type, required for caching coupons.
We also stop using HLL8 in hope to fix
https://datadoghq.atlassian.net/browse/CLOUDPREM-625

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-05-19 13:29:35 +02:00
trinity-1686a
fbe620b9b4 Merge pull request #2933 from quickwit-oss/1686a/sstable-opt
optimise sstable index access pattern
2026-05-19 11:43:17 +02:00
trinity-1686a
95d8a3989a cr 2026-05-19 11:38:48 +02:00
trinity-1686a
ea61a68db4 skip sstable index binary search when ordinal is in same block 2026-05-16 11:35:38 +02:00
trinity-1686a
c367df37c1 refactor sstable index 2026-05-16 11:30:02 +02:00
Mohammad Dashti
d99a5d4e91 Rename validate_aggregation_fields to validate_aggregation_fields_exist
Applies @PSeitz's review suggestion to make the function name more
descriptive of what it checks. Also adds a doc note clarifying why
validation is opt-in rather than enforced by default.
2026-05-16 15:45:20 +08:00
Mohammad Dashti
2de6f075ce Fixed the example 2026-05-16 15:45:20 +08:00
Mohammad Dashti
18080067c7 Applied PR comment:
I would move it outside of the aggregation. You can fetch the fields from the aggregation request and do a validation in a helper function
2026-05-16 15:45:20 +08:00
Mohammad Dashti
95db7d2e5c Revert "Revert all impl."
This reverts commit d5e0991549a05bf80f19f853f7689ad69f96e7e5.
2026-05-16 15:45:20 +08:00
Mohammad Dashti
fc017c4c74 Applied PR comments. 2026-05-16 15:45:20 +08:00
Mohammad Dashti
141c91d028 Added a flag: strict_validation 2026-05-16 15:45:20 +08:00
Mohammad Dashti
36a83e7c1a Fixed agg validation 2026-05-16 15:45:20 +08:00
jinhelin
be11f8a6a1 Fix opening positions file error 2026-05-14 15:55:59 +08:00
dependabot[bot]
4305e4029e Update binggan requirement from 0.16.1 to 0.17.0
Updates the requirements on [binggan](https://github.com/pseitz/binggan) to permit the latest version.
- [Changelog](https://github.com/PSeitz/binggan/blob/main/CHANGELOG.md)
- [Commits](https://github.com/pseitz/binggan/commits)

---
updated-dependencies:
- dependency-name: binggan
  dependency-version: 0.17.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-12 15:10:20 +08:00
Pascal Seitz
edfb02b47e switch to enum, fix mixed types for cardinality agg 2026-05-05 16:39:51 +08:00
Pascal Seitz
d0fad88bac use bitsets for card agg 2026-05-05 16:39:51 +08:00
Pascal Seitz
351280c0b4 add card bench for high card 2026-05-05 16:39:51 +08:00
James Sewell
4480cf0a98 Enable BMW for single-scorer boolean queries by removing early return in scorer_union (#2915)
The early return for `scorers.len() == 1` in `scorer_union` short-circuits a single TermScorer into `SpecializedScorer::Other`, bypassing the `TermUnion` path that enables block-max WAND (BMW) in `for_each_pruning`.

This was originally addressed in PR #2898 (backed out), which added a special case in `BooleanWeight::for_each_pruning`. PR #2912 (merged as d27ca164a) added a single-scorer fast path inside `block_wand` itself, but did not remove this early return — so a single SHOULD TermScorer still never reaches the BMW path.

Removing the early return lets a single TermScorer with freq reading flow through to `SpecializedScorer::TermUnion`, where `block_wand` → `block_wand_single_scorer` handles it efficiently.
2026-04-28 14:49:53 -07:00
Pascal Seitz
d47abdf104 early cut off for order by sub agg in term agg 2026-04-28 16:59:59 +02:00
Pascal Seitz
c11952eb7c add order by agg benchmark 2026-04-28 16:59:59 +02:00
trinity-1686a
09667ee9c8 Merge pull request #2909 from osyniakov/claude/add-ossf-scorecard-1z6Vn
Add OpenSSF Scorecard workflow
2026-04-28 11:57:36 +02:00
trinity-1686a
333ccf5300 Merge pull request #2896 from osyniakov/claude/fix-issues-5945-5937-eQm1Q
ci: pin GitHub Actions to full commit SHAs and restrict token permissions
2026-04-28 11:57:18 +02:00
Oleksii Syniakov
60a39a4689 Merge branch 'main' into claude/fix-issues-5945-5937-eQm1Q 2026-04-28 10:28:23 +02:00
Oleksii Syniakov
f8f3e4277f remove not neeeded permissions for the public repo 2026-04-28 10:09:30 +02:00
Oleksii Syniakov
ff1433713a bump upload-sarif -> 4.35.2
Co-authored-by: trinity-1686a <trinity.pointard@gmail.com>
2026-04-28 10:07:45 +02:00
trinity-1686a
ca139d8eb1 Merge pull request #2910 from quickwit-oss/abdul.andha/composite-agg-after
Composite aggregations: send after key on last page
2026-04-27 23:38:52 +02:00
Abdul Andha
ac508108aa address pr comment 2026-04-27 12:39:38 -04:00
Abdul Andha
4fbae92187 send after key on last page 2026-04-24 15:33:26 -04:00
Claude
a5d297c75f Add OpenSSF Scorecard workflow
Runs weekly security analysis and uploads SARIF results to GitHub code
scanning. Third-party actions are pinned by commit SHA. Adds the Scorecard
badge to the README.

Based on quickwit-oss/quickwit#5969.
2026-04-24 06:56:58 +00:00
Claude
3a6a3de8d7 ci: update pinned Action SHAs to current latest versions
The previous commit pinned actions to commit SHAs but used stale
version tags (v4.2.2, v2.7.5, old nextest/cargo-llvm-cov refs).
Update to the actual current HEAD of each pinned tag:

  actions/checkout        v4.2.2 → v4.3.1  (34e114876b0b...)
  Swatinem/rust-cache     v2.7.5 → v2.9.1  (c19371144df3...)
  taiki-e/install-action  nextest           (56cc9adf3a3e...)
  taiki-e/install-action  cargo-llvm-cov    (e4b3a0453201...)

actions-rs/toolchain, actions-rs/clippy-check, and
codecov/codecov-action SHAs were already correct.

https://claude.ai/code/session_01VD7Bo8upj3cQwWDf9ni2Ln
2026-04-16 06:49:47 +00:00
Claude
af3c6c0070 ci: pin GitHub Actions to full commit SHAs and restrict token permissions
Fixes two supply chain / token security issues:

- Pin all third-party Actions to immutable full commit SHAs instead of
  mutable version tags (addresses unpinned-dependencies risk, analogous
  to quickwit-oss/quickwit#5937):
    actions/checkout v4.2.2
    actions-rs/toolchain v1.0.7
    Swatinem/rust-cache v2.7.5
    taiki-e/install-action nextest / cargo-llvm-cov
    actions-rs/clippy-check v1.0.7
    codecov/codecov-action v3.1.6

- Add explicit least-privilege `permissions` blocks at workflow and job
  level (addresses excessive GITHUB_TOKEN permissions, analogous to
  quickwit-oss/quickwit#5945):
    default: contents: read
    check job: also grants checks: write (required by clippy-check)

https://claude.ai/code/session_01VD7Bo8upj3cQwWDf9ni2Ln
2026-04-15 20:55:43 +00:00
41 changed files with 1907 additions and 529 deletions

View File

@@ -4,6 +4,9 @@ on:
push:
branches: [main]
permissions:
contents: read
# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -12,16 +15,20 @@ concurrency:
jobs:
coverage:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install Rust
run: rustup toolchain install nightly-2025-12-01 --profile minimal --component llvm-tools-preview
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1
- uses: taiki-e/install-action@e4b3a0453201addddc06d3a72db90326aad87084 # cargo-llvm-cov
- name: Generate code coverage
run: cargo +nightly-2025-12-01 llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v6
uses: codecov/codecov-action@57e3a136b779b570ffcdbf80b3bdc90e7fab3de2 # v6.0.0
continue-on-error: true
with:
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos

View File

@@ -8,6 +8,9 @@ env:
CARGO_TERM_COLOR: always
NUM_FUNCTIONAL_TEST_ITERATIONS: 20000
permissions:
contents: read
# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -18,10 +21,13 @@ jobs:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install stable
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: stable
profile: minimal

49
.github/workflows/scorecard.yml vendored Normal file
View File

@@ -0,0 +1,49 @@
name: OpenSSF Scorecard
on:
schedule:
- cron: '0 0 * * 0'
push:
branches:
- main
permissions:
contents: read
jobs:
analysis:
name: Scorecards analysis
runs-on: ubuntu-latest
permissions:
# Needed to upload the results to code-scanning dashboard.
security-events: write
# Needed to publish results
id-token: write
steps:
- name: 'Checkout code'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: 'Run analysis'
uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # v2.4.3
with:
results_file: results.sarif
results_format: sarif
repo_token: ${{ secrets.GITHUB_TOKEN }}
publish_results: true
# Upload the results as artifacts.
- name: 'Upload artifact'
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: SARIF file
path: results.sarif
retention-days: 5
# Upload the results to GitHub's code scanning dashboard.
- name: 'Upload to code-scanning'
uses: github/codeql-action/upload-sarif@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
with:
sarif_file: results.sarif

View File

@@ -9,6 +9,9 @@ on:
env:
CARGO_TERM_COLOR: always
permissions:
contents: read
# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@@ -19,23 +22,27 @@ jobs:
runs-on: ubuntu-latest
permissions:
contents: read
checks: write
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install nightly
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: nightly
profile: minimal
components: rustfmt
- name: Install stable
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: stable
profile: minimal
components: clippy
- uses: Swatinem/rust-cache@v2
- uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1
- name: Check Formatting
run: cargo +nightly fmt --all -- --check
@@ -47,7 +54,7 @@ jobs:
- name: Check Bench Compilation
run: cargo +nightly bench --no-run --profile=dev --all-features
- uses: actions-rs/clippy-check@v1
- uses: actions-rs/clippy-check@b5b5f21f4797c02da247df37026fcd0a5024aa4d # v1.0.7
with:
toolchain: stable
token: ${{ secrets.GITHUB_TOKEN }}
@@ -57,6 +64,9 @@ jobs:
runs-on: ubuntu-latest
permissions:
contents: read
strategy:
matrix:
features:
@@ -67,17 +77,17 @@ jobs:
name: test-${{ matrix.features.label}}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install stable
uses: actions-rs/toolchain@v1
uses: actions-rs/toolchain@16499b5e05bf2e26879000db0c1d13f7e13fa3af # v1.0.7
with:
toolchain: stable
profile: minimal
override: true
- uses: taiki-e/install-action@nextest
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@56cc9adf3a3e2c23eafb56e8acaf9d0373cb845a # nextest
- uses: Swatinem/rust-cache@c19371144df3bb44fab255c43d04cbc2ab54d1c4 # v2.9.1
- name: Run tests
run: |

View File

@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
readme = "README.md"
keywords = ["search", "information", "retrieval"]
edition = "2021"
rust-version = "1.86"
rust-version = "1.92"
exclude = ["benches/*.json", "benches/*.txt"]
[dependencies]
@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.10", path = "./bitpacker" }
common = { version = "0.11", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.7", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.4", features = ["use_serde"] }
datasketches = { git = "https://github.com/fulmicoton-dd/datasketches-rust", rev = "7635fb8" }
datasketches = { version = "0.3.0", features = ["hll"] }
futures-util = { version = "0.3.28", optional = true }
futures-channel = { version = "0.3.28", optional = true }
fnv = "1.0.7"
@@ -75,7 +75,7 @@ typetag = "0.2.21"
winapi = "0.3.9"
[dev-dependencies]
binggan = "0.16.1"
binggan = "0.17.0"
rand = "0.9"
maplit = "1.0.2"
matches = "0.1.9"

View File

@@ -1,6 +1,7 @@
[![Docs](https://docs.rs/tantivy/badge.svg)](https://docs.rs/crate/tantivy/)
[![Build Status](https://github.com/quickwit-oss/tantivy/actions/workflows/test.yml/badge.svg)](https://github.com/quickwit-oss/tantivy/actions/workflows/test.yml)
[![codecov](https://codecov.io/gh/quickwit-oss/tantivy/branch/main/graph/badge.svg)](https://codecov.io/gh/quickwit-oss/tantivy)
[![OpenSSF Scorecard](https://api.scorecard.dev/projects/github.com/quickwit-oss/tantivy/badge)](https://scorecard.dev/viewer/?uri=github.com/quickwit-oss/tantivy)
[![Join the chat at https://discord.gg/MT27AG5EVE](https://shields.io/discord/908281611840282624?label=chat%20on%20discord)](https://discord.gg/MT27AG5EVE)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Crates.io](https://img.shields.io/crates/v/tantivy.svg)](https://crates.io/crates/tantivy)

View File

@@ -79,8 +79,12 @@ fn bench_agg(mut group: InputGroup<Index>) {
register!(group, composite_histogram_calendar);
register!(group, cardinality_agg);
register!(group, cardinality_agg_high_card);
register!(group, cardinality_agg_low_card);
register!(group, terms_status_with_cardinality_agg);
register!(group, terms_100_buckets_with_cardinality_agg);
register!(group, terms_many_with_single_term_order_by_card);
register!(group, terms_many_with_single_term_2_order_by_card);
register!(group, range_agg);
register!(group, range_agg_with_avg_sub_agg);
@@ -168,6 +172,32 @@ fn cardinality_agg(index: &Index) {
});
execute_agg(index, agg_req);
}
// Full-scan cardinality on a near-1M-cardinality string field.
// Hits the dense (PagedBitset) path: every doc has a unique term,
// so the bucket promotes from FxHashSet shortly into the scan.
fn cardinality_agg_high_card(index: &Index) {
let agg_req = json!({
"cardinality": {
"cardinality": {
"field": "text_all_unique_terms"
},
}
});
execute_agg(index, agg_req);
}
// Full-scan cardinality on a tiny-cardinality string field (7 distinct
// values). Stays on the FxHashSet path — the promotion threshold is
// never crossed. Validates no regression on the sparse path.
fn cardinality_agg_low_card(index: &Index) {
let agg_req = json!({
"cardinality": {
"cardinality": {
"field": "text_few_terms_status"
},
}
});
execute_agg(index, agg_req);
}
fn terms_status_with_cardinality_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
@@ -200,6 +230,58 @@ fn terms_100_buckets_with_cardinality_agg(index: &Index) {
execute_agg(index, agg_req);
}
fn terms_many_with_single_term_order_by_card(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_many_terms" },
"aggs": {
"nested_terms": {
"terms": {
"field": "single_term",
"order": { "cardinality": "desc" }
},
"aggs": {
"cardinality": {
"cardinality": { "field": "text_few_terms" }
}
}
}
}
},
});
execute_agg(index, agg_req);
}
// Two-level terms ordered by cardinality at each level: a high-card outer terms
// (text_many_terms) ordered by a cardinality sub-agg, with a nested low-card terms
// (text_few_terms_status) also ordered by a cardinality sub-agg, plus an avg.
fn terms_many_with_single_term_2_order_by_card(index: &Index) {
let agg_req = json!({
"by_ip": {
"terms": {
"field": "text_many_terms",
"order": { "card_few_terms": "desc" }
},
"aggs": {
"card_few_terms": {
"cardinality": { "field": "text_few_terms" }
},
"nested_terms": {
"terms": {
"field": " single_term",
"order": { "distinct_path2": "desc" }
},
"aggs": {
"avg_botscore": { "avg": { "field": "score" } },
"distinct_path2": { "cardinality": { "field": "text_few_terms" } }
}
}
}
}
});
execute_agg(index, agg_req);
}
fn terms_7(index: &Index) {
let agg_req = json!({
"my_texts": { "terms": { "field": "text_few_terms_status" } },
@@ -609,7 +691,8 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let text_field = schema_builder.add_text_field("text", text_fieldtype.clone());
let single_term = schema_builder.add_text_field("single_term", FAST);
let json_field = schema_builder.add_json_field("json", FAST);
let text_field_all_unique_terms =
schema_builder.add_text_field("text_all_unique_terms", STRING | FAST);
@@ -673,6 +756,8 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
index_writer.add_document(doc!(
json_field => json!({"mixed_type": 10.0}),
json_field => json!({"mixed_type": 10.0}),
single_term => "single_term",
single_term => "single_term",
text_field => "cool",
text_field => "cool",
text_field_all_unique_terms => "cool",
@@ -707,6 +792,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
json!({"mixed_type": many_terms_data.choose(&mut rng).unwrap().to_string()})
};
index_writer.add_document(doc!(
single_term => "single_term",
text_field => "cool",
json_field => json,
text_field_all_unique_terms => format!("unique_term_{}", rng.random::<u64>()),

View File

@@ -23,7 +23,7 @@ downcast-rs = "2.0.1"
proptest = "1"
more-asserts = "0.3.1"
rand = "0.9"
binggan = "0.16.1"
binggan = "0.17.0"
[[bench]]
name = "bench_merge"

View File

@@ -19,6 +19,6 @@ time = { version = "0.3.47", features = ["serde-well-known"] }
serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.16.1"
binggan = "0.17.0"
proptest = "1.0.0"
rand = "0.9"

View File

@@ -121,7 +121,7 @@ pub struct FileSlice {
impl fmt::Debug for FileSlice {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "FileSlice({:?}, {:?})", &self.data, self.range)
write!(f, "FileSlice({:?}, {:?})", self.data, self.range)
}
}

View File

@@ -20,8 +20,8 @@ use crate::aggregation::metric::{
build_segment_stats_collector, AverageAggregation, CardinalityAggReqData,
CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation, MaxAggregation,
MetricAggReqData, MinAggregation, SegmentCardinalityCollector, SegmentExtendedStatsCollector,
SegmentPercentilesCollector, StatsAggregation, StatsType, SumAggregation, TopHitsAggReqData,
TopHitsSegmentCollector,
SegmentPercentilesCollector, StatsAggregation, StatsType, SumAggregation, TermOrdSet,
TopHitsAggReqData, TopHitsSegmentCollector, BITSET_MAX_TERM_ORD,
};
use crate::aggregation::segment_agg_result::{
GenericSegmentAggregationResultsCollector, SegmentAggregationCollector,
@@ -413,12 +413,38 @@ pub(crate) fn build_segment_agg_collector(
}
AggKind::Cardinality => {
let req_data = &mut req.get_cardinality_req_data_mut(node.idx_in_req_data);
Ok(Box::new(SegmentCardinalityCollector::from_req(
req_data.column_type,
node.idx_in_req_data,
req_data.accessor.clone(),
req_data.missing_value_for_accessor,
)))
// For str columns, choose the per-bucket entries representation
// based on the segment's column.max_value():
// * small (< BITSET_MAX_TERM_ORD): `BitSet`, pre-allocated, no promotion machinery.
// * large: `TermOrdSet` (sparse FxHashSet that promotes to a paged bitset).
// For non-str columns the `entries` field is unused (values go
// straight into the HLL sketch); we still pick `TermOrdSet`
// because its empty Sparse(FxHashSet) costs nothing.
let is_str = req_data.column_type == ColumnType::Str;
let max_term_ord_inclusive = if is_str {
req_data.accessor.max_value()
} else {
0
};
let collector: Box<dyn SegmentAggregationCollector> =
if is_str && max_term_ord_inclusive < BITSET_MAX_TERM_ORD {
Box::new(SegmentCardinalityCollector::<BitSet>::from_req(
req_data.column_type,
node.idx_in_req_data,
req_data.accessor.clone(),
req_data.missing_value_for_accessor,
max_term_ord_inclusive,
))
} else {
Box::new(SegmentCardinalityCollector::<TermOrdSet>::from_req(
req_data.column_type,
node.idx_in_req_data,
req_data.accessor.clone(),
req_data.missing_value_for_accessor,
max_term_ord_inclusive,
))
};
Ok(collector)
}
AggKind::StatsKind(stats_type) => {
let req_data = &mut req.per_request.stats_metric_req_data[node.idx_in_req_data];
@@ -1006,10 +1032,20 @@ fn build_terms_or_cardinality_nodes(
(idx_in_req_data, AggKind::Terms)
}
TermsOrCardinalityRequest::Cardinality(ref req) => {
// `str_dict_column` is computed once per field; for JSON paths
// with mixed types it's `Some` even on the numeric req_data.
// Cardinality only consults it for the str column path, so
// gate by column_type to avoid driving non-str collectors
// through the coupon-cache path.
let str_dict_column_for_req = if column_type == ColumnType::Str {
str_dict_column.clone()
} else {
None
};
let idx_in_req_data = data.push_cardinality_req_data(CardinalityAggReqData {
accessor,
column_type,
str_dict_column: str_dict_column.clone(),
str_dict_column: str_dict_column_for_req,
missing_value_for_accessor,
name: agg_name.to_string(),
req: req.clone(),

View File

@@ -115,6 +115,71 @@ pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
fast_field_names
}
/// Validates that all fields referenced in the aggregation request exist in the schema
/// and are configured as fast fields.
///
/// This is a convenience function for upfront validation before executing aggregations.
/// Returns an error if any field doesn't exist or is not a fast field.
///
/// Validation is intentionally opt-in rather than baked into aggregation execution: the
/// default lenient behavior (returning empty results for missing fields) supports
/// schema evolution and federated queries where the same request runs against segments
/// or indices with different schemas.
///
/// # Example
/// ```
/// use tantivy::aggregation::agg_req::{Aggregations, validate_aggregation_fields_exist};
/// use tantivy::schema::{Schema, FAST};
/// use tantivy::Index;
///
/// # fn main() -> tantivy::Result<()> {
/// // Create a simple index
/// let mut schema_builder = Schema::builder();
/// schema_builder.add_f64_field("price", FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// // Parse aggregation request
/// let agg_req: Aggregations = serde_json::from_str(r#"{
/// "avg_price": { "avg": { "field": "price" } }
/// }"#)?;
///
/// let reader = index.reader()?;
/// let searcher = reader.searcher();
///
/// // Validate fields before executing
/// for segment_reader in searcher.segment_readers() {
/// validate_aggregation_fields_exist(&agg_req, segment_reader)?;
/// }
/// # Ok(())
/// # }
/// ```
pub fn validate_aggregation_fields_exist(
aggs: &Aggregations,
reader: &crate::SegmentReader,
) -> crate::Result<()> {
let field_names = get_fast_field_names(aggs);
let schema = reader.schema();
for field_name in field_names {
// Check if the field is either directly in the schema or could be part of a json field
// present in the schema, and verify it's a fast field.
if let Some((field, _path)) = schema.find_field(&field_name) {
let field_type = schema.get_field_entry(field).field_type();
if !field_type.is_fast() {
return Err(crate::TantivyError::SchemaError(format!(
"Field '{}' is not a fast field. Aggregations require fast fields.",
field_name
)));
}
} else {
return Err(crate::TantivyError::FieldNotFound(field_name));
}
}
Ok(())
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// All aggregation types.
pub enum AggregationVariants {

View File

@@ -1436,3 +1436,46 @@ fn test_aggregation_on_json_object_mixed_numerical_segments() {
)
);
}
#[test]
fn test_aggregation_field_validation_helper() {
// Test the standalone validation helper function for field validation
let index = get_test_index_2_segments(false).unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
// Test with invalid field
let agg_req: Aggregations = serde_json::from_str(
r#"{
"avg_test": {
"avg": { "field": "nonexistent_field" }
}
}"#,
)
.unwrap();
let result =
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
assert!(result.is_err());
match result {
Err(crate::TantivyError::FieldNotFound(field_name)) => {
assert_eq!(field_name, "nonexistent_field");
}
_ => panic!("Expected FieldNotFound error, got: {:?}", result),
}
// Test with valid field
let agg_req: Aggregations = serde_json::from_str(
r#"{
"avg_test": {
"avg": { "field": "score" }
}
}"#,
)
.unwrap();
let result =
crate::aggregation::agg_req::validate_aggregation_fields_exist(&agg_req, segment_reader);
assert!(result.is_ok());
}

View File

@@ -199,6 +199,17 @@ impl SegmentAggregationCollector for SegmentCompositeCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Composite is a multi-bucket agg with no single value to extract.
None
}
}
impl SegmentCompositeCollector {

View File

@@ -559,34 +559,30 @@ mod tests {
page_size,
agg_req,
);
if page_idx + 1 < page_count {
assert!(
res["my_composite"].get("after_key").is_some(),
"expected after_key on all but last page"
);
after_key = Some(res["my_composite"]["after_key"].clone());
} else if res["my_composite"].get("after_key").is_some() {
// currently we sometime have an after_key on the last page,
// check that the next "page" is empty
let agg_req_json = json!({
"my_composite": {
"composite": {
"sources": composite_agg_sources,
"size": page_size,
"after": res["my_composite"]["after_key"].clone(),
}
}
});
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), index).unwrap();
assert_eq!(
res["my_composite"]["buckets"],
json!([]),
"expected no buckets when using after_key from last page, query: {:?}",
agg_req
);
}
assert!(
res["my_composite"].get("after_key").is_some(),
"expected after_key on every non-empty page"
);
after_key = Some(res["my_composite"]["after_key"].clone());
}
// Using the after_key from the last page must yield an empty page.
let agg_req_json = json!({
"my_composite": {
"composite": {
"sources": composite_agg_sources,
"size": page_size,
"after": after_key,
}
}
});
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), index).unwrap();
assert_eq!(
res["my_composite"]["buckets"],
json!([]),
"expected no buckets when using after_key from last page, query: {:?}",
agg_req
);
}
}
@@ -711,8 +707,28 @@ mod tests {
{"key": {"myterm": "terme"}, "doc_count": 1}
])
);
assert!(res["my_composite"].get("after_key").is_none());
// paginating past last page should be empty
let agg_req_json = json!({
"my_composite": {
"composite": {
"sources": [
{"myterm": {"terms": {"field": "string_id"}}}
],
"size": 3,
"after": &res["my_composite"]["after_key"]
}
}
});
let agg_req: Aggregations = serde_json::from_value(agg_req_json).unwrap();
let res = exec_request(agg_req.clone(), &index).unwrap();
assert!(res["my_composite"].get("after_key").is_none());
assert_eq!(
res["my_composite"]["buckets"],
json!([]),
"expected no buckets when using after_key from last page, query: {:?}",
agg_req
);
Ok(())
}
@@ -820,7 +836,10 @@ mod tests {
{"key": {"myterm": "apple"}, "doc_count": 1}
])
);
assert!(res["fruity_aggreg"].get("after_key").is_none());
assert_eq!(
res["fruity_aggreg"]["after_key"],
json!({"myterm": "str:apple"})
);
Ok(())
}
@@ -1792,7 +1811,14 @@ mod tests {
{"key": {"month": ms_timestamp_from_iso_str("2021-02-01T00:00:00Z"), "category": "books"}, "doc_count": 1},
]),
);
assert!(res["my_composite"].get("after_key").is_none());
let feb_2021_ns = ms_timestamp_from_iso_str("2021-02-01T00:00:00Z") * 1_000_000;
assert_eq!(
res["my_composite"]["after_key"],
json!({
"month": format!("dt:{}", feb_2021_ns),
"category": "str:books"
})
);
Ok(())
}

View File

@@ -674,6 +674,17 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentFilterCollector<B>
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// TODO: forward into the inner `sub_agg` for nested order paths (`filter.metric`).
None
}
}
/// Intermediate result for filter aggregation

View File

@@ -394,6 +394,17 @@ impl SegmentAggregationCollector for SegmentHistogramCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Histogram is a multi-bucket agg with no single value to extract.
None
}
}
impl SegmentHistogramCollector {

View File

@@ -328,6 +328,17 @@ impl<B: SubAggBuffer> SegmentAggregationCollector for SegmentRangeCollector<B> {
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Range is a multi-bucket agg with no single value to extract.
None
}
}
/// Build a concrete `SegmentRangeCollector` with either a Vec- or HashMap-backed
/// bucket storage, depending on the column type and aggregation level.

View File

@@ -352,19 +352,15 @@ pub(crate) fn build_segment_term_collector(
)));
}
// Validate sub aggregation exists when ordering by sub-aggregation.
{
if let OrderTarget::SubAggregation(sub_agg_name) = &terms_req_data.req.order.target {
let (agg_name, _agg_property) = get_agg_name_and_property(sub_agg_name);
node.get_sub_agg(agg_name, &req_data.per_request)
.ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"could not find aggregation with name {agg_name} in metric \
sub_aggregations"
))
})?;
}
// Validate that the referenced sub-aggregation exists when ordering by one.
if let OrderTarget::SubAggregation(sub_agg_name) = &terms_req_data.req.order.target {
let (agg_name, _agg_property) = get_agg_name_and_property(sub_agg_name);
node.get_sub_agg(agg_name, &req_data.per_request)
.ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"could not find aggregation with name {agg_name} in metric sub_aggregations"
))
})?;
}
// Build sub-aggregation blueprint if there are children.
@@ -887,6 +883,17 @@ impl<TermMap: TermAggregationMap, B: SubAggBuffer> SegmentAggregationCollector
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// Terms is a multi-bucket agg with no single value to extract.
None
}
}
/// Missing value are represented as a sentinel value in the column.
@@ -960,9 +967,6 @@ where
) -> crate::Result<IntermediateBucketResult> {
let mut entries: Vec<(u64, Bucket)> = term_buckets.into_vec();
let order_by_sub_aggregation =
matches!(term_req.req.order.target, OrderTarget::SubAggregation(_));
match &term_req.req.order.target {
OrderTarget::Key => {
// We rely on the fact, that term ordinals match the order of the strings
@@ -974,10 +978,37 @@ where
entries.sort_unstable_by_key(|bucket| bucket.0);
}
}
OrderTarget::SubAggregation(_name) => {
// don't sort and cut off since it's hard to make assumptions on the quality of the
// results when cutting off du to unknown nature of the sub_aggregation (possible
// to check).
OrderTarget::SubAggregation(sub_agg_path) => {
// Peek segment-level metric values, sort, then fall through to
// `cut_off_buckets`. Like Elasticsearch, we always cut off when ordering
// by a sub-agg: top-K results are approximate and may differ from the
// global ordering, especially for non-monotonic metrics like avg/min.
let coll = sub_agg_collector.as_deref().ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"Could not find sub-aggregation collector for path {sub_agg_path}"
))
})?;
let (agg_name, agg_prop) = get_agg_name_and_property(sub_agg_path);
// Fetch values up-front; otherwise sort would re-compute per comparison
let mut keyed: Vec<(f64, (u64, Bucket))> = entries
.into_iter()
.map(|bucket| {
let metric_value = coll
.compute_metric_value(bucket.1.bucket_id, agg_name, agg_prop, agg_data)
.unwrap_or(0.0);
(metric_value, bucket)
})
.collect();
if term_req.req.order.order == Order::Desc {
keyed.sort_unstable_by(|a, b| {
b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)
});
} else {
keyed.sort_unstable_by(|a, b| {
a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)
});
}
entries = keyed.into_iter().map(|(_, e)| e).collect();
}
OrderTarget::Count => {
if term_req.req.order.order == Order::Desc {
@@ -988,11 +1019,8 @@ where
}
}
let (term_doc_count_before_cutoff, sum_other_doc_count) = if order_by_sub_aggregation {
(0, 0)
} else {
cut_off_buckets(&mut entries, term_req.req.segment_size as usize)
};
let (term_doc_count_before_cutoff, sum_other_doc_count) =
cut_off_buckets(&mut entries, term_req.req.segment_size as usize);
let mut dict: FxHashMap<IntermediateKey, IntermediateTermBucketEntry> = Default::default();
dict.reserve(entries.len());
@@ -1767,6 +1795,263 @@ mod tests {
Ok(())
}
#[test]
fn terms_aggregation_order_by_cardinality_desc_single_segment() -> crate::Result<()> {
terms_aggregation_order_by_cardinality_desc(true)
}
#[test]
fn terms_aggregation_order_by_cardinality_desc_multi_segment() -> crate::Result<()> {
terms_aggregation_order_by_cardinality_desc(false)
}
fn terms_aggregation_order_by_cardinality_desc(merge_segments: bool) -> crate::Result<()> {
// Distinct score values per bucket key: A→5, B→1, C→3.
// Order by cardinality desc must yield A, C, B.
let segment_and_terms = vec![vec![
(1.0, "A".to_string()),
(2.0, "A".to_string()),
(3.0, "A".to_string()),
(4.0, "A".to_string()),
(5.0, "A".to_string()),
(1.0, "B".to_string()),
(1.0, "B".to_string()),
(1.0, "B".to_string()),
(1.0, "C".to_string()),
(2.0, "C".to_string()),
(3.0, "C".to_string()),
]];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "card": "desc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][0]["card"]["value"], 5.0);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][1]["card"]["value"], 3.0);
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][2]["card"]["value"], 1.0);
// Asc engages the segment-cutoff path too (monotonic-safe: discarded buckets had
// local card >= cutoff, so merged card >= cutoff and they cannot be globally smallest).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "card": "asc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "A");
// size=2 with desc engages the segment cutoff: must keep top-2 by cardinality (A, C),
// and `sum_other_doc_count` reflects the dropped B (3 docs).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"size": 2,
"order": { "card": "desc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"].as_array().unwrap().len(), 2);
// size=2 with asc engages the segment cutoff: must keep bottom-2 by cardinality (B, C).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"size": 2,
"order": { "card": "asc" }
},
"aggs": {
"card": { "cardinality": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"].as_array().unwrap().len(), 2);
Ok(())
}
#[test]
fn terms_aggregation_order_by_sum_single_segment() -> crate::Result<()> {
terms_aggregation_order_by_sum(true)
}
#[test]
fn terms_aggregation_order_by_sum_multi_segment() -> crate::Result<()> {
terms_aggregation_order_by_sum(false)
}
fn terms_aggregation_order_by_sum(merge_segments: bool) -> crate::Result<()> {
// Per-bucket sums on the U64 `score` column (non-negative => sum is monotonic):
// A → 1+2+3+4+5 = 15, B → 1+1+1 = 3, C → 1+2+3 = 6.
let segment_and_terms = vec![
vec![
(1.0, "A".to_string()),
(2.0, "A".to_string()),
(3.0, "A".to_string()),
(1.0, "B".to_string()),
(1.0, "C".to_string()),
],
vec![
(4.0, "A".to_string()),
(5.0, "A".to_string()),
(1.0, "B".to_string()),
(1.0, "B".to_string()),
(2.0, "C".to_string()),
(3.0, "C".to_string()),
],
];
let index = get_test_index_from_values_and_terms(merge_segments, &segment_and_terms)?;
// Desc on a Sum metric engages the fast path (column is U64).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "total": "desc" }
},
"aggs": {
"total": { "sum": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][0]["total"]["value"], 15.0);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][1]["total"]["value"], 6.0);
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][2]["total"]["value"], 3.0);
// Asc engages the fast path too — discarded buckets had local sum >= cutoff,
// and merged sum >= local (non-negative addends), so they cannot be globally smallest.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "total": "asc" }
},
"aggs": {
"total": { "sum": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "A");
// size=2 desc with cutoff: top-2 by sum (A, C).
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"size": 2,
"order": { "total": "desc" }
},
"aggs": {
"total": { "sum": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"].as_array().unwrap().len(), 2);
// Stats sub-property: ordering by `mystats.sum` on a U64 column also engages.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "mystats.sum": "desc" }
},
"aggs": {
"mystats": { "stats": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
// Sum on a signed column (I64) takes the same cutoff path. Results may be
// approximate near the boundary on adversarial data, but for this dataset the
// top-K is unambiguous.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "total": "desc" }
},
"aggs": {
"total": { "sum": { "field": "score_i64" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
// Order by extended_stats sub-property exercises compute_metric_value on the
// ExtendedStats collector. A→max=5, B→max=1, C→max=3, so desc by max → A, C, B.
let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string_id",
"order": { "ext.max": "desc" }
},
"aggs": {
"ext": { "extended_stats": { "field": "score" } }
}
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][1]["key"], "C");
assert_eq!(res["my_texts"]["buckets"][2]["key"], "B");
Ok(())
}
#[test]
fn terms_aggregation_test_order_key_single_segment() -> crate::Result<()> {
terms_aggregation_test_order_key_merge_segment(true)

View File

@@ -177,6 +177,17 @@ impl SegmentAggregationCollector for TermMissingAgg {
}
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// TODO: forward to `sub_agg` for nested order paths (`missing_agg>metric`).
None
}
}
#[cfg(test)]

View File

@@ -1004,24 +1004,20 @@ impl IntermediateCompositeBucketResult {
) -> crate::Result<BucketResult> {
let trimmed_entry_vec =
trim_composite_buckets(self.entries, &self.orders, self.target_size)?;
let after_key = if trimmed_entry_vec.len() == req.size as usize {
trimmed_entry_vec
.last()
.map(|bucket| {
let (intermediate_key, _entry) = bucket;
intermediate_key
.iter()
.enumerate()
.map(|(idx, intermediate_key)| {
let source = &req.sources[idx];
(source.name().to_string(), intermediate_key.clone().into())
})
.collect()
})
.unwrap()
} else {
FxHashMap::default()
};
let after_key = trimmed_entry_vec
.last()
.map(|bucket| {
let (intermediate_key, _entry) = bucket;
intermediate_key
.iter()
.enumerate()
.map(|(idx, intermediate_key)| {
let source = &req.sources[idx];
(source.name().to_string(), intermediate_key.clone().into())
})
.collect()
})
.unwrap_or_default();
let buckets = trimmed_entry_vec
.into_iter()

View File

@@ -4,6 +4,7 @@ use std::io;
use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{Column, ColumnType, Dictionary, StrColumn};
use common::{BitSet, TinySet};
use datasketches::hll::{Coupon, HllSketch, HllType, HllUnion};
use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -20,6 +21,12 @@ use crate::TantivyError;
/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4).
const LG_K: u8 = 11;
/// Promote FxHashSet<u64> -> PagedBitset at ~3% density (`len * 32 >
/// dict_num_terms`). Past this point the bitset (~`dict_num_terms / 7.5`
/// bytes) is smaller than the hashset (~10 B/entry minimum) and avoids
/// the per-insert hash.
const PROMOTION_RATIO: u64 = 32;
/// # Cardinality
///
/// The cardinality aggregation allows for computing an estimate
@@ -159,8 +166,12 @@ impl CouponCache {
let should_use_dense =
highest_term_ord < 1_000_000u64 || highest_term_ord < num_terms as u64 * 3u64;
if should_use_dense {
let mut coupon_map: Vec<Coupon> = vec![Coupon::EMPTY; highest_term_ord as usize + 1];
for (term_ord, coupon) in term_ords.into_iter().zip(coupons.into_iter()) {
// We don't really care about the value here. We will populate all the values we will
// read anyway.
let uninitialized_coupon = Coupon::from_hash(0);
let mut coupon_map: Vec<Coupon> =
vec![uninitialized_coupon; highest_term_ord as usize + 1];
for (term_ord, coupon) in term_ords.into_iter().zip(coupons) {
coupon_map[term_ord as usize] = coupon;
}
CouponCache::Dense {
@@ -177,9 +188,263 @@ impl CouponCache {
}
}
pub(crate) struct SegmentCardinalityCollector {
// =================================================================
// PagedBitset: a sparse bitset indexed by term_ord.
//
// Used as the dense alternative to FxHashSet<u64> once a string
// cardinality bucket has accumulated enough unique term ordinals.
// Memory is bounded to (touched pages) * (page bytes), not
// (max_term_ord / 8).
//
// Page geometry mirrors `PagedTermMap` in `term_agg.rs`: 1024 ords
// per page, lazy `Vec<Option<Box<Page>>>` directory.
// =================================================================
const BITSET_PAGE_SHIFT: u32 = 10;
const BITSET_PAGE_BITS: u64 = 1u64 << BITSET_PAGE_SHIFT; // 1024
const BITSET_PAGE_MASK: u64 = BITSET_PAGE_BITS - 1;
const BITSET_WORDS_PER_PAGE: usize = (BITSET_PAGE_BITS / 64) as usize; // 16
#[derive(Clone)]
struct PagedBitsetPage {
words: [TinySet; BITSET_WORDS_PER_PAGE],
}
impl PagedBitsetPage {
fn new() -> Self {
Self {
words: [TinySet::empty(); BITSET_WORDS_PER_PAGE],
}
}
}
pub(crate) struct PagedBitset {
pages: Vec<Option<Box<PagedBitsetPage>>>,
/// Cached number of set bits, maintained on insert.
count: u64,
}
impl PagedBitset {
/// Allocates a directory big enough to hold ords up to and including
/// `max_term_ord`. Pages are allocated lazily on first set.
fn with_max_term_ord(max_term_ord: u64) -> Self {
let max_page_idx = (max_term_ord >> BITSET_PAGE_SHIFT) as usize;
let num_pages = max_page_idx + 1;
Self {
pages: vec![None; num_pages],
count: 0,
}
}
#[inline]
fn insert(&mut self, term_ord: u64) {
let page_idx = (term_ord >> BITSET_PAGE_SHIFT) as usize;
let intra = term_ord & BITSET_PAGE_MASK;
let word_idx = (intra >> 6) as usize;
let bit_idx = (intra & 63) as u32;
let page = match &mut self.pages[page_idx] {
Some(p) => p,
None => {
self.pages[page_idx] = Some(Box::new(PagedBitsetPage::new()));
self.pages[page_idx].as_mut().unwrap()
}
};
if page.words[word_idx].insert_mut(bit_idx) {
self.count += 1;
}
}
/// Number of set bits. O(1).
#[inline]
fn len(&self) -> u64 {
self.count
}
/// Iterate set ords in ascending order.
fn iter_sorted(&self) -> impl Iterator<Item = u64> + '_ {
self.pages
.iter()
.enumerate()
.filter_map(|(page_idx, page_opt)| page_opt.as_ref().map(|p| (page_idx, p)))
.flat_map(|(page_idx, page)| {
let page_base_ord = (page_idx as u64) << BITSET_PAGE_SHIFT;
page.words
.iter()
.enumerate()
.flat_map(move |(word_idx, &word)| {
let word_base_ord = page_base_ord + (word_idx as u64) * 64;
word.into_iter()
.map(move |bit| word_base_ord + u64::from(bit))
})
})
}
}
/// Threshold below which we use `BitSet` instead of `TermOrdSet`.
///
/// Both `BitSet` and `FxHashSet<u64>` have the same 32-byte struct, so the comparison is heap only:
/// * `BitSet` at T=256: 5 `TinySet` words covering 258 bits (with the missing-value sentinel) =
/// 40 bytes.
/// * `FxHashSet<u64>` after one insert: 4-bucket hashbrown table ≈ 56 bytes
pub(crate) const BITSET_MAX_TERM_ORD: u64 = 256;
// =================================================================
// TermOrdAccumulator: per-bucket abstraction over the entries set.
//
// Implementations:
// - `BitSet` (from `common`): used when `column.max_value()` is small (< BITSET_MAX_TERM_ORD).
// Pre-allocated, no promotion.
// - `TermOrdSet`: adaptive, starts as FxHashSet and promotes to a paged bitset when occupancy
// crosses the density threshold (only if promotion is enabled — typically gated on top-level
// aggregation).
//
// The trait lets `SegmentCardinalityCollector` be generic over the choice
// so the hot collect() loop monomorphizes to a direct call (no enum
// dispatch per insert).
// =================================================================
pub(crate) trait TermOrdAccumulator: Sized {
/// Construct an empty accumulator.
/// `max_term_ord_inclusive` is the largest term_ord that may be
/// inserted (used to size pre-allocated bitsets and the dense bitset
/// on promotion).
fn new(max_term_ord_inclusive: u64) -> Self;
fn insert(&mut self, term_ord: u64);
/// Bulk insert. Implementations may override to hoist any inner
/// dispatch outside the loop. Default loops `insert`.
#[inline]
fn extend_from_iter<I: IntoIterator<Item = u64>>(&mut self, ords: I) {
for ord in ords {
self.insert(ord);
}
}
/// Hook called once per ingested block. Adaptive impls use this to
/// decide on sparse->dense promotion.
fn maybe_compact(&mut self) {}
fn len(&self) -> usize;
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_;
}
impl TermOrdAccumulator for BitSet {
#[inline]
fn new(max_term_ord_inclusive: u64) -> Self {
// `BitSet::with_max_value(M)` accepts ords in [0, M).
// We need ords up to and including `max_term_ord_inclusive`, plus
// the missing-value sentinel `column.max_value() + 1`.
BitSet::with_max_value((max_term_ord_inclusive + 2) as u32)
}
#[inline]
fn insert(&mut self, term_ord: u64) {
BitSet::insert(self, term_ord as u32);
}
#[inline]
fn len(&self) -> usize {
BitSet::len(self)
}
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_ {
// `BitSet` itself doesn't expose iteration, but
// `BitSet::tinyset(bucket)` does. Walk per-bucket and yield each
// set bit. The capacity is `max_value()`; iterating to
// `div_ceil(64)` covers every possible ord exactly once.
let num_buckets = self.max_value().div_ceil(64);
(0..num_buckets).flat_map(move |bucket| {
let chunk_base = u64::from(bucket) * 64;
self.tinyset(bucket)
.into_iter()
.map(move |bit| chunk_base + u64::from(bit))
})
}
}
// =================================================================
// TermOrdSet: adaptive sparse->dense accumulator.
//
// Starts as an FxHashSet (cheap when few ords are seen). When occupancy
// crosses `len * PROMOTION_RATIO > max_term_ord_inclusive`, drains into
// a `PagedBitset` and continues dense. Promotion is one-way.
// =================================================================
pub(crate) struct TermOrdSet {
inner: TermOrdSetInner,
/// Largest term_ord that may be inserted. Used for both sizing the
/// dense bitset on promotion and as the promotion-threshold reference.
max_term_ord_inclusive: u64,
}
enum TermOrdSetInner {
Sparse(FxHashSet<u64>),
Dense(PagedBitset),
}
impl TermOrdAccumulator for TermOrdSet {
fn new(max_term_ord_inclusive: u64) -> Self {
Self {
inner: TermOrdSetInner::Sparse(FxHashSet::default()),
max_term_ord_inclusive,
}
}
#[inline]
fn insert(&mut self, term_ord: u64) {
match &mut self.inner {
TermOrdSetInner::Sparse(set) => {
set.insert(term_ord);
}
TermOrdSetInner::Dense(bitset) => bitset.insert(term_ord),
}
}
/// Hoist the Sparse/Dense match outside the per-ord loop so that a
/// block of inserts dispatches once.
fn extend_from_iter<I: IntoIterator<Item = u64>>(&mut self, ords: I) {
match &mut self.inner {
TermOrdSetInner::Sparse(set) => {
for ord in ords {
set.insert(ord);
}
}
TermOrdSetInner::Dense(bitset) => {
for ord in ords {
bitset.insert(ord);
}
}
}
}
fn maybe_compact(&mut self) {
let TermOrdSetInner::Sparse(set) = &mut self.inner else {
return;
};
if set.len() as u64 * PROMOTION_RATIO <= self.max_term_ord_inclusive {
return;
}
// Size for ord <= max_term_ord_inclusive plus the missing sentinel
// (column.max_value() + 1, which may equal max_term_ord_inclusive
// when the column references every dictionary term).
let mut bitset = PagedBitset::with_max_term_ord(self.max_term_ord_inclusive + 1);
let set = std::mem::take(set);
for ord in set {
bitset.insert(ord);
}
self.inner = TermOrdSetInner::Dense(bitset);
}
fn len(&self) -> usize {
match &self.inner {
TermOrdSetInner::Sparse(set) => set.len(),
TermOrdSetInner::Dense(bitset) => bitset.len() as usize,
}
}
fn iter_ords(&self) -> impl Iterator<Item = u64> + '_ {
match &self.inner {
TermOrdSetInner::Sparse(set) => itertools::Either::Left(set.iter().copied()),
TermOrdSetInner::Dense(bitset) => itertools::Either::Right(bitset.iter_sorted()),
}
}
}
pub(crate) struct SegmentCardinalityCollector<S: TermOrdAccumulator> {
/// Buckets are Some(_) until they get consumed by into_intermediate_results().
buckets: Vec<Option<SegmentCardinalityCollectorBucket>>,
buckets: Vec<Option<SegmentCardinalityCollectorBucket<S>>>,
accessor_idx: usize,
/// The column accessor to access the fast field values.
accessor: Column<u64>,
@@ -188,9 +453,13 @@ pub(crate) struct SegmentCardinalityCollector {
/// The missing value normalized to the internal u64 representation of the field type.
missing_value_for_accessor: Option<u64>,
coupon_cache: Option<CouponCache>,
/// Largest term_ord that may be inserted into a bucket. For str columns
/// this is `accessor.max_value()`; for non-str columns this is unused
/// (no inserts go into `entries`) and set to 0.
max_term_ord_inclusive: u64,
}
impl Debug for SegmentCardinalityCollector {
impl<S: TermOrdAccumulator> Debug for SegmentCardinalityCollector<S> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("SegmentCardinalityCollector")
.field("column_type", &self.column_type)
@@ -202,16 +471,21 @@ impl Debug for SegmentCardinalityCollector {
}
}
pub(crate) struct SegmentCardinalityCollectorBucket {
cardinality: CardinalityCollector,
entries: FxHashSet<u64>,
/// Per-bucket state. Shape depends on column kind: str columns dedup
/// term ords and only build the HLL sketch at finalization (saves the
/// ~96 B `CardinalityCollector` per bucket during collect); numeric/IpAddr
/// columns feed the sketch directly during collect.
pub(crate) enum SegmentCardinalityCollectorBucket<S: TermOrdAccumulator> {
Str(S),
Numeric(CardinalityCollector),
}
impl SegmentCardinalityCollectorBucket {
impl<S: TermOrdAccumulator> SegmentCardinalityCollectorBucket<S> {
#[inline(always)]
pub fn new(column_type: ColumnType) -> Self {
Self {
cardinality: CardinalityCollector::new(column_type as u8),
entries: FxHashSet::default(),
pub fn new(column_type: ColumnType, max_term_ord_inclusive: u64) -> Self {
if column_type == ColumnType::Str {
Self::Str(S::new(max_term_ord_inclusive))
} else {
Self::Numeric(CardinalityCollector::new(column_type as u8))
}
}
@@ -222,37 +496,57 @@ impl SegmentCardinalityCollectorBucket {
//
// If the column is str, then the values are dictionary encoded
// and have not been added to the sketch yet.
// We need to resolves the term ords accumulated in self.entries
// with the coupon cache, and append the results to the sketch.
// We need to resolves the term ords accumulated in the str entries
// with the coupon cache, and append the results to a fresh sketch.
fn into_intermediate_metric_result(
mut self,
self,
coupon_cache_opt: Option<&CouponCache>,
) -> crate::Result<IntermediateMetricResult> {
if let Some(coupon_cache) = coupon_cache_opt {
assert!(self.cardinality.sketch.is_empty());
append_to_sketch(&self.entries, coupon_cache, &mut self.cardinality);
}
Ok(IntermediateMetricResult::Cardinality(self.cardinality))
let cardinality = match self {
Self::Str(entries) => {
let mut cardinality = CardinalityCollector::new(ColumnType::Str as u8);
if let Some(coupon_cache) = coupon_cache_opt {
// Sketch must be empty for str columns: coupons are appended here
// from the term_ord set (and not directly during collection).
assert!(cardinality.sketch.is_empty());
append_to_sketch(&entries, coupon_cache, &mut cardinality);
}
cardinality
}
Self::Numeric(cardinality) => cardinality,
};
Ok(IntermediateMetricResult::Cardinality(cardinality))
}
}
/// Builds a coupon cache from the given buckets, dictionary, and optional missing value.
/// Returns a mapping from term_ord to the hash (coupon) of the associated term.
fn build_coupon_cache(
buckets: &[Option<SegmentCardinalityCollectorBucket>],
fn build_coupon_cache<S: TermOrdAccumulator>(
buckets: &[Option<SegmentCardinalityCollectorBucket<S>>],
dictionary: &Dictionary,
missing_value_opt: Option<&Key>,
) -> io::Result<CouponCache> {
let term_ords_capacity: usize = buckets
.iter()
.flatten()
.map(|bucket| bucket.entries.len())
.max()
.unwrap_or(0)
* 2;
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(term_ords_capacity, FxBuildHasher);
// Caller restricts this to str cardinality collectors, so every
// present bucket must be the `Str` variant. Pass 1 validates and
// computes the capacity hint; pass 2 inserts.
let mut max_bucket_len = 0usize;
for bucket in buckets.iter().flatten() {
term_ords_set.extend(bucket.entries.iter().copied());
match bucket {
SegmentCardinalityCollectorBucket::Str(entries) => {
max_bucket_len = max_bucket_len.max(entries.len());
}
SegmentCardinalityCollectorBucket::Numeric(_) => {
return Err(io::Error::other(
"build_coupon_cache invoked with a non-str bucket",
));
}
}
}
let mut term_ords_set = FxHashSet::with_capacity_and_hasher(max_bucket_len * 2, FxBuildHasher);
for bucket in buckets.iter().flatten() {
if let SegmentCardinalityCollectorBucket::Str(entries) = bucket {
term_ords_set.extend(entries.iter_ords());
}
}
let mut term_ords: Vec<u64> = term_ords_set.into_iter().collect();
term_ords.sort_unstable();
@@ -284,8 +578,8 @@ fn build_coupon_cache(
Ok(CouponCache::new(term_ords, coupons, missing_coupon_opt))
}
fn append_to_sketch(
term_ords: &FxHashSet<u64>,
fn append_to_sketch<S: TermOrdAccumulator>(
term_ords: &S,
coupon_cache: &CouponCache,
sketch: &mut CardinalityCollector,
) {
@@ -294,7 +588,7 @@ fn append_to_sketch(
coupon_map,
missing_coupon_opt,
} => {
for &term_ord in term_ords {
for term_ord in term_ords.iter_ords() {
if let Some(coupon) = coupon_map
.get(term_ord as usize)
.copied()
@@ -308,8 +602,8 @@ fn append_to_sketch(
coupon_map,
missing_coupon_opt,
} => {
for term_ord in term_ords {
if let Some(coupon) = coupon_map.get(term_ord).copied().or(*missing_coupon_opt) {
for term_ord in term_ords.iter_ords() {
if let Some(coupon) = coupon_map.get(&term_ord).copied().or(*missing_coupon_opt) {
sketch.insert_coupon(coupon);
}
}
@@ -317,12 +611,13 @@ fn append_to_sketch(
}
}
impl SegmentCardinalityCollector {
impl<S: TermOrdAccumulator> SegmentCardinalityCollector<S> {
pub fn from_req(
column_type: ColumnType,
accessor_idx: usize,
accessor: Column<u64>,
missing_value_for_accessor: Option<u64>,
max_term_ord_inclusive: u64,
) -> Self {
Self {
buckets: Vec::new(),
@@ -331,6 +626,7 @@ impl SegmentCardinalityCollector {
accessor,
missing_value_for_accessor,
coupon_cache: None,
max_term_ord_inclusive,
}
}
@@ -347,7 +643,9 @@ impl SegmentCardinalityCollector {
}
}
impl SegmentAggregationCollector for SegmentCardinalityCollector {
impl<S: TermOrdAccumulator + 'static> SegmentAggregationCollector
for SegmentCardinalityCollector<S>
{
fn add_intermediate_aggregation_result(
&mut self,
agg_data: &AggregationsSegmentCtx,
@@ -402,31 +700,41 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
));
};
let col_block_accessor = &agg_data.column_block_accessor;
if self.column_type == ColumnType::Str {
for term_ord in col_block_accessor.iter_vals() {
bucket.entries.insert(term_ord);
match bucket {
SegmentCardinalityCollectorBucket::Str(entries) => {
// Promotion check runs on the pre-block state: the first call
// sees an empty set (no-op), and the last block of inserts
// doesn't trigger a promotion of a set we won't grow further.
// The trait dispatches once per block (via `extend_from_iter`)
// for adaptive variants and inlines to a tight loop for the
// BitSet path.
entries.maybe_compact();
entries.extend_from_iter(col_block_accessor.iter_vals());
}
} else if self.column_type == ColumnType::IpAddr {
let compact_space_accessor = self
.accessor
.values
.clone()
.downcast_arc::<CompactSpaceU64Accessor>()
.map_err(|_| {
TantivyError::AggregationError(
crate::aggregation::AggregationError::InternalError(
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
.to_string(),
),
)
})?;
for val in col_block_accessor.iter_vals() {
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
bucket.cardinality.insert(val);
}
} else {
for val in col_block_accessor.iter_vals() {
bucket.cardinality.insert(val);
SegmentCardinalityCollectorBucket::Numeric(cardinality) => {
if self.column_type == ColumnType::IpAddr {
let compact_space_accessor = self
.accessor
.values
.clone()
.downcast_arc::<CompactSpaceU64Accessor>()
.map_err(|_| {
TantivyError::AggregationError(
crate::aggregation::AggregationError::InternalError(
"Type mismatch: Could not downcast to CompactSpaceU64Accessor"
.to_string(),
),
)
})?;
for val in col_block_accessor.iter_vals() {
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
cardinality.insert(val);
}
} else {
for val in col_block_accessor.iter_vals() {
cardinality.insert(val);
}
}
}
}
@@ -439,12 +747,40 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
_agg_data: &AggregationsSegmentCtx,
) -> crate::Result<()> {
if max_bucket as usize >= self.buckets.len() {
let column_type = self.column_type;
let max_term_ord_inclusive = self.max_term_ord_inclusive;
self.buckets.resize_with(max_bucket as usize + 1, || {
Some(SegmentCardinalityCollectorBucket::new(self.column_type))
Some(SegmentCardinalityCollectorBucket::<S>::new(
column_type,
max_term_ord_inclusive,
))
});
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
let req_data = &agg_data.get_cardinality_req_data(self.accessor_idx);
if req_data.name != sub_agg_name || !sub_agg_property.is_empty() {
return None;
}
let bucket = self.buckets.get(bucket_id as usize)?.as_ref()?;
// For string columns the sketch isn't built until finalization; the
// term_ord set's len is the exact distinct count. For numeric columns
// the sketch is populated during collect.
match bucket {
SegmentCardinalityCollectorBucket::Str(entries) => Some(entries.len() as f64),
SegmentCardinalityCollectorBucket::Numeric(cardinality) => {
Some(cardinality.sketch.estimate().trunc())
}
}
}
}
#[derive(Clone, Debug)]
@@ -489,7 +825,7 @@ impl<'de> Deserialize<'de> for CardinalityCollector {
impl CardinalityCollector {
fn new(salt: u8) -> Self {
Self {
sketch: HllSketch::new(LG_K, HllType::Hll4),
sketch: HllSketch::new(LG_K, HllType::Hll8),
salt,
}
}
@@ -520,7 +856,7 @@ impl CardinalityCollector {
let mut union = HllUnion::new(LG_K);
union.update(&self.sketch);
union.update(&right.sketch);
self.sketch = union.to_sketch(HllType::Hll4);
self.sketch = union.to_sketch(HllType::Hll8);
Ok(())
}
}
@@ -592,6 +928,134 @@ mod tests {
Ok(())
}
/// Build a single-segment string-cardinality index with 32 unique terms.
/// `column.max_value() = 31` is well below `BITSET_MAX_TERM_ORD`,
/// so the bucket exercises the `BitSet` path end to end.
#[test]
fn cardinality_aggregation_test_str_bitset() -> crate::Result<()> {
let terms: Vec<String> = (0..32).map(|i| format!("term_{i}")).collect();
let term_refs: Vec<Vec<&str>> = terms.iter().map(|t| vec![t.as_str()]).collect::<Vec<_>>();
// single segment so we have a single dictionary of 32 terms.
let index = get_test_index_from_terms(true, &term_refs)?;
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": { "field": "string_id" }
},
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
assert_eq!(res["cardinality"]["value"], 32.0);
Ok(())
}
/// `BitSet` path with a `missing` parameter: the column-level missing
/// sentinel (`column.max_value() + 1`) flows into the bitset, the
/// dict lookup filter at finalization drops it, and the missing
/// coupon is applied separately.
#[test]
fn cardinality_aggregation_test_str_bitset_with_missing() {
let mut schema_builder = Schema::builder();
let name_field = schema_builder.add_text_field("name", STRING | FAST);
let index = Index::create_in_ram(schema_builder.build());
let mut writer = index.writer_for_tests().unwrap();
for i in 0..16 {
let term = format!("t{i:02}");
writer.add_document(doc!(name_field => term)).unwrap();
}
// One empty doc, exercising the missing sentinel.
writer.add_document(doc!()).unwrap();
writer.commit().unwrap();
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": {
"field": "name",
"missing": "MISSING_SENTINEL_KEY",
}
},
}))
.unwrap();
let res = exec_request(agg_req, &index).unwrap();
// 16 distinct real terms + 1 distinct "missing" value = 17.
assert_eq!(res["cardinality"]["value"], 17.0);
}
/// Unit-test the PagedBitset itself: cross-page inserts produce sorted
/// iteration, len() matches the inserted set, and duplicates are
/// idempotent.
#[test]
fn paged_bitset_basic() {
use super::PagedBitset;
// Span several pages: BITSET_PAGE_BITS = 1024, so ords > 1024 land
// on the second page, > 2048 on the third, etc.
let ords = [0u64, 1, 63, 64, 1023, 1024, 1025, 4096, 4097, 9999, 10_000];
let max_ord = *ords.iter().max().unwrap();
let mut bitset = PagedBitset::with_max_term_ord(max_ord);
for &ord in &ords {
bitset.insert(ord);
// Idempotent: inserting again must not increase count.
bitset.insert(ord);
}
assert_eq!(bitset.len(), ords.len() as u64);
let collected: Vec<u64> = bitset.iter_sorted().collect();
let mut expected: Vec<u64> = ords.to_vec();
expected.sort_unstable();
assert_eq!(collected, expected);
}
/// Unit-test `TermOrdSet`: starts Sparse, promotes to Dense on
/// `maybe_compact` once the density threshold is crossed, and
/// `iter_ords()` yields the same set in either state. Ords spanning
/// multiple paged-bitset pages exercise the Dense iter ordering.
#[test]
fn term_ord_set_promotes_on_maybe_compact() {
use super::{TermOrdAccumulator, TermOrdSet, PROMOTION_RATIO};
// Pick max so promotion needs few inserts: len * RATIO > max with
// RATIO=32 and max=64 trips at len=3 (3*32=96 > 64).
let max_term_ord = 64u64;
let mut set = <TermOrdSet as TermOrdAccumulator>::new(max_term_ord);
// Two inserts: should stay Sparse after maybe_compact (2 * RATIO = 64, not > 64).
set.insert(0);
set.insert(7);
set.maybe_compact();
assert_eq!(set.len(), 2);
// Third insert promotes on next maybe_compact.
set.insert(20);
assert_eq!(set.len(), 3);
// Sanity check: at len=3, 3 * PROMOTION_RATIO = 96 > 64.
assert!(3u64 * PROMOTION_RATIO > max_term_ord);
set.maybe_compact();
// Post-promotion: extending continues to work.
set.insert(15);
set.insert(15); // dup
assert_eq!(set.len(), 4);
let mut collected: Vec<u64> = set.iter_ords().collect();
collected.sort_unstable();
assert_eq!(collected, vec![0, 7, 15, 20]);
}
/// Unit-test the `BitSet` impl of `TermOrdAccumulator`: insert,
/// dedup, and iter_ords order.
#[test]
fn bitset_accumulator_basic() {
use common::BitSet;
use super::TermOrdAccumulator;
let mut set = <BitSet as TermOrdAccumulator>::new(255);
for ord in [0u64, 1, 63, 64, 65, 128, 200, 200, 0] {
<BitSet as TermOrdAccumulator>::insert(&mut set, ord);
}
assert_eq!(<BitSet as TermOrdAccumulator>::len(&set), 7);
let collected: Vec<u64> = set.iter_ords().collect();
assert_eq!(collected, vec![0, 1, 63, 64, 65, 128, 200]);
}
#[test]
fn cardinality_aggregation_u64() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
@@ -683,6 +1147,42 @@ mod tests {
Ok(())
}
/// A JSON path that resolves to both a Str column and a numeric column
/// produces two collector instances per segment — one with `Str` buckets
/// and one with `Numeric` buckets. Their `IntermediateMetricResult`s must
/// merge into the union cardinality.
#[test]
fn cardinality_aggregation_json_str_and_numeric() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_json_field("json", FAST);
let index = Index::create_in_ram(schema_builder.build());
{
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(field => json!({"value": "hello"})))?;
writer.add_document(doc!(field => json!({"value": "world"})))?;
writer.add_document(doc!(field => json!({"value": "hello"})))?; // dup str
writer.add_document(doc!(field => json!({"value": i64::from_u64(7u64)})))?;
writer.add_document(doc!(field => json!({"value": i64::from_u64(42u64)})))?;
writer.add_document(doc!(field => json!({"value": i64::from_u64(7u64)})))?; // dup num
writer.commit()?;
}
let agg_req: Aggregations = serde_json::from_value(json!({
"cardinality": {
"cardinality": {
"field": "json.value"
},
}
}))
.unwrap();
let res = exec_request(agg_req, &index)?;
// 4 distinct values: "hello", "world", 7, 42.
assert_eq!(res["cardinality"]["value"], 4.0);
Ok(())
}
#[test]
fn cardinality_collector_serde_roundtrip() {
use super::CardinalityCollector;

View File

@@ -399,6 +399,26 @@ impl SegmentAggregationCollector for SegmentExtendedStatsCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
if self.name != sub_agg_name {
return None;
}
let extended = self.buckets.get(bucket_id as usize)?;
// Finalize is a pure read of accumulators — calling it here for the cutoff sort
// doesn't disturb the eventual intermediate result.
extended
.finalize()
.get_value(sub_agg_property)
.ok()
.flatten()
}
}
#[cfg(test)]

View File

@@ -312,6 +312,26 @@ impl SegmentAggregationCollector for SegmentPercentilesCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
if agg_data.get_metric_req_data(self.accessor_idx).name != sub_agg_name {
return None;
}
let percentile: f64 = sub_agg_property.parse().ok()?;
if !(0.0..=100.0).contains(&percentile) {
return None;
}
let bucket = self.buckets.get(bucket_id as usize)?;
// DDSketch.quantile is a pure read; calling it here for the cutoff sort does
// not affect the intermediate state used for the final result.
bucket.sketch.quantile(percentile / 100.0).ok().flatten()
}
}
#[cfg(test)]

View File

@@ -321,6 +321,40 @@ impl<const COLUMN_TYPE_ID: u8> SegmentAggregationCollector
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
if self.name != sub_agg_name {
return None;
}
let stats = self.buckets.get(bucket_id as usize)?;
// The property depends on what we're collecting:
// - StatsType::Stats exposes count/sum/min/max/avg via dotted property.
// - Single-value kinds (Sum/Count/Min/Max/Average) expect an empty property and return
// the value they were configured to collect.
let prop = match self.collecting_for {
StatsType::Stats if !sub_agg_property.is_empty() => sub_agg_property,
StatsType::Sum if sub_agg_property.is_empty() => "sum",
StatsType::Count if sub_agg_property.is_empty() => "count",
StatsType::Max if sub_agg_property.is_empty() => "max",
StatsType::Min if sub_agg_property.is_empty() => "min",
StatsType::Average if sub_agg_property.is_empty() => "avg",
_ => return None,
};
match prop {
"count" => Some(stats.count as f64),
"sum" => Some(stats.sum),
"min" if stats.count > 0 => Some(stats.min),
"max" if stats.count > 0 => Some(stats.max),
"avg" if stats.count > 0 => Some(stats.sum / stats.count as f64),
_ => None,
}
}
}
#[inline]

View File

@@ -644,6 +644,17 @@ impl SegmentAggregationCollector for TopHitsSegmentCollector {
);
Ok(())
}
fn compute_metric_value(
&self,
_bucket_id: BucketId,
_sub_agg_name: &str,
_sub_agg_property: &str,
_agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
// top_hits is not a numeric metric and cannot be used as an order target.
None
}
}
#[cfg(test)]

View File

@@ -76,6 +76,31 @@ pub trait SegmentAggregationCollector: Debug {
fn flush(&mut self, _agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
Ok(())
}
/// Compute the segment-level metric value of the named direct-child metric for `bucket_id`.
///
/// Used by parent term aggs that order by a sub-aggregation: the parent sorts on
/// this value and cuts off at segment time, matching the approximation tradeoff
/// Elasticsearch makes for any sub-agg ordering.
///
/// `sub_agg_property` is the dotted suffix (e.g. `"sum"` in `mystats.sum`); empty when
/// the metric is a single-value kind such as cardinality.
///
/// Returns `None` only on name mismatch, unknown property, or empty bucket. Implementations
/// may finalize their per-bucket state (e.g. compute a percentile from a sketch); calls
/// must be idempotent so the final intermediate result is unaffected.
///
/// No default impl on purpose: every collector must decide explicitly whether it
/// produces a metric value, forwards into children (single-bucket aggs), or rejects
/// the lookup. A silent `None` default would let a parent term agg's cutoff sort all
/// buckets to the same key and drop arbitrary winners.
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
agg_data: &AggregationsSegmentCtx,
) -> Option<f64>;
}
#[derive(Default)]
@@ -137,4 +162,21 @@ impl SegmentAggregationCollector for GenericSegmentAggregationResultsCollector {
}
Ok(())
}
fn compute_metric_value(
&self,
bucket_id: BucketId,
sub_agg_name: &str,
sub_agg_property: &str,
agg_data: &AggregationsSegmentCtx,
) -> Option<f64> {
for agg in &self.aggs {
if let Some(value) =
agg.compute_metric_value(bucket_id, sub_agg_name, sub_agg_property, agg_data)
{
return Some(value);
}
}
None
}
}

View File

@@ -301,11 +301,14 @@ pub trait SegmentCollector: 'static {
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score);
/// The query pushes the scored document to the collector via this method.
/// The query pushes the matched documents to the collector via this method.
/// This method is used when the collector does not require scoring.
///
/// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
/// buffer size passed to the collector.
/// `docs` is a block of matched doc ids. Doc ids are produced in increasing
/// order, in windows of [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN),
/// but several windows are accumulated before being flushed here, so the
/// block may be larger than `COLLECT_BLOCK_BUFFER_LEN`. Implementations must
/// not assume any particular maximum length.
fn collect_block(&mut self, docs: &[DocId]) {
for doc in docs {
self.collect(*doc, 0.0);

View File

@@ -52,7 +52,7 @@ impl<T: FastValue> SortKeyComputer for SortByStaticFastValue<T> {
if schema_type != T::to_type() {
return Err(crate::TantivyError::SchemaError(format!(
"Field `{}` is of type {schema_type:?}, not of the type {:?}.",
&self.field,
self.field,
T::to_type()
)));
}

View File

@@ -11,9 +11,14 @@ use crate::DocId;
/// to compare `[u32; 4]`.
pub const TERMINATED: DocId = i32::MAX as u32;
/// The collect_block method on `SegmentCollector` uses a buffer of this size.
/// Passed results to `collect_block` will not exceed this size and will be
/// exactly this size as long as we can fill the buffer.
/// Window size used by [`DocSet::fill_buffer`]: a single `fill_buffer` call
/// writes at most this many doc ids, and exactly this many as long as the
/// `DocSet` is not exhausted.
///
/// Note that this is *not* the maximum length of the slice passed to
/// `SegmentCollector::collect_block`: the collection loop accumulates several
/// such windows into a larger buffer before flushing it, so `collect_block`
/// may receive a block larger than `COLLECT_BLOCK_BUFFER_LEN`.
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].

View File

@@ -6,6 +6,7 @@ use common::{ByteCount, HasLen};
use fnv::FnvHashMap;
use itertools::Itertools;
use crate::directory::error::OpenReadError;
use crate::directory::{CompositeFile, FileSlice};
use crate::error::DataCorruption;
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
@@ -159,12 +160,10 @@ impl SegmentReader {
let postings_file = segment.open_read(SegmentComponent::Postings)?;
let postings_composite = CompositeFile::open(&postings_file)?;
let positions_composite = {
if let Ok(positions_file) = segment.open_read(SegmentComponent::Positions) {
CompositeFile::open(&positions_file)?
} else {
CompositeFile::empty()
}
let positions_composite = match segment.open_read(SegmentComponent::Positions) {
Ok(positions_file) => CompositeFile::open(&positions_file)?,
Err(OpenReadError::FileDoesNotExist(_)) => CompositeFile::empty(),
Err(open_read_error) => return Err(open_read_error.into()),
};
let schema = segment.schema();
@@ -323,7 +322,7 @@ impl SegmentReader {
// Without expand dots enabled dots need to be escaped.
let escaped_json_path = json_path.replace('.', "\\.");
let full_path = format!("{field_name}.{escaped_json_path}");
let full_path_unescaped = format!("{}.{}", field_name, &json_path);
let full_path_unescaped = format!("{}.{}", field_name, json_path);
map_to_canonical.insert(full_path_unescaped, full_path.to_string());
full_path
} else {

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::index::SegmentReader;
use crate::postings::FreqReadingOption;
use crate::query::disjunction::Disjunction;
@@ -9,8 +8,8 @@ use crate::query::score_combiner::{DoNothingCombiner, ScoreCombiner};
use crate::query::term_query::TermScorer;
use crate::query::weight::{for_each_docset_buffered, for_each_pruning_scorer, for_each_scorer};
use crate::query::{
intersect_scorers, AllScorer, BufferedUnionScorer, EmptyScorer, Exclude, Explanation,
Intersection, Occur, RequiredOptionalScorer, Scorer, Weight,
intersect_scorers, AllScorer, BufferedUnionScorer, EmptyScorer, Exclude, Explanation, Occur,
RequiredOptionalScorer, Scorer, Weight,
};
use crate::{DocId, Score};
@@ -50,10 +49,9 @@ where
TScoreCombiner: ScoreCombiner,
{
assert!(!scorers.is_empty());
if scorers.len() == 1 {
if scorers.len() == 1 && !scorers[0].is::<TermScorer>() {
return SpecializedScorer::Other(scorers.into_iter().next().unwrap()); //< we checked the size beforehand
}
{
let is_all_term_queries = scorers.iter().all(|scorer| scorer.is::<TermScorer>());
if is_all_term_queries {
@@ -67,6 +65,9 @@ where
{
// Block wand is only available if we read frequencies.
return SpecializedScorer::TermUnion(scorers);
} else if scorers.len() == 1 {
// Single TermScorer without freq reading — unwrap directly.
return SpecializedScorer::Other(Box::new(scorers.into_iter().next().unwrap()));
} else {
return SpecializedScorer::Other(Box::new(BufferedUnionScorer::build(
scorers,
@@ -529,13 +530,12 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
) -> crate::Result<()> {
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
let num_docs = reader.num_docs();
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
for_each_docset_buffered(&mut union_scorer, callback);
}
SpecializedScorer::TermIntersection(term_scorers) => {
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
@@ -543,10 +543,10 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
.map(|term_scorer| Box::new(term_scorer) as Box<dyn Scorer>)
.collect();
let mut intersection = intersect_scorers(boxed_scorers, num_docs);
for_each_docset_buffered(intersection.as_mut(), &mut buffer, callback);
for_each_docset_buffered(intersection.as_mut(), callback);
}
SpecializedScorer::Other(mut scorer) => {
for_each_docset_buffered(scorer.as_mut(), &mut buffer, callback);
for_each_docset_buffered(scorer.as_mut(), callback);
}
}
Ok(())

View File

@@ -1,5 +1,5 @@
use super::term_scorer::TermScorer;
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::docset::DocSet;
use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader;
use crate::postings::SegmentPostings;
@@ -92,13 +92,11 @@ impl Weight for TermWeight {
) -> crate::Result<()> {
match self.specialized_scorer(reader, 1.0)? {
TermOrEmptyOrAllScorer::TermScorer(mut term_scorer) => {
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut term_scorer, &mut buffer, callback);
for_each_docset_buffered(&mut term_scorer, callback);
}
TermOrEmptyOrAllScorer::Empty => {}
TermOrEmptyOrAllScorer::AllMatch(mut all_scorer) => {
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut all_scorer, &mut buffer, callback);
for_each_docset_buffered(&mut all_scorer, callback);
}
};

View File

@@ -17,18 +17,56 @@ pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
}
}
/// Iterates through all of the documents matched by the DocSet
/// `DocSet`.
/// Number of `COLLECT_BLOCK_BUFFER_LEN`-sized windows accumulated into the large
/// buffer before it is flushed to the collector via `collect_block`.
const NUM_WINDOWS_PER_BLOCK: usize = 32;
/// Size of the buffer accumulated before invoking the callback (2_048 = 32 * 64).
/// `fill_buffer` keeps writing `COLLECT_BLOCK_BUFFER_LEN`-sized windows; this only
/// changes how much we accumulate before flushing.
const LARGE_COLLECT_BUFFER_LEN: usize = COLLECT_BLOCK_BUFFER_LEN * NUM_WINDOWS_PER_BLOCK;
/// Iterates through all of the documents matched by the `DocSet`, flushing
/// blocks of up to `LARGE_COLLECT_BUFFER_LEN` doc ids to `callback`.
///
/// `fill_buffer` only ever writes `COLLECT_BLOCK_BUFFER_LEN` doc ids at a time,
/// so we accumulate several such windows into a single larger buffer before
/// handing it to the collector. This amortizes the per-`collect_block` overhead
/// (virtual dispatch, aggregation setup) over more documents.
#[inline]
pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>(
docset: &mut T,
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
mut callback: impl FnMut(&[DocId]),
) {
// Heap-allocated once per call (i.e. once per segment in the no-score path).
// `new_zeroed_slice` zeroes directly on the heap, avoiding a 2_048-element
// stack temporary.
// SAFETY: an all-zero bit pattern is a valid value for every `DocId` (u32),
// so the zeroed slice is fully initialized.
let mut buffer: Box<[DocId]> =
unsafe { Box::new_zeroed_slice(LARGE_COLLECT_BUFFER_LEN).assume_init() };
loop {
let num_items = docset.fill_buffer(buffer);
callback(&buffer[..num_items]);
if num_items != buffer.len() {
let mut filled = 0;
let mut reached_end = false;
// Fill the large buffer one `COLLECT_BLOCK_BUFFER_LEN` window at a time.
// `chunks_exact_mut` yields windows of exactly `COLLECT_BLOCK_BUFFER_LEN`
// because `LARGE_COLLECT_BUFFER_LEN` is a multiple of it (empty remainder).
// The windows are contiguous and filled in order, so the doc ids always
// occupy the contiguous prefix `buffer[..filled]`.
for window in buffer.chunks_exact_mut(COLLECT_BLOCK_BUFFER_LEN) {
// SAFETY: each `window` is a slice of exactly `COLLECT_BLOCK_BUFFER_LEN`
// elements, so reinterpreting its start pointer as a fixed-size array
// reference of that length is valid.
let window: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN] =
unsafe { &mut *window.as_mut_ptr().cast::<[DocId; COLLECT_BLOCK_BUFFER_LEN]>() };
let num_items = docset.fill_buffer(window);
filled += num_items;
if num_items != COLLECT_BLOCK_BUFFER_LEN {
reached_end = true;
break;
}
}
callback(&buffer[..filled]);
if reached_end {
break;
}
}
@@ -104,9 +142,7 @@ pub trait Weight: Send + Sync + 'static {
callback: &mut dyn FnMut(&[DocId]),
) -> crate::Result<()> {
let mut docset = self.scorer(reader, 1.0)?;
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
for_each_docset_buffered(&mut docset, &mut buffer, callback);
for_each_docset_buffered(&mut docset, callback);
Ok(())
}

View File

@@ -94,13 +94,7 @@ impl SkipIndex {
byte_range: 0..first_layer_len,
};
for layer in &self.layers {
if let Some(checkpoint) =
layer.seek_start_at_offset(target, cur_checkpoint.byte_range.start)
{
cur_checkpoint = checkpoint;
} else {
return None;
}
cur_checkpoint = layer.seek_start_at_offset(target, cur_checkpoint.byte_range.start)?;
}
Some(cur_checkpoint)
}

View File

@@ -14,11 +14,8 @@ use itertools::Itertools;
use tantivy_fst::Automaton;
use tantivy_fst::automaton::AlwaysMatch;
use crate::sstable_index_v3::SSTableIndexV3Empty;
use crate::streamer::{Streamer, StreamerBuilder};
use crate::{
BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, SSTableIndexV3, TermOrdinal, VoidSSTable,
};
use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal, VoidSSTable};
/// An SSTable is a sorted map that associates sorted `&[u8]` keys
/// to any kind of typed values.
@@ -288,33 +285,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = match version {
2 => SSTableIndex::V2(
crate::sstable_index_v2::SSTableIndex::load(sstable_index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
),
3 => {
let (sstable_index_bytes, mut footerv3_len_bytes) = sstable_index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
SSTableIndex::V3(
SSTableIndexV3::load(sstable_index_bytes, store_offset).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?,
)
} else {
// if store_offset is zero, there is no index, so we build a pseudo-index
// assuming a single block of sstable covering everything.
SSTableIndex::V3Empty(SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::other(format!(
"Unsupported sstable version, expected one of [2, 3], found {version}"
)));
}
};
let sstable_index = SSTableIndex::open(version, index_offset, sstable_index_bytes)?;
Ok(Dictionary {
sstable_slice,
@@ -525,10 +496,15 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
// Open the block for the first ordinal.
let mut bytes = Vec::new();
let mut current_block_addr = self.sstable_index.get_block_with_ord(ord);
let (mut current_block_addr, block_id) = self.sstable_index.get_and_locate_with_ord(ord);
let mut current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
let mut current_block_ordinal = current_block_addr.first_ordinal;
let mut current_block_end_bound = self
.sstable_index
.get_block(block_id + 1)
.map(|block_addr| block_addr.first_ordinal)
.unwrap_or(u64::MAX);
loop {
// move to the ord inside the current block
@@ -557,17 +533,19 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
}
};
// TODO optimization: it is silly to do a binary search to get the block every single
// time.
//
// Check if block changed for new term_ord
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
if new_block_addr != current_block_addr {
if next_ord >= current_block_end_bound {
let (new_block_addr, block_id) =
self.sstable_index.get_and_locate_with_ord(next_ord);
current_block_addr = new_block_addr;
current_block_ordinal = current_block_addr.first_ordinal;
current_sstable_delta_reader =
self.sstable_delta_reader_block(current_block_addr.clone())?;
bytes.clear();
current_block_end_bound = self
.sstable_index
.get_block(block_id + 1)
.map(|block_addr| block_addr.first_ordinal)
.unwrap_or(u64::MAX)
}
ord = next_ord;
}

319
sstable/src/index/mod.rs Normal file
View File

@@ -0,0 +1,319 @@
pub(crate) mod v2;
pub(crate) mod v3;
use std::io::{self, Read, Write};
use std::ops::Range;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_fst::{Automaton, MapBuilder};
use crate::{TermOrdinal, common_prefix_len};
#[derive(Debug, Clone)]
pub enum SSTableIndex {
V2(v2::SSTableIndex),
V3(v3::SSTableIndexV3),
V3Empty(v3::SSTableIndexV3Empty),
}
impl SSTableIndex {
pub(crate) fn open(
version: u32,
index_offset: u64,
index_bytes: OwnedBytes,
) -> io::Result<Self> {
let index = match version {
2 => {
SSTableIndex::V2(v2::SSTableIndex::load(index_bytes).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?)
}
3 => {
let (index_bytes, mut footerv3_len_bytes) = index_bytes.rsplit(8);
let store_offset = u64::deserialize(&mut footerv3_len_bytes)?;
if store_offset != 0 {
SSTableIndex::V3(v3::SSTableIndexV3::load(index_bytes, store_offset).map_err(
|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"),
)?)
} else {
// if store_offset is zero, there is no index, so we build a pseudo-index
// assuming a single block of sstable covering everything.
SSTableIndex::V3Empty(v3::SSTableIndexV3Empty::load(index_offset as usize))
}
}
_ => {
return Err(io::Error::other(format!(
"Unsupported sstable version, expected one of [2, 3], found {version}"
)));
}
};
Ok(index)
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
}
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
}
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
}
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
}
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_and_locate_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_and_locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_and_locate_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct BlockAddr {
pub first_ordinal: u64,
pub byte_range: Range<usize>,
}
impl BlockAddr {
fn to_block_start(&self) -> BlockStartAddr {
BlockStartAddr {
first_ordinal: self.first_ordinal,
byte_range_start: self.byte_range.start,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BlockStartAddr {
first_ordinal: u64,
byte_range_start: usize,
}
impl BlockStartAddr {
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
BlockAddr {
first_ordinal: self.first_ordinal,
byte_range: self.byte_range_start..byte_range_end,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
/// and yet strictly smaller than the first key in the next block.
pub last_key_or_greater: Vec<u8>,
pub block_addr: BlockAddr,
}
impl BinarySerializable for BlockStartAddr {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let start = self.byte_range_start as u64;
start.serialize(writer)?;
self.first_ordinal.serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let byte_range_start = u64::deserialize(reader)? as usize;
let first_ordinal = u64::deserialize(reader)?;
Ok(BlockStartAddr {
first_ordinal,
byte_range_start,
})
}
// Provided method
fn num_bytes(&self) -> u64 {
BlockStartAddr::SIZE_IN_BYTES as u64
}
}
impl FixedSize for BlockStartAddr {
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
}
/// Given that left < right,
/// mutates `left into a shorter byte string left'` that
/// matches `left <= left' < right`.
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
assert!(&left[..] < right);
let common_len = common_prefix_len(left, right);
if left.len() == common_len {
return;
}
// It is possible to do one character shorter in some case,
// but it is not worth the extra complexity
for pos in (common_len + 1)..left.len() {
if left[pos] != u8::MAX {
left[pos] += 1;
left.truncate(pos + 1);
return;
}
}
}
#[derive(Default)]
pub struct SSTableIndexBuilder {
blocks: Vec<BlockMeta>,
}
impl SSTableIndexBuilder {
/// In order to make the index as light as possible, we
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
first_ordinal,
},
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
if self.blocks.len() <= 1 {
return Ok(0);
}
let counting_writer = common::CountingWriter::wrap(wrt);
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
for (i, block) in self.blocks.iter().enumerate() {
map_builder
.insert(&block.last_key_or_greater, i as u64)
.map_err(fst_error_to_io_error)?;
}
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
let written_bytes = counting_writer.written_bytes();
let mut wrt = counting_writer.finish();
let mut block_store_writer = v3::BlockAddrStoreWriter::new();
for block in &self.blocks {
block_store_writer.write_block_meta(block.block_addr.clone())?;
}
block_store_writer.serialize(&mut wrt)?;
Ok(written_bytes)
}
}
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
match error {
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
tantivy_fst::Error::Io(ioerror) => ioerror,
}
}
#[cfg(test)]
mod tests {
#[track_caller]
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
let mut left_buf = left.to_vec();
super::find_shorter_str_in_between(&mut left_buf, right);
assert!(left_buf.len() <= left.len());
assert!(left <= &left_buf);
assert!(&left_buf[..] < right);
}
#[test]
fn test_find_shorter_str_in_between() {
test_find_shorter_str_in_between_aux(b"", b"hello");
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
if left < right {
test_find_shorter_str_in_between_aux(&left, &right);
}
}
}
}

View File

@@ -77,6 +77,13 @@ impl SSTableIndex {
self.get_block(self.locate_with_ord(ord)).unwrap()
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
let location = self.locate_with_ord(ord);
// locate_with_ord always returns an index within range
let block_addr = self.get_block(location).unwrap();
(block_addr, location as u64)
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,

View File

@@ -1,106 +1,14 @@
use std::io::{self, Read, Write};
use std::ops::Range;
use std::sync::Arc;
use common::{BinarySerializable, FixedSize, OwnedBytes};
use tantivy_bitpacker::{BitPacker, compute_num_bits};
use tantivy_fst::raw::Fst;
use tantivy_fst::{Automaton, IntoStreamer, Map, MapBuilder, Streamer};
use tantivy_fst::{Automaton, IntoStreamer, Map, Streamer};
use super::{BlockAddr, BlockStartAddr};
use crate::block_match_automaton::can_block_match_automaton;
use crate::{SSTableDataCorruption, TermOrdinal, common_prefix_len};
#[derive(Debug, Clone)]
pub enum SSTableIndex {
V2(crate::sstable_index_v2::SSTableIndex),
V3(SSTableIndexV3),
V3Empty(SSTableIndexV3Empty),
}
impl SSTableIndex {
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block(block_id as usize),
SSTableIndex::V3(v3_index) => v3_index.get_block(block_id),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block(block_id),
}
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<u64> {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_key(key).map(|i| i as u64),
SSTableIndex::V3(v3_index) => v3_index.locate_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_key(key),
}
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_key(key),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_key(key),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_key(key),
}
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> u64 {
match self {
SSTableIndex::V2(v2_index) => v2_index.locate_with_ord(ord) as u64,
SSTableIndex::V3(v3_index) => v3_index.locate_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.locate_with_ord(ord),
}
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
match self {
SSTableIndex::V2(v2_index) => v2_index.get_block_with_ord(ord),
SSTableIndex::V3(v3_index) => v3_index.get_block_with_ord(ord),
SSTableIndex::V3Empty(v3_empty) => v3_empty.get_block_with_ord(ord),
}
}
pub fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
) -> impl Iterator<Item = (u64, BlockAddr)> + 'a {
match self {
SSTableIndex::V2(v2_index) => {
BlockIter::V2(v2_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3(v3_index) => {
BlockIter::V3(v3_index.get_block_for_automaton(automaton))
}
SSTableIndex::V3Empty(v3_empty) => {
BlockIter::V3Empty(std::iter::once((0, v3_empty.block_addr.clone())))
}
}
}
}
enum BlockIter<V2, V3, T> {
V2(V2),
V3(V3),
V3Empty(std::iter::Once<T>),
}
impl<V2: Iterator<Item = T>, V3: Iterator<Item = T>, T> Iterator for BlockIter<V2, V3, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::V2(v2) => v2.next(),
BlockIter::V3(v3) => v3.next(),
BlockIter::V3Empty(once) => once.next(),
}
}
}
use crate::{SSTableDataCorruption, TermOrdinal};
#[derive(Debug, Clone)]
pub struct SSTableIndexV3 {
@@ -160,6 +68,11 @@ impl SSTableIndexV3 {
self.block_addr_store.binary_search_ord(ord).1
}
pub(crate) fn get_and_locate_with_ord(&self, ord: TermOrdinal) -> (BlockAddr, u64) {
let (location, block_addr) = self.block_addr_store.binary_search_ord(ord);
(block_addr, location)
}
pub(crate) fn get_block_for_automaton<'a>(
&'a self,
automaton: &'a impl Automaton,
@@ -216,7 +129,7 @@ impl<A: Automaton> Iterator for GetBlockForAutomaton<'_, A> {
#[derive(Debug, Clone)]
pub struct SSTableIndexV3Empty {
block_addr: BlockAddr,
pub block_addr: BlockAddr,
}
impl SSTableIndexV3Empty {
@@ -230,8 +143,8 @@ impl SSTableIndexV3Empty {
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, _block_id: u64) -> Option<BlockAddr> {
Some(self.block_addr.clone())
pub(crate) fn get_block(&self, block_id: u64) -> Option<BlockAddr> {
(block_id == 0).then(|| self.block_addr.clone())
}
/// Get the block id of the block that would contain `key`.
@@ -256,146 +169,9 @@ impl SSTableIndexV3Empty {
pub(crate) fn get_block_with_ord(&self, _ord: TermOrdinal) -> BlockAddr {
self.block_addr.clone()
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct BlockAddr {
pub first_ordinal: u64,
pub byte_range: Range<usize>,
}
impl BlockAddr {
fn to_block_start(&self) -> BlockStartAddr {
BlockStartAddr {
first_ordinal: self.first_ordinal,
byte_range_start: self.byte_range.start,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BlockStartAddr {
first_ordinal: u64,
byte_range_start: usize,
}
impl BlockStartAddr {
fn to_block_addr(&self, byte_range_end: usize) -> BlockAddr {
BlockAddr {
first_ordinal: self.first_ordinal,
byte_range: self.byte_range_start..byte_range_end,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
/// and yet strictly smaller than the first key in the next block.
pub last_key_or_greater: Vec<u8>,
pub block_addr: BlockAddr,
}
impl BinarySerializable for BlockStartAddr {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
let start = self.byte_range_start as u64;
start.serialize(writer)?;
self.first_ordinal.serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let byte_range_start = u64::deserialize(reader)? as usize;
let first_ordinal = u64::deserialize(reader)?;
Ok(BlockStartAddr {
first_ordinal,
byte_range_start,
})
}
// Provided method
fn num_bytes(&self) -> u64 {
BlockStartAddr::SIZE_IN_BYTES as u64
}
}
impl FixedSize for BlockStartAddr {
const SIZE_IN_BYTES: usize = 2 * u64::SIZE_IN_BYTES;
}
/// Given that left < right,
/// mutates `left into a shorter byte string left'` that
/// matches `left <= left' < right`.
fn find_shorter_str_in_between(left: &mut Vec<u8>, right: &[u8]) {
assert!(&left[..] < right);
let common_len = common_prefix_len(left, right);
if left.len() == common_len {
return;
}
// It is possible to do one character shorter in some case,
// but it is not worth the extra complexity
for pos in (common_len + 1)..left.len() {
if left[pos] != u8::MAX {
left[pos] += 1;
left.truncate(pos + 1);
return;
}
}
}
#[derive(Default)]
pub struct SSTableIndexBuilder {
blocks: Vec<BlockMeta>,
}
impl SSTableIndexBuilder {
/// In order to make the index as light as possible, we
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
first_ordinal,
},
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<u64> {
if self.blocks.len() <= 1 {
return Ok(0);
}
let counting_writer = common::CountingWriter::wrap(wrt);
let mut map_builder = MapBuilder::new(counting_writer).map_err(fst_error_to_io_error)?;
for (i, block) in self.blocks.iter().enumerate() {
map_builder
.insert(&block.last_key_or_greater, i as u64)
.map_err(fst_error_to_io_error)?;
}
let counting_writer = map_builder.into_inner().map_err(fst_error_to_io_error)?;
let written_bytes = counting_writer.written_bytes();
let mut wrt = counting_writer.finish();
let mut block_store_writer = BlockAddrStoreWriter::new();
for block in &self.blocks {
block_store_writer.write_block_meta(block.block_addr.clone())?;
}
block_store_writer.serialize(&mut wrt)?;
Ok(written_bytes)
}
}
fn fst_error_to_io_error(error: tantivy_fst::Error) -> io::Error {
match error {
tantivy_fst::Error::Fst(fst_error) => io::Error::other(fst_error),
tantivy_fst::Error::Io(ioerror) => ioerror,
pub(crate) fn get_and_locate_with_ord(&self, _ord: TermOrdinal) -> (BlockAddr, u64) {
(self.block_addr.clone(), 0)
}
}
@@ -647,14 +423,14 @@ fn binary_search(max: u64, cmp_fn: impl Fn(u64) -> std::cmp::Ordering) -> Result
Err(left)
}
struct BlockAddrStoreWriter {
pub(crate) struct BlockAddrStoreWriter {
buffer_block_metas: Vec<u8>,
buffer_addrs: Vec<u8>,
block_addrs: Vec<BlockAddr>,
}
impl BlockAddrStoreWriter {
fn new() -> Self {
pub(crate) fn new() -> Self {
BlockAddrStoreWriter {
buffer_block_metas: Vec::new(),
buffer_addrs: Vec::new(),
@@ -662,7 +438,7 @@ impl BlockAddrStoreWriter {
}
}
fn flush_block(&mut self) -> io::Result<()> {
pub(crate) fn flush_block(&mut self) -> io::Result<()> {
if self.block_addrs.is_empty() {
return Ok(());
}
@@ -741,7 +517,7 @@ impl BlockAddrStoreWriter {
Ok(())
}
fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
pub(crate) fn write_block_meta(&mut self, block_addr: BlockAddr) -> io::Result<()> {
self.block_addrs.push(block_addr);
if self.block_addrs.len() >= STORE_BLOCK_LEN {
self.flush_block()?;
@@ -749,7 +525,7 @@ impl BlockAddrStoreWriter {
Ok(())
}
fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
pub(crate) fn serialize<W: std::io::Write>(&mut self, wrt: &mut W) -> io::Result<()> {
self.flush_block()?;
let len = self.buffer_block_metas.len() as u64;
len.serialize(wrt)?;
@@ -824,8 +600,9 @@ mod tests {
use common::OwnedBytes;
use super::*;
use crate::SSTableDataCorruption;
use crate::block_match_automaton::tests::EqBuffer;
use crate::index::BlockMeta;
use crate::{SSTableDataCorruption, SSTableIndexBuilder};
#[test]
fn test_sstable_index() {
@@ -874,36 +651,7 @@ mod tests {
assert!(matches!(data_corruption_err, SSTableDataCorruption));
}
#[track_caller]
fn test_find_shorter_str_in_between_aux(left: &[u8], right: &[u8]) {
let mut left_buf = left.to_vec();
super::find_shorter_str_in_between(&mut left_buf, right);
assert!(left_buf.len() <= left.len());
assert!(left <= &left_buf);
assert!(&left_buf[..] < right);
}
#[test]
fn test_find_shorter_str_in_between() {
test_find_shorter_str_in_between_aux(b"", b"hello");
test_find_shorter_str_in_between_aux(b"abc", b"abcd");
test_find_shorter_str_in_between_aux(b"abcd", b"abd");
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[1]);
test_find_shorter_str_in_between_aux(&[0, 0, 0], &[0, 0, 1]);
test_find_shorter_str_in_between_aux(&[0, 0, 255, 255, 255, 0u8], &[0, 1]);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_find_shorter_str(left in any::<Vec<u8>>(), right in any::<Vec<u8>>()) {
if left < right {
test_find_shorter_str_in_between_aux(&left, &right);
}
}
}
// use proptest::prelude::*;
#[test]
fn test_find_best_slop() {

View File

@@ -47,9 +47,8 @@ pub mod merge;
mod streamer;
pub mod value;
mod sstable_index_v3;
pub use sstable_index_v3::{BlockAddr, SSTableIndex, SSTableIndexBuilder, SSTableIndexV3};
mod sstable_index_v2;
mod index;
pub use index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
pub(crate) mod vint;
pub use dictionary::{Dictionary, TermOrdHit};
pub use streamer::{Streamer, StreamerBuilder};

View File

@@ -27,7 +27,7 @@ rand = "0.9"
zipf = "7.0.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.16.1" }
binggan = { version = "0.17.0" }
rand_distr = "0.5"
[features]