mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-02-19 22:30:36 +00:00
Compare commits
34 Commits
postings-w
...
congxie/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f764cb0c5 | ||
|
|
18fedd9384 | ||
|
|
2098fca47f | ||
|
|
1251b40c93 | ||
|
|
09a49b872c | ||
|
|
b9ace002ce | ||
|
|
2dc4e9ef78 | ||
|
|
aeea65f61d | ||
|
|
4211d5a1ed | ||
|
|
d50c7a1daf | ||
|
|
cf760fd5b6 | ||
|
|
df04c7d8f1 | ||
|
|
68626bf3a1 | ||
|
|
7eca33143e | ||
|
|
698f073f88 | ||
|
|
cdd24b7ee5 | ||
|
|
5562ce6037 | ||
|
|
09b6ececa7 | ||
|
|
8018016e46 | ||
|
|
6bf185dc3f | ||
|
|
bb141abe22 | ||
|
|
f1c29ba972 | ||
|
|
ae0554a6a5 | ||
|
|
0d7abe5d23 | ||
|
|
28db952131 | ||
|
|
98ebbf922d | ||
|
|
4a89e74597 | ||
|
|
4d99e51e50 | ||
|
|
a55e4069e4 | ||
|
|
1fd30c62be | ||
|
|
9b619998bd | ||
|
|
765c448945 | ||
|
|
943594ebaa | ||
|
|
df17daae0d |
125
.claude/skills/rationalize-deps/SKILL.md
Normal file
125
.claude/skills/rationalize-deps/SKILL.md
Normal file
@@ -0,0 +1,125 @@
|
||||
---
|
||||
name: rationalize-deps
|
||||
description: Analyze Cargo.toml dependencies and attempt to remove unused features to reduce compile times and binary size
|
||||
---
|
||||
|
||||
# Rationalize Dependencies
|
||||
|
||||
This skill analyzes Cargo.toml dependencies to identify and remove unused features.
|
||||
|
||||
## Overview
|
||||
|
||||
Many crates enable features by default that may not be needed. This skill:
|
||||
1. Identifies dependencies with default features enabled
|
||||
2. Tests if `default-features = false` works
|
||||
3. Identifies which specific features are actually needed
|
||||
4. Verifies compilation after changes
|
||||
|
||||
## Step 1: Identify the target
|
||||
|
||||
Ask the user which crate(s) to analyze:
|
||||
- A specific crate name (e.g., "tokio", "serde")
|
||||
- A specific workspace member (e.g., "quickwit-search")
|
||||
- "all" to scan the entire workspace
|
||||
|
||||
## Step 2: Analyze current dependencies
|
||||
|
||||
For the workspace Cargo.toml (`quickwit/Cargo.toml`), list dependencies that:
|
||||
- Do NOT have `default-features = false`
|
||||
- Have default features that might be unnecessary
|
||||
|
||||
Run: `cargo tree -p <crate> -f "{p} {f}" --edges features` to see what features are actually used.
|
||||
|
||||
## Step 3: For each candidate dependency
|
||||
|
||||
### 3a: Check the crate's default features
|
||||
|
||||
Look up the crate on crates.io or check its Cargo.toml to understand:
|
||||
- What features are enabled by default
|
||||
- What each feature provides
|
||||
|
||||
Use: `cargo metadata --format-version=1 | jq '.packages[] | select(.name == "<crate>") | .features'`
|
||||
|
||||
### 3b: Try disabling default features
|
||||
|
||||
Modify the dependency in `quickwit/Cargo.toml`:
|
||||
|
||||
From:
|
||||
```toml
|
||||
some-crate = { version = "1.0" }
|
||||
```
|
||||
|
||||
To:
|
||||
```toml
|
||||
some-crate = { version = "1.0", default-features = false }
|
||||
```
|
||||
|
||||
### 3c: Run cargo check
|
||||
|
||||
Run: `cargo check --workspace` (or target specific packages for faster feedback)
|
||||
|
||||
If compilation fails:
|
||||
1. Read the error messages to identify which features are needed
|
||||
2. Add only the required features explicitly:
|
||||
```toml
|
||||
some-crate = { version = "1.0", default-features = false, features = ["needed-feature"] }
|
||||
```
|
||||
3. Re-run cargo check
|
||||
|
||||
### 3d: Binary search for minimal features
|
||||
|
||||
If there are many default features, use binary search:
|
||||
1. Start with no features
|
||||
2. If it fails, add half the default features
|
||||
3. Continue until you find the minimal set
|
||||
|
||||
## Step 4: Document findings
|
||||
|
||||
For each dependency analyzed, report:
|
||||
- Original configuration
|
||||
- New configuration (if changed)
|
||||
- Features that were removed
|
||||
- Any features that are required
|
||||
|
||||
## Step 5: Verify full build
|
||||
|
||||
After all changes, run:
|
||||
```bash
|
||||
cargo check --workspace --all-targets
|
||||
cargo test --workspace --no-run
|
||||
```
|
||||
|
||||
## Common Patterns
|
||||
|
||||
### Serde
|
||||
Often only needs `derive`:
|
||||
```toml
|
||||
serde = { version = "1.0", default-features = false, features = ["derive", "std"] }
|
||||
```
|
||||
|
||||
### Tokio
|
||||
Identify which runtime features are actually used:
|
||||
```toml
|
||||
tokio = { version = "1.0", default-features = false, features = ["rt-multi-thread", "macros", "sync"] }
|
||||
```
|
||||
|
||||
### Reqwest
|
||||
Often doesn't need all TLS backends:
|
||||
```toml
|
||||
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"] }
|
||||
```
|
||||
|
||||
## Rollback
|
||||
|
||||
If changes cause issues:
|
||||
```bash
|
||||
git checkout quickwit/Cargo.toml
|
||||
cargo check --workspace
|
||||
```
|
||||
|
||||
## Tips
|
||||
|
||||
- Start with large crates that have many default features (tokio, reqwest, hyper)
|
||||
- Use `cargo bloat --crates` to identify large dependencies
|
||||
- Check `cargo tree -d` for duplicate dependencies that might indicate feature conflicts
|
||||
- Some features are needed only for tests - consider using `[dev-dependencies]` features
|
||||
60
.claude/skills/simple-pr/SKILL.md
Normal file
60
.claude/skills/simple-pr/SKILL.md
Normal file
@@ -0,0 +1,60 @@
|
||||
---
|
||||
name: simple-pr
|
||||
description: Create a simple PR from staged changes with an auto-generated commit message
|
||||
disable-model-invocation: true
|
||||
---
|
||||
|
||||
# Simple PR
|
||||
|
||||
Follow these steps to create a simple PR from staged changes:
|
||||
|
||||
## Step 1: Check workspace state
|
||||
|
||||
Run: `git status`
|
||||
|
||||
Verify that all changes have been staged (no unstaged changes). If there are unstaged changes, abort and ask the user to stage their changes first with `git add`.
|
||||
|
||||
Also verify that we are on the `main` branch. If not, abort and ask the user to switch to main first.
|
||||
|
||||
## Step 2: Ensure main is up to date
|
||||
|
||||
Run: `git pull origin main`
|
||||
|
||||
This ensures we're working from the latest code.
|
||||
|
||||
## Step 3: Review staged changes
|
||||
|
||||
Run: `git diff --cached`
|
||||
|
||||
Review the staged changes to understand what the PR will contain.
|
||||
|
||||
## Step 4: Generate commit message
|
||||
|
||||
Based on the staged changes, generate a concise commit message (1-2 sentences) that describes the "why" rather than the "what".
|
||||
|
||||
Display the proposed commit message to the user and ask for confirmation before proceeding.
|
||||
|
||||
## Step 5: Create a new branch
|
||||
|
||||
Get the git username: `git config user.name | tr ' ' '-' | tr '[:upper:]' '[:lower:]'`
|
||||
|
||||
Create a short, descriptive branch name based on the changes (e.g., `fix-typo-in-readme`, `add-retry-logic`, `update-deps`).
|
||||
|
||||
Create and checkout the branch: `git checkout -b {username}/{short-descriptive-name}`
|
||||
|
||||
## Step 6: Commit changes
|
||||
|
||||
Commit with the message from step 3:
|
||||
```
|
||||
git commit -m "{commit-message}"
|
||||
```
|
||||
|
||||
## Step 7: Push and open a PR
|
||||
|
||||
Push the branch and open a PR:
|
||||
```
|
||||
git push -u origin {branch-name}
|
||||
gh pr create --title "{commit-message-title}" --body "{longer-description-if-needed}"
|
||||
```
|
||||
|
||||
Report the PR URL to the user when complete.
|
||||
15
Cargo.toml
15
Cargo.toml
@@ -15,7 +15,7 @@ rust-version = "1.85"
|
||||
exclude = ["benches/*.json", "benches/*.txt"]
|
||||
|
||||
[dependencies]
|
||||
oneshot = "0.1.7"
|
||||
oneshot = "0.1.13"
|
||||
base64 = "0.22.0"
|
||||
byteorder = "1.4.3"
|
||||
crc32fast = "1.3.2"
|
||||
@@ -64,8 +64,8 @@ query-grammar = { version = "0.25.0", path = "./query-grammar", package = "tanti
|
||||
tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
|
||||
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
|
||||
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
|
||||
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
|
||||
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
|
||||
sketches-ddsketch = { git = "https://github.com/quickwit-oss/rust-sketches-ddsketch.git", rev = "555caf1", features = ["use_serde"] }
|
||||
datasketches = "0.2.0"
|
||||
futures-util = { version = "0.3.28", optional = true }
|
||||
futures-channel = { version = "0.3.28", optional = true }
|
||||
fnv = "1.0.7"
|
||||
@@ -193,3 +193,12 @@ harness = false
|
||||
[[bench]]
|
||||
name = "str_search_and_get"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "merge_segments"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "regex_all_terms"
|
||||
harness = false
|
||||
|
||||
|
||||
224
benches/merge_segments.rs
Normal file
224
benches/merge_segments.rs
Normal file
@@ -0,0 +1,224 @@
|
||||
// Benchmarks segment merging
|
||||
//
|
||||
// Notes:
|
||||
// - Input segments are kept intact (no deletes / no IndexWriter merge).
|
||||
// - Output is written to a `NullDirectory` that discards all files except
|
||||
// fieldnorms (needed for merging).
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::io::{self, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use binggan::{black_box, BenchRunner};
|
||||
use rand::prelude::*;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use tantivy::directory::{
|
||||
AntiCallToken, Directory, FileHandle, OwnedBytes, TerminatingWrite, WatchCallback, WatchHandle,
|
||||
WritePtr,
|
||||
};
|
||||
use tantivy::indexer::{merge_filtered_segments, NoMergePolicy};
|
||||
use tantivy::schema::{Schema, TEXT};
|
||||
use tantivy::{doc, HasLen, Index, IndexSettings, Segment};
|
||||
|
||||
#[derive(Clone, Default, Debug)]
|
||||
struct NullDirectory {
|
||||
blobs: Arc<RwLock<HashMap<PathBuf, OwnedBytes>>>,
|
||||
}
|
||||
|
||||
struct NullWriter;
|
||||
|
||||
impl Write for NullWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl TerminatingWrite for NullWriter {
|
||||
fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct InMemoryWriter {
|
||||
path: PathBuf,
|
||||
buffer: Vec<u8>,
|
||||
blobs: Arc<RwLock<HashMap<PathBuf, OwnedBytes>>>,
|
||||
}
|
||||
|
||||
impl Write for InMemoryWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.buffer.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl TerminatingWrite for InMemoryWriter {
|
||||
fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> {
|
||||
let bytes = OwnedBytes::new(std::mem::take(&mut self.buffer));
|
||||
self.blobs.write().unwrap().insert(self.path.clone(), bytes);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct NullFileHandle;
|
||||
impl HasLen for NullFileHandle {
|
||||
fn len(&self) -> usize {
|
||||
0
|
||||
}
|
||||
}
|
||||
impl FileHandle for NullFileHandle {
|
||||
fn read_bytes(&self, _range: std::ops::Range<usize>) -> io::Result<OwnedBytes> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for NullDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
|
||||
if let Some(bytes) = self.blobs.read().unwrap().get(path) {
|
||||
return Ok(Arc::new(bytes.clone()));
|
||||
}
|
||||
Ok(Arc::new(NullFileHandle))
|
||||
}
|
||||
|
||||
fn delete(&self, _path: &Path) -> Result<(), DeleteError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn exists(&self, _path: &Path) -> Result<bool, OpenReadError> {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
let path_buf = path.to_path_buf();
|
||||
if path.to_string_lossy().ends_with(".fieldnorm") {
|
||||
let writer = InMemoryWriter {
|
||||
path: path_buf,
|
||||
buffer: Vec::new(),
|
||||
blobs: Arc::clone(&self.blobs),
|
||||
};
|
||||
Ok(io::BufWriter::new(Box::new(writer)))
|
||||
} else {
|
||||
Ok(io::BufWriter::new(Box::new(NullWriter)))
|
||||
}
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
|
||||
if let Some(bytes) = self.blobs.read().unwrap().get(path) {
|
||||
return Ok(bytes.as_slice().to_vec());
|
||||
}
|
||||
Err(OpenReadError::FileDoesNotExist(path.to_path_buf()))
|
||||
}
|
||||
|
||||
fn atomic_write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync_directory(&self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn watch(&self, _watch_callback: WatchCallback) -> tantivy::Result<WatchHandle> {
|
||||
Ok(WatchHandle::empty())
|
||||
}
|
||||
}
|
||||
|
||||
struct MergeScenario {
|
||||
#[allow(dead_code)]
|
||||
index: Index,
|
||||
segments: Vec<Segment>,
|
||||
settings: IndexSettings,
|
||||
label: String,
|
||||
}
|
||||
|
||||
fn build_index(
|
||||
num_segments: usize,
|
||||
docs_per_segment: usize,
|
||||
tokens_per_doc: usize,
|
||||
vocab_size: usize,
|
||||
) -> MergeScenario {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let body = schema_builder.add_text_field("body", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
|
||||
assert!(vocab_size > 0);
|
||||
let total_tokens = num_segments * docs_per_segment * tokens_per_doc;
|
||||
let use_unique_terms = vocab_size >= total_tokens;
|
||||
let mut rng = StdRng::from_seed([7u8; 32]);
|
||||
let mut next_token_id: u64 = 0;
|
||||
|
||||
{
|
||||
let mut writer = index.writer_with_num_threads(1, 256_000_000).unwrap();
|
||||
writer.set_merge_policy(Box::new(NoMergePolicy));
|
||||
for _ in 0..num_segments {
|
||||
for _ in 0..docs_per_segment {
|
||||
let mut tokens = Vec::with_capacity(tokens_per_doc);
|
||||
for _ in 0..tokens_per_doc {
|
||||
let token_id = if use_unique_terms {
|
||||
let id = next_token_id;
|
||||
next_token_id += 1;
|
||||
id
|
||||
} else {
|
||||
rng.random_range(0..vocab_size as u64)
|
||||
};
|
||||
tokens.push(format!("term_{token_id}"));
|
||||
}
|
||||
writer.add_document(doc!(body => tokens.join(" "))).unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let segments = index.searchable_segments().unwrap();
|
||||
let settings = index.settings().clone();
|
||||
let label = format!(
|
||||
"segments={}, docs/seg={}, tokens/doc={}, vocab={}",
|
||||
num_segments, docs_per_segment, tokens_per_doc, vocab_size
|
||||
);
|
||||
|
||||
MergeScenario {
|
||||
index,
|
||||
segments,
|
||||
settings,
|
||||
label,
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let scenarios = vec![
|
||||
build_index(8, 50_000, 12, 8),
|
||||
build_index(16, 50_000, 12, 8),
|
||||
build_index(16, 100_000, 12, 8),
|
||||
build_index(8, 50_000, 8, 8 * 50_000 * 8),
|
||||
];
|
||||
|
||||
let mut runner = BenchRunner::new();
|
||||
for scenario in scenarios {
|
||||
let mut group = runner.new_group();
|
||||
group.set_name(format!("merge_segments inv_index — {}", scenario.label));
|
||||
let segments = scenario.segments.clone();
|
||||
let settings = scenario.settings.clone();
|
||||
group.register("merge", move |_| {
|
||||
let output_dir = NullDirectory::default();
|
||||
let filter_doc_ids = vec![None; segments.len()];
|
||||
let merged_index =
|
||||
merge_filtered_segments(&segments, settings.clone(), filter_doc_ids, output_dir)
|
||||
.unwrap();
|
||||
black_box(merged_index);
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
}
|
||||
113
benches/regex_all_terms.rs
Normal file
113
benches/regex_all_terms.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
// Benchmarks regex query that matches all terms in a synthetic index.
|
||||
//
|
||||
// Corpus model:
|
||||
// - N unique terms: t000000, t000001, ...
|
||||
// - M docs
|
||||
// - K tokens per doc: doc i gets terms derived from (i, token_index)
|
||||
//
|
||||
// Query:
|
||||
// - Regex "t.*" to match all terms
|
||||
//
|
||||
// Run with:
|
||||
// - cargo bench --bench regex_all_terms
|
||||
//
|
||||
|
||||
use std::fmt::Write;
|
||||
|
||||
use binggan::{black_box, BenchRunner};
|
||||
use tantivy::collector::Count;
|
||||
use tantivy::query::RegexQuery;
|
||||
use tantivy::schema::{Schema, TEXT};
|
||||
use tantivy::{doc, Index, ReloadPolicy};
|
||||
|
||||
const HEAP_SIZE_BYTES: usize = 200_000_000;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct BenchConfig {
|
||||
num_terms: usize,
|
||||
num_docs: usize,
|
||||
tokens_per_doc: usize,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let configs = default_configs();
|
||||
|
||||
let mut runner = BenchRunner::new();
|
||||
for config in configs {
|
||||
let (index, text_field) = build_index(config, HEAP_SIZE_BYTES);
|
||||
let reader = index
|
||||
.reader_builder()
|
||||
.reload_policy(ReloadPolicy::Manual)
|
||||
.try_into()
|
||||
.expect("reader");
|
||||
let searcher = reader.searcher();
|
||||
let query = RegexQuery::from_pattern("t.*", text_field).expect("regex query");
|
||||
|
||||
let mut group = runner.new_group();
|
||||
group.set_name(format!(
|
||||
"regex_all_terms_t{}_d{}_k{}",
|
||||
config.num_terms, config.num_docs, config.tokens_per_doc
|
||||
));
|
||||
group.register("regex_count", move |_| {
|
||||
let count = searcher.search(&query, &Count).expect("search");
|
||||
black_box(count);
|
||||
});
|
||||
group.run();
|
||||
}
|
||||
}
|
||||
|
||||
fn default_configs() -> Vec<BenchConfig> {
|
||||
vec![
|
||||
BenchConfig {
|
||||
num_terms: 10_000,
|
||||
num_docs: 100_000,
|
||||
tokens_per_doc: 1,
|
||||
},
|
||||
BenchConfig {
|
||||
num_terms: 10_000,
|
||||
num_docs: 100_000,
|
||||
tokens_per_doc: 8,
|
||||
},
|
||||
BenchConfig {
|
||||
num_terms: 100_000,
|
||||
num_docs: 100_000,
|
||||
tokens_per_doc: 1,
|
||||
},
|
||||
BenchConfig {
|
||||
num_terms: 100_000,
|
||||
num_docs: 100_000,
|
||||
tokens_per_doc: 8,
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn build_index(config: BenchConfig, heap_size_bytes: usize) -> (Index, tantivy::schema::Field) {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
let term_width = config.num_terms.to_string().len();
|
||||
{
|
||||
let mut writer = index
|
||||
.writer_with_num_threads(1, heap_size_bytes)
|
||||
.expect("writer");
|
||||
let mut buffer = String::new();
|
||||
for doc_id in 0..config.num_docs {
|
||||
buffer.clear();
|
||||
for token_idx in 0..config.tokens_per_doc {
|
||||
if token_idx > 0 {
|
||||
buffer.push(' ');
|
||||
}
|
||||
let term_id = (doc_id * config.tokens_per_doc + token_idx) % config.num_terms;
|
||||
write!(&mut buffer, "t{term_id:0term_width$}").expect("write token");
|
||||
}
|
||||
writer
|
||||
.add_document(doc!(text_field => buffer.as_str()))
|
||||
.expect("add_document");
|
||||
}
|
||||
writer.commit().expect("commit");
|
||||
}
|
||||
|
||||
(index, text_field)
|
||||
}
|
||||
@@ -60,7 +60,7 @@ At indexing, tantivy will try to interpret number and strings as different type
|
||||
priority order.
|
||||
|
||||
Numbers will be interpreted as u64, i64 and f64 in that order.
|
||||
Strings will be interpreted as rfc3999 dates or simple strings.
|
||||
Strings will be interpreted as rfc3339 dates or simple strings.
|
||||
|
||||
The first working type is picked and is the only term that is emitted for indexing.
|
||||
Note this interpretation happens on a per-document basis, and there is no effort to try to sniff
|
||||
@@ -81,7 +81,7 @@ Will be interpreted as
|
||||
(my_path.my_segment, String, 233) or (my_path.my_segment, u64, 233)
|
||||
```
|
||||
|
||||
Likewise, we need to emit two tokens if the query contains an rfc3999 date.
|
||||
Likewise, we need to emit two tokens if the query contains an rfc3339 date.
|
||||
Indeed the date could have been actually a single token inside the text of a document at ingestion time. Generally speaking, we will always at least emit a string token in query parsing, and sometimes more.
|
||||
|
||||
If one more json field is defined, things get even more complicated.
|
||||
|
||||
@@ -560,7 +560,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
|
||||
(
|
||||
(
|
||||
value((), tag(">=")),
|
||||
map(word_infallible("", false), |(bound, err)| {
|
||||
map(word_infallible(")", false), |(bound, err)| {
|
||||
(
|
||||
(
|
||||
bound
|
||||
@@ -574,7 +574,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
|
||||
),
|
||||
(
|
||||
value((), tag("<=")),
|
||||
map(word_infallible("", false), |(bound, err)| {
|
||||
map(word_infallible(")", false), |(bound, err)| {
|
||||
(
|
||||
(
|
||||
UserInputBound::Unbounded,
|
||||
@@ -588,7 +588,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
|
||||
),
|
||||
(
|
||||
value((), tag(">")),
|
||||
map(word_infallible("", false), |(bound, err)| {
|
||||
map(word_infallible(")", false), |(bound, err)| {
|
||||
(
|
||||
(
|
||||
bound
|
||||
@@ -602,7 +602,7 @@ fn range_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
|
||||
),
|
||||
(
|
||||
value((), tag("<")),
|
||||
map(word_infallible("", false), |(bound, err)| {
|
||||
map(word_infallible(")", false), |(bound, err)| {
|
||||
(
|
||||
(
|
||||
UserInputBound::Unbounded,
|
||||
@@ -704,7 +704,11 @@ fn regex(inp: &str) -> IResult<&str, UserInputLeaf> {
|
||||
many1(alt((preceded(char('\\'), char('/')), none_of("/")))),
|
||||
char('/'),
|
||||
),
|
||||
peek(alt((multispace1, eof))),
|
||||
peek(alt((
|
||||
value((), multispace1),
|
||||
value((), char(')')),
|
||||
value((), eof),
|
||||
))),
|
||||
),
|
||||
|elements| UserInputLeaf::Regex {
|
||||
field: None,
|
||||
@@ -721,8 +725,12 @@ fn regex_infallible(inp: &str) -> JResult<&str, UserInputLeaf> {
|
||||
opt_i_err(char('/'), "missing delimiter /"),
|
||||
),
|
||||
opt_i_err(
|
||||
peek(alt((multispace1, eof))),
|
||||
"expected whitespace or end of input",
|
||||
peek(alt((
|
||||
value((), multispace1),
|
||||
value((), char(')')),
|
||||
value((), eof),
|
||||
))),
|
||||
"expected whitespace, closing parenthesis, or end of input",
|
||||
),
|
||||
)(inp)
|
||||
{
|
||||
@@ -1323,6 +1331,14 @@ mod test {
|
||||
test_parse_query_to_ast_helper("<a", "{\"*\" TO \"a\"}");
|
||||
test_parse_query_to_ast_helper("<=a", "{\"*\" TO \"a\"]");
|
||||
test_parse_query_to_ast_helper("<=bsd", "{\"*\" TO \"bsd\"]");
|
||||
|
||||
test_parse_query_to_ast_helper("(<=42)", "{\"*\" TO \"42\"]");
|
||||
test_parse_query_to_ast_helper("(<=42 )", "{\"*\" TO \"42\"]");
|
||||
test_parse_query_to_ast_helper("(age:>5)", "\"age\":{\"5\" TO \"*\"}");
|
||||
test_parse_query_to_ast_helper(
|
||||
"(title:bar AND age:>12)",
|
||||
"(+\"title\":bar +\"age\":{\"12\" TO \"*\"})",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1699,6 +1715,10 @@ mod test {
|
||||
test_parse_query_to_ast_helper("foo:(A OR B)", "(?\"foo\":A ?\"foo\":B)");
|
||||
test_parse_query_to_ast_helper("foo:(A* OR B*)", "(?\"foo\":A* ?\"foo\":B*)");
|
||||
test_parse_query_to_ast_helper("foo:(*A OR *B)", "(?\"foo\":*A ?\"foo\":*B)");
|
||||
|
||||
// Regexes between parentheses
|
||||
test_parse_query_to_ast_helper("foo:(/A.*/)", "\"foo\":/A.*/");
|
||||
test_parse_query_to_ast_helper("foo:(/A.*/ OR /B.*/)", "(?\"foo\":/A.*/ ?\"foo\":/B.*/)");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -66,6 +66,7 @@ impl UserInputLeaf {
|
||||
}
|
||||
UserInputLeaf::Range { field, .. } if field.is_none() => *field = Some(default_field),
|
||||
UserInputLeaf::Set { field, .. } if field.is_none() => *field = Some(default_field),
|
||||
UserInputLeaf::Regex { field, .. } if field.is_none() => *field = Some(default_field),
|
||||
_ => (), // field was already set, do nothing
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,6 +90,19 @@ impl From<IntermediateKey> for Key {
|
||||
|
||||
impl Eq for IntermediateKey {}
|
||||
|
||||
impl std::fmt::Display for IntermediateKey {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
IntermediateKey::Str(val) => f.write_str(val),
|
||||
IntermediateKey::F64(val) => f.write_str(&val.to_string()),
|
||||
IntermediateKey::U64(val) => f.write_str(&val.to_string()),
|
||||
IntermediateKey::I64(val) => f.write_str(&val.to_string()),
|
||||
IntermediateKey::Bool(val) => f.write_str(&val.to_string()),
|
||||
IntermediateKey::IpAddr(val) => f.write_str(&val.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::hash::Hash for IntermediateKey {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
core::mem::discriminant(self).hash(state);
|
||||
@@ -105,6 +118,21 @@ impl std::hash::Hash for IntermediateKey {
|
||||
}
|
||||
|
||||
impl IntermediateAggregationResults {
|
||||
/// Returns a reference to the intermediate aggregation result for the given key.
|
||||
pub fn get(&self, key: &str) -> Option<&IntermediateAggregationResult> {
|
||||
self.aggs_res.get(key)
|
||||
}
|
||||
|
||||
/// Removes and returns the intermediate aggregation result for the given key.
|
||||
pub fn remove(&mut self, key: &str) -> Option<IntermediateAggregationResult> {
|
||||
self.aggs_res.remove(key)
|
||||
}
|
||||
|
||||
/// Returns an iterator over the keys in the intermediate aggregation results.
|
||||
pub fn keys(&self) -> impl Iterator<Item = &String> {
|
||||
self.aggs_res.keys()
|
||||
}
|
||||
|
||||
/// Add a result
|
||||
pub fn push(&mut self, key: String, value: IntermediateAggregationResult) -> crate::Result<()> {
|
||||
let entry = self.aggs_res.entry(key);
|
||||
@@ -639,6 +667,21 @@ pub struct IntermediateTermBucketResult {
|
||||
}
|
||||
|
||||
impl IntermediateTermBucketResult {
|
||||
/// Returns a reference to the map of bucket entries keyed by [`IntermediateKey`].
|
||||
pub fn entries(&self) -> &FxHashMap<IntermediateKey, IntermediateTermBucketEntry> {
|
||||
&self.entries
|
||||
}
|
||||
|
||||
/// Returns the count of documents not included in the returned buckets.
|
||||
pub fn sum_other_doc_count(&self) -> u64 {
|
||||
self.sum_other_doc_count
|
||||
}
|
||||
|
||||
/// Returns the upper bound of the error on document counts in the returned buckets.
|
||||
pub fn doc_count_error_upper_bound(&self) -> u64 {
|
||||
self.doc_count_error_upper_bound
|
||||
}
|
||||
|
||||
pub(crate) fn into_final_result(
|
||||
self,
|
||||
req: &TermsAggregation,
|
||||
@@ -820,7 +863,7 @@ impl IntermediateRangeBucketEntry {
|
||||
};
|
||||
|
||||
// If we have a date type on the histogram buckets, we add the `key_as_string` field as
|
||||
// rfc339
|
||||
// rfc3339
|
||||
if column_type == Some(ColumnType::DateTime) {
|
||||
if let Some(val) = range_bucket_entry.to {
|
||||
let key_as_string = format_date(val as i64)?;
|
||||
|
||||
@@ -55,6 +55,12 @@ impl IntermediateAverage {
|
||||
pub(crate) fn from_stats(stats: IntermediateStats) -> Self {
|
||||
Self { stats }
|
||||
}
|
||||
|
||||
/// Returns a reference to the underlying [`IntermediateStats`].
|
||||
pub fn stats(&self) -> &IntermediateStats {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
/// Merges the other intermediate result into self.
|
||||
pub fn merge_fruits(&mut self, other: IntermediateAverage) {
|
||||
self.stats.merge_fruits(other.stats);
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{BuildHasher, Hasher};
|
||||
use std::hash::Hash;
|
||||
|
||||
use columnar::column_values::CompactSpaceU64Accessor;
|
||||
use columnar::{Column, ColumnType, Dictionary, StrColumn};
|
||||
use common::f64_to_u64;
|
||||
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
|
||||
use datasketches::hll::{HllSketch, HllType, HllUnion};
|
||||
use rustc_hash::FxHashSet;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
use crate::aggregation::agg_data::AggregationsSegmentCtx;
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
@@ -16,29 +15,17 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
|
||||
use crate::aggregation::*;
|
||||
use crate::TantivyError;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
struct BuildSaltedHasher {
|
||||
salt: u8,
|
||||
}
|
||||
|
||||
impl BuildHasher for BuildSaltedHasher {
|
||||
type Hasher = DefaultHasher;
|
||||
|
||||
fn build_hasher(&self) -> Self::Hasher {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
hasher.write_u8(self.salt);
|
||||
|
||||
hasher
|
||||
}
|
||||
}
|
||||
/// Log2 of the number of registers for the HLL sketch.
|
||||
/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4).
|
||||
const LG_K: u8 = 11;
|
||||
|
||||
/// # Cardinality
|
||||
///
|
||||
/// The cardinality aggregation allows for computing an estimate
|
||||
/// of the number of different values in a data set based on the
|
||||
/// HyperLogLog++ algorithm. This is particularly useful for understanding the
|
||||
/// uniqueness of values in a large dataset where counting each unique value
|
||||
/// individually would be computationally expensive.
|
||||
/// Apache DataSketches HyperLogLog algorithm. This is particularly useful for
|
||||
/// understanding the uniqueness of values in a large dataset where counting
|
||||
/// each unique value individually would be computationally expensive.
|
||||
///
|
||||
/// For example, you might use a cardinality aggregation to estimate the number
|
||||
/// of unique visitors to a website by aggregating on a field that contains
|
||||
@@ -184,7 +171,7 @@ impl SegmentCardinalityCollectorBucket {
|
||||
|
||||
term_ids.sort_unstable();
|
||||
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
|
||||
self.cardinality.sketch.insert_any(&term);
|
||||
self.cardinality.insert(term);
|
||||
Ok(())
|
||||
})?;
|
||||
if has_missing {
|
||||
@@ -195,17 +182,17 @@ impl SegmentCardinalityCollectorBucket {
|
||||
);
|
||||
match missing_key {
|
||||
Key::Str(missing) => {
|
||||
self.cardinality.sketch.insert_any(&missing);
|
||||
self.cardinality.insert(missing.as_str());
|
||||
}
|
||||
Key::F64(val) => {
|
||||
let val = f64_to_u64(*val);
|
||||
self.cardinality.sketch.insert_any(&val);
|
||||
self.cardinality.insert(val);
|
||||
}
|
||||
Key::U64(val) => {
|
||||
self.cardinality.sketch.insert_any(&val);
|
||||
self.cardinality.insert(*val);
|
||||
}
|
||||
Key::I64(val) => {
|
||||
self.cardinality.sketch.insert_any(&val);
|
||||
self.cardinality.insert(*val);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -296,11 +283,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
})?;
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
|
||||
bucket.cardinality.sketch.insert_any(&val);
|
||||
bucket.cardinality.insert(val);
|
||||
}
|
||||
} else {
|
||||
for val in col_block_accessor.iter_vals() {
|
||||
bucket.cardinality.sketch.insert_any(&val);
|
||||
bucket.cardinality.insert(val);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,11 +308,18 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
/// The percentiles collector used during segment collection and for merging results.
|
||||
#[derive(Clone, Debug)]
|
||||
/// The cardinality collector used during segment collection and for merging results.
|
||||
/// Uses Apache DataSketches HLL (lg_k=11, Hll4) for compact binary serialization
|
||||
/// and cross-language compatibility (e.g. Java `datasketches` library).
|
||||
pub struct CardinalityCollector {
|
||||
sketch: HyperLogLogPlus<u64, BuildSaltedHasher>,
|
||||
sketch: HllSketch,
|
||||
/// Salt derived from `ColumnType`, used to differentiate values of different column types
|
||||
/// that map to the same u64 (e.g. bool `false` = 0 vs i64 `0`).
|
||||
/// Not serialized — only needed during insertion, not after sketch registers are populated.
|
||||
salt: u8,
|
||||
}
|
||||
|
||||
impl Default for CardinalityCollector {
|
||||
fn default() -> Self {
|
||||
Self::new(0)
|
||||
@@ -338,25 +332,88 @@ impl PartialEq for CardinalityCollector {
|
||||
}
|
||||
}
|
||||
|
||||
impl CardinalityCollector {
|
||||
/// Compute the final cardinality estimate.
|
||||
pub fn finalize(self) -> Option<f64> {
|
||||
Some(self.sketch.clone().count().trunc())
|
||||
impl Serialize for CardinalityCollector {
|
||||
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
let bytes = self.sketch.serialize();
|
||||
serializer.serialize_bytes(&bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for CardinalityCollector {
|
||||
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
struct HllBytesVisitor;
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for HllBytesVisitor {
|
||||
type Value = Vec<u8>;
|
||||
|
||||
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.write_str("HLL sketch bytes")
|
||||
}
|
||||
|
||||
fn visit_bytes<E: serde::de::Error>(self, v: &[u8]) -> Result<Vec<u8>, E> {
|
||||
Ok(v.to_vec())
|
||||
}
|
||||
|
||||
fn visit_borrowed_bytes<E: serde::de::Error>(
|
||||
self,
|
||||
v: &'de [u8],
|
||||
) -> Result<Vec<u8>, E> {
|
||||
Ok(v.to_vec())
|
||||
}
|
||||
|
||||
fn visit_byte_buf<E: serde::de::Error>(self, v: Vec<u8>) -> Result<Vec<u8>, E> {
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
fn visit_seq<A: serde::de::SeqAccess<'de>>(
|
||||
self,
|
||||
mut seq: A,
|
||||
) -> Result<Vec<u8>, A::Error> {
|
||||
let mut bytes = Vec::with_capacity(seq.size_hint().unwrap_or(0));
|
||||
while let Some(byte) = seq.next_element()? {
|
||||
bytes.push(byte);
|
||||
}
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
let bytes = deserializer.deserialize_byte_buf(HllBytesVisitor)?;
|
||||
let sketch = HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?;
|
||||
Ok(Self { sketch, salt: 0 })
|
||||
}
|
||||
}
|
||||
|
||||
impl CardinalityCollector {
|
||||
fn new(salt: u8) -> Self {
|
||||
Self {
|
||||
sketch: HyperLogLogPlus::new(16, BuildSaltedHasher { salt }).unwrap(),
|
||||
sketch: HllSketch::new(LG_K, HllType::Hll4),
|
||||
salt,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> {
|
||||
self.sketch.merge(&right.sketch).map_err(|err| {
|
||||
TantivyError::AggregationError(AggregationError::InternalError(format!(
|
||||
"Error while merging cardinality {err:?}"
|
||||
)))
|
||||
})?;
|
||||
/// Insert a value into the HLL sketch, salted by the column type.
|
||||
/// The salt ensures that identical u64 values from different column types
|
||||
/// (e.g. bool `false` vs i64 `0`) are counted as distinct.
|
||||
pub(crate) fn insert<T: Hash>(&mut self, value: T) {
|
||||
self.sketch.update((self.salt, value));
|
||||
}
|
||||
|
||||
/// Compute the final cardinality estimate.
|
||||
pub fn finalize(self) -> Option<f64> {
|
||||
Some(self.sketch.estimate().trunc())
|
||||
}
|
||||
|
||||
/// Serialize the HLL sketch to its compact binary representation.
|
||||
/// The format is cross-language compatible with Apache DataSketches (Java, C++, Python).
|
||||
pub fn to_sketch_bytes(&self) -> Vec<u8> {
|
||||
self.sketch.serialize()
|
||||
}
|
||||
|
||||
pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> {
|
||||
let mut union = HllUnion::new(LG_K);
|
||||
union.update(&self.sketch);
|
||||
union.update(&right.sketch);
|
||||
self.sketch = union.get_result(HllType::Hll4);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -518,4 +575,287 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_serde_roundtrip() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
collector.insert("hello");
|
||||
collector.insert("world");
|
||||
collector.insert("hello"); // duplicate
|
||||
|
||||
let serialized = serde_json::to_vec(&collector).unwrap();
|
||||
let deserialized: CardinalityCollector = serde_json::from_slice(&serialized).unwrap();
|
||||
|
||||
let original_estimate = collector.finalize().unwrap();
|
||||
let roundtrip_estimate = deserialized.finalize().unwrap();
|
||||
assert_eq!(original_estimate, roundtrip_estimate);
|
||||
assert_eq!(original_estimate, 2.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_postcard_roundtrip() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
collector.insert("hello");
|
||||
collector.insert("world");
|
||||
collector.insert("hello");
|
||||
|
||||
let original_estimate = collector.clone().finalize().unwrap();
|
||||
|
||||
let serialized = postcard::to_allocvec(&collector).expect("postcard serialize failed");
|
||||
let deserialized: CardinalityCollector =
|
||||
postcard::from_bytes(&serialized).expect("postcard deserialize failed");
|
||||
|
||||
let roundtrip_estimate = deserialized.finalize().unwrap();
|
||||
assert_eq!(original_estimate, roundtrip_estimate);
|
||||
assert_eq!(original_estimate, 2.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_postcard_bytes_fidelity() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
for i in 0..10u64 {
|
||||
collector.insert(i);
|
||||
}
|
||||
|
||||
let hll_bytes = collector.sketch.serialize();
|
||||
println!("HLL bytes len: {}, first 16: {:?}", hll_bytes.len(), &hll_bytes[..16.min(hll_bytes.len())]);
|
||||
|
||||
let postcard_bytes = postcard::to_allocvec(&collector).unwrap();
|
||||
println!("Postcard bytes len: {}", postcard_bytes.len());
|
||||
|
||||
let deserialized: CardinalityCollector = postcard::from_bytes(&postcard_bytes).unwrap();
|
||||
let hll_bytes_after = deserialized.sketch.serialize();
|
||||
println!(
|
||||
"HLL bytes after roundtrip len: {}, first 16: {:?}",
|
||||
hll_bytes_after.len(),
|
||||
&hll_bytes_after[..16.min(hll_bytes_after.len())]
|
||||
);
|
||||
|
||||
assert_eq!(hll_bytes, hll_bytes_after, "HLL bytes should be identical after Postcard roundtrip");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_postcard_roundtrip_large() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
for i in 0..1000u64 {
|
||||
collector.insert(i);
|
||||
}
|
||||
|
||||
let original_estimate = collector.clone().finalize().unwrap();
|
||||
assert!((original_estimate - 1000.0).abs() < 50.0);
|
||||
|
||||
let serialized = postcard::to_allocvec(&collector).expect("postcard serialize failed");
|
||||
println!(
|
||||
"Large HLL sketch serialized to {} postcard bytes",
|
||||
serialized.len()
|
||||
);
|
||||
let deserialized: CardinalityCollector =
|
||||
postcard::from_bytes(&serialized).expect("postcard deserialize failed");
|
||||
|
||||
let roundtrip_estimate = deserialized.finalize().unwrap();
|
||||
assert_eq!(original_estimate, roundtrip_estimate);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_intermediate_metric_result_postcard_roundtrip() {
|
||||
use super::CardinalityCollector;
|
||||
use crate::aggregation::intermediate_agg_result::IntermediateMetricResult;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
collector.insert("hello");
|
||||
collector.insert("world");
|
||||
|
||||
let intermediate = IntermediateMetricResult::Cardinality(collector);
|
||||
let serialized =
|
||||
postcard::to_allocvec(&intermediate).expect("postcard serialize failed");
|
||||
let deserialized: IntermediateMetricResult =
|
||||
postcard::from_bytes(&serialized).expect("postcard deserialize failed");
|
||||
|
||||
match deserialized {
|
||||
IntermediateMetricResult::Cardinality(c) => {
|
||||
assert_eq!(c.finalize().unwrap(), 2.0);
|
||||
}
|
||||
_ => panic!("expected Cardinality variant"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_postcard_multisegment_roundtrip() {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::collector::AggregationCollector;
|
||||
use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults;
|
||||
use crate::aggregation::AggContextParams;
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::query::AllQuery;
|
||||
|
||||
let segment_and_terms = vec![
|
||||
vec!["terma"],
|
||||
vec!["termb"],
|
||||
vec!["termc"],
|
||||
vec!["terma"],
|
||||
];
|
||||
let index = get_test_index_from_terms(false, &segment_and_terms).unwrap();
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(json!({
|
||||
"cardinality": {
|
||||
"cardinality": {
|
||||
"field": "string_id",
|
||||
}
|
||||
},
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(
|
||||
agg_req,
|
||||
AggContextParams::new(Default::default(), index.tokenizers().clone()),
|
||||
);
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
|
||||
let segments = searcher.segment_readers();
|
||||
assert!(
|
||||
segments.len() > 1,
|
||||
"Need multiple segments for this test, got {}",
|
||||
segments.len()
|
||||
);
|
||||
|
||||
// Collect from each segment individually and serialize via Postcard
|
||||
let serialized_results: Vec<Vec<u8>> = segments
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(ord, segment_reader)| {
|
||||
let mut segment_collector = collector
|
||||
.for_segment(ord as u32, segment_reader)
|
||||
.unwrap();
|
||||
for doc in segment_reader.doc_ids_alive() {
|
||||
segment_collector.collect(doc, 0.0);
|
||||
}
|
||||
let fruit = segment_collector.harvest().unwrap();
|
||||
postcard::to_allocvec(&fruit).expect("postcard serialize should work")
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Deserialize and merge (this is what quickwit does)
|
||||
let merged: IntermediateAggregationResults = serialized_results
|
||||
.iter()
|
||||
.map(|bytes| {
|
||||
postcard::from_bytes::<IntermediateAggregationResults>(bytes)
|
||||
.expect("postcard deserialize should work")
|
||||
})
|
||||
.fold(None, |acc: Option<IntermediateAggregationResults>, fruits| {
|
||||
match acc {
|
||||
Some(mut merged) => {
|
||||
merged.merge_fruits(fruits).unwrap();
|
||||
Some(merged)
|
||||
}
|
||||
None => Some(fruits),
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Verify the merged result can be serialized again
|
||||
let _final_bytes =
|
||||
postcard::to_allocvec(&merged).expect("final postcard serialize should work");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_full_intermediate_agg_results_postcard_roundtrip() {
|
||||
use super::CardinalityCollector;
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults,
|
||||
IntermediateMetricResult,
|
||||
};
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
collector.insert("hello");
|
||||
collector.insert("world");
|
||||
|
||||
let mut results = IntermediateAggregationResults::default();
|
||||
results
|
||||
.push(
|
||||
"test_card".to_string(),
|
||||
IntermediateAggregationResult::Metric(
|
||||
IntermediateMetricResult::Cardinality(collector),
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let serialized =
|
||||
postcard::to_allocvec(&results).expect("postcard serialize failed");
|
||||
let deserialized: IntermediateAggregationResults =
|
||||
postcard::from_bytes(&serialized).expect("postcard deserialize failed");
|
||||
|
||||
let result = deserialized
|
||||
.aggs_res
|
||||
.get("test_card")
|
||||
.expect("missing key");
|
||||
match result {
|
||||
IntermediateAggregationResult::Metric(IntermediateMetricResult::Cardinality(c)) => {
|
||||
assert_eq!(c.clone().finalize().unwrap(), 2.0);
|
||||
}
|
||||
_ => panic!("expected Cardinality variant"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_merge() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut left = CardinalityCollector::default();
|
||||
left.insert("a");
|
||||
left.insert("b");
|
||||
|
||||
let mut right = CardinalityCollector::default();
|
||||
right.insert("b");
|
||||
right.insert("c");
|
||||
|
||||
left.merge_fruits(right).unwrap();
|
||||
let estimate = left.finalize().unwrap();
|
||||
assert_eq!(estimate, 3.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_serialize_deserialize_binary() {
|
||||
use datasketches::hll::HllSketch;
|
||||
|
||||
use super::CardinalityCollector;
|
||||
|
||||
let mut collector = CardinalityCollector::default();
|
||||
collector.insert("apple");
|
||||
collector.insert("banana");
|
||||
collector.insert("cherry");
|
||||
|
||||
let bytes = collector.to_sketch_bytes();
|
||||
let deserialized = HllSketch::deserialize(&bytes).unwrap();
|
||||
assert!((deserialized.estimate() - 3.0).abs() < 0.01);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cardinality_collector_salt_differentiates_types() {
|
||||
use super::CardinalityCollector;
|
||||
|
||||
// Without salt, same u64 value from different column types would collide
|
||||
let mut collector_bool = CardinalityCollector::new(5); // e.g. ColumnType::Bool
|
||||
collector_bool.insert(0u64); // false
|
||||
collector_bool.insert(1u64); // true
|
||||
|
||||
let mut collector_i64 = CardinalityCollector::new(2); // e.g. ColumnType::I64
|
||||
collector_i64.insert(0u64);
|
||||
collector_i64.insert(1u64);
|
||||
|
||||
// Merge them
|
||||
collector_bool.merge_fruits(collector_i64).unwrap();
|
||||
let estimate = collector_bool.finalize().unwrap();
|
||||
// Should be 4 because salt makes (5, 0) != (2, 0) and (5, 1) != (2, 1)
|
||||
assert_eq!(estimate, 4.0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,8 +107,11 @@ pub enum PercentileValues {
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
/// The entry when requesting percentiles with keyed: false
|
||||
pub struct PercentileValuesVecEntry {
|
||||
key: f64,
|
||||
value: f64,
|
||||
/// Percentile
|
||||
pub key: f64,
|
||||
|
||||
/// Value at the percentile
|
||||
pub value: f64,
|
||||
}
|
||||
|
||||
/// Single-metric aggregations use this common result structure.
|
||||
|
||||
@@ -222,6 +222,12 @@ impl PercentilesCollector {
|
||||
self.sketch.add(val);
|
||||
}
|
||||
|
||||
/// Encode the underlying DDSketch to Java-compatible binary format
|
||||
/// for cross-language serialization with Java consumers.
|
||||
pub fn to_sketch_bytes(&self) -> Vec<u8> {
|
||||
self.sketch.to_java_bytes()
|
||||
}
|
||||
|
||||
pub(crate) fn merge_fruits(&mut self, right: PercentilesCollector) -> crate::Result<()> {
|
||||
self.sketch.merge(&right.sketch).map_err(|err| {
|
||||
TantivyError::AggregationError(AggregationError::InternalError(format!(
|
||||
@@ -325,7 +331,7 @@ mod tests {
|
||||
use crate::aggregation::AggregationCollector;
|
||||
use crate::query::AllQuery;
|
||||
use crate::schema::{Schema, FAST};
|
||||
use crate::Index;
|
||||
use crate::{assert_nearly_equals, Index};
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_percentiles_empty_index() -> crate::Result<()> {
|
||||
@@ -608,12 +614,16 @@ mod tests {
|
||||
let res = exec_request_with_query(agg_req, &index, None)?;
|
||||
assert_eq!(res["range_with_stats"]["buckets"][0]["doc_count"], 3);
|
||||
|
||||
assert_eq!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"],
|
||||
assert_nearly_equals!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"]
|
||||
.as_f64()
|
||||
.unwrap(),
|
||||
5.0028295751107414
|
||||
);
|
||||
assert_eq!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"],
|
||||
assert_nearly_equals!(
|
||||
res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"]
|
||||
.as_f64()
|
||||
.unwrap(),
|
||||
10.07469668951144
|
||||
);
|
||||
|
||||
@@ -659,8 +669,14 @@ mod tests {
|
||||
|
||||
let res = exec_request_with_query(agg_req, &index, None)?;
|
||||
|
||||
assert_eq!(res["percentiles"]["values"]["1.0"], 5.0028295751107414);
|
||||
assert_eq!(res["percentiles"]["values"]["99.0"], 10.07469668951144);
|
||||
assert_nearly_equals!(
|
||||
res["percentiles"]["values"]["1.0"].as_f64().unwrap(),
|
||||
5.0028295751107414
|
||||
);
|
||||
assert_nearly_equals!(
|
||||
res["percentiles"]["values"]["99.0"].as_f64().unwrap(),
|
||||
10.07469668951144
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -110,6 +110,16 @@ impl Default for IntermediateStats {
|
||||
}
|
||||
|
||||
impl IntermediateStats {
|
||||
/// Returns the number of values collected.
|
||||
pub fn count(&self) -> u64 {
|
||||
self.count
|
||||
}
|
||||
|
||||
/// Returns the sum of all values collected.
|
||||
pub fn sum(&self) -> f64 {
|
||||
self.sum
|
||||
}
|
||||
|
||||
/// Merges the other stats intermediate result into self.
|
||||
pub fn merge_fruits(&mut self, other: IntermediateStats) {
|
||||
self.count += other.count;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod order;
|
||||
mod sort_by_bytes;
|
||||
mod sort_by_erased_type;
|
||||
mod sort_by_score;
|
||||
mod sort_by_static_fast_value;
|
||||
@@ -6,6 +7,7 @@ mod sort_by_string;
|
||||
mod sort_key_computer;
|
||||
|
||||
pub use order::*;
|
||||
pub use sort_by_bytes::SortByBytes;
|
||||
pub use sort_by_erased_type::SortByErasedType;
|
||||
pub use sort_by_score::SortBySimilarityScore;
|
||||
pub use sort_by_static_fast_value::SortByStaticFastValue;
|
||||
|
||||
168
src/collector/sort_key/sort_by_bytes.rs
Normal file
168
src/collector/sort_key/sort_by_bytes.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
use columnar::BytesColumn;
|
||||
|
||||
use crate::collector::sort_key::NaturalComparator;
|
||||
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer};
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::{DocId, Score};
|
||||
|
||||
/// Sort by the first value of a bytes column.
|
||||
///
|
||||
/// If the field is multivalued, only the first value is considered.
|
||||
///
|
||||
/// Documents that do not have this value are still considered.
|
||||
/// Their sort key will simply be `None`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SortByBytes {
|
||||
column_name: String,
|
||||
}
|
||||
|
||||
impl SortByBytes {
|
||||
/// Creates a new sort by bytes sort key computer.
|
||||
pub fn for_field(column_name: impl ToString) -> Self {
|
||||
SortByBytes {
|
||||
column_name: column_name.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SortKeyComputer for SortByBytes {
|
||||
type SortKey = Option<Vec<u8>>;
|
||||
type Child = ByBytesColumnSegmentSortKeyComputer;
|
||||
type Comparator = NaturalComparator;
|
||||
|
||||
fn segment_sort_key_computer(
|
||||
&self,
|
||||
segment_reader: &crate::SegmentReader,
|
||||
) -> crate::Result<Self::Child> {
|
||||
let bytes_column_opt = segment_reader.fast_fields().bytes(&self.column_name)?;
|
||||
Ok(ByBytesColumnSegmentSortKeyComputer { bytes_column_opt })
|
||||
}
|
||||
}
|
||||
|
||||
/// Segment-level sort key computer for bytes columns.
|
||||
pub struct ByBytesColumnSegmentSortKeyComputer {
|
||||
bytes_column_opt: Option<BytesColumn>,
|
||||
}
|
||||
|
||||
impl SegmentSortKeyComputer for ByBytesColumnSegmentSortKeyComputer {
|
||||
type SortKey = Option<Vec<u8>>;
|
||||
type SegmentSortKey = Option<TermOrdinal>;
|
||||
type SegmentComparator = NaturalComparator;
|
||||
|
||||
#[inline(always)]
|
||||
fn segment_sort_key(&mut self, doc: DocId, _score: Score) -> Option<TermOrdinal> {
|
||||
let bytes_column = self.bytes_column_opt.as_ref()?;
|
||||
bytes_column.ords().first(doc)
|
||||
}
|
||||
|
||||
fn convert_segment_sort_key(&self, term_ord_opt: Option<TermOrdinal>) -> Option<Vec<u8>> {
|
||||
// TODO: Individual lookups to the dictionary like this are very likely to repeatedly
|
||||
// decompress the same blocks. See https://github.com/quickwit-oss/tantivy/issues/2776
|
||||
let term_ord = term_ord_opt?;
|
||||
let bytes_column = self.bytes_column_opt.as_ref()?;
|
||||
let mut bytes = Vec::new();
|
||||
bytes_column
|
||||
.dictionary()
|
||||
.ord_to_term(term_ord, &mut bytes)
|
||||
.ok()?;
|
||||
Some(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::SortByBytes;
|
||||
use crate::collector::TopDocs;
|
||||
use crate::query::AllQuery;
|
||||
use crate::schema::{BytesOptions, Schema, FAST, INDEXED};
|
||||
use crate::{Index, IndexWriter, Order, TantivyDocument};
|
||||
|
||||
#[test]
|
||||
fn test_sort_by_bytes_asc() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let bytes_field = schema_builder
|
||||
.add_bytes_field("data", BytesOptions::default().set_fast().set_indexed());
|
||||
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer: IndexWriter = index.writer_for_tests()?;
|
||||
|
||||
// Insert documents with byte values in non-sorted order
|
||||
let test_data: Vec<(u64, Vec<u8>)> = vec![
|
||||
(1, vec![0x02, 0x00]),
|
||||
(2, vec![0x00, 0x10]),
|
||||
(3, vec![0x01, 0x00]),
|
||||
(4, vec![0x00, 0x20]),
|
||||
];
|
||||
|
||||
for (id, bytes) in &test_data {
|
||||
let mut doc = TantivyDocument::new();
|
||||
doc.add_u64(id_field, *id);
|
||||
doc.add_bytes(bytes_field, bytes);
|
||||
index_writer.add_document(doc)?;
|
||||
}
|
||||
index_writer.commit()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
// Sort ascending by bytes
|
||||
let top_docs =
|
||||
TopDocs::with_limit(10).order_by((SortByBytes::for_field("data"), Order::Asc));
|
||||
let results: Vec<(Option<Vec<u8>>, _)> = searcher.search(&AllQuery, &top_docs)?;
|
||||
|
||||
// Expected order: [0x00,0x10], [0x00,0x20], [0x01,0x00], [0x02,0x00]
|
||||
let sorted_bytes: Vec<Option<Vec<u8>>> = results.into_iter().map(|(b, _)| b).collect();
|
||||
assert_eq!(
|
||||
sorted_bytes,
|
||||
vec![
|
||||
Some(vec![0x00, 0x10]),
|
||||
Some(vec![0x00, 0x20]),
|
||||
Some(vec![0x01, 0x00]),
|
||||
Some(vec![0x02, 0x00]),
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sort_by_bytes_desc() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let bytes_field = schema_builder
|
||||
.add_bytes_field("data", BytesOptions::default().set_fast().set_indexed());
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut index_writer: IndexWriter = index.writer_for_tests()?;
|
||||
|
||||
let test_data: Vec<Vec<u8>> = vec![vec![0x00, 0x10], vec![0x02, 0x00], vec![0x01, 0x00]];
|
||||
|
||||
for bytes in &test_data {
|
||||
let mut doc = TantivyDocument::new();
|
||||
doc.add_bytes(bytes_field, bytes);
|
||||
index_writer.add_document(doc)?;
|
||||
}
|
||||
index_writer.commit()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
// Sort descending by bytes
|
||||
let top_docs =
|
||||
TopDocs::with_limit(10).order_by((SortByBytes::for_field("data"), Order::Desc));
|
||||
let results: Vec<(Option<Vec<u8>>, _)> = searcher.search(&AllQuery, &top_docs)?;
|
||||
|
||||
// Expected order (descending): [0x02,0x00], [0x01,0x00], [0x00,0x10]
|
||||
let sorted_bytes: Vec<Option<Vec<u8>>> = results.into_iter().map(|(b, _)| b).collect();
|
||||
assert_eq!(
|
||||
sorted_bytes,
|
||||
vec![
|
||||
Some(vec![0x02, 0x00]),
|
||||
Some(vec![0x01, 0x00]),
|
||||
Some(vec![0x00, 0x10]),
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use columnar::{ColumnType, MonotonicallyMappableToU64};
|
||||
|
||||
use crate::collector::sort_key::{
|
||||
NaturalComparator, SortBySimilarityScore, SortByStaticFastValue, SortByString,
|
||||
NaturalComparator, SortByBytes, SortBySimilarityScore, SortByStaticFastValue, SortByString,
|
||||
};
|
||||
use crate::collector::{SegmentSortKeyComputer, SortKeyComputer};
|
||||
use crate::fastfield::FastFieldNotAvailableError;
|
||||
@@ -114,6 +114,16 @@ impl SortKeyComputer for SortByErasedType {
|
||||
},
|
||||
})
|
||||
}
|
||||
ColumnType::Bytes => {
|
||||
let computer = SortByBytes::for_field(column_name);
|
||||
let inner = computer.segment_sort_key_computer(segment_reader)?;
|
||||
Box::new(ErasedSegmentSortKeyComputerWrapper {
|
||||
inner,
|
||||
converter: |val: Option<Vec<u8>>| {
|
||||
val.map(OwnedValue::Bytes).unwrap_or(OwnedValue::Null)
|
||||
},
|
||||
})
|
||||
}
|
||||
ColumnType::U64 => {
|
||||
let computer = SortByStaticFastValue::<u64>::for_field(column_name);
|
||||
let inner = computer.segment_sort_key_computer(segment_reader)?;
|
||||
@@ -281,6 +291,65 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sort_by_owned_bytes() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let data_field = schema_builder.add_bytes_field("data", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer
|
||||
.add_document(doc!(data_field => vec![0x03u8, 0x00]))
|
||||
.unwrap();
|
||||
writer
|
||||
.add_document(doc!(data_field => vec![0x01u8, 0x00]))
|
||||
.unwrap();
|
||||
writer
|
||||
.add_document(doc!(data_field => vec![0x02u8, 0x00]))
|
||||
.unwrap();
|
||||
writer.add_document(doc!()).unwrap();
|
||||
writer.commit().unwrap();
|
||||
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
|
||||
// Sort descending (Natural - highest first)
|
||||
let collector = TopDocs::with_limit(10)
|
||||
.order_by((SortByErasedType::for_field("data"), ComparatorEnum::Natural));
|
||||
let top_docs = searcher.search(&AllQuery, &collector).unwrap();
|
||||
|
||||
let values: Vec<OwnedValue> = top_docs.into_iter().map(|(key, _)| key).collect();
|
||||
|
||||
assert_eq!(
|
||||
values,
|
||||
vec![
|
||||
OwnedValue::Bytes(vec![0x03, 0x00]),
|
||||
OwnedValue::Bytes(vec![0x02, 0x00]),
|
||||
OwnedValue::Bytes(vec![0x01, 0x00]),
|
||||
OwnedValue::Null
|
||||
]
|
||||
);
|
||||
|
||||
// Sort ascending (ReverseNoneLower - lowest first, nulls last)
|
||||
let collector = TopDocs::with_limit(10).order_by((
|
||||
SortByErasedType::for_field("data"),
|
||||
ComparatorEnum::ReverseNoneLower,
|
||||
));
|
||||
let top_docs = searcher.search(&AllQuery, &collector).unwrap();
|
||||
|
||||
let values: Vec<OwnedValue> = top_docs.into_iter().map(|(key, _)| key).collect();
|
||||
|
||||
assert_eq!(
|
||||
values,
|
||||
vec![
|
||||
OwnedValue::Bytes(vec![0x01, 0x00]),
|
||||
OwnedValue::Bytes(vec![0x02, 0x00]),
|
||||
OwnedValue::Bytes(vec![0x03, 0x00]),
|
||||
OwnedValue::Null
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sort_by_owned_reverse() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -291,18 +291,6 @@ impl<TScoreCombiner: ScoreCombiner> BooleanWeight<TScoreCombiner> {
|
||||
}
|
||||
};
|
||||
|
||||
let exclude_scorer_opt: Option<Box<dyn Scorer>> = if exclude_scorers.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let exclude_specialized_scorer: SpecializedScorer =
|
||||
scorer_union(exclude_scorers, DoNothingCombiner::default, num_docs);
|
||||
Some(into_box_scorer(
|
||||
exclude_specialized_scorer,
|
||||
DoNothingCombiner::default,
|
||||
num_docs,
|
||||
))
|
||||
};
|
||||
|
||||
let include_scorer = match (should_scorers, must_scorers) {
|
||||
(ShouldScorersCombinationMethod::Ignored, must_scorers) => {
|
||||
// No SHOULD clauses (or they were absorbed into MUST).
|
||||
@@ -380,16 +368,23 @@ impl<TScoreCombiner: ScoreCombiner> BooleanWeight<TScoreCombiner> {
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(exclude_scorer) = exclude_scorer_opt {
|
||||
let include_scorer_boxed =
|
||||
into_box_scorer(include_scorer, &score_combiner_fn, num_docs);
|
||||
Ok(SpecializedScorer::Other(Box::new(Exclude::new(
|
||||
include_scorer_boxed,
|
||||
exclude_scorer,
|
||||
))))
|
||||
} else {
|
||||
Ok(include_scorer)
|
||||
if exclude_scorers.is_empty() {
|
||||
return Ok(include_scorer);
|
||||
}
|
||||
|
||||
let include_scorer_boxed = into_box_scorer(include_scorer, &score_combiner_fn, num_docs);
|
||||
let scorer: Box<dyn Scorer> = if exclude_scorers.len() == 1 {
|
||||
let exclude_scorer = exclude_scorers.pop().unwrap();
|
||||
match exclude_scorer.downcast::<TermScorer>() {
|
||||
// Cast to TermScorer succeeded
|
||||
Ok(exclude_scorer) => Box::new(Exclude::new(include_scorer_boxed, *exclude_scorer)),
|
||||
// We get back the original Box<dyn Scorer>
|
||||
Err(exclude_scorer) => Box::new(Exclude::new(include_scorer_boxed, exclude_scorer)),
|
||||
}
|
||||
} else {
|
||||
Box::new(Exclude::new(include_scorer_boxed, exclude_scorers))
|
||||
};
|
||||
Ok(SpecializedScorer::Other(scorer))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,48 +1,71 @@
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::docset::{DocSet, SeekDangerResult, TERMINATED};
|
||||
use crate::query::Scorer;
|
||||
use crate::{DocId, Score};
|
||||
|
||||
#[inline]
|
||||
fn is_within<TDocSetExclude: DocSet>(docset: &mut TDocSetExclude, doc: DocId) -> bool {
|
||||
docset.doc() <= doc && docset.seek(doc) == doc
|
||||
}
|
||||
|
||||
/// Filters a given `DocSet` by removing the docs from a given `DocSet`.
|
||||
/// An exclusion set is a set of documents
|
||||
/// that should be excluded from a given DocSet.
|
||||
///
|
||||
/// The excluding docset has no impact on scoring.
|
||||
pub struct Exclude<TDocSet, TDocSetExclude> {
|
||||
underlying_docset: TDocSet,
|
||||
excluding_docset: TDocSetExclude,
|
||||
/// It can be a single DocSet, or a Vec of DocSets.
|
||||
pub trait ExclusionSet: Send {
|
||||
/// Returns `true` if the given `doc` is in the exclusion set.
|
||||
fn contains(&mut self, doc: DocId) -> bool;
|
||||
}
|
||||
|
||||
impl<TDocSet, TDocSetExclude> Exclude<TDocSet, TDocSetExclude>
|
||||
impl<TDocSet: DocSet> ExclusionSet for TDocSet {
|
||||
#[inline]
|
||||
fn contains(&mut self, doc: DocId) -> bool {
|
||||
self.seek_danger(doc) == SeekDangerResult::Found
|
||||
}
|
||||
}
|
||||
|
||||
impl<TDocSet: DocSet> ExclusionSet for Vec<TDocSet> {
|
||||
#[inline]
|
||||
fn contains(&mut self, doc: DocId) -> bool {
|
||||
for docset in self.iter_mut() {
|
||||
if docset.seek_danger(doc) == SeekDangerResult::Found {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Filters a given `DocSet` by removing the docs from an exclusion set.
|
||||
///
|
||||
/// The excluding docsets have no impact on scoring.
|
||||
pub struct Exclude<TDocSet, TExclusionSet> {
|
||||
underlying_docset: TDocSet,
|
||||
exclusion_set: TExclusionSet,
|
||||
}
|
||||
|
||||
impl<TDocSet, TExclusionSet> Exclude<TDocSet, TExclusionSet>
|
||||
where
|
||||
TDocSet: DocSet,
|
||||
TDocSetExclude: DocSet,
|
||||
TExclusionSet: ExclusionSet,
|
||||
{
|
||||
/// Creates a new `ExcludeScorer`
|
||||
pub fn new(
|
||||
mut underlying_docset: TDocSet,
|
||||
mut excluding_docset: TDocSetExclude,
|
||||
) -> Exclude<TDocSet, TDocSetExclude> {
|
||||
mut exclusion_set: TExclusionSet,
|
||||
) -> Exclude<TDocSet, TExclusionSet> {
|
||||
while underlying_docset.doc() != TERMINATED {
|
||||
let target = underlying_docset.doc();
|
||||
if !is_within(&mut excluding_docset, target) {
|
||||
if !exclusion_set.contains(target) {
|
||||
break;
|
||||
}
|
||||
underlying_docset.advance();
|
||||
}
|
||||
Exclude {
|
||||
underlying_docset,
|
||||
excluding_docset,
|
||||
exclusion_set,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TDocSet, TDocSetExclude> DocSet for Exclude<TDocSet, TDocSetExclude>
|
||||
impl<TDocSet, TExclusionSet> DocSet for Exclude<TDocSet, TExclusionSet>
|
||||
where
|
||||
TDocSet: DocSet,
|
||||
TDocSetExclude: DocSet,
|
||||
TExclusionSet: ExclusionSet,
|
||||
{
|
||||
fn advance(&mut self) -> DocId {
|
||||
loop {
|
||||
@@ -50,7 +73,7 @@ where
|
||||
if candidate == TERMINATED {
|
||||
return TERMINATED;
|
||||
}
|
||||
if !is_within(&mut self.excluding_docset, candidate) {
|
||||
if !self.exclusion_set.contains(candidate) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
@@ -61,7 +84,7 @@ where
|
||||
if candidate == TERMINATED {
|
||||
return TERMINATED;
|
||||
}
|
||||
if !is_within(&mut self.excluding_docset, candidate) {
|
||||
if !self.exclusion_set.contains(candidate) {
|
||||
return candidate;
|
||||
}
|
||||
self.advance()
|
||||
@@ -79,10 +102,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<TScorer, TDocSetExclude> Scorer for Exclude<TScorer, TDocSetExclude>
|
||||
impl<TScorer, TExclusionSet> Scorer for Exclude<TScorer, TExclusionSet>
|
||||
where
|
||||
TScorer: Scorer,
|
||||
TDocSetExclude: DocSet + 'static,
|
||||
TExclusionSet: ExclusionSet + 'static,
|
||||
{
|
||||
#[inline]
|
||||
fn score(&mut self) -> Score {
|
||||
|
||||
@@ -43,7 +43,7 @@ pub use self::boost_query::{BoostQuery, BoostWeight};
|
||||
pub use self::const_score_query::{ConstScoreQuery, ConstScorer};
|
||||
pub use self::disjunction_max_query::DisjunctionMaxQuery;
|
||||
pub use self::empty_query::{EmptyQuery, EmptyScorer, EmptyWeight};
|
||||
pub use self::exclude::Exclude;
|
||||
pub use self::exclude::{Exclude, ExclusionSet};
|
||||
pub use self::exist_query::ExistsQuery;
|
||||
pub use self::explanation::Explanation;
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -2068,6 +2068,16 @@ mod test {
|
||||
format!("Regex(Field(0), {:#?})", expected_regex).as_str(),
|
||||
false,
|
||||
);
|
||||
let expected_regex2 = tantivy_fst::Regex::new(r".*a").unwrap();
|
||||
test_parse_query_to_logical_ast_helper(
|
||||
"title:(/.*b/ OR /.*a/)",
|
||||
format!(
|
||||
"(Regex(Field(0), {:#?}) Regex(Field(0), {:#?}))",
|
||||
expected_regex, expected_regex2
|
||||
)
|
||||
.as_str(),
|
||||
false,
|
||||
);
|
||||
|
||||
// Invalid field
|
||||
let err = parse_query_to_logical_ast("float:/.*b/", false).unwrap_err();
|
||||
|
||||
@@ -19,7 +19,8 @@ pub(crate) fn is_type_valid_for_fastfield_range_query(typ: Type) -> bool {
|
||||
| Type::Bool
|
||||
| Type::Date
|
||||
| Type::Json
|
||||
| Type::IpAddr => true,
|
||||
Type::Facet | Type::Bytes => false,
|
||||
| Type::IpAddr
|
||||
| Type::Bytes => true,
|
||||
Type::Facet => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,8 @@ use std::net::Ipv6Addr;
|
||||
use std::ops::{Bound, RangeInclusive};
|
||||
|
||||
use columnar::{
|
||||
Cardinality, Column, ColumnType, MonotonicallyMappableToU128, MonotonicallyMappableToU64,
|
||||
NumericalType, StrColumn,
|
||||
BytesColumn, Cardinality, Column, ColumnType, MonotonicallyMappableToU128,
|
||||
MonotonicallyMappableToU64, NumericalType, StrColumn,
|
||||
};
|
||||
use common::bounds::{BoundsRange, TransformBound};
|
||||
|
||||
@@ -163,6 +163,25 @@ impl Weight for FastFieldRangeWeight {
|
||||
};
|
||||
let dict = str_dict_column.dictionary();
|
||||
|
||||
let bounds = self.bounds.map_bound(get_value_bytes);
|
||||
// Get term ids for terms
|
||||
let (lower_bound, upper_bound) =
|
||||
dict.term_bounds_to_ord(bounds.lower_bound, bounds.upper_bound)?;
|
||||
let fast_field_reader = reader.fast_fields();
|
||||
let Some((column, _col_type)) =
|
||||
fast_field_reader.u64_lenient_for_type(None, &field_name)?
|
||||
else {
|
||||
return Ok(Box::new(EmptyScorer));
|
||||
};
|
||||
search_on_u64_ff(column, boost, BoundsRange::new(lower_bound, upper_bound))
|
||||
} else if field_type.is_bytes() {
|
||||
let Some(bytes_column): Option<BytesColumn> =
|
||||
reader.fast_fields().bytes(&field_name)?
|
||||
else {
|
||||
return Ok(Box::new(EmptyScorer));
|
||||
};
|
||||
let dict = bytes_column.dictionary();
|
||||
|
||||
let bounds = self.bounds.map_bound(get_value_bytes);
|
||||
// Get term ids for terms
|
||||
let (lower_bound, upper_bound) =
|
||||
@@ -1402,6 +1421,66 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_field_ff_range_query() -> crate::Result<()> {
|
||||
use crate::schema::BytesOptions;
|
||||
|
||||
let mut schema_builder = Schema::builder();
|
||||
let bytes_field = schema_builder
|
||||
.add_bytes_field("data", BytesOptions::default().set_fast().set_indexed());
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
let mut index_writer: IndexWriter = index.writer_for_tests()?;
|
||||
|
||||
// Insert documents with lexicographically sortable byte values
|
||||
// Using simple byte sequences that have clear ordering
|
||||
let values: Vec<Vec<u8>> = vec![
|
||||
vec![0x00, 0x10],
|
||||
vec![0x00, 0x20],
|
||||
vec![0x00, 0x30],
|
||||
vec![0x01, 0x00],
|
||||
vec![0x01, 0x10],
|
||||
vec![0x02, 0x00],
|
||||
];
|
||||
|
||||
for value in &values {
|
||||
let mut doc = TantivyDocument::new();
|
||||
doc.add_bytes(bytes_field, value);
|
||||
index_writer.add_document(doc)?;
|
||||
}
|
||||
index_writer.commit()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
// Test: Range query [0x00, 0x20] to [0x01, 0x00] (inclusive)
|
||||
// Should match: [0x00, 0x20], [0x00, 0x30], [0x01, 0x00]
|
||||
let lower = Term::from_field_bytes(bytes_field, &[0x00, 0x20]);
|
||||
let upper = Term::from_field_bytes(bytes_field, &[0x01, 0x00]);
|
||||
let range_query = RangeQuery::new(Bound::Included(lower), Bound::Included(upper));
|
||||
let count = searcher.search(&range_query, &Count)?;
|
||||
assert_eq!(
|
||||
count, 3,
|
||||
"Expected 3 documents in range [0x00,0x20] to [0x01,0x00]"
|
||||
);
|
||||
|
||||
// Test: Range query > [0x01, 0x00] (exclusive lower bound)
|
||||
// Should match: [0x01, 0x10], [0x02, 0x00]
|
||||
let lower = Term::from_field_bytes(bytes_field, &[0x01, 0x00]);
|
||||
let range_query = RangeQuery::new(Bound::Excluded(lower), Bound::Unbounded);
|
||||
let count = searcher.search(&range_query, &Count)?;
|
||||
assert_eq!(count, 2, "Expected 2 documents > [0x01,0x00]");
|
||||
|
||||
// Test: Range query < [0x00, 0x30] (exclusive upper bound)
|
||||
// Should match: [0x00, 0x10], [0x00, 0x20]
|
||||
let upper = Term::from_field_bytes(bytes_field, &[0x00, 0x30]);
|
||||
let range_query = RangeQuery::new(Bound::Unbounded, Bound::Excluded(upper));
|
||||
let count = searcher.search(&range_query, &Count)?;
|
||||
assert_eq!(count, 2, "Expected 2 documents < [0x00,0x30]");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -223,6 +223,11 @@ impl FieldType {
|
||||
matches!(self, FieldType::Str(_))
|
||||
}
|
||||
|
||||
/// returns true if this is a bytes field
|
||||
pub fn is_bytes(&self) -> bool {
|
||||
matches!(self, FieldType::Bytes(_))
|
||||
}
|
||||
|
||||
/// returns true if this is an date field
|
||||
pub fn is_date(&self) -> bool {
|
||||
matches!(self, FieldType::Date(_))
|
||||
|
||||
Reference in New Issue
Block a user