Compare commits

...

19 Commits

Author SHA1 Message Date
cong.xie
c69835dc91 fix format 2026-02-04 09:03:42 -05:00
cong.xie
037f387817 feat(aggregation): expose sketches for percentiles and cardinality
This change extends the multi-step query support to percentiles and
cardinality aggregations by exposing their underlying sketches.

Changes:
- Add CardinalityMetricResult struct with value and HLL sketch
- Update PercentilesMetricResult to include DDSketch
- Update MetricResult::Cardinality to use CardinalityMetricResult
- Update finalization to include sketches in results
- Add tests verifying sketch data is present in results

JSON output changes:

Percentiles:
  Before: { "values": {...} }
  After:  { "values": {...}, "sketch": {...} }

Cardinality:
  Before: { "value": 10.0 }
  After:  { "value": 10.0, "sketch": {...} }

The sketch fields enable downstream systems to merge results across
multiple query steps using the raw sketch data.
2026-02-03 11:56:51 -05:00
cong.xie
06c67b656c feat(aggregation): expose sum and count in Average metric result
This change modifies the Average aggregation to return sum and count
alongside the computed average value, enabling downstream systems to
properly merge results across multiple query steps.

Changes:
- Add AverageMetricResult struct with value, sum, and count fields
- Add sum() and count() getter methods to IntermediateAverage
- Update MetricResult::Average to use AverageMetricResult
- Update finalization to populate sum/count from intermediate result
- Update tests to expect new JSON format

JSON output changes from:
  { "value": 2.5 }
to:
  { "value": 2.5, "sum": 15.0, "count": 6 }

This is a breaking change for JSON consumers expecting the old format.
2026-02-03 10:31:54 -05:00
PSeitz
28db952131 Add regex search and merge segments benchmark (#2826)
* add merge_segments benchmark

* add regex search bench
2026-02-02 17:28:02 +01:00
PSeitz
98ebbf922d faster exclude queries (#2825)
* faster exclude queries

Faster exclude queries with multiple terms.

Changes `Exclude` to be able to exclude multiple DocSets, instead of
putting the docsets into a union.
Use `seek_danger` in `Exclude`.

closes #2822

* replace unwrap with match
2026-01-30 17:06:41 +01:00
Paul Masurel
4a89e74597 Fix rfc3339 typos and add Claude Code skills (#2823)
Closes #2817
2026-01-30 12:00:28 +01:00
Alex Lazar
4d99e51e50 Bump oneshot to 0.1.13 per dependabot (#2821) 2026-01-30 11:42:01 +01:00
trinity-1686a
9b619998bd Merge pull request #2816 from evance-br/fix-closing-paren-elastic-range 2026-01-27 17:00:08 +01:00
Evance Soumaoro
765c448945 uncomment commented code when testing 2026-01-27 13:19:41 +00:00
Evance Soumaoro
943594ebaa uncomment commented code when testing 2026-01-27 13:08:38 +00:00
Evance Soumaoro
df17daae0d fix closing parenthesis error on elastic range queries for lenient parser 2026-01-27 13:01:14 +00:00
Paul Masurel
0ae94baef5 Remove temp file (#2815)
Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-01-27 09:22:11 +01:00
Paul Masurel
3f448ecf79 Bugfix on intersection. (#2812)
The intersection algorithm made it possible for .seek(..) with values
lower than the current doc id, breaking the DocSet contract.

The fix removes the optimization that caused left.seek(..) to be replaced
by a simpler left.advance(..).

Simply doing so lead to a performance regression.
I therefore integrated that idea within SegmentPostings.seek.

We now attempt to check the next doc systematically on seek,
PROVIDED the block is already loaded.

Closes #2811

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-01-27 09:21:09 +01:00
Paul Masurel
b86caeefe2 Major bugfix in intersection
A bug was added with the `seek_into_the_danger_zone()` optimization

(Spotted and fixed by Stu)

The contract says seek_into_the_danger_zone returns true if do is part of the docset.

The blanket implementation goes like this.

```
let current_doc = self.doc();
if current_doc < target {
     self.seek(target);
}
self.doc() == target
```

So it will return true if target is TERMINATED, where really TERMINATED does not belong to the docset.


The fix tries to clarify the contracts and fixes the intersection algorithm.
We observe a small but all over the board improvement in intersection performance.

---------

Co-authored-by: Stu Hood <stuhood@gmail.com>
Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-01-23 18:44:10 +01:00
ChangRui-Ryan
abf1e64f4d add benchmark for string search and get (#2795) 2026-01-19 11:50:41 +01:00
trinity-1686a
12977bc7c4 upgrade some dependancies (#2802)
including rand, which had a few breaking changes
2026-01-14 10:19:09 +01:00
trinity-1686a
0c94eb94c3 Merge pull request #2799 from jollygreenlaser/lru 2026-01-13 22:47:35 +01:00
Paul Masurel
c92e831dde Minor refactoring in PostingsSerializer (#2801)
Removes the Write generics argument in PostingsSerializer.
This removes useless generic.
Prepares the path for codecs.
Removes one useless CountingWrite layer.
etc.

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-01-12 13:53:43 +01:00
Alex Lazar
947c0d5f40 Bump lru to 0.16.3 per dependabot 2026-01-09 23:25:51 -08:00
75 changed files with 1715 additions and 379 deletions

View File

@@ -0,0 +1,125 @@
---
name: rationalize-deps
description: Analyze Cargo.toml dependencies and attempt to remove unused features to reduce compile times and binary size
---
# Rationalize Dependencies
This skill analyzes Cargo.toml dependencies to identify and remove unused features.
## Overview
Many crates enable features by default that may not be needed. This skill:
1. Identifies dependencies with default features enabled
2. Tests if `default-features = false` works
3. Identifies which specific features are actually needed
4. Verifies compilation after changes
## Step 1: Identify the target
Ask the user which crate(s) to analyze:
- A specific crate name (e.g., "tokio", "serde")
- A specific workspace member (e.g., "quickwit-search")
- "all" to scan the entire workspace
## Step 2: Analyze current dependencies
For the workspace Cargo.toml (`quickwit/Cargo.toml`), list dependencies that:
- Do NOT have `default-features = false`
- Have default features that might be unnecessary
Run: `cargo tree -p <crate> -f "{p} {f}" --edges features` to see what features are actually used.
## Step 3: For each candidate dependency
### 3a: Check the crate's default features
Look up the crate on crates.io or check its Cargo.toml to understand:
- What features are enabled by default
- What each feature provides
Use: `cargo metadata --format-version=1 | jq '.packages[] | select(.name == "<crate>") | .features'`
### 3b: Try disabling default features
Modify the dependency in `quickwit/Cargo.toml`:
From:
```toml
some-crate = { version = "1.0" }
```
To:
```toml
some-crate = { version = "1.0", default-features = false }
```
### 3c: Run cargo check
Run: `cargo check --workspace` (or target specific packages for faster feedback)
If compilation fails:
1. Read the error messages to identify which features are needed
2. Add only the required features explicitly:
```toml
some-crate = { version = "1.0", default-features = false, features = ["needed-feature"] }
```
3. Re-run cargo check
### 3d: Binary search for minimal features
If there are many default features, use binary search:
1. Start with no features
2. If it fails, add half the default features
3. Continue until you find the minimal set
## Step 4: Document findings
For each dependency analyzed, report:
- Original configuration
- New configuration (if changed)
- Features that were removed
- Any features that are required
## Step 5: Verify full build
After all changes, run:
```bash
cargo check --workspace --all-targets
cargo test --workspace --no-run
```
## Common Patterns
### Serde
Often only needs `derive`:
```toml
serde = { version = "1.0", default-features = false, features = ["derive", "std"] }
```
### Tokio
Identify which runtime features are actually used:
```toml
tokio = { version = "1.0", default-features = false, features = ["rt-multi-thread", "macros", "sync"] }
```
### Reqwest
Often doesn't need all TLS backends:
```toml
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"] }
```
## Rollback
If changes cause issues:
```bash
git checkout quickwit/Cargo.toml
cargo check --workspace
```
## Tips
- Start with large crates that have many default features (tokio, reqwest, hyper)
- Use `cargo bloat --crates` to identify large dependencies
- Check `cargo tree -d` for duplicate dependencies that might indicate feature conflicts
- Some features are needed only for tests - consider using `[dev-dependencies]` features

View File

@@ -0,0 +1,60 @@
---
name: simple-pr
description: Create a simple PR from staged changes with an auto-generated commit message
disable-model-invocation: true
---
# Simple PR
Follow these steps to create a simple PR from staged changes:
## Step 1: Check workspace state
Run: `git status`
Verify that all changes have been staged (no unstaged changes). If there are unstaged changes, abort and ask the user to stage their changes first with `git add`.
Also verify that we are on the `main` branch. If not, abort and ask the user to switch to main first.
## Step 2: Ensure main is up to date
Run: `git pull origin main`
This ensures we're working from the latest code.
## Step 3: Review staged changes
Run: `git diff --cached`
Review the staged changes to understand what the PR will contain.
## Step 4: Generate commit message
Based on the staged changes, generate a concise commit message (1-2 sentences) that describes the "why" rather than the "what".
Display the proposed commit message to the user and ask for confirmation before proceeding.
## Step 5: Create a new branch
Get the git username: `git config user.name | tr ' ' '-' | tr '[:upper:]' '[:lower:]'`
Create a short, descriptive branch name based on the changes (e.g., `fix-typo-in-readme`, `add-retry-logic`, `update-deps`).
Create and checkout the branch: `git checkout -b {username}/{short-descriptive-name}`
## Step 6: Commit changes
Commit with the message from step 3:
```
git commit -m "{commit-message}"
```
## Step 7: Push and open a PR
Push the branch and open a PR:
```
git push -u origin {branch-name}
gh pr create --title "{commit-message-title}" --body "{longer-description-if-needed}"
```
Report the PR URL to the user when complete.

View File

@@ -15,7 +15,7 @@ rust-version = "1.85"
exclude = ["benches/*.json", "benches/*.txt"]
[dependencies]
oneshot = "0.1.7"
oneshot = "0.1.13"
base64 = "0.22.0"
byteorder = "1.4.3"
crc32fast = "1.3.2"
@@ -27,7 +27,7 @@ regex = { version = "1.5.5", default-features = false, features = [
aho-corasick = "1.0"
tantivy-fst = "0.5"
memmap2 = { version = "0.9.0", optional = true }
lz4_flex = { version = "0.11", default-features = false, optional = true }
lz4_flex = { version = "0.12", default-features = false, optional = true }
zstd = { version = "0.13", optional = true, default-features = false }
tempfile = { version = "3.12.0", optional = true }
log = "0.4.16"
@@ -50,7 +50,7 @@ fail = { version = "0.5.0", optional = true }
time = { version = "0.3.35", features = ["serde-well-known"] }
smallvec = "1.8.0"
rayon = "1.5.2"
lru = "0.12.0"
lru = "0.16.3"
fastdivide = "0.4.0"
itertools = "0.14.0"
measure_time = "0.9.0"
@@ -76,7 +76,7 @@ winapi = "0.3.9"
[dev-dependencies]
binggan = "0.14.2"
rand = "0.8.5"
rand = "0.9"
maplit = "1.0.2"
matches = "0.1.9"
pretty_assertions = "1.2.1"
@@ -85,7 +85,7 @@ test-log = "0.2.10"
futures = "0.3.21"
paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.4.3"
rand_distr = "0.5"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
postcard = { version = "1.0.4", features = [
"use-std",
@@ -189,3 +189,16 @@ harness = false
[[bench]]
name = "bool_queries_with_range"
harness = false
[[bench]]
name = "str_search_and_get"
harness = false
[[bench]]
name = "merge_segments"
harness = false
[[bench]]
name = "regex_all_terms"
harness = false

View File

@@ -1,8 +1,8 @@
use binggan::plugins::PeakMemAllocPlugin;
use binggan::{black_box, InputGroup, PeakMemAlloc, INSTRUMENTED_SYSTEM};
use rand::distributions::WeightedIndex;
use rand::prelude::SliceRandom;
use rand::distr::weighted::WeightedIndex;
use rand::rngs::StdRng;
use rand::seq::IndexedRandom;
use rand::{Rng, SeedableRng};
use rand_distr::Distribution;
use serde_json::json;
@@ -532,7 +532,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
// Prepare 1000 unique terms sampled using a Zipf distribution.
// Exponent ~1.1 approximates top-20 terms covering around ~20%.
let terms_1000: Vec<String> = (1..=1000).map(|i| format!("term_{i}")).collect();
let zipf_1000 = rand_distr::Zipf::new(1000, 1.1f64).unwrap();
let zipf_1000 = rand_distr::Zipf::new(1000.0, 1.1f64).unwrap();
{
let mut rng = StdRng::from_seed([1u8; 32]);
@@ -576,8 +576,8 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
}
let _val_max = 1_000_000.0;
for _ in 0..doc_with_value {
let val: f64 = rng.gen_range(0.0..1_000_000.0);
let json = if rng.gen_bool(0.1) {
let val: f64 = rng.random_range(0.0..1_000_000.0);
let json = if rng.random_bool(0.1) {
// 10% are numeric values
json!({ "mixed_type": val })
} else {
@@ -586,7 +586,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
index_writer.add_document(doc!(
text_field => "cool",
json_field => json,
text_field_all_unique_terms => format!("unique_term_{}", rng.gen::<u64>()),
text_field_all_unique_terms => format!("unique_term_{}", rng.random::<u64>()),
text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(),
text_field_few_terms_status => status_field_data[log_level_distribution.sample(&mut rng)].0,
text_field_1000_terms_zipf => terms_1000[zipf_1000.sample(&mut rng) as usize - 1].as_str(),

View File

@@ -55,29 +55,29 @@ fn build_shared_indices(num_docs: usize, p_a: f32, p_b: f32, p_c: f32) -> (Bench
{
let mut writer = index.writer_with_num_threads(1, 500_000_000).unwrap();
for _ in 0..num_docs {
let has_a = rng.gen_bool(p_a as f64);
let has_b = rng.gen_bool(p_b as f64);
let has_c = rng.gen_bool(p_c as f64);
let score = rng.gen_range(0u64..100u64);
let score2 = rng.gen_range(0u64..100_000u64);
let has_a = rng.random_bool(p_a as f64);
let has_b = rng.random_bool(p_b as f64);
let has_c = rng.random_bool(p_c as f64);
let score = rng.random_range(0u64..100u64);
let score2 = rng.random_range(0u64..100_000u64);
let mut title_tokens: Vec<&str> = Vec::new();
let mut body_tokens: Vec<&str> = Vec::new();
if has_a {
if rng.gen_bool(0.1) {
if rng.random_bool(0.1) {
title_tokens.push("a");
} else {
body_tokens.push("a");
}
}
if has_b {
if rng.gen_bool(0.1) {
if rng.random_bool(0.1) {
title_tokens.push("b");
} else {
body_tokens.push("b");
}
}
if has_c {
if rng.gen_bool(0.1) {
if rng.random_bool(0.1) {
title_tokens.push("c");
} else {
body_tokens.push("c");

View File

@@ -36,13 +36,13 @@ fn build_shared_indices(num_docs: usize, p_title_a: f32, distribution: &str) ->
"dense" => {
for doc_id in 0..num_docs {
// Always add title to avoid empty documents
let title_token = if rng.gen_bool(p_title_a as f64) {
let title_token = if rng.random_bool(p_title_a as f64) {
"a"
} else {
"b"
};
let num_rand = rng.gen_range(0u64..1000u64);
let num_rand = rng.random_range(0u64..1000u64);
let num_asc = (doc_id / 10000) as u64;
@@ -60,13 +60,13 @@ fn build_shared_indices(num_docs: usize, p_title_a: f32, distribution: &str) ->
"sparse" => {
for doc_id in 0..num_docs {
// Always add title to avoid empty documents
let title_token = if rng.gen_bool(p_title_a as f64) {
let title_token = if rng.random_bool(p_title_a as f64) {
"a"
} else {
"b"
};
let num_rand = rng.gen_range(0u64..10000000u64);
let num_rand = rng.random_range(0u64..10000000u64);
let num_asc = doc_id as u64;

224
benches/merge_segments.rs Normal file
View File

@@ -0,0 +1,224 @@
// Benchmarks segment merging
//
// Notes:
// - Input segments are kept intact (no deletes / no IndexWriter merge).
// - Output is written to a `NullDirectory` that discards all files except
// fieldnorms (needed for merging).
use std::collections::HashMap;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use binggan::{black_box, BenchRunner};
use rand::prelude::*;
use rand::rngs::StdRng;
use rand::SeedableRng;
use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use tantivy::directory::{
AntiCallToken, Directory, FileHandle, OwnedBytes, TerminatingWrite, WatchCallback, WatchHandle,
WritePtr,
};
use tantivy::indexer::{merge_filtered_segments, NoMergePolicy};
use tantivy::schema::{Schema, TEXT};
use tantivy::{doc, HasLen, Index, IndexSettings, Segment};
#[derive(Clone, Default, Debug)]
struct NullDirectory {
blobs: Arc<RwLock<HashMap<PathBuf, OwnedBytes>>>,
}
struct NullWriter;
impl Write for NullWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl TerminatingWrite for NullWriter {
fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> {
Ok(())
}
}
struct InMemoryWriter {
path: PathBuf,
buffer: Vec<u8>,
blobs: Arc<RwLock<HashMap<PathBuf, OwnedBytes>>>,
}
impl Write for InMemoryWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl TerminatingWrite for InMemoryWriter {
fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> {
let bytes = OwnedBytes::new(std::mem::take(&mut self.buffer));
self.blobs.write().unwrap().insert(self.path.clone(), bytes);
Ok(())
}
}
#[derive(Debug, Default)]
struct NullFileHandle;
impl HasLen for NullFileHandle {
fn len(&self) -> usize {
0
}
}
impl FileHandle for NullFileHandle {
fn read_bytes(&self, _range: std::ops::Range<usize>) -> io::Result<OwnedBytes> {
unimplemented!()
}
}
impl Directory for NullDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
if let Some(bytes) = self.blobs.read().unwrap().get(path) {
return Ok(Arc::new(bytes.clone()));
}
Ok(Arc::new(NullFileHandle))
}
fn delete(&self, _path: &Path) -> Result<(), DeleteError> {
Ok(())
}
fn exists(&self, _path: &Path) -> Result<bool, OpenReadError> {
Ok(true)
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let path_buf = path.to_path_buf();
if path.to_string_lossy().ends_with(".fieldnorm") {
let writer = InMemoryWriter {
path: path_buf,
buffer: Vec::new(),
blobs: Arc::clone(&self.blobs),
};
Ok(io::BufWriter::new(Box::new(writer)))
} else {
Ok(io::BufWriter::new(Box::new(NullWriter)))
}
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
if let Some(bytes) = self.blobs.read().unwrap().get(path) {
return Ok(bytes.as_slice().to_vec());
}
Err(OpenReadError::FileDoesNotExist(path.to_path_buf()))
}
fn atomic_write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
Ok(())
}
fn sync_directory(&self) -> io::Result<()> {
Ok(())
}
fn watch(&self, _watch_callback: WatchCallback) -> tantivy::Result<WatchHandle> {
Ok(WatchHandle::empty())
}
}
struct MergeScenario {
#[allow(dead_code)]
index: Index,
segments: Vec<Segment>,
settings: IndexSettings,
label: String,
}
fn build_index(
num_segments: usize,
docs_per_segment: usize,
tokens_per_doc: usize,
vocab_size: usize,
) -> MergeScenario {
let mut schema_builder = Schema::builder();
let body = schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
assert!(vocab_size > 0);
let total_tokens = num_segments * docs_per_segment * tokens_per_doc;
let use_unique_terms = vocab_size >= total_tokens;
let mut rng = StdRng::from_seed([7u8; 32]);
let mut next_token_id: u64 = 0;
{
let mut writer = index.writer_with_num_threads(1, 256_000_000).unwrap();
writer.set_merge_policy(Box::new(NoMergePolicy));
for _ in 0..num_segments {
for _ in 0..docs_per_segment {
let mut tokens = Vec::with_capacity(tokens_per_doc);
for _ in 0..tokens_per_doc {
let token_id = if use_unique_terms {
let id = next_token_id;
next_token_id += 1;
id
} else {
rng.random_range(0..vocab_size as u64)
};
tokens.push(format!("term_{token_id}"));
}
writer.add_document(doc!(body => tokens.join(" "))).unwrap();
}
writer.commit().unwrap();
}
}
let segments = index.searchable_segments().unwrap();
let settings = index.settings().clone();
let label = format!(
"segments={}, docs/seg={}, tokens/doc={}, vocab={}",
num_segments, docs_per_segment, tokens_per_doc, vocab_size
);
MergeScenario {
index,
segments,
settings,
label,
}
}
fn main() {
let scenarios = vec![
build_index(8, 50_000, 12, 8),
build_index(16, 50_000, 12, 8),
build_index(16, 100_000, 12, 8),
build_index(8, 50_000, 8, 8 * 50_000 * 8),
];
let mut runner = BenchRunner::new();
for scenario in scenarios {
let mut group = runner.new_group();
group.set_name(format!("merge_segments inv_index — {}", scenario.label));
let segments = scenario.segments.clone();
let settings = scenario.settings.clone();
group.register("merge", move |_| {
let output_dir = NullDirectory::default();
let filter_doc_ids = vec![None; segments.len()];
let merged_index =
merge_filtered_segments(&segments, settings.clone(), filter_doc_ids, output_dir)
.unwrap();
black_box(merged_index);
});
group.run();
}
}

View File

@@ -33,7 +33,7 @@ fn build_shared_indices(num_docs: usize, distribution: &str) -> BenchIndex {
match distribution {
"dense" => {
for doc_id in 0..num_docs {
let num_rand = rng.gen_range(0u64..1000u64);
let num_rand = rng.random_range(0u64..1000u64);
let num_asc = (doc_id / 10000) as u64;
writer
@@ -46,7 +46,7 @@ fn build_shared_indices(num_docs: usize, distribution: &str) -> BenchIndex {
}
"sparse" => {
for doc_id in 0..num_docs {
let num_rand = rng.gen_range(0u64..10000000u64);
let num_rand = rng.random_range(0u64..10000000u64);
let num_asc = doc_id as u64;
writer

View File

@@ -97,20 +97,20 @@ fn get_index_0_to_100() -> Index {
let num_vals = 100_000;
let docs: Vec<_> = (0..num_vals)
.map(|_i| {
let id_name = if rng.gen_bool(0.01) {
let id_name = if rng.random_bool(0.01) {
"veryfew".to_string() // 1%
} else if rng.gen_bool(0.1) {
} else if rng.random_bool(0.1) {
"few".to_string() // 9%
} else {
"most".to_string() // 90%
};
Doc {
id_name,
id: rng.gen_range(0..100),
id: rng.random_range(0..100),
// Multiply by 1000, so that we create most buckets in the compact space
// The benches depend on this range to select n-percent of elements with the
// methods below.
ip: Ipv6Addr::from_u128(rng.gen_range(0..100) * 1000),
ip: Ipv6Addr::from_u128(rng.random_range(0..100) * 1000),
}
})
.collect();

113
benches/regex_all_terms.rs Normal file
View File

@@ -0,0 +1,113 @@
// Benchmarks regex query that matches all terms in a synthetic index.
//
// Corpus model:
// - N unique terms: t000000, t000001, ...
// - M docs
// - K tokens per doc: doc i gets terms derived from (i, token_index)
//
// Query:
// - Regex "t.*" to match all terms
//
// Run with:
// - cargo bench --bench regex_all_terms
//
use std::fmt::Write;
use binggan::{black_box, BenchRunner};
use tantivy::collector::Count;
use tantivy::query::RegexQuery;
use tantivy::schema::{Schema, TEXT};
use tantivy::{doc, Index, ReloadPolicy};
const HEAP_SIZE_BYTES: usize = 200_000_000;
#[derive(Clone, Copy)]
struct BenchConfig {
num_terms: usize,
num_docs: usize,
tokens_per_doc: usize,
}
fn main() {
let configs = default_configs();
let mut runner = BenchRunner::new();
for config in configs {
let (index, text_field) = build_index(config, HEAP_SIZE_BYTES);
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.expect("reader");
let searcher = reader.searcher();
let query = RegexQuery::from_pattern("t.*", text_field).expect("regex query");
let mut group = runner.new_group();
group.set_name(format!(
"regex_all_terms_t{}_d{}_k{}",
config.num_terms, config.num_docs, config.tokens_per_doc
));
group.register("regex_count", move |_| {
let count = searcher.search(&query, &Count).expect("search");
black_box(count);
});
group.run();
}
}
fn default_configs() -> Vec<BenchConfig> {
vec![
BenchConfig {
num_terms: 10_000,
num_docs: 100_000,
tokens_per_doc: 1,
},
BenchConfig {
num_terms: 10_000,
num_docs: 100_000,
tokens_per_doc: 8,
},
BenchConfig {
num_terms: 100_000,
num_docs: 100_000,
tokens_per_doc: 1,
},
BenchConfig {
num_terms: 100_000,
num_docs: 100_000,
tokens_per_doc: 8,
},
]
}
fn build_index(config: BenchConfig, heap_size_bytes: usize) -> (Index, tantivy::schema::Field) {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let term_width = config.num_terms.to_string().len();
{
let mut writer = index
.writer_with_num_threads(1, heap_size_bytes)
.expect("writer");
let mut buffer = String::new();
for doc_id in 0..config.num_docs {
buffer.clear();
for token_idx in 0..config.tokens_per_doc {
if token_idx > 0 {
buffer.push(' ');
}
let term_id = (doc_id * config.tokens_per_doc + token_idx) % config.num_terms;
write!(&mut buffer, "t{term_id:0term_width$}").expect("write token");
}
writer
.add_document(doc!(text_field => buffer.as_str()))
.expect("add_document");
}
writer.commit().expect("commit");
}
(index, text_field)
}

View File

@@ -0,0 +1,421 @@
// This benchmark compares different approaches for retrieving string values:
//
// 1. Fast Field Approach: retrieves string values via term_ords() and ord_to_str()
//
// 2. Doc Store Approach: retrieves string values via searcher.doc() and field extraction
//
// The benchmark includes various data distributions:
// - Dense Sequential: Sequential document IDs with dense data
// - Dense Random: Random document IDs with dense data
// - Sparse Sequential: Sequential document IDs with sparse data
// - Sparse Random: Random document IDs with sparse data
use std::ops::Bound;
use binggan::{black_box, BenchGroup, BenchRunner};
use rand::prelude::*;
use rand::rngs::StdRng;
use rand::SeedableRng;
use tantivy::collector::{Count, DocSetCollector};
use tantivy::query::RangeQuery;
use tantivy::schema::document::TantivyDocument;
use tantivy::schema::{Schema, Value, FAST, STORED, STRING};
use tantivy::{doc, Index, ReloadPolicy, Searcher, Term};
#[derive(Clone)]
struct BenchIndex {
#[allow(dead_code)]
index: Index,
searcher: Searcher,
}
fn build_shared_indices(num_docs: usize, distribution: &str) -> BenchIndex {
// Schema with string fast field and stored field for doc access
let mut schema_builder = Schema::builder();
let f_str_fast = schema_builder.add_text_field("str_fast", STRING | STORED | FAST);
let f_str_stored = schema_builder.add_text_field("str_stored", STRING | STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
// Populate index with stable RNG for reproducibility.
let mut rng = StdRng::from_seed([7u8; 32]);
{
let mut writer = index.writer_with_num_threads(1, 4_000_000_000).unwrap();
match distribution {
"dense_random" => {
for _doc_id in 0..num_docs {
let suffix = rng.gen_range(0u64..1000u64);
let str_val = format!("str_{:03}", suffix);
writer
.add_document(doc!(
f_str_fast=>str_val.clone(),
f_str_stored=>str_val,
))
.unwrap();
}
}
"dense_sequential" => {
for doc_id in 0..num_docs {
let suffix = doc_id as u64 % 1000;
let str_val = format!("str_{:03}", suffix);
writer
.add_document(doc!(
f_str_fast=>str_val.clone(),
f_str_stored=>str_val,
))
.unwrap();
}
}
"sparse_random" => {
for _doc_id in 0..num_docs {
let suffix = rng.gen_range(0u64..1000000u64);
let str_val = format!("str_{:07}", suffix);
writer
.add_document(doc!(
f_str_fast=>str_val.clone(),
f_str_stored=>str_val,
))
.unwrap();
}
}
"sparse_sequential" => {
for doc_id in 0..num_docs {
let suffix = doc_id as u64;
let str_val = format!("str_{:07}", suffix);
writer
.add_document(doc!(
f_str_fast=>str_val.clone(),
f_str_stored=>str_val,
))
.unwrap();
}
}
_ => {
panic!("Unsupported distribution type");
}
}
writer.commit().unwrap();
}
// Prepare reader/searcher once.
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.unwrap();
let searcher = reader.searcher();
BenchIndex { index, searcher }
}
fn main() {
// Prepare corpora with varying scenarios
let scenarios = vec![
(
"dense_random_search_low_range".to_string(),
1_000_000,
"dense_random",
0,
9,
),
(
"dense_random_search_high_range".to_string(),
1_000_000,
"dense_random",
990,
999,
),
(
"dense_sequential_search_low_range".to_string(),
1_000_000,
"dense_sequential",
0,
9,
),
(
"dense_sequential_search_high_range".to_string(),
1_000_000,
"dense_sequential",
990,
999,
),
(
"sparse_random_search_low_range".to_string(),
1_000_000,
"sparse_random",
0,
9999,
),
(
"sparse_random_search_high_range".to_string(),
1_000_000,
"sparse_random",
990_000,
999_999,
),
(
"sparse_sequential_search_low_range".to_string(),
1_000_000,
"sparse_sequential",
0,
9999,
),
(
"sparse_sequential_search_high_range".to_string(),
1_000_000,
"sparse_sequential",
990_000,
999_999,
),
];
let mut runner = BenchRunner::new();
for (scenario_id, n, distribution, range_low, range_high) in scenarios {
let bench_index = build_shared_indices(n, distribution);
let mut group = runner.new_group();
group.set_name(scenario_id);
let field = bench_index.searcher.schema().get_field("str_fast").unwrap();
let (lower_str, upper_str) =
if distribution == "dense_sequential" || distribution == "dense_random" {
(
format!("str_{:03}", range_low),
format!("str_{:03}", range_high),
)
} else {
(
format!("str_{:07}", range_low),
format!("str_{:07}", range_high),
)
};
let lower_term = Term::from_field_text(field, &lower_str);
let upper_term = Term::from_field_text(field, &upper_str);
let query = RangeQuery::new(Bound::Included(lower_term), Bound::Included(upper_term));
run_benchmark_tasks(&mut group, &bench_index, query, range_low, range_high);
group.run();
}
}
/// Run all benchmark tasks for a given range query
fn run_benchmark_tasks(
bench_group: &mut BenchGroup,
bench_index: &BenchIndex,
query: RangeQuery,
range_low: u64,
range_high: u64,
) {
// Test count of matching documents
add_bench_task_count(
bench_group,
bench_index,
query.clone(),
range_low,
range_high,
);
// Test fetching all DocIds of matching documents
add_bench_task_docset(
bench_group,
bench_index,
query.clone(),
range_low,
range_high,
);
// Test fetching all string fast field values of matching documents
add_bench_task_fetch_all_strings(
bench_group,
bench_index,
query.clone(),
range_low,
range_high,
);
// Test fetching all string values of matching documents through doc() method
add_bench_task_fetch_all_strings_from_doc(
bench_group,
bench_index,
query,
range_low,
range_high,
);
}
fn add_bench_task_count(
bench_group: &mut BenchGroup,
bench_index: &BenchIndex,
query: RangeQuery,
range_low: u64,
range_high: u64,
) {
let task_name = format!("string_search_count_[{}-{}]", range_low, range_high);
let search_task = CountSearchTask {
searcher: bench_index.searcher.clone(),
query,
};
bench_group.register(task_name, move |_| black_box(search_task.run()));
}
fn add_bench_task_docset(
bench_group: &mut BenchGroup,
bench_index: &BenchIndex,
query: RangeQuery,
range_low: u64,
range_high: u64,
) {
let task_name = format!("string_fetch_all_docset_[{}-{}]", range_low, range_high);
let search_task = DocSetSearchTask {
searcher: bench_index.searcher.clone(),
query,
};
bench_group.register(task_name, move |_| black_box(search_task.run()));
}
fn add_bench_task_fetch_all_strings(
bench_group: &mut BenchGroup,
bench_index: &BenchIndex,
query: RangeQuery,
range_low: u64,
range_high: u64,
) {
let task_name = format!(
"string_fastfield_fetch_all_strings_[{}-{}]",
range_low, range_high
);
let search_task = FetchAllStringsSearchTask {
searcher: bench_index.searcher.clone(),
query,
};
bench_group.register(task_name, move |_| {
let result = black_box(search_task.run());
result.len()
});
}
fn add_bench_task_fetch_all_strings_from_doc(
bench_group: &mut BenchGroup,
bench_index: &BenchIndex,
query: RangeQuery,
range_low: u64,
range_high: u64,
) {
let task_name = format!(
"string_doc_fetch_all_strings_[{}-{}]",
range_low, range_high
);
let search_task = FetchAllStringsFromDocTask {
searcher: bench_index.searcher.clone(),
query,
};
bench_group.register(task_name, move |_| {
let result = black_box(search_task.run());
result.len()
});
}
struct CountSearchTask {
searcher: Searcher,
query: RangeQuery,
}
impl CountSearchTask {
#[inline(never)]
pub fn run(&self) -> usize {
self.searcher.search(&self.query, &Count).unwrap()
}
}
struct DocSetSearchTask {
searcher: Searcher,
query: RangeQuery,
}
impl DocSetSearchTask {
#[inline(never)]
pub fn run(&self) -> usize {
let result = self.searcher.search(&self.query, &DocSetCollector).unwrap();
result.len()
}
}
struct FetchAllStringsSearchTask {
searcher: Searcher,
query: RangeQuery,
}
impl FetchAllStringsSearchTask {
#[inline(never)]
pub fn run(&self) -> Vec<String> {
let doc_addresses = self.searcher.search(&self.query, &DocSetCollector).unwrap();
let mut docs = doc_addresses.into_iter().collect::<Vec<_>>();
docs.sort();
let mut strings = Vec::with_capacity(docs.len());
for doc_address in docs {
let segment_reader = &self.searcher.segment_readers()[doc_address.segment_ord as usize];
let str_column_opt = segment_reader.fast_fields().str("str_fast");
if let Ok(Some(str_column)) = str_column_opt {
let doc_id = doc_address.doc_id;
let term_ord = str_column.term_ords(doc_id).next().unwrap();
let mut str_buffer = String::new();
if str_column.ord_to_str(term_ord, &mut str_buffer).is_ok() {
strings.push(str_buffer);
}
}
}
strings
}
}
struct FetchAllStringsFromDocTask {
searcher: Searcher,
query: RangeQuery,
}
impl FetchAllStringsFromDocTask {
#[inline(never)]
pub fn run(&self) -> Vec<String> {
let doc_addresses = self.searcher.search(&self.query, &DocSetCollector).unwrap();
let mut docs = doc_addresses.into_iter().collect::<Vec<_>>();
docs.sort();
let mut strings = Vec::with_capacity(docs.len());
let str_stored_field = self
.searcher
.schema()
.get_field("str_stored")
.expect("str_stored field should exist");
for doc_address in docs {
// Get the document from the doc store (row store access)
if let Ok(doc) = self.searcher.doc::<TantivyDocument>(doc_address) {
// Extract string values from the stored field
if let Some(field_value) = doc.get_first(str_stored_field) {
if let Some(text) = field_value.as_value().as_str() {
strings.push(text.to_string());
}
}
}
}
strings
}
}

View File

@@ -18,5 +18,5 @@ homepage = "https://github.com/quickwit-oss/tantivy"
bitpacking = { version = "0.9.2", default-features = false, features = ["bitpacker1x"] }
[dev-dependencies]
rand = "0.8"
rand = "0.9"
proptest = "1"

View File

@@ -4,8 +4,8 @@ extern crate test;
#[cfg(test)]
mod tests {
use rand::rng;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tantivy_bitpacker::{BitPacker, BitUnpacker, BlockedBitpacker};
use test::Bencher;
@@ -27,7 +27,7 @@ mod tests {
let num_els = 1_000_000u32;
let bit_unpacker = BitUnpacker::new(bit_width);
let data = create_bitpacked_data(bit_width, num_els);
let idxs: Vec<u32> = (0..num_els).choose_multiple(&mut thread_rng(), 100_000);
let idxs: Vec<u32> = (0..num_els).choose_multiple(&mut rng(), 100_000);
b.iter(|| {
let mut out = 0u64;
for &idx in &idxs {

View File

@@ -22,7 +22,7 @@ downcast-rs = "2.0.1"
[dev-dependencies]
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8"
rand = "0.9"
binggan = "0.14.0"
[[bench]]

View File

@@ -9,7 +9,7 @@ use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_co
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55_000_u64)
.map(|num| num + rng.r#gen::<u8>() as u64)
.map(|num| num + rng.random::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);

View File

@@ -6,7 +6,7 @@ use tantivy_columnar::column_values::{CodecType, serialize_u64_based_column_valu
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55_000_u64)
.map(|num| num + rng.r#gen::<u8>() as u64)
.map(|num| num + rng.random::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);

View File

@@ -8,7 +8,7 @@ const TOTAL_NUM_VALUES: u32 = 1_000_000;
fn gen_optional_index(fill_ratio: f64) -> OptionalIndex {
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let vals: Vec<u32> = (0..TOTAL_NUM_VALUES)
.map(|_| rng.gen_bool(fill_ratio))
.map(|_| rng.random_bool(fill_ratio))
.enumerate()
.filter(|(_pos, val)| *val)
.map(|(pos, _)| pos as u32)
@@ -25,7 +25,7 @@ fn random_range_iterator(
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let mut current = start;
std::iter::from_fn(move || {
current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
current += rng.random_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
if current >= end { None } else { Some(current) }
})
}

View File

@@ -39,7 +39,7 @@ fn get_data_50percent_item() -> Vec<u128> {
let mut data = vec![];
for _ in 0..300_000 {
let val = rng.gen_range(1..=100);
let val = rng.random_range(1..=100);
data.push(val);
}
data.push(SINGLE_ITEM);

View File

@@ -34,7 +34,7 @@ fn get_data_50percent_item() -> Vec<u128> {
let mut data = vec![];
for _ in 0..300_000 {
let val = rng.gen_range(1..=100);
let val = rng.random_range(1..=100);
data.push(val);
}
data.push(SINGLE_ITEM);

View File

@@ -268,7 +268,7 @@ mod tests {
#[test]
fn linear_interpol_fast_field_rand() {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
for _ in 0..50 {
let mut data = (0..10_000).map(|_| rng.next_u64()).collect::<Vec<_>>();
create_and_validate::<LinearCodec>(&data, "random");

View File

@@ -122,7 +122,7 @@ pub(crate) fn create_and_validate<TColumnCodec: ColumnCodec>(
assert_eq!(vals, buffer);
if !vals.is_empty() {
let test_rand_idx = rand::thread_rng().gen_range(0..=vals.len() - 1);
let test_rand_idx = rand::rng().random_range(0..=vals.len() - 1);
let expected_positions: Vec<u32> = vals
.iter()
.enumerate()

View File

@@ -21,5 +21,5 @@ serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.14.0"
proptest = "1.0.0"
rand = "0.8.4"
rand = "0.9"

View File

@@ -1,6 +1,6 @@
use binggan::{BenchRunner, black_box};
use rand::rng;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tantivy_common::{BitSet, TinySet, serialize_vint_u32};
fn bench_vint() {
@@ -17,7 +17,7 @@ fn bench_vint() {
black_box(out);
});
let vals: Vec<u32> = (0..20_000).choose_multiple(&mut thread_rng(), 100_000);
let vals: Vec<u32> = (0..20_000).choose_multiple(&mut rng(), 100_000);
runner.bench_function("bench_vint_rand", move |_| {
let mut out = 0u64;
for val in vals.iter().cloned() {

View File

@@ -416,7 +416,7 @@ mod tests {
use std::collections::HashSet;
use ownedbytes::OwnedBytes;
use rand::distributions::Bernoulli;
use rand::distr::Bernoulli;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

View File

@@ -60,7 +60,7 @@ At indexing, tantivy will try to interpret number and strings as different type
priority order.
Numbers will be interpreted as u64, i64 and f64 in that order.
Strings will be interpreted as rfc3999 dates or simple strings.
Strings will be interpreted as rfc3339 dates or simple strings.
The first working type is picked and is the only term that is emitted for indexing.
Note this interpretation happens on a per-document basis, and there is no effort to try to sniff
@@ -81,7 +81,7 @@ Will be interpreted as
(my_path.my_segment, String, 233) or (my_path.my_segment, u64, 233)
```
Likewise, we need to emit two tokens if the query contains an rfc3999 date.
Likewise, we need to emit two tokens if the query contains an rfc3339 date.
Indeed the date could have been actually a single token inside the text of a document at ingestion time. Generally speaking, we will always at least emit a string token in query parsing, and sometimes more.
If one more json field is defined, things get even more complicated.

View File

@@ -560,7 +560,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
(
(
value((), tag(">=")),
map(word_infallible("", false), |(bound, err)| {
map(word_infallible(")", false), |(bound, err)| {
(
(
bound
@@ -574,7 +574,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
),
(
value((), tag("<=")),
map(word_infallible("", false), |(bound, err)| {
map(word_infallible(")", false), |(bound, err)| {
(
(
UserInputBound::Unbounded,
@@ -588,7 +588,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
),
(
value((), tag(">")),
map(word_infallible("", false), |(bound, err)| {
map(word_infallible(")", false), |(bound, err)| {
(
(
bound
@@ -602,7 +602,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
),
(
value((), tag("<")),
map(word_infallible("", false), |(bound, err)| {
map(word_infallible(")", false), |(bound, err)| {
(
(
UserInputBound::Unbounded,
@@ -1323,6 +1323,14 @@ mod test {
test_parse_query_to_ast_helper("<a", "{\"*\" TO \"a\"}");
test_parse_query_to_ast_helper("<=a", "{\"*\" TO \"a\"]");
test_parse_query_to_ast_helper("<=bsd", "{\"*\" TO \"bsd\"]");
test_parse_query_to_ast_helper("(<=42)", "{\"*\" TO \"42\"]");
test_parse_query_to_ast_helper("(<=42 )", "{\"*\" TO \"42\"]");
test_parse_query_to_ast_helper("(age:>5)", "\"age\":{\"5\" TO \"*\"}");
test_parse_query_to_ast_helper(
"(title:bar AND age:>12)",
"(+\"title\":bar +\"age\":{\"12\" TO \"*\"})",
);
}
#[test]

View File

@@ -10,7 +10,8 @@ use serde::{Deserialize, Serialize};
use super::bucket::GetDocCount;
use super::metric::{
ExtendedStats, PercentilesMetricResult, SingleMetricResult, Stats, TopHitsMetricResult,
AverageMetricResult, CardinalityMetricResult, ExtendedStats, PercentilesMetricResult,
SingleMetricResult, Stats, TopHitsMetricResult,
};
use super::{AggregationError, Key};
use crate::TantivyError;
@@ -81,8 +82,8 @@ impl AggregationResult {
#[serde(untagged)]
/// MetricResult
pub enum MetricResult {
/// Average metric result.
Average(SingleMetricResult),
/// Average metric result with sum and count for multi-step merging.
Average(AverageMetricResult),
/// Count metric result.
Count(SingleMetricResult),
/// Max metric result.
@@ -99,8 +100,8 @@ pub enum MetricResult {
Percentiles(PercentilesMetricResult),
/// Top hits metric result
TopHits(TopHitsMetricResult),
/// Cardinality metric result
Cardinality(SingleMetricResult),
/// Cardinality metric result with HLL sketch for multi-step merging.
Cardinality(CardinalityMetricResult),
}
impl MetricResult {
@@ -119,7 +120,7 @@ impl MetricResult {
MetricResult::TopHits(_) => Err(TantivyError::AggregationError(
AggregationError::InvalidRequest("top_hits can't be used to order".to_string()),
)),
MetricResult::Cardinality(card) => Ok(card.value),
MetricResult::Cardinality(card) => Ok(card.value), // CardinalityMetricResult.value
}
}
}

View File

@@ -1359,10 +1359,10 @@ fn test_aggregation_on_json_object_mixed_types() {
&serde_json::json!({
"rangeagg": {
"buckets": [
{ "average_in_range": { "value": -20.5 }, "doc_count": 1, "key": "*-3", "to": 3.0 },
{ "average_in_range": { "value": 10.0 }, "doc_count": 1, "from": 3.0, "key": "3-19", "to": 19.0 },
{ "average_in_range": { "value": null }, "doc_count": 0, "from": 19.0, "key": "19-20", "to": 20.0 },
{ "average_in_range": { "value": null }, "doc_count": 0, "from": 20.0, "key": "20-*" }
{ "average_in_range": { "value": -20.5, "sum": -20.5, "count": 1 }, "doc_count": 1, "key": "*-3", "to": 3.0 },
{ "average_in_range": { "value": 10.0, "sum": 10.0, "count": 1 }, "doc_count": 1, "from": 3.0, "key": "3-19", "to": 19.0 },
{ "average_in_range": { "value": null, "sum": 0.0, "count": 0 }, "doc_count": 0, "from": 19.0, "key": "19-20", "to": 20.0 },
{ "average_in_range": { "value": null, "sum": 0.0, "count": 0 }, "doc_count": 0, "from": 20.0, "key": "20-*" }
]
},
"termagg": {

View File

@@ -838,7 +838,7 @@ mod tests {
let expected = json!({
"electronics": {
"doc_count": 2,
"avg_price": { "value": 899.0 } // (999 + 799) / 2
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 } // (999 + 799) / 2
}
});
@@ -868,7 +868,7 @@ mod tests {
let expected = json!({
"furniture": {
"doc_count": 0,
"avg_price": { "value": null }
"avg_price": { "value": null, "sum": 0.0, "count": 0 }
}
});
@@ -904,7 +904,7 @@ mod tests {
let expected = json!({
"electronics": {
"doc_count": 2,
"avg_price": { "value": 899.0 }
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 }
},
"in_stock": {
"doc_count": 3, // apple, samsung, penguin
@@ -1000,7 +1000,7 @@ mod tests {
let expected = json!({
"premium_electronics": {
"doc_count": 1, // Only apple (999) is >= 800 in tantivy's range semantics
"avg_rating": { "value": 4.5 }
"avg_rating": { "value": 4.5, "sum": 4.5, "count": 1 }
}
});
@@ -1032,7 +1032,7 @@ mod tests {
let expected = json!({
"in_stock": {
"doc_count": 3, // apple, samsung, penguin
"avg_price": { "value": 607.67 } // (999 + 799 + 25) / 3 ≈ 607.67
"avg_price": { "value": 607.67, "sum": 1823.0, "count": 3 } // (999 + 799 + 25) / 3 ≈ 607.67
},
"out_of_stock": {
"doc_count": 1, // nike
@@ -1183,7 +1183,7 @@ mod tests {
"doc_count": 4,
"electronics_branch": {
"doc_count": 2,
"avg_price": { "value": 899.0 }
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 }
},
"in_stock_branch": {
"doc_count": 3,
@@ -1259,7 +1259,7 @@ mod tests {
"doc_count": 2, // apple (999), samsung (799)
"electronics": {
"doc_count": 2, // both are electronics
"avg_rating": { "value": 4.35 } // (4.5 + 4.2) / 2
"avg_rating": { "value": 4.35, "sum": 8.7, "count": 2 } // (4.5 + 4.2) / 2
},
"in_stock": {
"doc_count": 2, // both are in stock
@@ -1321,12 +1321,12 @@ mod tests {
{
"key": "samsung",
"doc_count": 1,
"avg_price": { "value": 799.0 }
"avg_price": { "value": 799.0, "sum": 799.0, "count": 1 }
},
{
"key": "apple",
"doc_count": 1,
"avg_price": { "value": 999.0 }
"avg_price": { "value": 999.0, "sum": 999.0, "count": 1 }
}
],
"sum_other_doc_count": 0,
@@ -1370,7 +1370,7 @@ mod tests {
"sum": 1798.0,
"avg": 899.0
},
"rating_avg": { "value": 4.35 },
"rating_avg": { "value": 4.35, "sum": 8.7, "count": 2 },
"count": { "value": 2.0 }
}
});
@@ -1411,7 +1411,7 @@ mod tests {
let expected = json!({
"electronics": {
"doc_count": 0,
"avg_price": { "value": null }
"avg_price": { "value": null, "sum": 0.0, "count": 0 }
}
});
@@ -1698,13 +1698,15 @@ mod tests {
let filter_expected = json!({
"electronics": {
"doc_count": 2,
"avg_price": { "value": 899.0 }
"avg_price": { "value": 899.0, "sum": 1798.0, "count": 2 }
}
});
let separate_expected = json!({
"result": {
"value": 899.0
"value": 899.0,
"sum": 1798.0,
"count": 2
}
});

View File

@@ -1222,7 +1222,9 @@ mod tests {
res["histogram"]["buckets"][0],
json!({
"avg": {
"value": Value::Null
"value": Value::Null,
"sum": 0.0,
"count": 0
},
"doc_count": 0,
"key": 2.0,

View File

@@ -19,8 +19,9 @@ use super::bucket::{
GetDocCount, Order, OrderTarget, RangeAggregation, TermsAggregation,
};
use super::metric::{
IntermediateAverage, IntermediateCount, IntermediateExtendedStats, IntermediateMax,
IntermediateMin, IntermediateStats, IntermediateSum, PercentilesCollector, TopHitsTopNComputer,
AverageMetricResult, CardinalityMetricResult, IntermediateAverage, IntermediateCount,
IntermediateExtendedStats, IntermediateMax, IntermediateMin, IntermediateStats,
IntermediateSum, PercentilesCollector, TopHitsTopNComputer,
};
use super::segment_agg_result::AggregationLimitsGuard;
use super::{format_date, AggregationError, Key, SerializedKey};
@@ -325,7 +326,11 @@ impl IntermediateMetricResult {
fn into_final_metric_result(self, req: &Aggregation) -> MetricResult {
match self {
IntermediateMetricResult::Average(intermediate_avg) => {
MetricResult::Average(intermediate_avg.finalize().into())
MetricResult::Average(AverageMetricResult {
value: intermediate_avg.finalize(),
sum: intermediate_avg.sum(),
count: intermediate_avg.count(),
})
}
IntermediateMetricResult::Count(intermediate_count) => {
MetricResult::Count(intermediate_count.finalize().into())
@@ -353,7 +358,11 @@ impl IntermediateMetricResult {
MetricResult::TopHits(top_hits.into_final_result())
}
IntermediateMetricResult::Cardinality(cardinality) => {
MetricResult::Cardinality(cardinality.finalize().into())
let value = cardinality.finalize();
MetricResult::Cardinality(CardinalityMetricResult {
value,
sketch: Some(cardinality),
})
}
}
}
@@ -820,7 +829,7 @@ impl IntermediateRangeBucketEntry {
};
// If we have a date type on the histogram buckets, we add the `key_as_string` field as
// rfc339
// rfc3339
if column_type == Some(ColumnType::DateTime) {
if let Some(val) = range_bucket_entry.to {
let key_as_string = format_date(val as i64)?;

View File

@@ -63,6 +63,16 @@ impl IntermediateAverage {
pub fn finalize(&self) -> Option<f64> {
self.stats.finalize().avg
}
/// Returns the sum of all collected values.
pub fn sum(&self) -> f64 {
self.stats.sum
}
/// Returns the count of all collected values.
pub fn count(&self) -> u64 {
self.stats.count
}
}
#[cfg(test)]

View File

@@ -340,7 +340,7 @@ impl PartialEq for CardinalityCollector {
impl CardinalityCollector {
/// Compute the final cardinality estimate.
pub fn finalize(self) -> Option<f64> {
pub fn finalize(&self) -> Option<f64> {
Some(self.sketch.clone().count().trunc())
}

View File

@@ -93,6 +93,41 @@ impl From<Option<f64>> for SingleMetricResult {
}
}
/// Average metric result with intermediate data for merging.
///
/// Unlike [`SingleMetricResult`], this struct includes the raw `sum` and `count`
/// values that can be used for multi-step query merging.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct AverageMetricResult {
/// The computed average value. None if no documents matched.
pub value: Option<f64>,
/// The sum of all values (for multi-step merging).
pub sum: f64,
/// The count of all values (for multi-step merging).
pub count: u64,
}
/// Cardinality metric result with computed value and raw HLL sketch for multi-step merging.
///
/// The `value` field contains the computed cardinality estimate.
/// The `sketch` field contains the serialized HyperLogLog++ sketch that can be used
/// for merging results across multiple query steps.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CardinalityMetricResult {
/// The computed cardinality estimate.
pub value: Option<f64>,
/// The serialized HyperLogLog++ sketch for multi-step merging.
#[serde(skip_serializing_if = "Option::is_none")]
pub sketch: Option<CardinalityCollector>,
}
impl PartialEq for CardinalityMetricResult {
fn eq(&self, other: &Self) -> bool {
// Only compare values, not sketch (sketch comparison is complex)
self.value == other.value
}
}
/// This is the wrapper of percentile entries, which can be vector or hashmap
/// depending on if it's keyed or not.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -111,13 +146,26 @@ pub struct PercentileValuesVecEntry {
value: f64,
}
/// Single-metric aggregations use this common result structure.
/// Percentiles metric result with computed values and raw sketch for multi-step merging.
///
/// Main reason to wrap it in value is to match elasticsearch output structure.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// The `values` field contains the computed percentile values.
/// The `sketch` field contains the serialized DDSketch that can be used for merging
/// results across multiple query steps.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PercentilesMetricResult {
/// The result of the percentile metric.
/// The computed percentile values.
pub values: PercentileValues,
/// The serialized DDSketch for multi-step merging.
/// This is the raw sketch data that can be deserialized and merged with other sketches.
#[serde(skip_serializing_if = "Option::is_none")]
pub sketch: Option<PercentilesCollector>,
}
impl PartialEq for PercentilesMetricResult {
fn eq(&self, other: &Self) -> bool {
// Only compare values, not sketch (sketch comparison is complex)
self.values == other.values
}
}
/// The top_hits metric results entry
@@ -198,4 +246,105 @@ mod tests {
assert_eq!(aggregations_res_json["price_min"]["value"], 0.0);
assert_eq!(aggregations_res_json["price_sum"]["value"], 15.0);
}
#[test]
fn test_average_returns_sum_and_count() {
let mut schema_builder = Schema::builder();
let field_options = NumericOptions::default().set_fast();
let field = schema_builder.add_f64_field("price", field_options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// Add documents with values 0, 1, 2, 3, 4, 5
// sum = 15, count = 6, avg = 2.5
for i in 0..6 {
index_writer
.add_document(doc!(
field => i as f64,
))
.unwrap();
}
index_writer.commit().unwrap();
let aggregations_json = r#"{ "price_avg": { "avg": { "field": "price" } } }"#;
let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
let collector = AggregationCollector::from_aggs(aggregations, Default::default());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let aggregations_res_json = serde_json::to_value(aggregations_res).unwrap();
// Verify all three fields are present and correct
assert_eq!(aggregations_res_json["price_avg"]["value"], 2.5);
assert_eq!(aggregations_res_json["price_avg"]["sum"], 15.0);
assert_eq!(aggregations_res_json["price_avg"]["count"], 6);
}
#[test]
fn test_percentiles_returns_sketch() {
let mut schema_builder = Schema::builder();
let field_options = NumericOptions::default().set_fast();
let field = schema_builder.add_f64_field("latency", field_options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// Add documents with latency values
for i in 0..100 {
index_writer
.add_document(doc!(
field => i as f64,
))
.unwrap();
}
index_writer.commit().unwrap();
let aggregations_json =
r#"{ "latency_percentiles": { "percentiles": { "field": "latency" } } }"#;
let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
let collector = AggregationCollector::from_aggs(aggregations, Default::default());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let aggregations_res_json = serde_json::to_value(aggregations_res).unwrap();
// Verify percentile values are present
assert!(aggregations_res_json["latency_percentiles"]["values"].is_object());
// Verify sketch is present (serialized DDSketch)
assert!(aggregations_res_json["latency_percentiles"]["sketch"].is_object());
}
#[test]
fn test_cardinality_returns_sketch() {
let mut schema_builder = Schema::builder();
let field_options = NumericOptions::default().set_fast();
let field = schema_builder.add_u64_field("user_id", field_options);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
// Add documents with some duplicate user_ids
for i in 0..50 {
index_writer
.add_document(doc!(
field => (i % 10) as u64, // 10 unique values
))
.unwrap();
}
index_writer.commit().unwrap();
let aggregations_json = r#"{ "unique_users": { "cardinality": { "field": "user_id" } } }"#;
let aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
let collector = AggregationCollector::from_aggs(aggregations, Default::default());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let aggregations_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap();
let aggregations_res_json = serde_json::to_value(aggregations_res).unwrap();
// Verify cardinality value is present and approximately correct
let cardinality = aggregations_res_json["unique_users"]["value"]
.as_f64()
.unwrap();
assert!(cardinality >= 9.0 && cardinality <= 11.0); // HLL is approximate
// Verify sketch is present (serialized HyperLogLog++)
assert!(aggregations_res_json["unique_users"]["sketch"].is_object());
}
}

View File

@@ -178,6 +178,9 @@ fn format_percentile(percentile: f64) -> String {
impl PercentilesCollector {
/// Convert result into final result. This will query the quantils from the underlying quantil
/// collector.
///
/// The result includes both the computed percentile values and the raw DDSketch
/// for multi-step query merging.
pub fn into_final_result(self, req: &PercentilesAggregationReq) -> PercentilesMetricResult {
let percentiles: &[f64] = req
.percents
@@ -210,7 +213,15 @@ impl PercentilesCollector {
.collect(),
)
};
PercentilesMetricResult { values }
PercentilesMetricResult {
values,
sketch: Some(self),
}
}
/// Returns a reference to the underlying DDSketch.
pub fn sketch(&self) -> &sketches_ddsketch::DDSketch {
&self.sketch
}
fn new() -> Self {

View File

@@ -486,9 +486,9 @@ mod tests {
use std::collections::BTreeSet;
use columnar::Dictionary;
use rand::distributions::Uniform;
use rand::distr::Uniform;
use rand::prelude::SliceRandom;
use rand::{thread_rng, Rng};
use rand::{rng, Rng};
use super::{FacetCollector, FacetCounts};
use crate::collector::facet_collector::compress_mapping;
@@ -731,7 +731,7 @@ mod tests {
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let uniform = Uniform::new_inclusive(1, 100_000);
let uniform = Uniform::new_inclusive(1, 100_000).unwrap();
let mut docs: Vec<TantivyDocument> =
vec![("a", 10), ("b", 100), ("c", 7), ("d", 12), ("e", 21)]
.into_iter()
@@ -741,14 +741,11 @@ mod tests {
std::iter::repeat_n(doc, count)
})
.map(|mut doc| {
doc.add_facet(
facet_field,
&format!("/facet/{}", thread_rng().sample(uniform)),
);
doc.add_facet(facet_field, &format!("/facet/{}", rng().sample(uniform)));
doc
})
.collect();
docs[..].shuffle(&mut thread_rng());
docs[..].shuffle(&mut rng());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
for doc in docs {
@@ -822,8 +819,8 @@ mod tests {
#[cfg(all(test, feature = "unstable"))]
mod bench {
use rand::rng;
use rand::seq::SliceRandom;
use rand::thread_rng;
use test::Bencher;
use crate::collector::FacetCollector;
@@ -846,7 +843,7 @@ mod bench {
}
}
// 40425 docs
docs[..].shuffle(&mut thread_rng());
docs[..].shuffle(&mut rng());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
for doc in docs {

View File

@@ -160,7 +160,7 @@ mod tests {
expected: &[(crate::Score, usize)],
) {
let mut vals: Vec<(crate::Score, usize)> = (0..10).map(|val| (val as f32, val)).collect();
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
let vals_merged = merge_top_k(vals.into_iter(), doc_range, ComparatorEnum::from(order));
assert_eq!(&vals_merged, expected);
}

View File

@@ -676,7 +676,7 @@ mod tests {
let num_segments = reader.searcher().segment_readers().len();
assert!(num_segments <= 4);
let num_components_except_deletes_and_tempstore =
crate::index::SegmentComponent::iterator().len() - 2;
crate::index::SegmentComponent::iterator().len() - 1;
let max_num_mmapped = num_components_except_deletes_and_tempstore * num_segments;
assert_eventually(|| {
let num_mmapped = mmap_directory.get_cache_info().mmapped.len();

View File

@@ -51,31 +51,55 @@ pub trait DocSet: Send {
doc
}
/// Seeks to the target if possible and returns true if the target is in the DocSet.
/// !!!Dragons ahead!!!
/// In spirit, this is an approximate and dangerous version of `seek`.
///
/// It can leave the DocSet in an `invalid` state and might return a
/// lower bound of what the result of Seek would have been.
///
///
/// More accurately it returns either:
/// - Found if the target is in the docset. In that case, the DocSet is left in a valid state.
/// - SeekLowerBound(seek_lower_bound) if the target is not in the docset. In that case, The
/// DocSet can be the left in a invalid state. The DocSet should then only receives call to
/// `seek_danger(..)` until it returns `Found`, and get back to a valid state.
///
/// `seek_lower_bound` can be any `DocId` (in the docset or not) as long as it is in
/// `(target .. seek_result] U {TERMINATED}` where `seek_result` is the first document in the
/// docset greater than to `target`.
///
/// `seek_danger` may return `SeekLowerBound(TERMINATED)`.
///
/// Calling `seek_danger` with TERMINATED as a target is allowed,
/// and should always return NewTarget(TERMINATED) or anything larger as TERMINATED is NOT in
/// the DocSet.
///
/// DocSets that already have an efficient `seek` method don't need to implement
/// `seek_into_the_danger_zone`. All wrapper DocSets should forward
/// `seek_into_the_danger_zone` to the underlying DocSet.
/// `seek_danger`.
///
/// ## API Behaviour
/// If `seek_into_the_danger_zone` is returning true, a call to `doc()` has to return target.
/// If `seek_into_the_danger_zone` is returning false, a call to `doc()` may return any doc
/// between the last doc that matched and target or a doc that is a valid next hit after
/// target. The DocSet is considered to be in an invalid state until
/// `seek_into_the_danger_zone` returns true again.
///
/// `target` needs to be equal or larger than `doc` when in a valid state.
///
/// Consecutive calls are not allowed to have decreasing `target` values.
///
/// # Warning
/// This is an advanced API used by intersection. The API contract is tricky, avoid using it.
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
let current_doc = self.doc();
if current_doc < target {
self.seek(target);
/// Consecutive calls to seek_danger are guaranteed to have strictly increasing `target`
/// values.
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
if target >= TERMINATED {
debug_assert!(target == TERMINATED);
// No need to advance.
return SeekDangerResult::SeekLowerBound(target);
}
// The default implementation does not include any
// `danger zone` behavior.
//
// It does not leave the scorer in an invalid state.
// For this reason, we can safely call `self.doc()`.
let mut doc = self.doc();
if doc < target {
doc = self.seek(target);
}
if doc == target {
SeekDangerResult::Found
} else {
SeekDangerResult::SeekLowerBound(doc)
}
self.doc() == target
}
/// Fills a given mutable buffer with the next doc ids from the
@@ -166,6 +190,17 @@ pub trait DocSet: Send {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SeekDangerResult {
/// The target was found in the DocSet.
Found,
/// The target was not found in the DocSet.
/// We return a range in which the value could be.
/// The given target can be any DocId, that is <= than the first document
/// in the docset after the target.
SeekLowerBound(DocId),
}
impl DocSet for &mut dyn DocSet {
fn advance(&mut self) -> u32 {
(**self).advance()
@@ -175,8 +210,8 @@ impl DocSet for &mut dyn DocSet {
(**self).seek(target)
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
(**self).seek_into_the_danger_zone(target)
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
(**self).seek_danger(target)
}
fn doc(&self) -> u32 {
@@ -211,9 +246,9 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.seek(target)
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.seek_into_the_danger_zone(target)
unboxed.seek_danger(target)
}
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {

View File

@@ -162,7 +162,7 @@ mod tests {
mod bench {
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use rand::rng;
use test::Bencher;
use super::AliveBitSet;
@@ -176,7 +176,7 @@ mod bench {
}
fn remove_rand(raw: &mut Vec<u32>) {
let i = (0..raw.len()).choose(&mut thread_rng()).unwrap();
let i = (0..raw.len()).choose(&mut rng()).unwrap();
raw.remove(i);
}

View File

@@ -879,7 +879,7 @@ mod tests {
const ONE_HOUR_IN_MICROSECS: i64 = 3_600 * 1_000_000;
let times: Vec<DateTime> = std::iter::repeat_with(|| {
// +- One hour.
let t = T0 + rng.gen_range(-ONE_HOUR_IN_MICROSECS..ONE_HOUR_IN_MICROSECS);
let t = T0 + rng.random_range(-ONE_HOUR_IN_MICROSECS..ONE_HOUR_IN_MICROSECS);
DateTime::from_timestamp_micros(t)
})
.take(1_000)

View File

@@ -1,6 +1,6 @@
use std::collections::HashSet;
use rand::{thread_rng, Rng};
use rand::{rng, Rng};
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::*;
@@ -29,7 +29,7 @@ fn test_functional_store() -> crate::Result<()> {
let index = Index::create_in_ram(schema);
let reader = index.reader()?;
let mut rng = thread_rng();
let mut rng = rng();
let mut index_writer: IndexWriter =
index.writer_with_num_threads(3, 3 * MEMORY_BUDGET_NUM_BYTES_MIN)?;
@@ -38,9 +38,9 @@ fn test_functional_store() -> crate::Result<()> {
let mut doc_id = 0u64;
for _iteration in 0..get_num_iterations() {
let num_docs: usize = rng.gen_range(0..4);
let num_docs: usize = rng.random_range(0..4);
if !doc_set.is_empty() {
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
let doc_to_remove_id = rng.random_range(0..doc_set.len());
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
}
@@ -70,10 +70,10 @@ const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit
cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat \
non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
fn get_text() -> String {
use rand::seq::SliceRandom;
let mut rng = thread_rng();
use rand::seq::IndexedRandom;
let mut rng = rng();
let tokens: Vec<_> = LOREM.split(' ').collect();
let random_val = rng.gen_range(0..20);
let random_val = rng.random_range(0..20);
(0..random_val)
.map(|_| tokens.choose(&mut rng).unwrap())
@@ -101,7 +101,7 @@ fn test_functional_indexing_unsorted() -> crate::Result<()> {
let index = Index::create_from_tempdir(schema)?;
let reader = index.reader()?;
let mut rng = thread_rng();
let mut rng = rng();
let mut index_writer: IndexWriter =
index.writer_with_num_threads(3, 3 * MEMORY_BUDGET_NUM_BYTES_MIN)?;
@@ -110,7 +110,7 @@ fn test_functional_indexing_unsorted() -> crate::Result<()> {
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
for _ in 0..get_num_iterations() {
let random_val = rng.gen_range(0..20);
let random_val = rng.random_range(0..20);
if random_val == 0 {
index_writer.commit()?;
committed_docs.extend(&uncommitted_docs);

View File

@@ -1,8 +1,6 @@
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
@@ -37,7 +35,6 @@ impl SegmentMetaInventory {
let inner = InnerSegmentMeta {
segment_id,
max_doc,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
deletes: None,
};
SegmentMeta::from(self.inventory.track(inner))
@@ -85,15 +82,6 @@ impl SegmentMeta {
self.tracked.segment_id
}
/// Removes the Component::TempStore from the alive list and
/// therefore marks the temp docstore file to be deleted by
/// the garbage collection.
pub fn untrack_temp_docstore(&self) {
self.tracked
.include_temp_doc_store
.store(false, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.tracked
@@ -111,20 +99,9 @@ impl SegmentMeta {
/// is by removing all files that have been created by tantivy
/// and are not used by any segment anymore.
pub fn list_files(&self) -> HashSet<PathBuf> {
if self
.tracked
.include_temp_doc_store
.load(std::sync::atomic::Ordering::Relaxed)
{
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
} else {
SegmentComponent::iterator()
.filter(|comp| *comp != &SegmentComponent::TempStore)
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
}
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
}
/// Returns the relative path of a component of our segment.
@@ -138,7 +115,6 @@ impl SegmentMeta {
SegmentComponent::Positions => ".pos".to_string(),
SegmentComponent::Terms => ".term".to_string(),
SegmentComponent::Store => ".store".to_string(),
SegmentComponent::TempStore => ".store.temp".to_string(),
SegmentComponent::FastFields => ".fast".to_string(),
SegmentComponent::FieldNorms => ".fieldnorm".to_string(),
SegmentComponent::Delete => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
@@ -183,7 +159,6 @@ impl SegmentMeta {
segment_id: inner_meta.segment_id,
max_doc,
deletes: None,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
});
SegmentMeta { tracked }
}
@@ -202,7 +177,6 @@ impl SegmentMeta {
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
@@ -214,14 +188,6 @@ struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
pub deletes: Option<DeleteMeta>,
/// If you want to avoid the SegmentComponent::TempStore file to be covered by
/// garbage collection and deleted, set this to true. This is used during merge.
#[serde(skip)]
#[serde(default = "default_temp_store")]
pub(crate) include_temp_doc_store: Arc<AtomicBool>,
}
fn default_temp_store() -> Arc<AtomicBool> {
Arc::new(AtomicBool::new(false))
}
impl InnerSegmentMeta {

View File

@@ -23,8 +23,6 @@ pub enum SegmentComponent {
/// Accessing a document from the store is relatively slow, as it
/// requires to decompress the entire block it belongs to.
Store,
/// Temporary storage of the documents, before streamed to `Store`.
TempStore,
/// Bitset describing which document of the segment is alive.
/// (It was representing deleted docs but changed to represent alive docs from v0.17)
Delete,
@@ -33,14 +31,13 @@ pub enum SegmentComponent {
impl SegmentComponent {
/// Iterates through the components.
pub fn iterator() -> slice::Iter<'static, SegmentComponent> {
static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [
static SEGMENT_COMPONENTS: [SegmentComponent; 7] = [
SegmentComponent::Postings,
SegmentComponent::Positions,
SegmentComponent::FastFields,
SegmentComponent::FieldNorms,
SegmentComponent::Terms,
SegmentComponent::Store,
SegmentComponent::TempStore,
SegmentComponent::Delete,
];
SEGMENT_COMPONENTS.iter()

View File

@@ -218,7 +218,7 @@ fn index_documents<D: Document>(
let alive_bitset_opt = apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
let meta = segment_with_max_doc.meta().clone();
meta.untrack_temp_docstore();
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt);
segment_updater.schedule_add_segment(segment_entry).wait()?;

View File

@@ -377,7 +377,7 @@ pub mod tests {
use common::{BinarySerializable, FixedSize};
use query_grammar::{UserInputAst, UserInputLeaf, UserInputLiteral};
use rand::distributions::{Bernoulli, Uniform};
use rand::distr::{Bernoulli, Uniform};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use time::OffsetDateTime;
@@ -428,7 +428,7 @@ pub mod tests {
pub fn generate_nonunique_unsorted(max_value: u32, n_elems: usize) -> Vec<u32> {
let seed: [u8; 32] = [1; 32];
StdRng::from_seed(seed)
.sample_iter(&Uniform::new(0u32, max_value))
.sample_iter(&Uniform::new(0u32, max_value).unwrap())
.take(n_elems)
.collect::<Vec<u32>>()
}

View File

@@ -303,10 +303,10 @@ impl BlockSegmentPostings {
}
pub(crate) fn load_block(&mut self) {
let offset = self.skip_reader.byte_offset();
if self.block_is_loaded() {
return;
}
let offset = self.skip_reader.byte_offset();
match self.skip_reader.block_info() {
BlockInfo::BitPacked {
doc_num_bits,

View File

@@ -397,7 +397,10 @@ mod bench {
let mut seed: [u8; 32] = [0; 32];
seed[31] = seed_val;
let mut rng = StdRng::from_seed(seed);
(0u32..).filter(|_| rng.gen_bool(ratio)).take(n).collect()
(0u32..)
.filter(|_| rng.random_bool(ratio))
.take(n)
.collect()
}
pub fn generate_array(n: usize, ratio: f64) -> Vec<u32> {

View File

@@ -604,13 +604,13 @@ mod bench {
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = TantivyDocument::default();
if rng.gen_bool(1f64 / 15f64) {
if rng.random_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
if rng.random_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
if rng.random_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");

View File

@@ -70,13 +70,13 @@ impl SegmentPostings {
let mut buffer = Vec::new();
{
let mut postings_serializer =
PostingsSerializer::new(&mut buffer, 0.0, IndexRecordOption::Basic, None);
PostingsSerializer::new(0.0, IndexRecordOption::Basic, None);
postings_serializer.new_term(docs.len() as u32, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32);
}
postings_serializer
.close_term(docs.len() as u32)
.close_term(docs.len() as u32, &mut buffer)
.expect("In memory Serialization should never fail.");
}
let block_segment_postings = BlockSegmentPostings::open(
@@ -115,7 +115,6 @@ impl SegmentPostings {
})
.unwrap_or(0.0);
let mut postings_serializer = PostingsSerializer::new(
&mut buffer,
average_field_norm,
IndexRecordOption::WithFreqs,
fieldnorm_reader,
@@ -125,7 +124,7 @@ impl SegmentPostings {
postings_serializer.write_doc(doc, tf);
}
postings_serializer
.close_term(doc_and_tfs.len() as u32)
.close_term(doc_and_tfs.len() as u32, &mut buffer)
.unwrap();
let block_segment_postings = BlockSegmentPostings::open(
doc_and_tfs.len() as u32,
@@ -169,12 +168,20 @@ impl DocSet for SegmentPostings {
self.doc()
}
#[inline]
fn seek(&mut self, target: DocId) -> DocId {
debug_assert!(self.doc() <= target);
if self.doc() >= target {
return self.doc();
}
// As an optimization, if the block is already loaded, we can
// cheaply check the next doc.
self.cur = (self.cur + 1).min(COMPRESSION_BLOCK_SIZE - 1);
if self.doc() >= target {
return self.doc();
}
// Delegate block-local search to BlockSegmentPostings::seek, which returns
// the in-block index of the first doc >= target.
self.cur = self.block_cursor.seek(target);

View File

@@ -104,10 +104,12 @@ impl InvertedIndexSerializer {
/// the serialization of a specific field.
pub struct FieldSerializer<'a> {
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer,
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
current_term_info: TermInfo,
term_open: bool,
postings_write: &'a mut CountingWriter<WritePtr>,
postings_start_offset: u64,
}
impl<'a> FieldSerializer<'a> {
@@ -128,27 +130,30 @@ impl<'a> FieldSerializer<'a> {
.as_ref()
.map(|ff_reader| total_num_tokens as Score / ff_reader.num_docs() as Score)
.unwrap_or(0.0);
let postings_serializer = PostingsSerializer::new(
postings_write,
average_fieldnorm,
index_record_option,
fieldnorm_reader,
);
let postings_serializer =
PostingsSerializer::new(average_fieldnorm, index_record_option, fieldnorm_reader);
let positions_serializer_opt = if index_record_option.has_positions() {
Some(PositionSerializer::new(positions_write))
} else {
None
};
let postings_start_offset = postings_write.written_bytes();
Ok(FieldSerializer {
term_dictionary_builder,
postings_serializer,
positions_serializer_opt,
current_term_info: TermInfo::default(),
term_open: false,
postings_write,
postings_start_offset,
})
}
fn postings_offset(&self) -> usize {
(self.postings_write.written_bytes() - self.postings_start_offset) as usize
}
fn current_term_info(&self) -> TermInfo {
let positions_start =
if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
@@ -156,7 +161,7 @@ impl<'a> FieldSerializer<'a> {
} else {
0u64
} as usize;
let addr = self.postings_serializer.written_bytes() as usize;
let addr = self.postings_offset();
TermInfo {
doc_freq: 0,
postings_range: addr..addr,
@@ -213,21 +218,22 @@ impl<'a> FieldSerializer<'a> {
crate::fail_point!("FieldSerializer::close_term", |msg: Option<String>| {
Err(io::Error::new(io::ErrorKind::Other, format!("{msg:?}")))
});
if self.term_open {
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.current_term_info.postings_range.end =
self.postings_serializer.written_bytes() as usize;
if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() {
positions_serializer.close_term()?;
self.current_term_info.positions_range.end =
positions_serializer.written_bytes() as usize;
}
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false;
if !self.term_open {
return Ok(());
};
self.postings_serializer
.close_term(self.current_term_info.doc_freq, self.postings_write)?;
self.current_term_info.postings_range.end = self.postings_offset();
if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() {
positions_serializer.close_term()?;
self.current_term_info.positions_range.end =
positions_serializer.written_bytes() as usize;
}
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false;
Ok(())
}
@@ -237,7 +243,7 @@ impl<'a> FieldSerializer<'a> {
if let Some(positions_serializer) = self.positions_serializer_opt {
positions_serializer.close()?;
}
self.postings_serializer.close()?;
self.postings_write.flush()?;
self.term_dictionary_builder.finish()?;
Ok(())
}
@@ -291,8 +297,7 @@ impl Block {
}
}
pub struct PostingsSerializer<W: Write> {
output_write: CountingWriter<W>,
pub struct PostingsSerializer {
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
@@ -310,16 +315,13 @@ pub struct PostingsSerializer<W: Write> {
term_has_freq: bool,
}
impl<W: Write> PostingsSerializer<W> {
impl PostingsSerializer {
pub fn new(
write: W,
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> PostingsSerializer<W> {
) -> PostingsSerializer {
PostingsSerializer {
output_write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
@@ -422,11 +424,11 @@ impl<W: Write> PostingsSerializer<W> {
}
}
fn close(mut self) -> io::Result<()> {
self.postings_write.flush()
}
pub fn close_term(&mut self, doc_freq: u32) -> io::Result<()> {
pub fn close_term(
&mut self,
doc_freq: u32,
output_write: &mut impl std::io::Write,
) -> io::Result<()> {
if !self.block.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
@@ -451,26 +453,16 @@ impl<W: Write> PostingsSerializer<W> {
}
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(&mut self.output_write)?;
self.output_write.write_all(skip_data)?;
VInt(skip_data.len() as u64).serialize(output_write)?;
output_write.write_all(skip_data)?;
}
self.output_write.write_all(&self.postings_write[..])?;
output_write.write_all(&self.postings_write[..])?;
self.skip_write.clear();
self.postings_write.clear();
self.bm25_weight = None;
Ok(())
}
/// Returns the number of bytes written in the postings write object
/// at this point.
/// When called before writing the postings of a term, this value is used as
/// start offset.
/// When called after writing the postings of a term, this value is used as a
/// end offset.
fn written_bytes(&self) -> u64 {
self.output_write.written_bytes()
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;

View File

@@ -291,18 +291,6 @@ impl<TScoreCombiner: ScoreCombiner> BooleanWeight<TScoreCombiner> {
}
};
let exclude_scorer_opt: Option<Box<dyn Scorer>> = if exclude_scorers.is_empty() {
None
} else {
let exclude_specialized_scorer: SpecializedScorer =
scorer_union(exclude_scorers, DoNothingCombiner::default, num_docs);
Some(into_box_scorer(
exclude_specialized_scorer,
DoNothingCombiner::default,
num_docs,
))
};
let include_scorer = match (should_scorers, must_scorers) {
(ShouldScorersCombinationMethod::Ignored, must_scorers) => {
// No SHOULD clauses (or they were absorbed into MUST).
@@ -380,16 +368,23 @@ impl<TScoreCombiner: ScoreCombiner> BooleanWeight<TScoreCombiner> {
}
}
};
if let Some(exclude_scorer) = exclude_scorer_opt {
let include_scorer_boxed =
into_box_scorer(include_scorer, &score_combiner_fn, num_docs);
Ok(SpecializedScorer::Other(Box::new(Exclude::new(
include_scorer_boxed,
exclude_scorer,
))))
} else {
Ok(include_scorer)
if exclude_scorers.is_empty() {
return Ok(include_scorer);
}
let include_scorer_boxed = into_box_scorer(include_scorer, &score_combiner_fn, num_docs);
let scorer: Box<dyn Scorer> = if exclude_scorers.len() == 1 {
let exclude_scorer = exclude_scorers.pop().unwrap();
match exclude_scorer.downcast::<TermScorer>() {
// Cast to TermScorer succeeded
Ok(exclude_scorer) => Box::new(Exclude::new(include_scorer_boxed, *exclude_scorer)),
// We get back the original Box<dyn Scorer>
Err(exclude_scorer) => Box::new(Exclude::new(include_scorer_boxed, exclude_scorer)),
}
} else {
Box::new(Exclude::new(include_scorer_boxed, exclude_scorers))
};
Ok(SpecializedScorer::Other(scorer))
}
}

View File

@@ -1,6 +1,6 @@
use std::fmt;
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
use crate::docset::{SeekDangerResult, COLLECT_BLOCK_BUFFER_LEN};
use crate::fastfield::AliveBitSet;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::{DocId, DocSet, Score, SegmentReader, Term};
@@ -104,8 +104,8 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
fn seek(&mut self, target: DocId) -> DocId {
self.underlying.seek(target)
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
self.underlying.seek_into_the_danger_zone(target)
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
self.underlying.seek_danger(target)
}
fn fill_buffer(&mut self, buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN]) -> usize {

View File

@@ -1,6 +1,7 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use crate::docset::SeekDangerResult;
use crate::query::score_combiner::DoNothingCombiner;
use crate::query::{ScoreCombiner, Scorer};
use crate::{DocId, DocSet, Score, TERMINATED};
@@ -67,10 +68,12 @@ impl<T: Scorer> DocSet for ScorerWrapper<T> {
self.current_doc = doc_id;
doc_id
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
let found = self.scorer.seek_into_the_danger_zone(target);
self.current_doc = self.scorer.doc();
found
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
let result = self.scorer.seek_danger(target);
if result == SeekDangerResult::Found {
self.current_doc = target;
}
result
}
fn doc(&self) -> DocId {

View File

@@ -1,48 +1,71 @@
use crate::docset::{DocSet, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::query::Scorer;
use crate::{DocId, Score};
#[inline]
fn is_within<TDocSetExclude: DocSet>(docset: &mut TDocSetExclude, doc: DocId) -> bool {
docset.doc() <= doc && docset.seek(doc) == doc
}
/// Filters a given `DocSet` by removing the docs from a given `DocSet`.
/// An exclusion set is a set of documents
/// that should be excluded from a given DocSet.
///
/// The excluding docset has no impact on scoring.
pub struct Exclude<TDocSet, TDocSetExclude> {
underlying_docset: TDocSet,
excluding_docset: TDocSetExclude,
/// It can be a single DocSet, or a Vec of DocSets.
pub trait ExclusionSet: Send {
/// Returns `true` if the given `doc` is in the exclusion set.
fn contains(&mut self, doc: DocId) -> bool;
}
impl<TDocSet, TDocSetExclude> Exclude<TDocSet, TDocSetExclude>
impl<TDocSet: DocSet> ExclusionSet for TDocSet {
#[inline]
fn contains(&mut self, doc: DocId) -> bool {
self.seek_danger(doc) == SeekDangerResult::Found
}
}
impl<TDocSet: DocSet> ExclusionSet for Vec<TDocSet> {
#[inline]
fn contains(&mut self, doc: DocId) -> bool {
for docset in self.iter_mut() {
if docset.seek_danger(doc) == SeekDangerResult::Found {
return true;
}
}
false
}
}
/// Filters a given `DocSet` by removing the docs from an exclusion set.
///
/// The excluding docsets have no impact on scoring.
pub struct Exclude<TDocSet, TExclusionSet> {
underlying_docset: TDocSet,
exclusion_set: TExclusionSet,
}
impl<TDocSet, TExclusionSet> Exclude<TDocSet, TExclusionSet>
where
TDocSet: DocSet,
TDocSetExclude: DocSet,
TExclusionSet: ExclusionSet,
{
/// Creates a new `ExcludeScorer`
pub fn new(
mut underlying_docset: TDocSet,
mut excluding_docset: TDocSetExclude,
) -> Exclude<TDocSet, TDocSetExclude> {
mut exclusion_set: TExclusionSet,
) -> Exclude<TDocSet, TExclusionSet> {
while underlying_docset.doc() != TERMINATED {
let target = underlying_docset.doc();
if !is_within(&mut excluding_docset, target) {
if !exclusion_set.contains(target) {
break;
}
underlying_docset.advance();
}
Exclude {
underlying_docset,
excluding_docset,
exclusion_set,
}
}
}
impl<TDocSet, TDocSetExclude> DocSet for Exclude<TDocSet, TDocSetExclude>
impl<TDocSet, TExclusionSet> DocSet for Exclude<TDocSet, TExclusionSet>
where
TDocSet: DocSet,
TDocSetExclude: DocSet,
TExclusionSet: ExclusionSet,
{
fn advance(&mut self) -> DocId {
loop {
@@ -50,7 +73,7 @@ where
if candidate == TERMINATED {
return TERMINATED;
}
if !is_within(&mut self.excluding_docset, candidate) {
if !self.exclusion_set.contains(candidate) {
return candidate;
}
}
@@ -61,7 +84,7 @@ where
if candidate == TERMINATED {
return TERMINATED;
}
if !is_within(&mut self.excluding_docset, candidate) {
if !self.exclusion_set.contains(candidate) {
return candidate;
}
self.advance()
@@ -79,10 +102,10 @@ where
}
}
impl<TScorer, TDocSetExclude> Scorer for Exclude<TScorer, TDocSetExclude>
impl<TScorer, TExclusionSet> Scorer for Exclude<TScorer, TExclusionSet>
where
TScorer: Scorer,
TDocSetExclude: DocSet + 'static,
TExclusionSet: ExclusionSet + 'static,
{
#[inline]
fn score(&mut self) -> Score {

View File

@@ -1,5 +1,5 @@
use super::size_hint::estimate_intersection;
use crate::docset::{DocSet, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::query::term_query::TermScorer;
use crate::query::{EmptyScorer, Scorer};
use crate::{DocId, Score};
@@ -84,6 +84,14 @@ impl<TDocSet: DocSet> Intersection<TDocSet, TDocSet> {
docsets.sort_by_key(|docset| docset.cost());
go_to_first_doc(&mut docsets);
let left = docsets.remove(0);
debug_assert!({
let doc = left.doc();
if doc == TERMINATED {
true
} else {
docsets.iter().all(|docset| docset.doc() == doc)
}
});
let right = docsets.remove(0);
Intersection {
left,
@@ -108,46 +116,61 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
#[inline]
fn advance(&mut self) -> DocId {
let (left, right) = (&mut self.left, &mut self.right);
let mut candidate = left.advance();
if candidate == TERMINATED {
return TERMINATED;
}
loop {
// In the first part we look for a document in the intersection
// of the two rarest `DocSet` in the intersection.
// Invariant:
// - candidate is always <= to the next document in the intersection.
// - candidate strictly increases at every occurence of the loop.
let mut candidate = left.doc() + 1;
loop {
if right.seek_into_the_danger_zone(candidate) {
break;
}
let right_doc = right.doc();
// TODO: Think about which value would make sense here
// It depends on the DocSet implementation, when a seek would outweigh an advance.
if right_doc > candidate.wrapping_add(100) {
candidate = left.seek(right_doc);
} else {
candidate = left.advance();
}
if candidate == TERMINATED {
return TERMINATED;
}
}
// Termination: candidate strictly increases.
'outer: while candidate < TERMINATED {
// As we enter the loop, we should always have candidate < next_doc.
debug_assert_eq!(left.doc(), right.doc());
// test the remaining scorers
if self
.others
.iter_mut()
.all(|docset| docset.seek_into_the_danger_zone(candidate))
candidate = left.seek(candidate);
// Left is positionned on `candidate`.
debug_assert_eq!(left.doc(), candidate);
if let SeekDangerResult::SeekLowerBound(seek_lower_bound) = right.seek_danger(candidate)
{
debug_assert_eq!(candidate, self.left.doc());
debug_assert_eq!(candidate, self.right.doc());
debug_assert!(self.others.iter().all(|docset| docset.doc() == candidate));
return candidate;
debug_assert!(
seek_lower_bound == TERMINATED || seek_lower_bound > candidate,
"seek_lower_bound {seek_lower_bound} must be greater than candidate \
{candidate}"
);
candidate = seek_lower_bound;
continue;
}
candidate = left.advance();
// Left and right are positionned on `candidate`.
debug_assert_eq!(right.doc(), candidate);
for other in &mut self.others {
if let SeekDangerResult::SeekLowerBound(seek_lower_bound) =
other.seek_danger(candidate)
{
// One of the scorer does not match, let's restart at the top of the loop.
debug_assert!(
seek_lower_bound == TERMINATED || seek_lower_bound > candidate,
"seek_lower_bound {seek_lower_bound} must be greater than candidate \
{candidate}"
);
candidate = seek_lower_bound;
continue 'outer;
}
}
// At this point all scorers are in a valid state, aligned on the next document in the
// intersection.
debug_assert!(self.others.iter().all(|docset| docset.doc() == candidate));
return candidate;
}
// We make sure our docset is in a valid state.
// In particular, we want .doc() to return TERMINATED.
left.seek(TERMINATED);
TERMINATED
}
fn seek(&mut self, target: DocId) -> DocId {
@@ -166,13 +189,19 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
///
/// Some implementations may choose to advance past the target if beneficial for performance.
/// The return value is `true` if the target is in the docset, and `false` otherwise.
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
self.left.seek_into_the_danger_zone(target)
&& self.right.seek_into_the_danger_zone(target)
&& self
.others
.iter_mut()
.all(|docset| docset.seek_into_the_danger_zone(target))
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
if let SeekDangerResult::SeekLowerBound(new_target) = self.left.seek_danger(target) {
return SeekDangerResult::SeekLowerBound(new_target);
}
if let SeekDangerResult::SeekLowerBound(new_target) = self.right.seek_danger(target) {
return SeekDangerResult::SeekLowerBound(new_target);
}
for docset in &mut self.others {
if let SeekDangerResult::SeekLowerBound(new_target) = docset.seek_danger(target) {
return SeekDangerResult::SeekLowerBound(new_target);
}
}
SeekDangerResult::Found
}
#[inline]
@@ -215,9 +244,12 @@ mod tests {
use proptest::prelude::*;
use super::Intersection;
use crate::collector::Count;
use crate::docset::{DocSet, TERMINATED};
use crate::postings::tests::test_skip_against_unoptimized;
use crate::query::VecDocSet;
use crate::query::{QueryParser, VecDocSet};
use crate::schema::{Schema, TEXT};
use crate::Index;
#[test]
fn test_intersection() {
@@ -304,6 +336,58 @@ mod tests {
assert_eq!(intersection.doc(), TERMINATED);
}
#[test]
fn test_intersection_abc() {
let a = VecDocSet::from(vec![2, 3, 6]);
let b = VecDocSet::from(vec![1, 3, 5]);
let c = VecDocSet::from(vec![1, 3, 5]);
let mut intersection = Intersection::new(vec![c, b, a], 10);
let mut docs = Vec::new();
use crate::DocSet;
while intersection.doc() != TERMINATED {
docs.push(intersection.doc());
intersection.advance();
}
assert_eq!(&docs, &[3]);
}
#[test]
fn test_intersection_termination() {
use crate::query::score_combiner::DoNothingCombiner;
use crate::query::{BufferedUnionScorer, ConstScorer, VecDocSet};
let a1 = ConstScorer::new(VecDocSet::from(vec![0u32, 10000]), 1.0);
let a2 = ConstScorer::new(VecDocSet::from(vec![0u32, 10000]), 1.0);
let mut b_scorers = vec![];
for _ in 0..2 {
// Union matches 0 and 10000.
b_scorers.push(ConstScorer::new(VecDocSet::from(vec![0, 10000]), 1.0));
}
// That's the union of two scores matching 0, and 10_000.
let union = BufferedUnionScorer::build(b_scorers, DoNothingCombiner::default, 30000);
// Mismatching scorer: matches 0 and 20000. We then append more docs at the end to ensure it
// is last.
let mut m_docs = vec![0, 20000];
for i in 30000..30100 {
m_docs.push(i);
}
let m = ConstScorer::new(VecDocSet::from(m_docs), 1.0);
// Costs: A1=2, A2=2, Union=4, M=102.
// Sorted: A1, A2, Union, M.
// Left=A1, Right=A2, Others=[Union, M].
let mut intersection = crate::query::intersect_scorers(
vec![Box::new(a1), Box::new(a2), Box::new(union), Box::new(m)],
40000,
);
while intersection.doc() != TERMINATED {
intersection.advance();
}
}
// Strategy to generate sorted and deduplicated vectors of u32 document IDs
fn sorted_deduped_vec(max_val: u32, max_size: usize) -> impl Strategy<Value = Vec<u32>> {
prop::collection::vec(0..max_val, 0..max_size).prop_map(|mut vec| {
@@ -335,6 +419,30 @@ mod tests {
}
assert_eq!(intersection.doc(), TERMINATED);
}
}
#[test]
fn test_bug_2811_intersection_candidate_should_increase() {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_for_tests().unwrap();
writer
.add_document(doc!(text_field=>"hello happy tax"))
.unwrap();
writer.add_document(doc!(text_field=>"hello")).unwrap();
writer.add_document(doc!(text_field=>"hello")).unwrap();
writer.add_document(doc!(text_field=>"happy tax")).unwrap();
writer.commit().unwrap();
let query_parser = QueryParser::for_index(&index, Vec::new());
let query = query_parser
.parse_query(r#"+text:hello +text:"happy tax""#)
.unwrap();
let searcher = index.reader().unwrap().searcher();
let c = searcher.search(&*query, &Count).unwrap();
assert_eq!(c, 1);
}
}

View File

@@ -43,7 +43,7 @@ pub use self::boost_query::{BoostQuery, BoostWeight};
pub use self::const_score_query::{ConstScoreQuery, ConstScorer};
pub use self::disjunction_max_query::DisjunctionMaxQuery;
pub use self::empty_query::{EmptyQuery, EmptyScorer, EmptyWeight};
pub use self::exclude::Exclude;
pub use self::exclude::{Exclude, ExclusionSet};
pub use self::exist_query::ExistsQuery;
pub use self::explanation::Explanation;
#[cfg(test)]

View File

@@ -1,4 +1,4 @@
use crate::docset::{DocSet, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::fieldnorm::FieldNormReader;
use crate::postings::Postings;
use crate::query::bm25::Bm25Weight;
@@ -194,11 +194,16 @@ impl<TPostings: Postings> DocSet for PhrasePrefixScorer<TPostings> {
self.advance()
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
if self.phrase_scorer.seek_into_the_danger_zone(target) {
self.matches_prefix()
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
let seek_res = self.phrase_scorer.seek_danger(target);
if seek_res != SeekDangerResult::Found {
return seek_res;
}
// The intersection matched. Now let's see if we match the prefix.
if self.matches_prefix() {
SeekDangerResult::Found
} else {
false
SeekDangerResult::SeekLowerBound(target + 1)
}
}

View File

@@ -1,6 +1,6 @@
use std::cmp::Ordering;
use crate::docset::{DocSet, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::fieldnorm::FieldNormReader;
use crate::postings::Postings;
use crate::query::bm25::Bm25Weight;
@@ -530,12 +530,23 @@ impl<TPostings: Postings> DocSet for PhraseScorer<TPostings> {
self.advance()
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
debug_assert!(target >= self.doc());
if self.intersection_docset.seek_into_the_danger_zone(target) && self.phrase_match() {
return true;
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
debug_assert!(
target >= self.doc(),
"target ({}) should be greater than or equal to doc ({})",
target,
self.doc()
);
let seek_res = self.intersection_docset.seek_danger(target);
if seek_res != SeekDangerResult::Found {
return seek_res;
}
// The intersection matched. Now let's see if we match the phrase.
if self.phrase_match() {
SeekDangerResult::Found
} else {
SeekDangerResult::SeekLowerBound(target + 1)
}
false
}
fn doc(&self) -> DocId {

View File

@@ -311,7 +311,7 @@ mod tests {
#![proptest_config(ProptestConfig::with_cases(50))]
#[test]
fn test_phrase_regex_with_random_strings(mut random_strings in proptest::collection::vec("[c-z ]{0,10}", 1..100), num_occurrences in 1..150_usize) {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
// Insert "aaa ccc" the specified number of times into the list
for _ in 0..num_occurrences {

View File

@@ -429,7 +429,7 @@ mod tests {
docs.push(doc);
}
docs.shuffle(&mut rand::thread_rng());
docs.shuffle(&mut rand::rng());
let mut docs_it = docs.into_iter();
for doc in (&mut docs_it).take(50) {
index_writer.add_document(doc)?;

View File

@@ -491,7 +491,7 @@ mod tests {
use common::DateTime;
use proptest::prelude::*;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::seq::IndexedRandom;
use rand::SeedableRng;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

View File

@@ -1,6 +1,6 @@
use std::marker::PhantomData;
use crate::docset::DocSet;
use crate::docset::{DocSet, SeekDangerResult};
use crate::query::score_combiner::ScoreCombiner;
use crate::query::Scorer;
use crate::{DocId, Score};
@@ -56,9 +56,9 @@ where
self.req_scorer.seek(target)
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
self.score_cache = None;
self.req_scorer.seek_into_the_danger_zone(target)
self.req_scorer.seek_danger(target)
}
fn doc(&self) -> DocId {

View File

@@ -105,6 +105,7 @@ impl DocSet for TermScorer {
#[inline]
fn seek(&mut self, target: DocId) -> DocId {
debug_assert!(target >= self.doc());
self.postings.seek(target)
}
@@ -304,10 +305,10 @@ mod tests {
let mut writer: IndexWriter =
index.writer_with_num_threads(3, 3 * MEMORY_BUDGET_NUM_BYTES_MIN)?;
use rand::Rng;
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
writer.set_merge_policy(Box::new(NoMergePolicy));
for _ in 0..3_000 {
let term_freq = rng.gen_range(1..10000);
let term_freq = rng.random_range(1..10000);
let words: Vec<&str> = std::iter::repeat_n("bbbb", term_freq).collect();
let text = words.join(" ");
writer.add_document(doc!(text_field=>text))?;

View File

@@ -1,6 +1,6 @@
use common::TinySet;
use crate::docset::{DocSet, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::query::score_combiner::{DoNothingCombiner, ScoreCombiner};
use crate::query::size_hint::estimate_union;
use crate::query::Scorer;
@@ -225,25 +225,47 @@ where
}
}
fn seek_into_the_danger_zone(&mut self, target: DocId) -> bool {
fn seek_danger(&mut self, target: DocId) -> SeekDangerResult {
if target >= TERMINATED {
return SeekDangerResult::SeekLowerBound(TERMINATED);
}
if self.is_in_horizon(target) {
// Our value is within the buffered horizon and the docset may already have been
// processed and removed, so we need to use seek, which uses the regular advance.
self.seek(target) == target
} else {
// The docsets are not in the buffered range, so we can use seek_into_the_danger_zone
// of the underlying docsets
let is_hit = self
.docsets
.iter_mut()
.any(|docset| docset.seek_into_the_danger_zone(target));
let seek_doc = self.seek(target);
if seek_doc == target {
return SeekDangerResult::Found;
} else {
return SeekDangerResult::SeekLowerBound(seek_doc);
};
}
// The API requires the DocSet to be in a valid state when `seek_into_the_danger_zone`
// returns true.
if is_hit {
self.seek(target);
// The docsets are not in the buffered range, so we can use seek_into_the_danger_zone
// of the underlying docsets
let mut is_hit = false;
let mut min_new_target = TERMINATED;
for docset in self.docsets.iter_mut() {
match docset.seek_danger(target) {
SeekDangerResult::Found => {
is_hit = true;
break;
}
SeekDangerResult::SeekLowerBound(new_target) => {
min_new_target = min_new_target.min(new_target);
}
}
is_hit
}
// The API requires the DocSet to be in a valid state when `seek_into_the_danger_zone`
// returns Found.
if is_hit {
// The doc is found. Let's make sure we position the union on the target
// to bring it back to a valid state.
self.seek(target);
SeekDangerResult::Found
} else {
SeekDangerResult::SeekLowerBound(min_new_target)
}
}

View File

@@ -14,7 +14,7 @@ mod tests {
use common::BitSet;
use super::{SimpleUnion, *};
use crate::docset::{DocSet, TERMINATED};
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
use crate::postings::tests::test_skip_against_unoptimized;
use crate::query::score_combiner::DoNothingCombiner;
use crate::query::union::bitset_union::BitSetPostingUnion;
@@ -254,6 +254,27 @@ mod tests {
vec![1, 2, 3, 7, 8, 9, 99, 100, 101, 500, 20000],
);
}
#[test]
fn test_buffered_union_seek_into_danger_zone_terminated() {
let scorer1 = ConstScorer::new(VecDocSet::from(vec![1, 2]), 1.0);
let scorer2 = ConstScorer::new(VecDocSet::from(vec![2, 3]), 1.0);
let mut union_scorer =
BufferedUnionScorer::build(vec![scorer1, scorer2], DoNothingCombiner::default, 100);
// Advance to end
while union_scorer.doc() != TERMINATED {
union_scorer.advance();
}
assert_eq!(union_scorer.doc(), TERMINATED);
assert_eq!(
union_scorer.seek_danger(TERMINATED),
SeekDangerResult::SeekLowerBound(TERMINATED)
);
}
}
#[cfg(all(test, feature = "unstable"))]

View File

@@ -17,6 +17,9 @@ pub struct VecDocSet {
impl From<Vec<DocId>> for VecDocSet {
fn from(doc_ids: Vec<DocId>) -> VecDocSet {
// We do not use `slice::is_sorted`, as we want to check for doc ids to be strictly
// sorted.
assert!(doc_ids.windows(2).all(|w| w[0] < w[1]));
VecDocSet { doc_ids, cursor: 0 }
}
}

View File

@@ -124,7 +124,6 @@ impl SegmentSpaceUsage {
FieldNorms => PerField(self.fieldnorms().clone()),
Terms => PerField(self.termdict().clone()),
SegmentComponent::Store => ComponentSpaceUsage::Store(self.store().clone()),
SegmentComponent::TempStore => ComponentSpaceUsage::Store(self.store().clone()),
Delete => Basic(self.deletes()),
}
}

View File

@@ -95,7 +95,7 @@ impl<'a> TermMerger<'a> {
#[cfg(all(test, feature = "unstable"))]
mod bench {
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use rand::{rng, Rng};
use test::{self, Bencher};
use super::TermMerger;
@@ -117,9 +117,9 @@ mod bench {
let buffer: Vec<u8> = {
let mut terms = vec![];
for _i in 0..num_terms {
let rand_string: String = thread_rng()
let rand_string: String = rng()
.sample_iter(&Alphanumeric)
.take(thread_rng().gen_range(30..42))
.take(rng().random_range(30..42))
.map(char::from)
.collect();
terms.push(rand_string);

View File

@@ -25,7 +25,7 @@ zstd-compression = ["zstd"]
proptest = "1"
criterion = { version = "0.5", default-features = false }
names = "0.14"
rand = "0.8"
rand = "0.9"
[[bench]]
name = "stream_bench"

View File

@@ -10,9 +10,9 @@ use tantivy_sstable::{Dictionary, MonotonicU64SSTable};
const CHARSET: &[u8] = b"abcdefghij";
fn generate_key(rng: &mut impl Rng) -> String {
let len = rng.gen_range(3..12);
let len = rng.random_range(3..12);
std::iter::from_fn(|| {
let idx = rng.gen_range(0..CHARSET.len());
let idx = rng.random_range(0..CHARSET.len());
Some(CHARSET[idx] as char)
})
.take(len)

View File

@@ -23,12 +23,12 @@ name = "hashmap"
path = "example/hashmap.rs"
[dev-dependencies]
rand = "0.8.5"
rand = "0.9"
zipf = "7.0.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.14.0" }
rand_distr = "0.4.3"
rand_distr = "0.5"
[features]
compare_hash_only = ["ahash"] # Compare hash only, not the key in the Hashmap

View File

@@ -90,10 +90,10 @@ fn bench_vint() {
}
// benchmark zipfs distribution numbers
{
use rand::distributions::Distribution;
use rand::distr::Distribution;
use rand::rngs::StdRng;
let mut rng = StdRng::from_seed([3u8; 32]);
let zipf = zipf::ZipfDistribution::new(10_000, 1.03).unwrap();
let zipf = rand_distr::Zipf::new(10_000.0f64, 1.03).unwrap();
let numbers: Vec<[u8; 8]> = (0..num_numbers)
.map(|_| zipf.sample(&mut rng).to_le_bytes())
.collect();

View File

@@ -7,8 +7,8 @@ edition = "2021"
[dependencies]
ahash = "0.8.7"
rand = "0.8.5"
rand_distr = "0.4.3"
rand = "0.9"
rand_distr = "0.5"
tantivy-stacker = { version = "0.2.0", path = ".." }
[workspace]

View File

@@ -14,7 +14,7 @@ fn test_with_seed(seed: u64) {
let mut hash_map = AHashMap::new();
let mut arena_hashmap = ArenaHashMap::default();
let mut rng = StdRng::seed_from_u64(seed);
let key_count = rng.gen_range(1_000..=1_000_000);
let key_count = rng.random_range(1_000..=1_000_000);
let exp = Exp::new(0.05).unwrap();
for _ in 0..key_count {