mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 21:12:54 +00:00
Compare commits
1 Commits
issue/1251
...
warming
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4e2d2e40e |
2
.github/workflows/long_running.yml
vendored
2
.github/workflows/long_running.yml
vendored
@@ -1,4 +1,4 @@
|
||||
name: Long running tests
|
||||
name: Rust
|
||||
|
||||
on:
|
||||
push:
|
||||
|
||||
20
.github/workflows/test.yml
vendored
20
.github/workflows/test.yml
vendored
@@ -1,4 +1,4 @@
|
||||
name: Unit tests
|
||||
name: Rust
|
||||
|
||||
on:
|
||||
push:
|
||||
@@ -18,25 +18,13 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Build
|
||||
run: cargo build --verbose --workspace
|
||||
- name: Install latest nightly to test also against unstable feature flag
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: nightly
|
||||
override: true
|
||||
components: rustfmt
|
||||
- name: Install latest nightly to test also against unstable feature flag
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
override: true
|
||||
components: rustfmt, clippy
|
||||
components: rustfmt
|
||||
- name: Run tests
|
||||
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
|
||||
run: cargo test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
|
||||
- name: Check Formatting
|
||||
run: cargo +nightly fmt --all -- --check
|
||||
- uses: actions-rs/clippy-check@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
args: --tests
|
||||
|
||||
run: cargo fmt --all -- --check
|
||||
|
||||
14
CHANGELOG.md
14
CHANGELOG.md
@@ -1,14 +1,12 @@
|
||||
Tantivy 0.17
|
||||
================================
|
||||
- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar @fulmicoton) [#115](https://github.com/quickwit-oss/tantivy/issues/115)
|
||||
- Adds a searcher Warmer API (@shikhar @fulmicoton)
|
||||
- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar) [#115](https://github.com/quickwit-inc/tantivy/issues/115)
|
||||
- Adds a searcher Warmer API (@shikhar)
|
||||
- Change to non-strict schema. Ignore fields in data which are not defined in schema. Previously this returned an error. #1211
|
||||
- Facets are necessarily indexed. Existing index with indexed facets should work out of the box. Index without facets that are marked with index: false should be broken (but they were already broken in a sense). (@fulmicoton) #1195 .
|
||||
- Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-oss/tantivy/issues/1224)
|
||||
- Schema now offers not indexing fieldnorms (@lpouget) [#922](https://github.com/quickwit-oss/tantivy/issues/922)
|
||||
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-oss/tantivy/issues/1225)
|
||||
- Fix opening bytes index with dynamic codec (@PSeitz) [#1278](https://github.com/quickwit-oss/tantivy/issues/1278)
|
||||
- Added an aggregation collector compatible with Elasticsearch (@PSeitz)
|
||||
- Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224)
|
||||
- Schema now offers not indexing fieldnorms (@lpouget) [#922](https://github.com/quickwit-inc/tantivy/issues/922)
|
||||
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225)
|
||||
|
||||
Tantivy 0.16.2
|
||||
================================
|
||||
@@ -130,7 +128,7 @@ Tantivy 0.12.0
|
||||
## How to update?
|
||||
|
||||
Crates relying on custom tokenizer, or registering tokenizer in the manager will require some
|
||||
minor changes. Check https://github.com/quickwit-oss/tantivy/blob/main/examples/custom_tokenizer.rs
|
||||
minor changes. Check https://github.com/quickwit-inc/tantivy/blob/main/examples/custom_tokenizer.rs
|
||||
to check for some code sample.
|
||||
|
||||
Tantivy 0.11.3
|
||||
|
||||
16
Cargo.toml
16
Cargo.toml
@@ -6,8 +6,8 @@ license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
description = """Search engine library"""
|
||||
documentation = "https://docs.rs/tantivy/"
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
homepage = "https://github.com/quickwit-inc/tantivy"
|
||||
repository = "https://github.com/quickwit-inc/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2018"
|
||||
@@ -52,10 +52,9 @@ chrono = "0.4.19"
|
||||
smallvec = "1.6.1"
|
||||
rayon = "1.5"
|
||||
lru = "0.7.0"
|
||||
fastdivide = "0.4"
|
||||
fastdivide = "0.3"
|
||||
itertools = "0.10.0"
|
||||
measure_time = "0.8.0"
|
||||
pretty_assertions = "1.1.0"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = "0.3.9"
|
||||
@@ -68,7 +67,6 @@ proptest = "1.0"
|
||||
criterion = "0.3.5"
|
||||
test-log = "0.2.8"
|
||||
env_logger = "0.9.0"
|
||||
pprof = {version= "0.6", features=["flamegraph", "criterion"]}
|
||||
|
||||
[dev-dependencies.fail]
|
||||
version = "0.5"
|
||||
@@ -97,6 +95,9 @@ unstable = [] # useful for benches.
|
||||
[workspace]
|
||||
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes"]
|
||||
|
||||
[badges]
|
||||
travis-ci = { repository = "tantivy-search/tantivy" }
|
||||
|
||||
# Following the "fail" crate best practises, we isolate
|
||||
# tests that define specific behavior in fail check points
|
||||
# in a different binary.
|
||||
@@ -112,8 +113,3 @@ required-features = ["fail/failpoints"]
|
||||
[[bench]]
|
||||
name = "analyzer"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "index-bench"
|
||||
harness = false
|
||||
|
||||
|
||||
3
Makefile
3
Makefile
@@ -1,6 +1,3 @@
|
||||
test:
|
||||
echo "Run test only... No examples."
|
||||
cargo test --tests --lib
|
||||
|
||||
fmt:
|
||||
cargo +nightly fmt --all
|
||||
|
||||
25
README.md
25
README.md
@@ -1,13 +1,22 @@
|
||||
|
||||
[](https://docs.rs/crate/tantivy/)
|
||||
[](https://github.com/quickwit-oss/tantivy/actions/workflows/test.yml)
|
||||
[](https://codecov.io/gh/quickwit-oss/tantivy)
|
||||
[](https://github.com/quickwit-inc/tantivy/actions/workflows/test.yml)
|
||||
[](https://codecov.io/gh/quickwit-inc/tantivy)
|
||||
[](https://discord.gg/MT27AG5EVE)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||
[](https://crates.io/crates/tantivy)
|
||||
|
||||

|
||||
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/0)
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/1)
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/2)
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/3)
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/4)
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/5)
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/6)
|
||||
[](https://sourcerer.io/fame/fulmicoton/tantivy-search/tantivy/links/7)
|
||||
|
||||
**Tantivy** is a **full text search engine library** written in Rust.
|
||||
|
||||
It is closer to [Apache Lucene](https://lucene.apache.org/) than to [Elasticsearch](https://www.elastic.co/products/elasticsearch) or [Apache Solr](https://lucene.apache.org/solr/) in the sense it is not
|
||||
@@ -18,7 +27,7 @@ Tantivy is, in fact, strongly inspired by Lucene's design.
|
||||
|
||||
# Benchmark
|
||||
|
||||
The following [benchmark](https://tantivy-search.github.io/bench/) break downs
|
||||
The following [benchmark](https://tantivy-search.github.io/bench/) break downs
|
||||
performance for different type of queries / collection.
|
||||
|
||||
Your mileage WILL vary depending on the nature of queries and their load.
|
||||
@@ -26,7 +35,7 @@ Your mileage WILL vary depending on the nature of queries and their load.
|
||||
# Features
|
||||
|
||||
- Full-text search
|
||||
- Configurable tokenizer (stemming available for 17 Latin languages with third party support for Chinese ([tantivy-jieba](https://crates.io/crates/tantivy-jieba) and [cang-jie](https://crates.io/crates/cang-jie)), Japanese ([lindera](https://github.com/lindera-morphology/lindera-tantivy), [Vaporetto](https://crates.io/crates/vaporetto_tantivy), and [tantivy-tokenizer-tiny-segmenter](https://crates.io/crates/tantivy-tokenizer-tiny-segmenter)) and Korean ([lindera](https://github.com/lindera-morphology/lindera-tantivy) + [lindera-ko-dic-builder](https://github.com/lindera-morphology/lindera-ko-dic-builder))
|
||||
- Configurable tokenizer (stemming available for 17 Latin languages with third party support for Chinese ([tantivy-jieba](https://crates.io/crates/tantivy-jieba) and [cang-jie](https://crates.io/crates/cang-jie)), Japanese ([lindera](https://github.com/lindera-morphology/lindera-tantivy) and [tantivy-tokenizer-tiny-segmenter](https://crates.io/crates/tantivy-tokenizer-tiny-segmenter)) and Korean ([lindera](https://github.com/lindera-morphology/lindera-tantivy) + [lindera-ko-dic-builder](https://github.com/lindera-morphology/lindera-ko-dic-builder))
|
||||
- Fast (check out the :racehorse: :sparkles: [benchmark](https://tantivy-search.github.io/bench/) :sparkles: :racehorse:)
|
||||
- Tiny startup time (<10ms), perfect for command line tools
|
||||
- BM25 scoring (the same as Lucene)
|
||||
@@ -48,7 +57,7 @@ Your mileage WILL vary depending on the nature of queries and their load.
|
||||
## Non-features
|
||||
|
||||
- Distributed search is out of the scope of Tantivy. That being said, Tantivy is a
|
||||
library upon which one could build a distributed search. Serializable/mergeable collector state for instance,
|
||||
library upon which one could build a distributed search. Serializable/mergeable collector state for instance,
|
||||
are within the scope of Tantivy.
|
||||
|
||||
|
||||
@@ -57,14 +66,14 @@ are within the scope of Tantivy.
|
||||
Tantivy works on stable Rust (>= 1.27) and supports Linux, MacOS, and Windows.
|
||||
|
||||
- [Tantivy's simple search example](https://tantivy-search.github.io/examples/basic_search.html)
|
||||
- [tantivy-cli and its tutorial](https://github.com/quickwit-oss/tantivy-cli) - `tantivy-cli` is an actual command line interface that makes it easy for you to create a search engine,
|
||||
- [tantivy-cli and its tutorial](https://github.com/tantivy-search/tantivy-cli) - `tantivy-cli` is an actual command line interface that makes it easy for you to create a search engine,
|
||||
index documents, and search via the CLI or a small server with a REST API.
|
||||
It walks you through getting a wikipedia search engine up and running in a few minutes.
|
||||
- [Reference doc for the last released version](https://docs.rs/tantivy/)
|
||||
|
||||
# How can I support this project?
|
||||
|
||||
There are many ways to support this project.
|
||||
There are many ways to support this project.
|
||||
|
||||
- Use Tantivy and tell us about your experience on [Discord](https://discord.gg/MT27AG5EVE) or by email (paul.masurel@gmail.com)
|
||||
- Report bugs
|
||||
@@ -83,7 +92,7 @@ Tantivy compiles on stable Rust but requires `Rust >= 1.27`.
|
||||
To check out and run tests, you can simply run:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/quickwit-oss/tantivy.git
|
||||
git clone https://github.com/quickwit-inc/tantivy.git
|
||||
cd tantivy
|
||||
cargo build
|
||||
```
|
||||
|
||||
100000
benches/hdfs.json
100000
benches/hdfs.json
File diff suppressed because it is too large
Load Diff
@@ -1,79 +0,0 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use tantivy::schema::{INDEXED, STORED, STRING, TEXT};
|
||||
use tantivy::Index;
|
||||
|
||||
const HDFS_LOGS: &str = include_str!("hdfs.json");
|
||||
|
||||
pub fn hdfs_index_benchmark(c: &mut Criterion) {
|
||||
let schema = {
|
||||
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
|
||||
schema_builder.add_u64_field("timestamp", INDEXED);
|
||||
schema_builder.add_text_field("body", TEXT);
|
||||
schema_builder.add_text_field("severity", STRING);
|
||||
schema_builder.build()
|
||||
};
|
||||
let schema_with_store = {
|
||||
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
|
||||
schema_builder.add_u64_field("timestamp", INDEXED | STORED);
|
||||
schema_builder.add_text_field("body", TEXT | STORED);
|
||||
schema_builder.add_text_field("severity", STRING | STORED);
|
||||
schema_builder.build()
|
||||
};
|
||||
|
||||
let mut group = c.benchmark_group("index-hdfs");
|
||||
group.sample_size(20);
|
||||
group.bench_function("index-hdfs-no-commit", |b| {
|
||||
b.iter(|| {
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for _ in 0..10 {
|
||||
for doc_json in HDFS_LOGS.trim().split("\n") {
|
||||
let doc = schema.parse_document(doc_json).unwrap();
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
group.bench_function("index-hdfs-with-commit", |b| {
|
||||
b.iter(|| {
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for _ in 0..10 {
|
||||
for doc_json in HDFS_LOGS.trim().split("\n") {
|
||||
let doc = schema.parse_document(doc_json).unwrap();
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
})
|
||||
});
|
||||
group.bench_function("index-hdfs-no-commit-with-docstore", |b| {
|
||||
b.iter(|| {
|
||||
let index = Index::create_in_ram(schema_with_store.clone());
|
||||
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for doc_json in HDFS_LOGS.trim().split("\n") {
|
||||
let doc = schema.parse_document(doc_json).unwrap();
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
})
|
||||
});
|
||||
group.bench_function("index-hdfs-with-commit-with-docstore", |b| {
|
||||
b.iter(|| {
|
||||
let index = Index::create_in_ram(schema_with_store.clone());
|
||||
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
|
||||
for doc_json in HDFS_LOGS.trim().split("\n") {
|
||||
let doc = schema.parse_document(doc_json).unwrap();
|
||||
index_writer.add_document(doc).unwrap();
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group! {
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = hdfs_index_benchmark
|
||||
}
|
||||
criterion_main!(benches);
|
||||
@@ -6,7 +6,7 @@ authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = []
|
||||
description = """Tantivy-sub crate: bitpacking"""
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-inc/tantivy"
|
||||
keywords = []
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::convert::TryInto;
|
||||
use std::io;
|
||||
use std::{convert::TryInto, io};
|
||||
|
||||
pub struct BitPacker {
|
||||
mini_buffer: u64,
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use super::bitpacker::BitPacker;
|
||||
use super::compute_num_bits;
|
||||
use crate::{minmax, BitUnpacker};
|
||||
|
||||
use super::{bitpacker::BitPacker, compute_num_bits};
|
||||
|
||||
const BLOCK_SIZE: usize = 128;
|
||||
|
||||
/// `BlockedBitpacker` compresses data in blocks of
|
||||
/// 128 elements, while keeping an index on it
|
||||
///
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockedBitpacker {
|
||||
// bitpacked blocks
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
mod bitpacker;
|
||||
mod blocked_bitpacker;
|
||||
|
||||
pub use crate::bitpacker::{BitPacker, BitUnpacker};
|
||||
pub use crate::bitpacker::BitPacker;
|
||||
pub use crate::bitpacker::BitUnpacker;
|
||||
pub use crate::blocked_bitpacker::BlockedBitpacker;
|
||||
|
||||
/// Computes the number of bits that will be used for bitpacking.
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use ownedbytes::OwnedBytes;
|
||||
use std::convert::TryInto;
|
||||
use std::io::Write;
|
||||
use std::{fmt, io, u64};
|
||||
|
||||
use ownedbytes::OwnedBytes;
|
||||
use std::u64;
|
||||
use std::{fmt, io};
|
||||
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
pub struct TinySet(u64);
|
||||
@@ -187,6 +187,7 @@ fn num_buckets(max_val: u32) -> u32 {
|
||||
|
||||
impl BitSet {
|
||||
/// serialize a `BitSet`.
|
||||
///
|
||||
pub fn serialize<T: Write>(&self, writer: &mut T) -> io::Result<()> {
|
||||
writer.write_all(self.max_value.to_le_bytes().as_ref())?;
|
||||
for tinyset in self.tinysets.iter().cloned() {
|
||||
@@ -352,6 +353,7 @@ impl ReadOnlyBitSet {
|
||||
}
|
||||
|
||||
/// Iterate the tinyset on the fly from serialized data.
|
||||
///
|
||||
#[inline]
|
||||
fn iter_tinysets(&self) -> impl Iterator<Item = TinySet> + '_ {
|
||||
self.data.chunks_exact(8).map(move |chunk| {
|
||||
@@ -361,6 +363,7 @@ impl ReadOnlyBitSet {
|
||||
}
|
||||
|
||||
/// Iterate over the positions of the elements.
|
||||
///
|
||||
#[inline]
|
||||
pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
|
||||
self.iter_tinysets()
|
||||
@@ -412,14 +415,14 @@ impl<'a> From<&'a BitSet> for ReadOnlyBitSet {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::BitSet;
|
||||
use super::ReadOnlyBitSet;
|
||||
use super::TinySet;
|
||||
use ownedbytes::OwnedBytes;
|
||||
use rand::distributions::Bernoulli;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
|
||||
use super::{BitSet, ReadOnlyBitSet, TinySet};
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[test]
|
||||
fn test_read_serialized_bitset_full_multi() {
|
||||
@@ -440,7 +443,7 @@ mod tests {
|
||||
bitset.serialize(&mut out).unwrap();
|
||||
|
||||
let bitset = ReadOnlyBitSet::open(OwnedBytes::new(out));
|
||||
assert_eq!(bitset.len() as usize, 64);
|
||||
assert_eq!(bitset.len() as usize, 64 as usize);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -707,10 +710,10 @@ mod tests {
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use super::BitSet;
|
||||
use super::TinySet;
|
||||
use test;
|
||||
|
||||
use super::{BitSet, TinySet};
|
||||
|
||||
#[bench]
|
||||
fn bench_tinyset_pop(b: &mut test::Bencher) {
|
||||
b.iter(|| {
|
||||
|
||||
@@ -104,11 +104,10 @@ pub fn u64_to_f64(val: u64) -> f64 {
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
|
||||
use std::f64;
|
||||
|
||||
use super::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
|
||||
use super::{BinarySerializable, FixedSize};
|
||||
use proptest::prelude::*;
|
||||
|
||||
use super::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64, BinarySerializable, FixedSize};
|
||||
use std::f64;
|
||||
|
||||
fn test_i64_converter_helper(val: i64) {
|
||||
assert_eq!(u64_to_i64(i64_to_u64(val)), val);
|
||||
@@ -158,10 +157,10 @@ pub mod test {
|
||||
#[test]
|
||||
fn test_f64_order() {
|
||||
assert!(!(f64_to_u64(f64::NEG_INFINITY)..f64_to_u64(f64::INFINITY))
|
||||
.contains(&f64_to_u64(f64::NAN))); // nan is not a number
|
||||
assert!(f64_to_u64(1.5) > f64_to_u64(1.0)); // same exponent, different mantissa
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.0)); // same mantissa, different exponent
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.5)); // different exponent and mantissa
|
||||
.contains(&f64_to_u64(f64::NAN))); //nan is not a number
|
||||
assert!(f64_to_u64(1.5) > f64_to_u64(1.0)); //same exponent, different mantissa
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.0)); //same mantissa, different exponent
|
||||
assert!(f64_to_u64(2.0) > f64_to_u64(1.5)); //different exponent and mantissa
|
||||
assert!(f64_to_u64(1.0) > f64_to_u64(-1.0)); // pos > neg
|
||||
assert!(f64_to_u64(-1.5) < f64_to_u64(-1.0));
|
||||
assert!(f64_to_u64(-2.0) < f64_to_u64(1.0));
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use std::io::{Read, Write};
|
||||
use std::{fmt, io};
|
||||
|
||||
use crate::Endianness;
|
||||
use crate::VInt;
|
||||
use byteorder::{ReadBytesExt, WriteBytesExt};
|
||||
|
||||
use crate::{Endianness, VInt};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
|
||||
/// Trait for a simple binary serialization.
|
||||
pub trait BinarySerializable: fmt::Debug + Sized {
|
||||
@@ -201,7 +202,8 @@ impl BinarySerializable for String {
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
|
||||
use super::{VInt, *};
|
||||
use super::VInt;
|
||||
use super::*;
|
||||
use crate::serialize::BinarySerializable;
|
||||
pub fn fixed_size_test<O: BinarySerializable + FixedSize + Default>() {
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use std::io;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
|
||||
use super::BinarySerializable;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
|
||||
/// Wrapper over a `u64` that serializes as a variable int.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
@@ -175,7 +174,9 @@ impl BinarySerializable for VInt {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{serialize_vint_u32, BinarySerializable, VInt};
|
||||
use super::serialize_vint_u32;
|
||||
use super::BinarySerializable;
|
||||
use super::VInt;
|
||||
|
||||
fn aux_test_vint(val: u64) {
|
||||
let mut v = [14u8; 10];
|
||||
|
||||
@@ -54,8 +54,7 @@ impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Struct used to prevent from calling
|
||||
/// [`terminate_ref`](trait.TerminatingWrite.html#tymethod.terminate_ref) directly
|
||||
/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite.html#tymethod.terminate_ref) directly
|
||||
///
|
||||
/// The point is that while the type is public, it cannot be built by anyone
|
||||
/// outside of this module.
|
||||
@@ -65,7 +64,9 @@ pub struct AntiCallToken(());
|
||||
pub trait TerminatingWrite: Write {
|
||||
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
|
||||
fn terminate(mut self) -> io::Result<()>
|
||||
where Self: Sized {
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
self.terminate_ref(AntiCallToken(()))
|
||||
}
|
||||
|
||||
@@ -96,9 +97,8 @@ impl<'a> TerminatingWrite for &'a mut Vec<u8> {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use std::io::Write;
|
||||
|
||||
use super::CountingWriter;
|
||||
use std::io::Write;
|
||||
|
||||
#[test]
|
||||
fn test_counting_writer() {
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
- [Index Sorting](./index_sorting.md)
|
||||
- [Innerworkings](./innerworkings.md)
|
||||
- [Inverted index](./inverted_index.md)
|
||||
- [Json](./json.md)
|
||||
- [Best practise](./inverted_index.md)
|
||||
|
||||
[Frequently Asked Questions](./faq.md)
|
||||
|
||||
@@ -38,7 +38,7 @@ Note: Tantivy 0.16 does not do this optimization yet.
|
||||
In principle there are many algorithms possible that exploit the monotonically increasing nature. (aggregations maybe?)
|
||||
|
||||
## Usage
|
||||
The index sorting can be configured setting [`sort_by_field`](https://github.com/quickwit-oss/tantivy/blob/000d76b11a139a84b16b9b95060a1c93e8b9851c/src/core/index_meta.rs#L238) on `IndexSettings` and passing it to a `IndexBuilder`. As of tantvy 0.16 only fast fields are allowed to be used.
|
||||
The index sorting can be configured setting [`sort_by_field`](https://github.com/quickwit-inc/tantivy/blob/000d76b11a139a84b16b9b95060a1c93e8b9851c/src/core/index_meta.rs#L238) on `IndexSettings` and passing it to a `IndexBuilder`. As of tantvy 0.16 only fast fields are allowed to be used.
|
||||
|
||||
```
|
||||
let settings = IndexSettings {
|
||||
@@ -55,7 +55,7 @@ let index = index_builder.create_in_ram().unwrap();
|
||||
|
||||
## Implementation details
|
||||
|
||||
Sorting an index is applied in the serialization step. In general there are two serialization steps: [Finishing a single segment](https://github.com/quickwit-oss/tantivy/blob/000d76b11a139a84b16b9b95060a1c93e8b9851c/src/indexer/segment_writer.rs#L338) and [merging multiple segments](https://github.com/quickwit-oss/tantivy/blob/000d76b11a139a84b16b9b95060a1c93e8b9851c/src/indexer/merger.rs#L1073).
|
||||
Sorting an index is applied in the serialization step. In general there are two serialization steps: [Finishing a single segment](https://github.com/quickwit-inc/tantivy/blob/000d76b11a139a84b16b9b95060a1c93e8b9851c/src/indexer/segment_writer.rs#L338) and [merging multiple segments](https://github.com/quickwit-inc/tantivy/blob/000d76b11a139a84b16b9b95060a1c93e8b9851c/src/indexer/merger.rs#L1073).
|
||||
|
||||
In both cases we generate a docid mapping reflecting the sort. This mapping is used when serializing the different components (doc store, fastfields, posting list, normfield, facets).
|
||||
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
# Json
|
||||
|
||||
As of tantivy 0.17, tantivy supports a json object type.
|
||||
This type can be used to allow for a schema-less search index.
|
||||
|
||||
When indexing a json object, we "flatten" the JSON. This operation emits terms that represent a triplet `(json_path, value_type, value)`
|
||||
|
||||
For instance, if user is a json field, the following document:
|
||||
|
||||
```json
|
||||
{
|
||||
"user": {
|
||||
"name": "Paul Masurel",
|
||||
"address": {
|
||||
"city": "Tokyo",
|
||||
"country": "Japan"
|
||||
},
|
||||
"created_at": "2018-11-12T23:20:50.52Z"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
emits the following tokens:
|
||||
- ("name", Text, "Paul")
|
||||
- ("name", Text, "Masurel")
|
||||
- ("address.city", Text, "Tokyo")
|
||||
- ("address.country", Text, "Japan")
|
||||
- ("created_at", Date, 15420648505)
|
||||
|
||||
|
||||
# Bytes-encoding and lexicographical sort.
|
||||
|
||||
Like any other terms, these triplets are encoded into a binary format as follows.
|
||||
- `json_path`: the json path is a sequence of "segments". In the example above, `address.city`
|
||||
is just a debug representation of the json path `["address", "city"]`.
|
||||
Its representation is done by separating segments by a unicode char `\x01`, and ending the path by `\x00`.
|
||||
- `value type`: One byte represents the `Value` type.
|
||||
- `value`: The value representation is just the regular Value representation.
|
||||
|
||||
This representation is designed to align the natural sort of Terms with the lexicographical sort
|
||||
of their binary representation (Tantivy's dictionary (whether fst or sstable) is sorted and does prefix encoding).
|
||||
|
||||
In the example above, the terms will be sorted as
|
||||
- ("address.city", Text, "Tokyo")
|
||||
- ("address.country", Text, "Japan")
|
||||
- ("name", Text, "Masurel")
|
||||
- ("name", Text, "Paul")
|
||||
- ("created_at", Date, 15420648505)
|
||||
|
||||
As seen in "pitfalls", we may end up having to search for a value for a same path in several different fields. Putting the field code after the path makes it maximizes compression opportunities but also increases the chances for the two terms to end up in the actual same term dictionary block.
|
||||
|
||||
|
||||
# Pitfalls and limitation.
|
||||
|
||||
Json gives very little information about the type of the literals it stores.
|
||||
All numeric types end up mapped as a "Number" and there are no types for dates.
|
||||
|
||||
At ingestion time, tantivy will try to interpret number and strings as different type with a
|
||||
priority order.
|
||||
Numbers will be interpreted as u64, i64 and f64 in that order.
|
||||
Strings will be interpreted as rfc3999 dates or simple strings.
|
||||
|
||||
The first working time is picked and only one type will be emitted for indexing.
|
||||
|
||||
Note this interpretation happens on a per-document basis, and there is no effort to try to sniff
|
||||
a consistent field type at the scale of a segment.
|
||||
|
||||
On the query parser side on the other hand, we may end up emitting more than one type.
|
||||
For instance, we do not even know if the type is a number or string based.
|
||||
|
||||
So the query
|
||||
|
||||
```
|
||||
my_path.my_segment:233
|
||||
```
|
||||
|
||||
Should be interpreted as
|
||||
- `(my_path.my_segment, String, 233)`
|
||||
- `(my_path.my_segment, u64, 233)`
|
||||
|
||||
Likewise, we need to emit two tokens if the query contains an rfc3999 date.
|
||||
Indeed the date could have been actually a single token inside the text of a document at ingestion time. Generally speaking, we will always at least emit a string token in query parsing, and sometimes more.
|
||||
@@ -1,130 +0,0 @@
|
||||
// # Aggregation example
|
||||
//
|
||||
// This example shows how you can use built-in aggregations.
|
||||
// We will use range buckets and compute the average in each bucket.
|
||||
//
|
||||
|
||||
use serde_json::Value;
|
||||
use tantivy::aggregation::agg_req::{
|
||||
Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation,
|
||||
RangeAggregation,
|
||||
};
|
||||
use tantivy::aggregation::agg_result::AggregationResults;
|
||||
use tantivy::aggregation::metric::AverageAggregation;
|
||||
use tantivy::aggregation::AggregationCollector;
|
||||
use tantivy::query::TermQuery;
|
||||
use tantivy::schema::{self, Cardinality, IndexRecordOption, Schema, TextFieldIndexing};
|
||||
use tantivy::{doc, Index, Term};
|
||||
|
||||
fn main() -> tantivy::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_fieldtype = schema::TextOptions::default()
|
||||
.set_indexing_options(
|
||||
TextFieldIndexing::default()
|
||||
.set_tokenizer("default")
|
||||
.set_index_option(IndexRecordOption::WithFreqs),
|
||||
)
|
||||
.set_stored();
|
||||
let text_field = schema_builder.add_text_field("text", text_fieldtype);
|
||||
let score_fieldtype = crate::schema::IntOptions::default().set_fast(Cardinality::SingleValue);
|
||||
let highscore_field = schema_builder.add_f64_field("highscore", score_fieldtype.clone());
|
||||
let price_field = schema_builder.add_f64_field("price", score_fieldtype.clone());
|
||||
|
||||
let schema = schema_builder.build();
|
||||
|
||||
// # Indexing documents
|
||||
//
|
||||
// Lets index a bunch of documents for this example.
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
let mut index_writer = index.writer(50_000_000)?;
|
||||
// writing the segment
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "cool",
|
||||
highscore_field => 1f64,
|
||||
price_field => 0f64,
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "cool",
|
||||
highscore_field => 3f64,
|
||||
price_field => 1f64,
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "cool",
|
||||
highscore_field => 5f64,
|
||||
price_field => 1f64,
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "nohit",
|
||||
highscore_field => 6f64,
|
||||
price_field => 2f64,
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "cool",
|
||||
highscore_field => 7f64,
|
||||
price_field => 2f64,
|
||||
))?;
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "cool",
|
||||
highscore_field => 11f64,
|
||||
price_field => 10f64,
|
||||
))?;
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "cool",
|
||||
highscore_field => 14f64,
|
||||
price_field => 15f64,
|
||||
))?;
|
||||
|
||||
index_writer.add_document(doc!(
|
||||
text_field => "cool",
|
||||
highscore_field => 15f64,
|
||||
price_field => 20f64,
|
||||
))?;
|
||||
|
||||
index_writer.commit()?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
let text_field = reader.searcher().schema().get_field("text").unwrap();
|
||||
|
||||
let term_query = TermQuery::new(
|
||||
Term::from_field_text(text_field, "cool"),
|
||||
IndexRecordOption::Basic,
|
||||
);
|
||||
|
||||
let sub_agg_req_1: Aggregations = vec![(
|
||||
"average_price".to_string(),
|
||||
Aggregation::Metric(MetricAggregation::Average(
|
||||
AverageAggregation::from_field_name("price".to_string()),
|
||||
)),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let agg_req_1: Aggregations = vec![(
|
||||
"score_ranges".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Range(RangeAggregation {
|
||||
field: "highscore".to_string(),
|
||||
ranges: vec![
|
||||
(-1f64..9f64).into(),
|
||||
(9f64..14f64).into(),
|
||||
(14f64..20f64).into(),
|
||||
],
|
||||
}),
|
||||
sub_aggregation: sub_agg_req_1.clone(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1);
|
||||
|
||||
let searcher = reader.searcher();
|
||||
let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap();
|
||||
|
||||
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
|
||||
println!("{}", serde_json::to_string_pretty(&res)?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -73,7 +73,7 @@ fn main() -> tantivy::Result<()> {
|
||||
// multithreaded.
|
||||
//
|
||||
// Here we give tantivy a budget of `50MB`.
|
||||
// Using a bigger memory_arena for the indexer may increase
|
||||
// Using a bigger heap for the indexer may increase
|
||||
// throughput, but 50 MB is already plenty.
|
||||
let mut index_writer = index.writer(50_000_000)?;
|
||||
|
||||
@@ -91,8 +91,8 @@ fn main() -> tantivy::Result<()> {
|
||||
old_man_doc.add_text(title, "The Old Man and the Sea");
|
||||
old_man_doc.add_text(
|
||||
body,
|
||||
"He was an old man who fished alone in a skiff in the Gulf Stream and he had gone \
|
||||
eighty-four days now without taking a fish.",
|
||||
"He was an old man who fished alone in a skiff in the Gulf Stream and \
|
||||
he had gone eighty-four days now without taking a fish.",
|
||||
);
|
||||
|
||||
// ... and add it to the `IndexWriter`.
|
||||
|
||||
@@ -12,7 +12,8 @@
|
||||
use tantivy::collector::{Collector, SegmentCollector};
|
||||
use tantivy::fastfield::{DynamicFastFieldReader, FastFieldReader};
|
||||
use tantivy::query::QueryParser;
|
||||
use tantivy::schema::{Field, Schema, FAST, INDEXED, TEXT};
|
||||
use tantivy::schema::Field;
|
||||
use tantivy::schema::{Schema, FAST, INDEXED, TEXT};
|
||||
use tantivy::{doc, Index, Score, SegmentReader};
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -62,7 +62,7 @@ fn main() -> tantivy::Result<()> {
|
||||
// multithreaded.
|
||||
//
|
||||
// Here we use a buffer of 50MB per thread. Using a bigger
|
||||
// memory arena for the indexer can increase its throughput.
|
||||
// heap for the indexer can increase its throughput.
|
||||
let mut index_writer = index.writer(50_000_000)?;
|
||||
index_writer.add_document(doc!(
|
||||
title => "The Old Man and the Sea",
|
||||
|
||||
@@ -56,9 +56,8 @@ fn main() -> tantivy::Result<()> {
|
||||
// If it is `text`, let's make sure to keep it `raw` and let's avoid
|
||||
// running any text processing on it.
|
||||
// This is done by associating this field to the tokenizer named `raw`.
|
||||
// Rather than building our
|
||||
// [`TextOptions`](//docs.rs/tantivy/~0/tantivy/schema/struct.TextOptions.html) manually, We
|
||||
// use the `STRING` shortcut. `STRING` stands for indexed (without term frequency or positions)
|
||||
// Rather than building our [`TextOptions`](//docs.rs/tantivy/~0/tantivy/schema/struct.TextOptions.html) manually,
|
||||
// We use the `STRING` shortcut. `STRING` stands for indexed (without term frequency or positions)
|
||||
// and untokenized.
|
||||
//
|
||||
// Because we also want to be able to see this `id` in our returned documents,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use tantivy::collector::TopDocs;
|
||||
use tantivy::doc;
|
||||
use tantivy::query::BooleanQuery;
|
||||
use tantivy::schema::*;
|
||||
use tantivy::{doc, DocId, Index, Score, SegmentReader};
|
||||
use tantivy::{DocId, Index, Score, SegmentReader};
|
||||
|
||||
fn main() -> tantivy::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
@@ -87,7 +87,7 @@ fn main() -> tantivy::Result<()> {
|
||||
.unwrap()
|
||||
.get_first(title)
|
||||
.unwrap()
|
||||
.as_text()
|
||||
.text()
|
||||
.unwrap()
|
||||
.to_owned()
|
||||
})
|
||||
|
||||
@@ -52,11 +52,11 @@ fn main() -> tantivy::Result<()> {
|
||||
let term_the = Term::from_field_text(title, "the");
|
||||
|
||||
// This segment posting object is like a cursor over the documents matching the term.
|
||||
// The `IndexRecordOption` arguments tells tantivy we will be interested in both term
|
||||
// frequencies and positions.
|
||||
// The `IndexRecordOption` arguments tells tantivy we will be interested in both term frequencies
|
||||
// and positions.
|
||||
//
|
||||
// If you don't need all this information, you may get better performance by decompressing
|
||||
// less information.
|
||||
// If you don't need all this information, you may get better performance by decompressing less
|
||||
// information.
|
||||
if let Some(mut segment_postings) =
|
||||
inverted_index.read_postings(&term_the, IndexRecordOption::WithFreqsAndPositions)?
|
||||
{
|
||||
@@ -109,11 +109,11 @@ fn main() -> tantivy::Result<()> {
|
||||
let inverted_index = segment_reader.inverted_index(title)?;
|
||||
|
||||
// This segment posting object is like a cursor over the documents matching the term.
|
||||
// The `IndexRecordOption` arguments tells tantivy we will be interested in both term
|
||||
// frequencies and positions.
|
||||
// The `IndexRecordOption` arguments tells tantivy we will be interested in both term frequencies
|
||||
// and positions.
|
||||
//
|
||||
// If you don't need all this information, you may get better performance by decompressing
|
||||
// less information.
|
||||
// If you don't need all this information, you may get better performance by decompressing less
|
||||
// information.
|
||||
if let Some(mut block_segment_postings) =
|
||||
inverted_index.read_block_postings(&term_the, IndexRecordOption::Basic)?
|
||||
{
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use tantivy::schema::{Schema, STORED, TEXT};
|
||||
use tantivy::{doc, Index, IndexWriter, Opstamp, TantivyError};
|
||||
|
||||
@@ -91,8 +90,7 @@ fn main() -> tantivy::Result<()> {
|
||||
// # In the main thread, we commit 10 times, once every 500ms.
|
||||
for _ in 0..10 {
|
||||
let opstamp: Opstamp = {
|
||||
// Committing or rollbacking on the other hand requires write lock. This will block
|
||||
// other threads.
|
||||
// Committing or rollbacking on the other hand requires write lock. This will block other threads.
|
||||
let mut index_writer_wlock = index_writer.write().unwrap();
|
||||
index_writer_wlock.commit()?
|
||||
};
|
||||
|
||||
@@ -57,10 +57,7 @@ fn main() -> tantivy::Result<()> {
|
||||
let doc = searcher.doc(doc_address)?;
|
||||
let snippet = snippet_generator.snippet_from_doc(&doc);
|
||||
println!("Document score {}:", score);
|
||||
println!(
|
||||
"title: {}",
|
||||
doc.get_first(title).unwrap().as_text().unwrap()
|
||||
);
|
||||
println!("title: {}", doc.get_first(title).unwrap().text().unwrap());
|
||||
println!("snippet: {}", snippet.to_html());
|
||||
println!("custom highlighting: {}", highlight(snippet));
|
||||
}
|
||||
|
||||
@@ -6,10 +6,8 @@ use tantivy::collector::TopDocs;
|
||||
use tantivy::fastfield::FastFieldReader;
|
||||
use tantivy::query::QueryParser;
|
||||
use tantivy::schema::{Field, Schema, FAST, TEXT};
|
||||
use tantivy::{
|
||||
doc, DocAddress, DocId, Index, IndexReader, Opstamp, Searcher, SearcherGeneration, SegmentId,
|
||||
SegmentReader, Warmer,
|
||||
};
|
||||
use tantivy::{doc, DocAddress, DocId, Index, IndexReader, SegmentReader, TrackedObject};
|
||||
use tantivy::{Opstamp, Searcher, SearcherGeneration, SegmentId, Warmer};
|
||||
|
||||
// This example shows how warmers can be used to
|
||||
// load a values from an external sources using the Warmer API.
|
||||
@@ -71,7 +69,7 @@ impl Warmer for DynamicPriceColumn {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn garbage_collect(&self, live_generations: &[&SearcherGeneration]) {
|
||||
fn garbage_collect(&self, live_generations: &[TrackedObject<SearcherGeneration>]) {
|
||||
let live_segment_id_and_delete_ops: HashSet<(SegmentId, Option<Opstamp>)> =
|
||||
live_generations
|
||||
.iter()
|
||||
@@ -92,6 +90,7 @@ impl Warmer for DynamicPriceColumn {
|
||||
/// This map represents a map (ProductId -> Price)
|
||||
///
|
||||
/// In practise, it could be fetching things from an external service, like a SQL table.
|
||||
///
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ExternalPriceTable {
|
||||
prices: Arc<RwLock<HashMap<ProductId, Price>>>,
|
||||
|
||||
@@ -4,14 +4,14 @@ extern crate test;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use fastfield_codecs::bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer};
|
||||
use fastfield_codecs::linearinterpol::{
|
||||
LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer,
|
||||
use fastfield_codecs::{
|
||||
bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer},
|
||||
linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer},
|
||||
multilinearinterpol::{
|
||||
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
|
||||
},
|
||||
*,
|
||||
};
|
||||
use fastfield_codecs::multilinearinterpol::{
|
||||
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
|
||||
};
|
||||
use fastfield_codecs::*;
|
||||
|
||||
fn get_data() -> Vec<u64> {
|
||||
let mut data: Vec<_> = (100..55000_u64)
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
use std::io::{self, Write};
|
||||
|
||||
use crate::FastFieldCodecReader;
|
||||
use crate::FastFieldCodecSerializer;
|
||||
use crate::FastFieldDataAccess;
|
||||
use crate::FastFieldStats;
|
||||
use common::BinarySerializable;
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
use std::io::{self, Write};
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitPacker;
|
||||
|
||||
use crate::{FastFieldCodecReader, FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
|
||||
use tantivy_bitpacker::BitUnpacker;
|
||||
|
||||
/// Depending on the field type, a different
|
||||
/// fast field is required.
|
||||
|
||||
@@ -53,8 +53,7 @@ pub trait FastFieldCodecSerializer {
|
||||
pub trait FastFieldDataAccess {
|
||||
/// Return the value associated to the given position.
|
||||
///
|
||||
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance
|
||||
/// reasons.
|
||||
/// Whenever possible use the Iterator passed to the fastfield creation instead, for performance reasons.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
@@ -83,10 +82,12 @@ impl FastFieldDataAccess for Vec<u64> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer};
|
||||
use crate::linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer};
|
||||
use crate::multilinearinterpol::{
|
||||
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
|
||||
use crate::{
|
||||
bitpacked::{BitpackedFastFieldReader, BitpackedFastFieldSerializer},
|
||||
linearinterpol::{LinearInterpolFastFieldReader, LinearInterpolFastFieldSerializer},
|
||||
multilinearinterpol::{
|
||||
MultiLinearInterpolFastFieldReader, MultiLinearInterpolFastFieldSerializer,
|
||||
},
|
||||
};
|
||||
|
||||
pub fn create_and_validate<S: FastFieldCodecSerializer, R: FastFieldCodecReader>(
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
use crate::FastFieldCodecReader;
|
||||
use crate::FastFieldCodecSerializer;
|
||||
use crate::FastFieldDataAccess;
|
||||
use crate::FastFieldStats;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Sub;
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitPacker;
|
||||
|
||||
use common::{BinarySerializable, FixedSize};
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::{FastFieldCodecReader, FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
|
||||
use common::BinarySerializable;
|
||||
use common::FixedSize;
|
||||
use tantivy_bitpacker::BitUnpacker;
|
||||
|
||||
/// Depending on the field type, a different
|
||||
/// fast field is required.
|
||||
@@ -132,7 +137,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
|
||||
// will be offset to 0
|
||||
offset = offset.max(calculated_value - actual_value);
|
||||
} else {
|
||||
// positive value no offset reuqired
|
||||
//positive value no offset reuqired
|
||||
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
|
||||
}
|
||||
}
|
||||
@@ -166,7 +171,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
|
||||
stats: FastFieldStats,
|
||||
) -> bool {
|
||||
if stats.num_vals < 3 {
|
||||
return false; // disable compressor for this case
|
||||
return false; //disable compressor for this case
|
||||
}
|
||||
// On serialisation the offset is added to the actual value.
|
||||
// We need to make sure this won't run into overflow calculation issues.
|
||||
@@ -206,8 +211,8 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer {
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within
|
||||
// 50% threshold.
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within 50%
|
||||
// threshold.
|
||||
// It is multiplied by 2 because in a log case scenario the line would be as much above as
|
||||
// below. So the offset would = max_distance
|
||||
//
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
#[macro_use]
|
||||
extern crate prettytable;
|
||||
use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer;
|
||||
use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer;
|
||||
use fastfield_codecs::{FastFieldCodecSerializer, FastFieldStats};
|
||||
use fastfield_codecs::{
|
||||
linearinterpol::LinearInterpolFastFieldSerializer,
|
||||
multilinearinterpol::MultiLinearInterpolFastFieldSerializer, FastFieldCodecSerializer,
|
||||
FastFieldStats,
|
||||
};
|
||||
use prettytable::{Cell, Row, Table};
|
||||
|
||||
fn main() {
|
||||
@@ -22,7 +24,7 @@ fn main() {
|
||||
);
|
||||
results.push(res);
|
||||
|
||||
// let best_estimation_codec = results
|
||||
//let best_estimation_codec = results
|
||||
//.iter()
|
||||
//.min_by(|res1, res2| res1.partial_cmp(&res2).unwrap())
|
||||
//.unwrap();
|
||||
@@ -39,6 +41,7 @@ fn main() {
|
||||
} else {
|
||||
(est.to_string(), comp.to_string())
|
||||
};
|
||||
#[allow(clippy::all)]
|
||||
let style = if comp == best_compression_ratio_codec.1 {
|
||||
"Fb"
|
||||
} else {
|
||||
@@ -46,7 +49,7 @@ fn main() {
|
||||
};
|
||||
|
||||
table.add_row(Row::new(vec![
|
||||
Cell::new(name).style_spec("bFg"),
|
||||
Cell::new(&name.to_string()).style_spec("bFg"),
|
||||
Cell::new(&ratio_cell).style_spec(style),
|
||||
Cell::new(&est_cell).style_spec(""),
|
||||
]));
|
||||
@@ -70,7 +73,7 @@ pub fn get_codec_test_data_sets() -> Vec<(Vec<u64>, &'static str)> {
|
||||
current_cumulative
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
// let data = (1..=200000_u64).map(|num| num + num).collect::<Vec<_>>();
|
||||
//let data = (1..=200000_u64).map(|num| num + num).collect::<Vec<_>>();
|
||||
data_and_names.push((data, "Monotonically increasing concave"));
|
||||
|
||||
let mut current_cumulative = 0;
|
||||
|
||||
@@ -1,22 +1,30 @@
|
||||
//! MultiLinearInterpol compressor uses linear interpolation to guess a values and stores the
|
||||
//! offset, but in blocks of 512.
|
||||
//!
|
||||
//! With a CHUNK_SIZE of 512 and 29 byte metadata per block, we get a overhead for metadata of 232 /
|
||||
//! 512 = 0,45 bits per element. The additional space required per element in a block is the the
|
||||
//! maximum deviation of the linear interpolation estimation function.
|
||||
//!
|
||||
//! E.g. if the maximum deviation of an element is 12, all elements cost 4bits.
|
||||
//!
|
||||
//! Size per block:
|
||||
//! Num Elements * Maximum Deviation from Interpolation + 29 Byte Metadata
|
||||
/*!
|
||||
|
||||
MultiLinearInterpol compressor uses linear interpolation to guess a values and stores the offset, but in blocks of 512.
|
||||
|
||||
With a CHUNK_SIZE of 512 and 29 byte metadata per block, we get a overhead for metadata of 232 / 512 = 0,45 bits per element.
|
||||
The additional space required per element in a block is the the maximum deviation of the linear interpolation estimation function.
|
||||
|
||||
E.g. if the maximum deviation of an element is 12, all elements cost 4bits.
|
||||
|
||||
Size per block:
|
||||
Num Elements * Maximum Deviation from Interpolation + 29 Byte Metadata
|
||||
|
||||
*/
|
||||
|
||||
use crate::FastFieldCodecReader;
|
||||
use crate::FastFieldCodecSerializer;
|
||||
use crate::FastFieldDataAccess;
|
||||
use crate::FastFieldStats;
|
||||
use common::CountingWriter;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::Sub;
|
||||
use tantivy_bitpacker::compute_num_bits;
|
||||
use tantivy_bitpacker::BitPacker;
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom};
|
||||
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};
|
||||
|
||||
use crate::{FastFieldCodecReader, FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats};
|
||||
use common::BinarySerializable;
|
||||
use common::DeserializeFrom;
|
||||
use tantivy_bitpacker::BitUnpacker;
|
||||
|
||||
const CHUNK_SIZE: u64 = 512;
|
||||
|
||||
@@ -244,11 +252,11 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
|
||||
);
|
||||
if calculated_value > actual_value {
|
||||
// negative value we need to apply an offset
|
||||
// we ignore negative values in the max value calculation, because negative
|
||||
// values will be offset to 0
|
||||
// we ignore negative values in the max value calculation, because negative values
|
||||
// will be offset to 0
|
||||
offset = offset.max(calculated_value - actual_value);
|
||||
} else {
|
||||
// positive value no offset reuqired
|
||||
//positive value no offset reuqired
|
||||
rel_positive_max = rel_positive_max.max(actual_value - calculated_value);
|
||||
}
|
||||
}
|
||||
@@ -342,8 +350,8 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer {
|
||||
.unwrap();
|
||||
|
||||
// Estimate one block and extrapolate the cost to all blocks.
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within
|
||||
// 50% threshold.
|
||||
// the theory would be that we don't have the actual max_distance, but we are close within 50%
|
||||
// threshold.
|
||||
// It is multiplied by 2 because in a log case scenario the line would be as much above as
|
||||
// below. So the offset would = max_distance
|
||||
//
|
||||
@@ -419,7 +427,10 @@ mod tests {
|
||||
let mut data = (5_000..20_000)
|
||||
.map(|_| rand::random::<u32>() as u64)
|
||||
.collect::<Vec<_>>();
|
||||
let _ = create_and_validate(&data, "random");
|
||||
let (estimate, actual_compression) = create_and_validate(&data, "random");
|
||||
dbg!(estimate);
|
||||
dbg!(actual_compression);
|
||||
|
||||
data.reverse();
|
||||
create_and_validate(&data, "random");
|
||||
}
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use std::convert::TryInto;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io, mem};
|
||||
#![allow(clippy::return_self_not_must_use)]
|
||||
|
||||
use stable_deref_trait::StableDeref;
|
||||
use std::convert::TryInto;
|
||||
use std::mem;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
/// An OwnedBytes simply wraps an object that owns a slice of data and exposes
|
||||
/// this data as a static slice.
|
||||
@@ -100,6 +102,7 @@ impl OwnedBytes {
|
||||
}
|
||||
|
||||
/// Drops the left most `advance_len` bytes.
|
||||
///
|
||||
#[inline]
|
||||
pub fn advance(&mut self, advance_len: usize) {
|
||||
self.data = &self.data[advance_len..]
|
||||
@@ -160,7 +163,8 @@ impl PartialEq<str> for OwnedBytes {
|
||||
}
|
||||
|
||||
impl<'a, T: ?Sized> PartialEq<&'a T> for OwnedBytes
|
||||
where OwnedBytes: PartialEq<T>
|
||||
where
|
||||
OwnedBytes: PartialEq<T>,
|
||||
{
|
||||
fn eq(&self, other: &&'a T) -> bool {
|
||||
*self == **other
|
||||
|
||||
@@ -5,8 +5,9 @@ authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
description = """Search engine library"""
|
||||
homepage = "https://github.com/quickwit-oss/tantivy"
|
||||
repository = "https://github.com/quickwit-oss/tantivy"
|
||||
documentation = "https://quickwit-inc.github.io/tantivy/tantivy/index.html"
|
||||
homepage = "https://github.com/quickwit-inc/tantivy"
|
||||
repository = "https://github.com/quickwit-inc/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2018"
|
||||
|
||||
@@ -1,20 +1,17 @@
|
||||
use combine::error::StringStreamError;
|
||||
use super::user_input_ast::{UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral};
|
||||
use crate::Occur;
|
||||
use combine::parser::char::{char, digit, space, spaces, string};
|
||||
use combine::parser::combinator::recognize;
|
||||
use combine::parser::range::{take_while, take_while1};
|
||||
use combine::parser::repeat::escaped;
|
||||
use combine::parser::Parser;
|
||||
use combine::{
|
||||
attempt, choice, eof, many, many1, one_of, optional, parser, satisfy, skip_many1, value,
|
||||
};
|
||||
use combine::{error::StringStreamError, parser::combinator::recognize};
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
|
||||
use super::user_input_ast::{UserInputAst, UserInputBound, UserInputLeaf, UserInputLiteral};
|
||||
use crate::Occur;
|
||||
|
||||
// Note: '-' char is only forbidden at the beginning of a field name, would be clearer to add it to
|
||||
// special characters.
|
||||
// Note: '-' char is only forbidden at the beginning of a field name, would be clearer to add it to special characters.
|
||||
const SPECIAL_CHARS: &[char] = &[
|
||||
'+', '^', '`', ':', '{', '}', '"', '[', ']', '(', ')', '~', '!', '\\', '*', ' ',
|
||||
];
|
||||
@@ -366,9 +363,8 @@ mod test {
|
||||
|
||||
type TestParseResult = Result<(), StringStreamError>;
|
||||
|
||||
use combine::parser::Parser;
|
||||
|
||||
use super::*;
|
||||
use combine::parser::Parser;
|
||||
|
||||
pub fn nearly_equals(a: f64, b: f64) -> bool {
|
||||
(a - b).abs() < 0.0005 * (a + b).abs()
|
||||
|
||||
@@ -1,7 +1 @@
|
||||
comment_width = 120
|
||||
format_strings = true
|
||||
group_imports = "StdExternalCrate"
|
||||
imports_granularity = "Module"
|
||||
normalize_comments = true
|
||||
where_single_line = true
|
||||
wrap_comments = true
|
||||
use_try_shorthand = true
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
# Contributing
|
||||
|
||||
When adding new bucket aggregation make sure to extend the "test_aggregation_flushing" test for at least 2 levels.
|
||||
|
||||
|
||||
|
||||
# Code Organization
|
||||
|
||||
Tantivy's aggregations have been designed to mimic the
|
||||
[aggregations of elasticsearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html).
|
||||
|
||||
The code is organized in submodules:
|
||||
|
||||
##bucket
|
||||
Contains all bucket aggregations, like range aggregation. These bucket aggregations group documents into buckets and can contain sub-aggegations.
|
||||
|
||||
##metric
|
||||
Contains all metric aggregations, like average aggregation. Metric aggregations do not have sub aggregations.
|
||||
|
||||
#### agg_req
|
||||
agg_req contains the users aggregation request. Deserialization from json is compatible with elasticsearch aggregation requests.
|
||||
|
||||
#### agg_req_with_accessor
|
||||
agg_req_with_accessor contains the users aggregation request enriched with fast field accessors etc, which are
|
||||
used during collection.
|
||||
|
||||
#### segment_agg_result
|
||||
segment_agg_result contains the aggregation result tree, which is used for collection of a segment.
|
||||
The tree from agg_req_with_accessor is passed during collection.
|
||||
|
||||
#### intermediate_agg_result
|
||||
intermediate_agg_result contains the aggregation tree for merging with other trees.
|
||||
|
||||
#### agg_result
|
||||
agg_result contains the final aggregation tree.
|
||||
|
||||
@@ -1,169 +0,0 @@
|
||||
//! Contains the aggregation request tree. Used to build an
|
||||
//! [AggregationCollector](super::AggregationCollector).
|
||||
//!
|
||||
//! [Aggregations] is the top level entry point to create a request, which is a `HashMap<String,
|
||||
//! Aggregation>`.
|
||||
//! Requests are compatible with the json format of elasticsearch.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```
|
||||
//! use tantivy::aggregation::bucket::RangeAggregation;
|
||||
//! use tantivy::aggregation::agg_req::BucketAggregationType;
|
||||
//! use tantivy::aggregation::agg_req::{Aggregation, Aggregations};
|
||||
//! use tantivy::aggregation::agg_req::BucketAggregation;
|
||||
//! let agg_req1: Aggregations = vec![
|
||||
//! (
|
||||
//! "range".to_string(),
|
||||
//! Aggregation::Bucket(BucketAggregation {
|
||||
//! bucket_agg: BucketAggregationType::Range(RangeAggregation{
|
||||
//! field: "score".to_string(),
|
||||
//! ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()],
|
||||
//! }),
|
||||
//! sub_aggregation: Default::default(),
|
||||
//! }),
|
||||
//! ),
|
||||
//! ]
|
||||
//! .into_iter()
|
||||
//! .collect();
|
||||
//!
|
||||
//! let elasticsearch_compatible_json_req = r#"
|
||||
//! {
|
||||
//! "range": {
|
||||
//! "range": {
|
||||
//! "field": "score",
|
||||
//! "ranges": [
|
||||
//! { "from": 3.0, "to": 7.0 },
|
||||
//! { "from": 7.0, "to": 20.0 }
|
||||
//! ]
|
||||
//! }
|
||||
//! }
|
||||
//! }"#;
|
||||
//! let agg_req2: Aggregations = serde_json::from_str(elasticsearch_compatible_json_req).unwrap();
|
||||
//! assert_eq!(agg_req1, agg_req2);
|
||||
//! ```
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use super::bucket::RangeAggregation;
|
||||
use super::metric::{AverageAggregation, StatsAggregation};
|
||||
|
||||
/// The top-level aggregation request structure, which contains [Aggregation] and their user defined
|
||||
/// names.
|
||||
///
|
||||
/// The key is the user defined name of the aggregation.
|
||||
pub type Aggregations = HashMap<String, Aggregation>;
|
||||
|
||||
/// Aggregation request of [BucketAggregation] or [MetricAggregation].
|
||||
///
|
||||
/// An aggregation is either a bucket or a metric.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum Aggregation {
|
||||
/// Bucket aggregation, see [BucketAggregation] for details.
|
||||
Bucket(BucketAggregation),
|
||||
/// Metric aggregation, see [MetricAggregation] for details.
|
||||
Metric(MetricAggregation),
|
||||
}
|
||||
|
||||
/// BucketAggregations create buckets of documents. Each bucket is associated with a rule which
|
||||
/// determines whether or not a document in the falls into it. In other words, the buckets
|
||||
/// effectively define document sets. Buckets are not necessarily disjunct, therefore a document can
|
||||
/// fall into multiple buckets. In addition to the buckets themselves, the bucket aggregations also
|
||||
/// compute and return the number of documents for each bucket. Bucket aggregations, as opposed to
|
||||
/// metric aggregations, can hold sub-aggregations. These sub-aggregations will be aggregated for
|
||||
/// the buckets created by their "parent" bucket aggregation. There are different bucket
|
||||
/// aggregators, each with a different "bucketing" strategy. Some define a single bucket, some
|
||||
/// define fixed number of multiple buckets, and others dynamically create the buckets during the
|
||||
/// aggregation process.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct BucketAggregation {
|
||||
/// Bucket aggregation strategy to group documents.
|
||||
#[serde(flatten)]
|
||||
pub bucket_agg: BucketAggregationType,
|
||||
/// The sub_aggregations in the buckets. Each bucket will aggregate on the document set in the
|
||||
/// bucket.
|
||||
#[serde(rename = "aggs")]
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Aggregations::is_empty")]
|
||||
pub sub_aggregation: Aggregations,
|
||||
}
|
||||
|
||||
/// The bucket aggregation types.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum BucketAggregationType {
|
||||
/// Put data into buckets of user-defined ranges.
|
||||
#[serde(rename = "range")]
|
||||
Range(RangeAggregation),
|
||||
}
|
||||
|
||||
/// The aggregations in this family compute metrics based on values extracted
|
||||
/// from the documents that are being aggregated. Values are extracted from the fast field of
|
||||
/// the document.
|
||||
|
||||
/// Some aggregations output a single numeric metric (e.g. Average) and are called
|
||||
/// single-value numeric metrics aggregation, others generate multiple metrics (e.g. Stats) and are
|
||||
/// called multi-value numeric metrics aggregation.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum MetricAggregation {
|
||||
/// Calculates the average.
|
||||
#[serde(rename = "avg")]
|
||||
Average(AverageAggregation),
|
||||
/// Calculates stats sum, average, min, max, standard_deviation on a field.
|
||||
#[serde(rename = "stats")]
|
||||
Stats(StatsAggregation),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn serialize_to_json_test() {
|
||||
let agg_req1: Aggregations = vec![(
|
||||
"range".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Range(RangeAggregation {
|
||||
field: "score".to_string(),
|
||||
ranges: vec![
|
||||
(f64::MIN..3f64).into(),
|
||||
(3f64..7f64).into(),
|
||||
(7f64..20f64).into(),
|
||||
(20f64..f64::MAX).into(),
|
||||
],
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let elasticsearch_compatible_json_req = r#"{
|
||||
"range": {
|
||||
"range": {
|
||||
"field": "score",
|
||||
"ranges": [
|
||||
{
|
||||
"to": 3.0
|
||||
},
|
||||
{
|
||||
"from": 3.0,
|
||||
"to": 7.0
|
||||
},
|
||||
{
|
||||
"from": 7.0,
|
||||
"to": 20.0
|
||||
},
|
||||
{
|
||||
"from": 20.0
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}"#;
|
||||
let agg_req2: String = serde_json::to_string_pretty(&agg_req1).unwrap();
|
||||
assert_eq!(agg_req2, elasticsearch_compatible_json_req);
|
||||
}
|
||||
}
|
||||
@@ -1,140 +0,0 @@
|
||||
//! This will enhance the request tree with access to the fastfield and metadata.
|
||||
|
||||
use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
|
||||
use super::bucket::RangeAggregation;
|
||||
use super::metric::{AverageAggregation, StatsAggregation};
|
||||
use super::VecWithNames;
|
||||
use crate::fastfield::DynamicFastFieldReader;
|
||||
use crate::schema::Type;
|
||||
use crate::{SegmentReader, TantivyError};
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct AggregationsWithAccessor {
|
||||
pub metrics: VecWithNames<MetricAggregationWithAccessor>,
|
||||
pub buckets: VecWithNames<BucketAggregationWithAccessor>,
|
||||
}
|
||||
|
||||
impl AggregationsWithAccessor {
|
||||
fn from_data(
|
||||
metrics: VecWithNames<MetricAggregationWithAccessor>,
|
||||
buckets: VecWithNames<BucketAggregationWithAccessor>,
|
||||
) -> Self {
|
||||
Self { metrics, buckets }
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.metrics.is_empty() && self.buckets.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BucketAggregationWithAccessor {
|
||||
/// In general there can be buckets without fast field access, e.g. buckets that are created
|
||||
/// based on search terms. So eventually this needs to be Option or moved.
|
||||
pub(crate) accessor: DynamicFastFieldReader<u64>,
|
||||
pub(crate) field_type: Type,
|
||||
pub(crate) bucket_agg: BucketAggregationType,
|
||||
pub(crate) sub_aggregation: AggregationsWithAccessor,
|
||||
}
|
||||
|
||||
impl BucketAggregationWithAccessor {
|
||||
fn from_bucket(
|
||||
bucket: &BucketAggregationType,
|
||||
sub_aggregation: &Aggregations,
|
||||
reader: &SegmentReader,
|
||||
) -> crate::Result<BucketAggregationWithAccessor> {
|
||||
let (accessor, field_type) = match &bucket {
|
||||
BucketAggregationType::Range(RangeAggregation {
|
||||
field: field_name,
|
||||
ranges: _,
|
||||
}) => get_ff_reader_and_validate(reader, field_name)?,
|
||||
};
|
||||
let sub_aggregation = sub_aggregation.clone();
|
||||
Ok(BucketAggregationWithAccessor {
|
||||
accessor,
|
||||
field_type,
|
||||
sub_aggregation: get_aggregations_with_accessor(&sub_aggregation, reader)?,
|
||||
bucket_agg: bucket.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains the metric request and the fast field accessor.
|
||||
#[derive(Clone)]
|
||||
pub struct MetricAggregationWithAccessor {
|
||||
pub metric: MetricAggregation,
|
||||
pub field_type: Type,
|
||||
pub accessor: DynamicFastFieldReader<u64>,
|
||||
}
|
||||
|
||||
impl MetricAggregationWithAccessor {
|
||||
fn from_metric(
|
||||
metric: &MetricAggregation,
|
||||
reader: &SegmentReader,
|
||||
) -> crate::Result<MetricAggregationWithAccessor> {
|
||||
match &metric {
|
||||
MetricAggregation::Average(AverageAggregation { field: field_name })
|
||||
| MetricAggregation::Stats(StatsAggregation { field: field_name }) => {
|
||||
let (accessor, field_type) = get_ff_reader_and_validate(reader, field_name)?;
|
||||
|
||||
Ok(MetricAggregationWithAccessor {
|
||||
accessor,
|
||||
field_type,
|
||||
metric: metric.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_aggregations_with_accessor(
|
||||
aggs: &Aggregations,
|
||||
reader: &SegmentReader,
|
||||
) -> crate::Result<AggregationsWithAccessor> {
|
||||
let mut metrics = vec![];
|
||||
let mut buckets = vec![];
|
||||
for (key, agg) in aggs.iter() {
|
||||
match agg {
|
||||
Aggregation::Bucket(bucket) => buckets.push((
|
||||
key.to_string(),
|
||||
BucketAggregationWithAccessor::from_bucket(
|
||||
&bucket.bucket_agg,
|
||||
&bucket.sub_aggregation,
|
||||
reader,
|
||||
)?,
|
||||
)),
|
||||
Aggregation::Metric(metric) => metrics.push((
|
||||
key.to_string(),
|
||||
MetricAggregationWithAccessor::from_metric(metric, reader)?,
|
||||
)),
|
||||
}
|
||||
}
|
||||
Ok(AggregationsWithAccessor::from_data(
|
||||
VecWithNames::from_entries(metrics),
|
||||
VecWithNames::from_entries(buckets),
|
||||
))
|
||||
}
|
||||
|
||||
fn get_ff_reader_and_validate(
|
||||
reader: &SegmentReader,
|
||||
field_name: &str,
|
||||
) -> crate::Result<(DynamicFastFieldReader<u64>, Type)> {
|
||||
let field = reader
|
||||
.schema()
|
||||
.get_field(field_name)
|
||||
.ok_or_else(|| TantivyError::FieldNotFound(field_name.to_string()))?;
|
||||
let field_type = reader.schema().get_field_entry(field).field_type();
|
||||
if field_type.value_type() != Type::I64
|
||||
&& field_type.value_type() != Type::U64
|
||||
&& field_type.value_type() != Type::F64
|
||||
{
|
||||
return Err(TantivyError::InvalidArgument(format!(
|
||||
"Invalid field type in aggregation {:?}, only f64, u64, i64 is supported",
|
||||
field_type.value_type()
|
||||
)));
|
||||
}
|
||||
let ff_fields = reader.fast_fields();
|
||||
ff_fields
|
||||
.u64_lenient(field)
|
||||
.map(|field| (field, field_type.value_type()))
|
||||
}
|
||||
@@ -1,142 +0,0 @@
|
||||
//! Contains the final aggregation tree.
|
||||
//! This tree can be converted via the `into()` method from `IntermediateAggregationResults`.
|
||||
//! This conversion computes the final result. For example: The intermediate result contains
|
||||
//! intermediate average results, which is the sum and the number of values. The actual average is
|
||||
//! calculated on the step from intermediate to final aggregation result tree.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
|
||||
IntermediateMetricResult, IntermediateRangeBucketEntry,
|
||||
};
|
||||
use super::metric::{SingleMetricResult, Stats};
|
||||
use super::Key;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
/// The final aggegation result.
|
||||
pub struct AggregationResults(pub HashMap<String, AggregationResult>);
|
||||
|
||||
impl From<IntermediateAggregationResults> for AggregationResults {
|
||||
fn from(tree: IntermediateAggregationResults) -> Self {
|
||||
Self(
|
||||
tree.0
|
||||
.into_iter()
|
||||
.map(|(key, agg)| (key, agg.into()))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
/// An aggregation is either a bucket or a metric.
|
||||
pub enum AggregationResult {
|
||||
/// Bucket result variant.
|
||||
BucketResult(BucketResult),
|
||||
/// Metric result variant.
|
||||
MetricResult(MetricResult),
|
||||
}
|
||||
impl From<IntermediateAggregationResult> for AggregationResult {
|
||||
fn from(tree: IntermediateAggregationResult) -> Self {
|
||||
match tree {
|
||||
IntermediateAggregationResult::Bucket(bucket) => {
|
||||
AggregationResult::BucketResult(bucket.into())
|
||||
}
|
||||
IntermediateAggregationResult::Metric(metric) => {
|
||||
AggregationResult::MetricResult(metric.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
/// MetricResult
|
||||
pub enum MetricResult {
|
||||
/// Average metric result.
|
||||
Average(SingleMetricResult),
|
||||
/// Stats metric result.
|
||||
Stats(Stats),
|
||||
}
|
||||
|
||||
impl From<IntermediateMetricResult> for MetricResult {
|
||||
fn from(metric: IntermediateMetricResult) -> Self {
|
||||
match metric {
|
||||
IntermediateMetricResult::Average(avg_data) => {
|
||||
MetricResult::Average(avg_data.finalize().into())
|
||||
}
|
||||
IntermediateMetricResult::Stats(intermediate_stats) => {
|
||||
MetricResult::Stats(intermediate_stats.finalize())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// BucketEntry holds bucket aggregation result types.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum BucketResult {
|
||||
/// This is the default entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
Range {
|
||||
/// The range buckets sorted by range.
|
||||
buckets: Vec<RangeBucketEntry>,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<IntermediateBucketResult> for BucketResult {
|
||||
fn from(result: IntermediateBucketResult) -> Self {
|
||||
match result {
|
||||
IntermediateBucketResult::Range(range_map) => {
|
||||
let mut buckets: Vec<RangeBucketEntry> = range_map
|
||||
.into_iter()
|
||||
.map(|(_, bucket)| bucket.into())
|
||||
.collect_vec();
|
||||
|
||||
buckets.sort_by(|a, b| {
|
||||
a.from
|
||||
.unwrap_or(f64::MIN)
|
||||
.partial_cmp(&b.from.unwrap_or(f64::MIN))
|
||||
.unwrap_or(Ordering::Equal)
|
||||
});
|
||||
BucketResult::Range { buckets }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the range entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RangeBucketEntry {
|
||||
/// The identifier of the bucket.
|
||||
pub key: Key,
|
||||
/// Number of documents in the bucket.
|
||||
pub doc_count: u64,
|
||||
#[serde(flatten)]
|
||||
/// sub-aggregations in this bucket.
|
||||
pub sub_aggregation: AggregationResults,
|
||||
/// The from range of the bucket. Equals f64::MIN when None.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub from: Option<f64>,
|
||||
/// The to range of the bucket. Equals f64::MAX when None.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub to: Option<f64>,
|
||||
}
|
||||
|
||||
impl From<IntermediateRangeBucketEntry> for RangeBucketEntry {
|
||||
fn from(entry: IntermediateRangeBucketEntry) -> Self {
|
||||
RangeBucketEntry {
|
||||
key: entry.key,
|
||||
doc_count: entry.doc_count,
|
||||
sub_aggregation: entry.sub_aggregation.into(),
|
||||
to: entry.to,
|
||||
from: entry.from,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
//! Module for all bucket aggregations.
|
||||
//!
|
||||
//! Results of final buckets are [BucketEntry](super::agg_result::BucketEntry).
|
||||
//! Results of intermediate buckets are
|
||||
//! [IntermediateBucketEntry](super::intermediate_agg_result::IntermediateBucketEntry)
|
||||
|
||||
mod range;
|
||||
|
||||
pub use range::RangeAggregation;
|
||||
pub(crate) use range::SegmentRangeCollector;
|
||||
@@ -1,536 +0,0 @@
|
||||
use std::ops::Range;
|
||||
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::aggregation::agg_req_with_accessor::{
|
||||
AggregationsWithAccessor, BucketAggregationWithAccessor,
|
||||
};
|
||||
use crate::aggregation::intermediate_agg_result::IntermediateBucketResult;
|
||||
use crate::aggregation::segment_agg_result::{
|
||||
SegmentAggregationResultsCollector, SegmentRangeBucketEntry,
|
||||
};
|
||||
use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, Key};
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use crate::schema::Type;
|
||||
use crate::{DocId, TantivyError};
|
||||
|
||||
/// Provide user-defined buckets to aggregate on.
|
||||
/// Two special buckets will automatically be created to cover the whole range of values.
|
||||
/// The provided buckets have to be continous.
|
||||
/// During the aggregation, the values extracted from the fast_field `field_name` will be checked
|
||||
/// against each bucket range. Note that this aggregation includes the from value and excludes the
|
||||
/// to value for each range.
|
||||
///
|
||||
/// Result type is [BucketResult](crate::aggregation::agg_result::BucketResult) with
|
||||
/// [BucketEntryKeyCount](crate::aggregation::agg_result::BucketEntryKeyCount) on the
|
||||
/// AggregationCollector.
|
||||
///
|
||||
/// Result type is
|
||||
/// [crate::aggregation::intermediate_agg_result::IntermediateBucketResult] with
|
||||
/// [crate::aggregation::intermediate_agg_result::IntermediateBucketEntryKeyCount] on the
|
||||
/// DistributedAggregationCollector.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RangeAggregation {
|
||||
/// The field to aggregate on.
|
||||
pub field: String,
|
||||
/// Note that this aggregation includes the from value and excludes the to value for each
|
||||
/// range. Extra buckets will be created until the first to, and last from, if necessary.
|
||||
pub ranges: Vec<RangeAggregationRange>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RangeAggregationRange {
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub from: Option<f64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub to: Option<f64>,
|
||||
}
|
||||
|
||||
impl From<Range<f64>> for RangeAggregationRange {
|
||||
fn from(range: Range<f64>) -> Self {
|
||||
let from = if range.start == f64::MIN {
|
||||
None
|
||||
} else {
|
||||
Some(range.start)
|
||||
};
|
||||
let to = if range.end == f64::MAX {
|
||||
None
|
||||
} else {
|
||||
Some(range.end)
|
||||
};
|
||||
RangeAggregationRange { from, to }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct SegmentRangeAndBucketEntry {
|
||||
range: Range<u64>,
|
||||
bucket: SegmentRangeBucketEntry,
|
||||
}
|
||||
|
||||
/// The collector puts values from the fast field into the correct buckets and does a conversion to
|
||||
/// the correct datatype.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct SegmentRangeCollector {
|
||||
/// The buckets containing the aggregation data.
|
||||
buckets: Vec<SegmentRangeAndBucketEntry>,
|
||||
field_type: Type,
|
||||
}
|
||||
|
||||
impl SegmentRangeCollector {
|
||||
pub fn into_intermediate_bucket_result(self) -> IntermediateBucketResult {
|
||||
let field_type = self.field_type;
|
||||
|
||||
let buckets = self
|
||||
.buckets
|
||||
.into_iter()
|
||||
.map(move |range_bucket| {
|
||||
(
|
||||
range_to_key(&range_bucket.range, &field_type),
|
||||
range_bucket.bucket.into(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
IntermediateBucketResult::Range(buckets)
|
||||
}
|
||||
|
||||
pub(crate) fn from_req(
|
||||
req: &RangeAggregation,
|
||||
sub_aggregation: &AggregationsWithAccessor,
|
||||
field_type: Type,
|
||||
) -> crate::Result<Self> {
|
||||
// The range input on the request is f64.
|
||||
// We need to convert to u64 ranges, because we read the values as u64.
|
||||
// The mapping from the conversion is monotonic so ordering is preserved.
|
||||
let buckets = extend_validate_ranges(&req.ranges, &field_type)?
|
||||
.iter()
|
||||
.map(|range| {
|
||||
let to = if range.end == u64::MAX {
|
||||
None
|
||||
} else {
|
||||
Some(f64_from_fastfield_u64(range.end, &field_type))
|
||||
};
|
||||
let from = if range.start == u64::MIN {
|
||||
None
|
||||
} else {
|
||||
Some(f64_from_fastfield_u64(range.start, &field_type))
|
||||
};
|
||||
let sub_aggregation = if sub_aggregation.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(SegmentAggregationResultsCollector::from_req(
|
||||
sub_aggregation,
|
||||
)?)
|
||||
};
|
||||
Ok(SegmentRangeAndBucketEntry {
|
||||
range: range.clone(),
|
||||
bucket: SegmentRangeBucketEntry {
|
||||
key: range_to_key(range, &field_type),
|
||||
doc_count: 0,
|
||||
sub_aggregation,
|
||||
from,
|
||||
to,
|
||||
},
|
||||
})
|
||||
})
|
||||
.collect::<crate::Result<_>>()?;
|
||||
|
||||
Ok(SegmentRangeCollector {
|
||||
buckets,
|
||||
field_type,
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn collect_block(
|
||||
&mut self,
|
||||
doc: &[DocId],
|
||||
bucket_with_accessor: &BucketAggregationWithAccessor,
|
||||
force_flush: bool,
|
||||
) {
|
||||
let mut iter = doc.chunks_exact(4);
|
||||
for docs in iter.by_ref() {
|
||||
let val1 = bucket_with_accessor.accessor.get(docs[0]);
|
||||
let val2 = bucket_with_accessor.accessor.get(docs[1]);
|
||||
let val3 = bucket_with_accessor.accessor.get(docs[2]);
|
||||
let val4 = bucket_with_accessor.accessor.get(docs[3]);
|
||||
let bucket_pos1 = self.get_bucket_pos(val1);
|
||||
let bucket_pos2 = self.get_bucket_pos(val2);
|
||||
let bucket_pos3 = self.get_bucket_pos(val3);
|
||||
let bucket_pos4 = self.get_bucket_pos(val4);
|
||||
|
||||
self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation);
|
||||
self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation);
|
||||
self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation);
|
||||
self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation);
|
||||
}
|
||||
for doc in iter.remainder() {
|
||||
let val = bucket_with_accessor.accessor.get(*doc);
|
||||
let bucket_pos = self.get_bucket_pos(val);
|
||||
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation);
|
||||
}
|
||||
if force_flush {
|
||||
for bucket in &mut self.buckets {
|
||||
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
|
||||
sub_aggregation
|
||||
.flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn increment_bucket(
|
||||
&mut self,
|
||||
bucket_pos: usize,
|
||||
doc: DocId,
|
||||
bucket_with_accessor: &AggregationsWithAccessor,
|
||||
) {
|
||||
let bucket = &mut self.buckets[bucket_pos];
|
||||
|
||||
bucket.bucket.doc_count += 1;
|
||||
if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation {
|
||||
sub_aggregation.collect(doc, bucket_with_accessor);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_bucket_pos(&self, val: u64) -> usize {
|
||||
let pos = self
|
||||
.buckets
|
||||
.binary_search_by_key(&val, |probe| probe.range.start)
|
||||
.unwrap_or_else(|pos| pos - 1);
|
||||
debug_assert!(self.buckets[pos].range.contains(&val));
|
||||
pos
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the user provided f64 range value to fast field value space.
|
||||
///
|
||||
/// Internally fast field values are always stored as u64.
|
||||
/// If the fast field has u64 [1,2,5], these values are stored as is in the fast field.
|
||||
/// A fast field with f64 [1.0, 2.0, 5.0] is converted to u64 space, using a
|
||||
/// monotonic mapping function, so the order is preserved.
|
||||
///
|
||||
/// Consequently, a f64 user range 1.0..3.0 needs to be converted to fast field value space using
|
||||
/// the same monotonic mapping function, so that the provided ranges contain the u64 values in the
|
||||
/// fast field.
|
||||
/// The alternative would be that every value read would be converted to the f64 range, but that is
|
||||
/// more computational expensive when many documents are hit.
|
||||
fn to_u64_range(range: &RangeAggregationRange, field_type: &Type) -> Range<u64> {
|
||||
range
|
||||
.from
|
||||
.map(|from| f64_to_fastfield_u64(from, field_type))
|
||||
.unwrap_or(u64::MIN)
|
||||
..range
|
||||
.to
|
||||
.map(|to| f64_to_fastfield_u64(to, field_type))
|
||||
.unwrap_or(u64::MAX)
|
||||
}
|
||||
|
||||
/// Extends the provided buckets to contain the whole value range, by inserting buckets at the
|
||||
/// beginning and end.
|
||||
fn extend_validate_ranges(
|
||||
buckets: &[RangeAggregationRange],
|
||||
field_type: &Type,
|
||||
) -> crate::Result<Vec<Range<u64>>> {
|
||||
let mut converted_buckets = buckets
|
||||
.iter()
|
||||
.map(|range| to_u64_range(range, field_type))
|
||||
.collect_vec();
|
||||
|
||||
converted_buckets.sort_by_key(|bucket| bucket.start);
|
||||
if converted_buckets[0].start != u64::MIN {
|
||||
converted_buckets.insert(0, u64::MIN..converted_buckets[0].start);
|
||||
}
|
||||
|
||||
if converted_buckets[converted_buckets.len() - 1].end != u64::MAX {
|
||||
converted_buckets.push(converted_buckets[converted_buckets.len() - 1].end..u64::MAX);
|
||||
}
|
||||
|
||||
// fill up holes in the ranges
|
||||
let find_hole = |converted_buckets: &[Range<u64>]| {
|
||||
for (pos, ranges) in converted_buckets.windows(2).enumerate() {
|
||||
if ranges[0].end > ranges[1].start {
|
||||
return Err(TantivyError::InvalidArgument(format!(
|
||||
"Overlapping ranges not supported range {:?}, range+1 {:?}",
|
||||
ranges[0], ranges[1]
|
||||
)));
|
||||
}
|
||||
if ranges[0].end != ranges[1].start {
|
||||
return Ok(Some(pos));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
};
|
||||
|
||||
while let Some(hole_pos) = find_hole(&converted_buckets)? {
|
||||
let new_range = converted_buckets[hole_pos].end..converted_buckets[hole_pos + 1].start;
|
||||
converted_buckets.insert(hole_pos + 1, new_range);
|
||||
}
|
||||
|
||||
Ok(converted_buckets)
|
||||
}
|
||||
|
||||
pub fn range_to_string(range: &Range<u64>, field_type: &Type) -> String {
|
||||
// is_start is there for malformed requests, e.g. ig the user passes the range u64::MIN..0.0,
|
||||
// it should be rendererd as "*-0" and not "*-*"
|
||||
let to_str = |val: u64, is_start: bool| {
|
||||
if (is_start && val == u64::MIN) || (!is_start && val == u64::MAX) {
|
||||
"*".to_string()
|
||||
} else {
|
||||
f64_from_fastfield_u64(val, field_type).to_string()
|
||||
}
|
||||
};
|
||||
|
||||
format!("{}-{}", to_str(range.start, true), to_str(range.end, false))
|
||||
}
|
||||
|
||||
pub fn range_to_key(range: &Range<u64>, field_type: &Type) -> Key {
|
||||
Key::Str(range_to_string(range, field_type))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use serde_json::Value;
|
||||
|
||||
use super::*;
|
||||
use crate::aggregation::agg_req::{
|
||||
Aggregation, Aggregations, BucketAggregation, BucketAggregationType,
|
||||
};
|
||||
use crate::aggregation::tests::get_test_index_with_num_docs;
|
||||
use crate::aggregation::AggregationCollector;
|
||||
use crate::fastfield::FastValue;
|
||||
use crate::query::AllQuery;
|
||||
|
||||
pub fn get_collector_from_ranges(
|
||||
ranges: Vec<RangeAggregationRange>,
|
||||
field_type: Type,
|
||||
) -> SegmentRangeCollector {
|
||||
let req = RangeAggregation {
|
||||
field: "dummy".to_string(),
|
||||
ranges,
|
||||
};
|
||||
|
||||
SegmentRangeCollector::from_req(&req, &Default::default(), field_type).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_fraction_test() -> crate::Result<()> {
|
||||
let index = get_test_index_with_num_docs(false, 100)?;
|
||||
|
||||
let agg_req: Aggregations = vec![(
|
||||
"range".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Range(RangeAggregation {
|
||||
field: "fraction_f64".to_string(),
|
||||
ranges: vec![(0f64..0.1f64).into(), (0.1f64..0.2f64).into()],
|
||||
}),
|
||||
sub_aggregation: Default::default(),
|
||||
}),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req);
|
||||
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
let agg_res = searcher.search(&AllQuery, &collector).unwrap();
|
||||
|
||||
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
|
||||
|
||||
assert_eq!(res["range"]["buckets"][0]["key"], "*-0");
|
||||
assert_eq!(res["range"]["buckets"][0]["doc_count"], 0);
|
||||
assert_eq!(res["range"]["buckets"][1]["key"], "0-0.1");
|
||||
assert_eq!(res["range"]["buckets"][1]["doc_count"], 10);
|
||||
assert_eq!(res["range"]["buckets"][2]["key"], "0.1-0.2");
|
||||
assert_eq!(res["range"]["buckets"][2]["doc_count"], 10);
|
||||
assert_eq!(res["range"]["buckets"][3]["key"], "0.2-*");
|
||||
assert_eq!(res["range"]["buckets"][3]["doc_count"], 80);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bucket_test_extend_range_hole() {
|
||||
let buckets = vec![(10f64..20f64).into(), (30f64..40f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(buckets[0].range.start, u64::MIN);
|
||||
assert_eq!(buckets[0].range.end, 10f64.to_u64());
|
||||
assert_eq!(buckets[1].range.start, 10f64.to_u64());
|
||||
assert_eq!(buckets[1].range.end, 20f64.to_u64());
|
||||
// Added bucket to fill hole
|
||||
assert_eq!(buckets[2].range.start, 20f64.to_u64());
|
||||
assert_eq!(buckets[2].range.end, 30f64.to_u64());
|
||||
assert_eq!(buckets[3].range.start, 30f64.to_u64());
|
||||
assert_eq!(buckets[3].range.end, 40f64.to_u64());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bucket_test_range_conversion_special_case() {
|
||||
// the monotonic conversion between f64 and u64, does not map f64::MIN.to_u64() ==
|
||||
// u64::MIN, but the into trait converts f64::MIN/MAX to None
|
||||
let buckets = vec![
|
||||
(f64::MIN..10f64).into(),
|
||||
(10f64..20f64).into(),
|
||||
(20f64..f64::MAX).into(),
|
||||
];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(buckets[0].range.start, u64::MIN);
|
||||
assert_eq!(buckets[0].range.end, 10f64.to_u64());
|
||||
assert_eq!(buckets[1].range.start, 10f64.to_u64());
|
||||
assert_eq!(buckets[1].range.end, 20f64.to_u64());
|
||||
assert_eq!(buckets[2].range.start, 20f64.to_u64());
|
||||
assert_eq!(buckets[2].range.end, u64::MAX);
|
||||
assert_eq!(buckets.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bucket_range_test_negative_vals() {
|
||||
let buckets = vec![(-10f64..-1f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(&buckets[0].bucket.key.to_string(), "*--10");
|
||||
assert_eq!(&buckets[buckets.len() - 1].bucket.key.to_string(), "-1-*");
|
||||
}
|
||||
#[test]
|
||||
fn bucket_range_test_positive_vals() {
|
||||
let buckets = vec![(0f64..10f64).into()];
|
||||
let collector = get_collector_from_ranges(buckets, Type::F64);
|
||||
|
||||
let buckets = collector.buckets;
|
||||
assert_eq!(&buckets[0].bucket.key.to_string(), "*-0");
|
||||
assert_eq!(&buckets[buckets.len() - 1].bucket.key.to_string(), "10-*");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_binary_search_test_u64() {
|
||||
let check_ranges = |ranges: Vec<RangeAggregationRange>| {
|
||||
let collector = get_collector_from_ranges(ranges, Type::U64);
|
||||
let search = |val: u64| collector.get_bucket_pos(val);
|
||||
|
||||
assert_eq!(search(u64::MIN), 0);
|
||||
assert_eq!(search(9), 0);
|
||||
assert_eq!(search(10), 1);
|
||||
assert_eq!(search(11), 1);
|
||||
assert_eq!(search(99), 1);
|
||||
assert_eq!(search(100), 2);
|
||||
assert_eq!(search(u64::MAX - 1), 2); // Since the end range is never included, the max
|
||||
// value
|
||||
};
|
||||
|
||||
let ranges = vec![(10.0..100.0).into()];
|
||||
check_ranges(ranges);
|
||||
|
||||
let ranges = vec![
|
||||
RangeAggregationRange {
|
||||
to: Some(10.0),
|
||||
from: None,
|
||||
},
|
||||
(10.0..100.0).into(),
|
||||
];
|
||||
check_ranges(ranges);
|
||||
|
||||
let ranges = vec![
|
||||
RangeAggregationRange {
|
||||
to: Some(10.0),
|
||||
from: None,
|
||||
},
|
||||
(10.0..100.0).into(),
|
||||
RangeAggregationRange {
|
||||
to: None,
|
||||
from: Some(100.0),
|
||||
},
|
||||
];
|
||||
check_ranges(ranges);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn range_binary_search_test_f64() {
|
||||
let ranges = vec![
|
||||
//(f64::MIN..10.0).into(),
|
||||
(10.0..100.0).into(),
|
||||
//(100.0..f64::MAX).into(),
|
||||
];
|
||||
|
||||
let collector = get_collector_from_ranges(ranges, Type::F64);
|
||||
let search = |val: u64| collector.get_bucket_pos(val);
|
||||
|
||||
assert_eq!(search(u64::MIN), 0);
|
||||
assert_eq!(search(9f64.to_u64()), 0);
|
||||
assert_eq!(search(10f64.to_u64()), 1);
|
||||
assert_eq!(search(11f64.to_u64()), 1);
|
||||
assert_eq!(search(99f64.to_u64()), 1);
|
||||
assert_eq!(search(100f64.to_u64()), 2);
|
||||
assert_eq!(search(u64::MAX - 1), 2); // Since the end range is never included,
|
||||
// the max value
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
|
||||
use super::*;
|
||||
use crate::aggregation::bucket::range::tests::get_collector_from_ranges;
|
||||
|
||||
const TOTAL_DOCS: u64 = 1_000_000u64;
|
||||
const NUM_DOCS: u64 = 50_000u64;
|
||||
|
||||
fn get_collector_with_buckets(num_buckets: u64, num_docs: u64) -> SegmentRangeCollector {
|
||||
let bucket_size = num_docs / num_buckets;
|
||||
let mut buckets: Vec<RangeAggregationRange> = vec![];
|
||||
for i in 0..num_buckets {
|
||||
let bucket_start = (i * bucket_size) as f64;
|
||||
buckets.push((bucket_start..bucket_start + bucket_size as f64).into())
|
||||
}
|
||||
|
||||
get_collector_from_ranges(buckets, Type::U64)
|
||||
}
|
||||
|
||||
fn get_rand_docs(total_docs: u64, num_docs_returned: u64) -> Vec<u64> {
|
||||
let mut rng = thread_rng();
|
||||
|
||||
let all_docs = (0..total_docs - 1).collect_vec();
|
||||
let mut vals = all_docs
|
||||
.as_slice()
|
||||
.choose_multiple(&mut rng, num_docs_returned as usize)
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
vals.sort();
|
||||
vals
|
||||
}
|
||||
|
||||
fn bench_range_binary_search(b: &mut test::Bencher, num_buckets: u64) {
|
||||
let collector = get_collector_with_buckets(num_buckets, TOTAL_DOCS);
|
||||
let vals = get_rand_docs(TOTAL_DOCS, NUM_DOCS);
|
||||
b.iter(|| {
|
||||
let mut bucket_pos = 0;
|
||||
for val in &vals {
|
||||
bucket_pos = collector.get_bucket_pos(*val);
|
||||
}
|
||||
bucket_pos
|
||||
})
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_range_100_buckets(b: &mut test::Bencher) {
|
||||
bench_range_binary_search(b, 100)
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn bench_range_10_buckets(b: &mut test::Bencher) {
|
||||
bench_range_binary_search(b, 10)
|
||||
}
|
||||
}
|
||||
@@ -1,135 +0,0 @@
|
||||
use super::agg_req::Aggregations;
|
||||
use super::agg_req_with_accessor::AggregationsWithAccessor;
|
||||
use super::agg_result::AggregationResults;
|
||||
use super::intermediate_agg_result::IntermediateAggregationResults;
|
||||
use super::segment_agg_result::SegmentAggregationResultsCollector;
|
||||
use crate::aggregation::agg_req_with_accessor::get_aggregations_with_accessor;
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::TantivyError;
|
||||
|
||||
/// Collector for aggregations.
|
||||
///
|
||||
/// The collector collects all aggregations by the underlying aggregation request.
|
||||
pub struct AggregationCollector {
|
||||
agg: Aggregations,
|
||||
}
|
||||
|
||||
impl AggregationCollector {
|
||||
/// Create collector from aggregation request.
|
||||
pub fn from_aggs(agg: Aggregations) -> Self {
|
||||
Self { agg }
|
||||
}
|
||||
}
|
||||
|
||||
/// Collector for distributed aggregations.
|
||||
///
|
||||
/// The collector collects all aggregations by the underlying aggregation request.
|
||||
///
|
||||
/// # Purpose
|
||||
/// AggregationCollector returns `IntermediateAggregationResults` and not the final
|
||||
/// `AggregationResults`, so that results from differenct indices can be merged and then converted
|
||||
/// into the final `AggregationResults` via the `into()` method.
|
||||
pub struct DistributedAggregationCollector {
|
||||
agg: Aggregations,
|
||||
}
|
||||
|
||||
impl DistributedAggregationCollector {
|
||||
/// Create collector from aggregation request.
|
||||
pub fn from_aggs(agg: Aggregations) -> Self {
|
||||
Self { agg }
|
||||
}
|
||||
}
|
||||
|
||||
impl Collector for DistributedAggregationCollector {
|
||||
type Fruit = IntermediateAggregationResults;
|
||||
|
||||
type Child = AggregationSegmentCollector;
|
||||
|
||||
fn for_segment(
|
||||
&self,
|
||||
_segment_local_id: crate::SegmentOrdinal,
|
||||
reader: &crate::SegmentReader,
|
||||
) -> crate::Result<Self::Child> {
|
||||
let aggs_with_accessor = get_aggregations_with_accessor(&self.agg, reader)?;
|
||||
let result = SegmentAggregationResultsCollector::from_req(&aggs_with_accessor)?;
|
||||
Ok(AggregationSegmentCollector {
|
||||
aggs: aggs_with_accessor,
|
||||
result,
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn merge_fruits(
|
||||
&self,
|
||||
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
|
||||
) -> crate::Result<Self::Fruit> {
|
||||
merge_fruits(segment_fruits)
|
||||
}
|
||||
}
|
||||
|
||||
impl Collector for AggregationCollector {
|
||||
type Fruit = AggregationResults;
|
||||
|
||||
type Child = AggregationSegmentCollector;
|
||||
|
||||
fn for_segment(
|
||||
&self,
|
||||
_segment_local_id: crate::SegmentOrdinal,
|
||||
reader: &crate::SegmentReader,
|
||||
) -> crate::Result<Self::Child> {
|
||||
let aggs_with_accessor = get_aggregations_with_accessor(&self.agg, reader)?;
|
||||
let result = SegmentAggregationResultsCollector::from_req(&aggs_with_accessor)?;
|
||||
Ok(AggregationSegmentCollector {
|
||||
aggs: aggs_with_accessor,
|
||||
result,
|
||||
})
|
||||
}
|
||||
|
||||
fn requires_scoring(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn merge_fruits(
|
||||
&self,
|
||||
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
|
||||
) -> crate::Result<Self::Fruit> {
|
||||
merge_fruits(segment_fruits).map(|res| res.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_fruits(
|
||||
mut segment_fruits: Vec<IntermediateAggregationResults>,
|
||||
) -> crate::Result<IntermediateAggregationResults> {
|
||||
if let Some(mut fruit) = segment_fruits.pop() {
|
||||
for next_fruit in segment_fruits {
|
||||
fruit.merge_fruits(&next_fruit);
|
||||
}
|
||||
Ok(fruit)
|
||||
} else {
|
||||
Err(TantivyError::InvalidArgument(
|
||||
"no fruits provided in merge_fruits".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AggregationSegmentCollector {
|
||||
aggs: AggregationsWithAccessor,
|
||||
result: SegmentAggregationResultsCollector,
|
||||
}
|
||||
|
||||
impl SegmentCollector for AggregationSegmentCollector {
|
||||
type Fruit = IntermediateAggregationResults;
|
||||
|
||||
#[inline]
|
||||
fn collect(&mut self, doc: crate::DocId, _score: crate::Score) {
|
||||
self.result.collect(doc, &self.aggs);
|
||||
}
|
||||
|
||||
fn harvest(mut self) -> Self::Fruit {
|
||||
self.result.flush_staged_docs(&self.aggs, true);
|
||||
self.result.into()
|
||||
}
|
||||
}
|
||||
@@ -1,304 +0,0 @@
|
||||
//! Contains the intermediate aggregation tree, that can be merged.
|
||||
//! Intermediate aggregation results can be used to merge results between segments or between
|
||||
//! indices.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::metric::{IntermediateAverage, IntermediateStats};
|
||||
use super::segment_agg_result::{
|
||||
SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentMetricResultCollector,
|
||||
SegmentRangeBucketEntry,
|
||||
};
|
||||
use super::{Key, VecWithNames};
|
||||
|
||||
/// Contains the intermediate aggregation result, which is optimized to be merged with other
|
||||
/// intermediate results.
|
||||
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct IntermediateAggregationResults(pub(crate) VecWithNames<IntermediateAggregationResult>);
|
||||
|
||||
impl From<SegmentAggregationResultsCollector> for IntermediateAggregationResults {
|
||||
fn from(tree: SegmentAggregationResultsCollector) -> Self {
|
||||
let mut data = vec![];
|
||||
for (key, bucket) in tree.buckets.into_iter() {
|
||||
data.push((key, IntermediateAggregationResult::Bucket(bucket.into())));
|
||||
}
|
||||
for (key, metric) in tree.metrics.into_iter() {
|
||||
data.push((key, IntermediateAggregationResult::Metric(metric.into())));
|
||||
}
|
||||
Self(VecWithNames::from_entries(data))
|
||||
}
|
||||
}
|
||||
|
||||
impl IntermediateAggregationResults {
|
||||
/// Merge an other intermediate aggregation result into this result.
|
||||
///
|
||||
/// The order of the values need to be the same on both results. This is ensured when the same
|
||||
/// (key values) are present on the underlying VecWithNames struct.
|
||||
pub fn merge_fruits(&mut self, other: &IntermediateAggregationResults) {
|
||||
for (tree_left, tree_right) in self.0.values_mut().zip(other.0.values()) {
|
||||
tree_left.merge_fruits(tree_right);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An aggregation is either a bucket or a metric.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum IntermediateAggregationResult {
|
||||
/// Bucket variant
|
||||
Bucket(IntermediateBucketResult),
|
||||
/// Metric variant
|
||||
Metric(IntermediateMetricResult),
|
||||
}
|
||||
|
||||
impl IntermediateAggregationResult {
|
||||
fn merge_fruits(&mut self, other: &IntermediateAggregationResult) {
|
||||
match (self, other) {
|
||||
(
|
||||
IntermediateAggregationResult::Bucket(res_left),
|
||||
IntermediateAggregationResult::Bucket(res_right),
|
||||
) => {
|
||||
res_left.merge_fruits(res_right);
|
||||
}
|
||||
(
|
||||
IntermediateAggregationResult::Metric(res_left),
|
||||
IntermediateAggregationResult::Metric(res_right),
|
||||
) => {
|
||||
res_left.merge_fruits(res_right);
|
||||
}
|
||||
_ => {
|
||||
panic!("incompatible types in aggregation tree on merge fruits");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds the intermediate data for metric results
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum IntermediateMetricResult {
|
||||
/// Average containing intermediate average data result
|
||||
Average(IntermediateAverage),
|
||||
/// AverageData variant
|
||||
Stats(IntermediateStats),
|
||||
}
|
||||
|
||||
impl From<SegmentMetricResultCollector> for IntermediateMetricResult {
|
||||
fn from(tree: SegmentMetricResultCollector) -> Self {
|
||||
match tree {
|
||||
SegmentMetricResultCollector::Average(collector) => {
|
||||
IntermediateMetricResult::Average(IntermediateAverage::from_collector(collector))
|
||||
}
|
||||
SegmentMetricResultCollector::Stats(collector) => {
|
||||
IntermediateMetricResult::Stats(collector.stats)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IntermediateMetricResult {
|
||||
fn merge_fruits(&mut self, other: &IntermediateMetricResult) {
|
||||
match (self, other) {
|
||||
(
|
||||
IntermediateMetricResult::Average(avg_data_left),
|
||||
IntermediateMetricResult::Average(avg_data_right),
|
||||
) => {
|
||||
avg_data_left.merge_fruits(avg_data_right);
|
||||
}
|
||||
(
|
||||
IntermediateMetricResult::Stats(stats_left),
|
||||
IntermediateMetricResult::Stats(stats_right),
|
||||
) => {
|
||||
stats_left.merge_fruits(stats_right);
|
||||
}
|
||||
_ => {
|
||||
panic!("incompatible fruit types in tree {:?}", other);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The intermediate bucket results. Internally they can be easily merged via the keys of the
|
||||
/// buckets.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub enum IntermediateBucketResult {
|
||||
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
|
||||
/// sub_aggregations.
|
||||
Range(HashMap<Key, IntermediateRangeBucketEntry>),
|
||||
}
|
||||
|
||||
impl From<SegmentBucketResultCollector> for IntermediateBucketResult {
|
||||
fn from(collector: SegmentBucketResultCollector) -> Self {
|
||||
match collector {
|
||||
SegmentBucketResultCollector::Range(range) => range.into_intermediate_bucket_result(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IntermediateBucketResult {
|
||||
fn merge_fruits(&mut self, other: &IntermediateBucketResult) {
|
||||
match (self, other) {
|
||||
(
|
||||
IntermediateBucketResult::Range(entries_left),
|
||||
IntermediateBucketResult::Range(entries_right),
|
||||
) => {
|
||||
for (name, entry_left) in entries_left.iter_mut() {
|
||||
if let Some(entry_right) = entries_right.get(name) {
|
||||
entry_left.merge_fruits(entry_right);
|
||||
}
|
||||
}
|
||||
|
||||
for (key, res) in entries_right.iter() {
|
||||
if !entries_left.contains_key(key) {
|
||||
entries_left.insert(key.clone(), res.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the range entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub_aggregations.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct IntermediateRangeBucketEntry {
|
||||
/// The unique the bucket is identified.
|
||||
pub key: Key,
|
||||
/// The number of documents in the bucket.
|
||||
pub doc_count: u64,
|
||||
pub(crate) values: Option<Vec<u64>>,
|
||||
/// The sub_aggregation in this bucket.
|
||||
pub sub_aggregation: IntermediateAggregationResults,
|
||||
/// The from range of the bucket. Equals f64::MIN when None.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub from: Option<f64>,
|
||||
/// The to range of the bucket. Equals f64::MAX when None.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub to: Option<f64>,
|
||||
}
|
||||
|
||||
impl From<SegmentRangeBucketEntry> for IntermediateRangeBucketEntry {
|
||||
fn from(entry: SegmentRangeBucketEntry) -> Self {
|
||||
let sub_aggregation = if let Some(sub_aggregation) = entry.sub_aggregation {
|
||||
sub_aggregation.into()
|
||||
} else {
|
||||
Default::default()
|
||||
};
|
||||
// let sub_aggregation = entry.sub_aggregation.into();
|
||||
|
||||
IntermediateRangeBucketEntry {
|
||||
key: entry.key,
|
||||
doc_count: entry.doc_count,
|
||||
values: None,
|
||||
sub_aggregation,
|
||||
to: entry.to,
|
||||
from: entry.from,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IntermediateRangeBucketEntry {
|
||||
fn merge_fruits(&mut self, other: &IntermediateRangeBucketEntry) {
|
||||
self.doc_count += other.doc_count;
|
||||
self.sub_aggregation.merge_fruits(&other.sub_aggregation);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn get_sub_test_tree(data: &[(String, u64)]) -> IntermediateAggregationResults {
|
||||
let mut map = HashMap::new();
|
||||
let mut buckets = HashMap::new();
|
||||
for (key, doc_count) in data {
|
||||
buckets.insert(
|
||||
Key::Str(key.to_string()),
|
||||
IntermediateRangeBucketEntry {
|
||||
key: Key::Str(key.to_string()),
|
||||
doc_count: *doc_count,
|
||||
values: None,
|
||||
sub_aggregation: Default::default(),
|
||||
from: None,
|
||||
to: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
map.insert(
|
||||
"my_agg_level2".to_string(),
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)),
|
||||
);
|
||||
IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect()))
|
||||
}
|
||||
|
||||
fn get_test_tree(data: &[(String, u64, String, u64)]) -> IntermediateAggregationResults {
|
||||
let mut map = HashMap::new();
|
||||
let mut buckets = HashMap::new();
|
||||
for (key, doc_count, sub_aggregation_key, sub_aggregation_count) in data {
|
||||
buckets.insert(
|
||||
Key::Str(key.to_string()),
|
||||
IntermediateRangeBucketEntry {
|
||||
key: Key::Str(key.to_string()),
|
||||
doc_count: *doc_count,
|
||||
values: None,
|
||||
from: None,
|
||||
to: None,
|
||||
sub_aggregation: get_sub_test_tree(&[(
|
||||
sub_aggregation_key.to_string(),
|
||||
*sub_aggregation_count,
|
||||
)]),
|
||||
},
|
||||
);
|
||||
}
|
||||
map.insert(
|
||||
"my_agg_level1".to_string(),
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)),
|
||||
);
|
||||
IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect()))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_fruits_tree_1() {
|
||||
let mut tree_left = get_test_tree(&[
|
||||
("red".to_string(), 50, "1900".to_string(), 25),
|
||||
("blue".to_string(), 30, "1900".to_string(), 30),
|
||||
]);
|
||||
let tree_right = get_test_tree(&[
|
||||
("red".to_string(), 60, "1900".to_string(), 30),
|
||||
("blue".to_string(), 25, "1900".to_string(), 50),
|
||||
]);
|
||||
|
||||
tree_left.merge_fruits(&tree_right);
|
||||
|
||||
let tree_expected = get_test_tree(&[
|
||||
("red".to_string(), 110, "1900".to_string(), 55),
|
||||
("blue".to_string(), 55, "1900".to_string(), 80),
|
||||
]);
|
||||
|
||||
assert_eq!(tree_left, tree_expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_fruits_tree_2() {
|
||||
let mut tree_left = get_test_tree(&[
|
||||
("red".to_string(), 50, "1900".to_string(), 25),
|
||||
("blue".to_string(), 30, "1900".to_string(), 30),
|
||||
]);
|
||||
let tree_right = get_test_tree(&[
|
||||
("red".to_string(), 60, "1900".to_string(), 30),
|
||||
("green".to_string(), 25, "1900".to_string(), 50),
|
||||
]);
|
||||
|
||||
tree_left.merge_fruits(&tree_right);
|
||||
|
||||
let tree_expected = get_test_tree(&[
|
||||
("red".to_string(), 110, "1900".to_string(), 55),
|
||||
("blue".to_string(), 30, "1900".to_string(), 30),
|
||||
("green".to_string(), 25, "1900".to_string(), 50),
|
||||
]);
|
||||
|
||||
assert_eq!(tree_left, tree_expected);
|
||||
}
|
||||
}
|
||||
@@ -1,101 +0,0 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::aggregation::f64_from_fastfield_u64;
|
||||
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
|
||||
use crate::schema::Type;
|
||||
use crate::DocId;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
/// A single-value metric aggregation that computes the average of numeric values that are
|
||||
/// extracted from the aggregated documents.
|
||||
/// Supported field types are u64, i64, and f64.
|
||||
/// See [super::SingleMetricResult] for return value.
|
||||
pub struct AverageAggregation {
|
||||
/// The field name to compute the stats on.
|
||||
pub field: String,
|
||||
}
|
||||
impl AverageAggregation {
|
||||
/// Create new AverageAggregation from a field.
|
||||
pub fn from_field_name(field_name: String) -> Self {
|
||||
AverageAggregation { field: field_name }
|
||||
}
|
||||
/// Return the field name.
|
||||
pub fn field_name(&self) -> &str {
|
||||
&self.field
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub(crate) struct SegmentAverageCollector {
|
||||
pub data: IntermediateAverage,
|
||||
field_type: Type,
|
||||
}
|
||||
|
||||
impl Debug for SegmentAverageCollector {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AverageCollector")
|
||||
.field("data", &self.data)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentAverageCollector {
|
||||
pub fn from_req(field_type: Type) -> Self {
|
||||
Self {
|
||||
field_type,
|
||||
data: Default::default(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
|
||||
let mut iter = doc.chunks_exact(4);
|
||||
for docs in iter.by_ref() {
|
||||
let val1 = field.get(docs[0]);
|
||||
let val2 = field.get(docs[1]);
|
||||
let val3 = field.get(docs[2]);
|
||||
let val4 = field.get(docs[3]);
|
||||
let val1 = f64_from_fastfield_u64(val1, &self.field_type);
|
||||
let val2 = f64_from_fastfield_u64(val2, &self.field_type);
|
||||
let val3 = f64_from_fastfield_u64(val3, &self.field_type);
|
||||
let val4 = f64_from_fastfield_u64(val4, &self.field_type);
|
||||
self.data.collect(val1);
|
||||
self.data.collect(val2);
|
||||
self.data.collect(val3);
|
||||
self.data.collect(val4);
|
||||
}
|
||||
for doc in iter.remainder() {
|
||||
let val = field.get(*doc);
|
||||
let val = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.data.collect(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Contains mergeable version of average data.
|
||||
#[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct IntermediateAverage {
|
||||
pub(crate) sum: f64,
|
||||
pub(crate) doc_count: u64,
|
||||
}
|
||||
|
||||
impl IntermediateAverage {
|
||||
pub(crate) fn from_collector(collector: SegmentAverageCollector) -> Self {
|
||||
collector.data
|
||||
}
|
||||
|
||||
/// Merge average data into this instance.
|
||||
pub fn merge_fruits(&mut self, other: &IntermediateAverage) {
|
||||
self.sum += other.sum;
|
||||
self.doc_count += other.doc_count;
|
||||
}
|
||||
/// compute final result
|
||||
pub fn finalize(&self) -> f64 {
|
||||
self.sum / self.doc_count as f64
|
||||
}
|
||||
#[inline]
|
||||
fn collect(&mut self, val: f64) {
|
||||
self.doc_count += 1;
|
||||
self.sum += val;
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
//! Module for all metric aggregations.
|
||||
|
||||
mod average;
|
||||
mod stats;
|
||||
pub use average::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use stats::*;
|
||||
|
||||
/// Single-metric aggregations use this common result structure.
|
||||
///
|
||||
/// Main reason to wrap it in value is to match elasticsearch output structure.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct SingleMetricResult {
|
||||
/// The value of the single value metric.
|
||||
pub value: f64,
|
||||
}
|
||||
|
||||
impl From<f64> for SingleMetricResult {
|
||||
fn from(value: f64) -> Self {
|
||||
Self { value }
|
||||
}
|
||||
}
|
||||
@@ -1,273 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::aggregation::f64_from_fastfield_u64;
|
||||
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
|
||||
use crate::schema::Type;
|
||||
use crate::DocId;
|
||||
|
||||
/// A multi-value metric aggregation that computes stats of numeric values that are
|
||||
/// extracted from the aggregated documents.
|
||||
/// Supported field types are u64, i64, and f64.
|
||||
/// See [Stats] for returned statistics.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct StatsAggregation {
|
||||
/// The field name to compute the stats on.
|
||||
pub field: String,
|
||||
}
|
||||
impl StatsAggregation {
|
||||
/// Create new StatsAggregation from a field.
|
||||
pub fn from_field_name(field_name: String) -> Self {
|
||||
StatsAggregation { field: field_name }
|
||||
}
|
||||
/// Return the field name.
|
||||
pub fn field_name(&self) -> &str {
|
||||
&self.field
|
||||
}
|
||||
}
|
||||
|
||||
/// Stats contains a collection of statistics.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Stats {
|
||||
/// The number of documents.
|
||||
pub count: usize,
|
||||
/// The sum of the fast field values.
|
||||
pub sum: f64,
|
||||
/// The standard deviation of the fast field values.
|
||||
pub standard_deviation: f64,
|
||||
/// The min value of the fast field values.
|
||||
pub min: f64,
|
||||
/// The max value of the fast field values.
|
||||
pub max: f64,
|
||||
/// The average of the values.
|
||||
pub avg: f64,
|
||||
}
|
||||
|
||||
/// IntermediateStats contains the mergeable version for stats.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct IntermediateStats {
|
||||
count: usize,
|
||||
sum: f64,
|
||||
squared_sum: f64,
|
||||
min: f64,
|
||||
max: f64,
|
||||
}
|
||||
|
||||
impl IntermediateStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
count: 0,
|
||||
sum: 0.0,
|
||||
squared_sum: 0.0,
|
||||
min: f64::MAX,
|
||||
max: f64::MIN,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn avg(&self) -> f64 {
|
||||
self.sum / (self.count as f64)
|
||||
}
|
||||
|
||||
fn square_mean(&self) -> f64 {
|
||||
self.squared_sum / (self.count as f64)
|
||||
}
|
||||
|
||||
pub(crate) fn standard_deviation(&self) -> f64 {
|
||||
let average = self.avg();
|
||||
(self.square_mean() - average * average).sqrt()
|
||||
}
|
||||
|
||||
/// Merge data from other stats into this instance.
|
||||
pub fn merge_fruits(&mut self, other: &IntermediateStats) {
|
||||
self.count += other.count;
|
||||
self.sum += other.sum;
|
||||
self.squared_sum += other.squared_sum;
|
||||
self.min = self.min.min(other.min);
|
||||
self.max = self.max.max(other.max);
|
||||
}
|
||||
|
||||
/// compute final result
|
||||
pub fn finalize(&self) -> Stats {
|
||||
Stats {
|
||||
count: self.count,
|
||||
sum: self.sum,
|
||||
standard_deviation: self.standard_deviation(),
|
||||
min: self.min,
|
||||
max: self.max,
|
||||
avg: self.avg(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn collect(&mut self, value: f64) {
|
||||
self.count += 1;
|
||||
self.sum += value;
|
||||
self.squared_sum += value * value;
|
||||
self.min = self.min.min(value);
|
||||
self.max = self.max.max(value);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) struct SegmentStatsCollector {
|
||||
pub(crate) stats: IntermediateStats,
|
||||
field_type: Type,
|
||||
}
|
||||
|
||||
impl SegmentStatsCollector {
|
||||
pub fn from_req(field_type: Type) -> Self {
|
||||
Self {
|
||||
field_type,
|
||||
stats: IntermediateStats::new(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
|
||||
let mut iter = doc.chunks_exact(4);
|
||||
for docs in iter.by_ref() {
|
||||
let val1 = field.get(docs[0]);
|
||||
let val2 = field.get(docs[1]);
|
||||
let val3 = field.get(docs[2]);
|
||||
let val4 = field.get(docs[3]);
|
||||
let val1 = f64_from_fastfield_u64(val1, &self.field_type);
|
||||
let val2 = f64_from_fastfield_u64(val2, &self.field_type);
|
||||
let val3 = f64_from_fastfield_u64(val3, &self.field_type);
|
||||
let val4 = f64_from_fastfield_u64(val4, &self.field_type);
|
||||
self.stats.collect(val1);
|
||||
self.stats.collect(val2);
|
||||
self.stats.collect(val3);
|
||||
self.stats.collect(val4);
|
||||
}
|
||||
for doc in iter.remainder() {
|
||||
let val = field.get(*doc);
|
||||
let val = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.stats.collect(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::iter;
|
||||
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::aggregation::agg_req::{
|
||||
Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation,
|
||||
RangeAggregation,
|
||||
};
|
||||
use crate::aggregation::agg_result::AggregationResults;
|
||||
use crate::aggregation::metric::StatsAggregation;
|
||||
use crate::aggregation::tests::get_test_index_2_segments;
|
||||
use crate::aggregation::AggregationCollector;
|
||||
use crate::query::TermQuery;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::Term;
|
||||
|
||||
#[test]
|
||||
fn test_aggregation_stats() -> crate::Result<()> {
|
||||
let index = get_test_index_2_segments(false)?;
|
||||
|
||||
let reader = index.reader()?;
|
||||
let text_field = reader.searcher().schema().get_field("text").unwrap();
|
||||
|
||||
let term_query = TermQuery::new(
|
||||
Term::from_field_text(text_field, "cool"),
|
||||
IndexRecordOption::Basic,
|
||||
);
|
||||
|
||||
let agg_req_1: Aggregations = vec![
|
||||
(
|
||||
"stats_i64".to_string(),
|
||||
Aggregation::Metric(MetricAggregation::Stats(StatsAggregation::from_field_name(
|
||||
"score_i64".to_string(),
|
||||
))),
|
||||
),
|
||||
(
|
||||
"stats_f64".to_string(),
|
||||
Aggregation::Metric(MetricAggregation::Stats(StatsAggregation::from_field_name(
|
||||
"score_f64".to_string(),
|
||||
))),
|
||||
),
|
||||
(
|
||||
"stats".to_string(),
|
||||
Aggregation::Metric(MetricAggregation::Stats(StatsAggregation::from_field_name(
|
||||
"score".to_string(),
|
||||
))),
|
||||
),
|
||||
(
|
||||
"range".to_string(),
|
||||
Aggregation::Bucket(BucketAggregation {
|
||||
bucket_agg: BucketAggregationType::Range(RangeAggregation {
|
||||
field: "score".to_string(),
|
||||
ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()],
|
||||
}),
|
||||
sub_aggregation: iter::once((
|
||||
"stats".to_string(),
|
||||
Aggregation::Metric(MetricAggregation::Stats(
|
||||
StatsAggregation::from_field_name("score".to_string()),
|
||||
)),
|
||||
))
|
||||
.collect(),
|
||||
}),
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let collector = AggregationCollector::from_aggs(agg_req_1);
|
||||
|
||||
let searcher = reader.searcher();
|
||||
let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap();
|
||||
|
||||
let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?;
|
||||
assert_eq!(
|
||||
res["stats"],
|
||||
json!({
|
||||
"avg": 12.142857142857142,
|
||||
"count": 7,
|
||||
"max": 44.0,
|
||||
"min": 1.0,
|
||||
"standard_deviation": 13.65313748796613,
|
||||
"sum": 85.0
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
res["stats_i64"],
|
||||
json!({
|
||||
"avg": 12.142857142857142,
|
||||
"count": 7,
|
||||
"max": 44.0,
|
||||
"min": 1.0,
|
||||
"standard_deviation": 13.65313748796613,
|
||||
"sum": 85.0
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
res["stats_f64"],
|
||||
json!({
|
||||
"avg": 12.214285714285714,
|
||||
"count": 7,
|
||||
"max": 44.5,
|
||||
"min": 1.0,
|
||||
"standard_deviation": 13.819905785437443,
|
||||
"sum": 85.5
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
res["range"]["buckets"][2]["stats"],
|
||||
json!({
|
||||
"avg": 10.666666666666666,
|
||||
"count": 3,
|
||||
"max": 14.0,
|
||||
"min": 7.0,
|
||||
"standard_deviation": 2.867441755680877,
|
||||
"sum": 32.0
|
||||
})
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,195 +0,0 @@
|
||||
//! Contains aggregation trees which is used during collection in a segment.
|
||||
//! This tree contains datastructrues optimized for fast collection.
|
||||
//! The tree can be converted to an intermediate tree, which contains datastructrues optimized for
|
||||
//! merging.
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
use super::agg_req::MetricAggregation;
|
||||
use super::agg_req_with_accessor::{
|
||||
AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor,
|
||||
};
|
||||
use super::bucket::SegmentRangeCollector;
|
||||
use super::metric::{
|
||||
AverageAggregation, SegmentAverageCollector, SegmentStatsCollector, StatsAggregation,
|
||||
};
|
||||
use super::{Key, VecWithNames};
|
||||
use crate::aggregation::agg_req::BucketAggregationType;
|
||||
use crate::DocId;
|
||||
|
||||
pub(crate) const DOC_BLOCK_SIZE: usize = 256;
|
||||
pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE];
|
||||
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub(crate) struct SegmentAggregationResultsCollector {
|
||||
pub(crate) metrics: VecWithNames<SegmentMetricResultCollector>,
|
||||
pub(crate) buckets: VecWithNames<SegmentBucketResultCollector>,
|
||||
staged_docs: DocBlock,
|
||||
num_staged_docs: usize,
|
||||
}
|
||||
|
||||
impl Debug for SegmentAggregationResultsCollector {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentAggregationResultsCollector")
|
||||
.field("metrics", &self.metrics)
|
||||
.field("buckets", &self.buckets)
|
||||
.field("staged_docs", &&self.staged_docs[..self.num_staged_docs])
|
||||
.field("num_staged_docs", &self.num_staged_docs)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentAggregationResultsCollector {
|
||||
pub(crate) fn from_req(req: &AggregationsWithAccessor) -> crate::Result<Self> {
|
||||
let buckets = req
|
||||
.buckets
|
||||
.entries()
|
||||
.map(|(key, req)| {
|
||||
Ok((
|
||||
key.to_string(),
|
||||
SegmentBucketResultCollector::from_req(req)?,
|
||||
))
|
||||
})
|
||||
.collect::<crate::Result<_>>()?;
|
||||
let metrics = req
|
||||
.metrics
|
||||
.entries()
|
||||
.map(|(key, req)| (key.to_string(), SegmentMetricResultCollector::from_req(req)))
|
||||
.collect_vec();
|
||||
Ok(SegmentAggregationResultsCollector {
|
||||
metrics: VecWithNames::from_entries(metrics),
|
||||
buckets: VecWithNames::from_entries(buckets),
|
||||
staged_docs: [0; DOC_BLOCK_SIZE],
|
||||
num_staged_docs: 0,
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn collect(
|
||||
&mut self,
|
||||
doc: crate::DocId,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
) {
|
||||
self.staged_docs[self.num_staged_docs] = doc;
|
||||
self.num_staged_docs += 1;
|
||||
if self.num_staged_docs == self.staged_docs.len() {
|
||||
self.flush_staged_docs(agg_with_accessor, false);
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
pub(crate) fn flush_staged_docs(
|
||||
&mut self,
|
||||
agg_with_accessor: &AggregationsWithAccessor,
|
||||
force_flush: bool,
|
||||
) {
|
||||
for (agg_with_accessor, collector) in agg_with_accessor
|
||||
.metrics
|
||||
.values()
|
||||
.zip(self.metrics.values_mut())
|
||||
{
|
||||
collector.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor);
|
||||
}
|
||||
for (agg_with_accessor, collector) in agg_with_accessor
|
||||
.buckets
|
||||
.values()
|
||||
.zip(self.buckets.values_mut())
|
||||
{
|
||||
collector.collect_block(
|
||||
&self.staged_docs[..self.num_staged_docs],
|
||||
agg_with_accessor,
|
||||
force_flush,
|
||||
);
|
||||
}
|
||||
|
||||
self.num_staged_docs = 0;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) enum SegmentMetricResultCollector {
|
||||
Average(SegmentAverageCollector),
|
||||
Stats(SegmentStatsCollector),
|
||||
}
|
||||
|
||||
impl SegmentMetricResultCollector {
|
||||
pub fn from_req(req: &MetricAggregationWithAccessor) -> Self {
|
||||
match &req.metric {
|
||||
MetricAggregation::Average(AverageAggregation { field: _ }) => {
|
||||
SegmentMetricResultCollector::Average(SegmentAverageCollector::from_req(
|
||||
req.field_type,
|
||||
))
|
||||
}
|
||||
MetricAggregation::Stats(StatsAggregation { field: _ }) => {
|
||||
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(req.field_type))
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) {
|
||||
match self {
|
||||
SegmentMetricResultCollector::Average(avg_collector) => {
|
||||
avg_collector.collect_block(doc, &metric.accessor);
|
||||
}
|
||||
SegmentMetricResultCollector::Stats(stats_collector) => {
|
||||
stats_collector.collect_block(doc, &metric.accessor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// SegmentBucketAggregationResultCollectors will have specialized buckets for collection inside
|
||||
/// segments.
|
||||
/// The typical structure of Map<Key, Bucket> is not suitable during collection for performance
|
||||
/// reasons.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) enum SegmentBucketResultCollector {
|
||||
Range(SegmentRangeCollector),
|
||||
}
|
||||
|
||||
impl SegmentBucketResultCollector {
|
||||
pub fn from_req(req: &BucketAggregationWithAccessor) -> crate::Result<Self> {
|
||||
match &req.bucket_agg {
|
||||
BucketAggregationType::Range(range_req) => Ok(Self::Range(
|
||||
SegmentRangeCollector::from_req(range_req, &req.sub_aggregation, req.field_type)?,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn collect_block(
|
||||
&mut self,
|
||||
doc: &[DocId],
|
||||
bucket_with_accessor: &BucketAggregationWithAccessor,
|
||||
force_flush: bool,
|
||||
) {
|
||||
match self {
|
||||
SegmentBucketResultCollector::Range(range) => {
|
||||
range.collect_block(doc, bucket_with_accessor, force_flush);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub(crate) struct SegmentRangeBucketEntry {
|
||||
pub key: Key,
|
||||
pub doc_count: u64,
|
||||
pub sub_aggregation: Option<SegmentAggregationResultsCollector>,
|
||||
/// The from range of the bucket. Equals f64::MIN when None.
|
||||
pub from: Option<f64>,
|
||||
/// The to range of the bucket. Equals f64::MAX when None.
|
||||
pub to: Option<f64>,
|
||||
}
|
||||
|
||||
impl Debug for SegmentRangeBucketEntry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SegmentRangeBucketEntry")
|
||||
.field("key", &self.key)
|
||||
.field("doc_count", &self.doc_count)
|
||||
.field("from", &self.from)
|
||||
.field("to", &self.to)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
use super::Collector;
|
||||
use crate::collector::SegmentCollector;
|
||||
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
|
||||
use crate::DocId;
|
||||
use crate::Score;
|
||||
use crate::SegmentOrdinal;
|
||||
use crate::SegmentReader;
|
||||
|
||||
/// `CountCollector` collector only counts how many
|
||||
/// documents match the query.
|
||||
@@ -77,7 +80,8 @@ impl SegmentCollector for SegmentCountCollector {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Count, SegmentCountCollector};
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::collector::Collector;
|
||||
use crate::collector::SegmentCollector;
|
||||
|
||||
#[test]
|
||||
fn test_count_collect_does_not_requires_scoring() {
|
||||
|
||||
@@ -8,7 +8,8 @@ pub(crate) struct CustomScoreTopCollector<TCustomScorer, TScore = Score> {
|
||||
}
|
||||
|
||||
impl<TCustomScorer, TScore> CustomScoreTopCollector<TCustomScorer, TScore>
|
||||
where TScore: Clone + PartialOrd
|
||||
where
|
||||
TScore: Clone + PartialOrd,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
custom_scorer: TCustomScorer,
|
||||
@@ -113,7 +114,8 @@ where
|
||||
}
|
||||
|
||||
impl<F, TScore> CustomSegmentScorer<TScore> for F
|
||||
where F: 'static + FnMut(DocId) -> TScore
|
||||
where
|
||||
F: 'static + FnMut(DocId) -> TScore,
|
||||
{
|
||||
fn score(&mut self, doc: DocId) -> TScore {
|
||||
(self)(doc)
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::{Collector, SegmentCollector};
|
||||
use crate::{DocAddress, DocId, Score};
|
||||
|
||||
use super::{Collector, SegmentCollector};
|
||||
|
||||
/// Collectors that returns the set of DocAddress that matches the query.
|
||||
///
|
||||
/// This collector is mostly useful for tests.
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
use crate::collector::Collector;
|
||||
use crate::collector::SegmentCollector;
|
||||
use crate::fastfield::FacetReader;
|
||||
use crate::schema::Facet;
|
||||
use crate::schema::Field;
|
||||
use crate::DocId;
|
||||
use crate::Score;
|
||||
use crate::SegmentOrdinal;
|
||||
use crate::SegmentReader;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{btree_map, BTreeMap, BTreeSet, BinaryHeap};
|
||||
use std::collections::btree_map;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::iter::Peekable;
|
||||
use std::ops::Bound;
|
||||
use std::{u64, usize};
|
||||
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::fastfield::FacetReader;
|
||||
use crate::schema::{Facet, Field};
|
||||
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
|
||||
|
||||
struct Hit<'a> {
|
||||
count: u64,
|
||||
facet: &'a Facet,
|
||||
@@ -233,7 +240,9 @@ impl FacetCollector {
|
||||
/// If you need the correct number of unique documents for two such facets,
|
||||
/// just add them in separate `FacetCollector`.
|
||||
pub fn add_facet<T>(&mut self, facet_from: T)
|
||||
where Facet: From<T> {
|
||||
where
|
||||
Facet: From<T>,
|
||||
{
|
||||
let facet = Facet::from(facet_from);
|
||||
for old_facet in &self.facets {
|
||||
assert!(
|
||||
@@ -393,7 +402,9 @@ impl FacetCounts {
|
||||
/// Returns an iterator over all of the facet count pairs inside this result.
|
||||
/// See the documentation for [FacetCollector] for a usage example.
|
||||
pub fn get<T>(&self, facet_from: T) -> FacetChildIterator<'_>
|
||||
where Facet: From<T> {
|
||||
where
|
||||
Facet: From<T>,
|
||||
{
|
||||
let facet = Facet::from(facet_from);
|
||||
let left_bound = Bound::Excluded(facet.clone());
|
||||
let right_bound = if facet.is_root() {
|
||||
@@ -412,7 +423,9 @@ impl FacetCounts {
|
||||
/// Returns a vector of top `k` facets with their counts, sorted highest-to-lowest by counts.
|
||||
/// See the documentation for [FacetCollector] for a usage example.
|
||||
pub fn top_k<T>(&self, facet: T, k: usize) -> Vec<(&Facet, u64)>
|
||||
where Facet: From<T> {
|
||||
where
|
||||
Facet: From<T>,
|
||||
{
|
||||
let mut heap = BinaryHeap::with_capacity(k);
|
||||
let mut it = self.get(facet);
|
||||
|
||||
@@ -445,18 +458,16 @@ impl FacetCounts {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::iter;
|
||||
|
||||
use rand::distributions::Uniform;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use super::{FacetCollector, FacetCounts};
|
||||
use crate::collector::Count;
|
||||
use crate::core::Index;
|
||||
use crate::query::{AllQuery, QueryParser, TermQuery};
|
||||
use crate::schema::{Document, Facet, FacetOptions, Field, IndexRecordOption, Schema};
|
||||
use crate::Term;
|
||||
use rand::distributions::Uniform;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::iter;
|
||||
|
||||
#[test]
|
||||
fn test_facet_collector_drilldown() -> crate::Result<()> {
|
||||
@@ -511,9 +522,8 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(
|
||||
expected = "Tried to add a facet which is a descendant of an already added facet."
|
||||
)]
|
||||
#[should_panic(expected = "Tried to add a facet which is a descendant of \
|
||||
an already added facet.")]
|
||||
fn test_misused_facet_collector() {
|
||||
let mut facet_collector = FacetCollector::for_field(Field::from_field_id(0));
|
||||
facet_collector.add_facet(Facet::from("/country"));
|
||||
@@ -690,14 +700,13 @@ mod tests {
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use test::Bencher;
|
||||
|
||||
use crate::collector::FacetCollector;
|
||||
use crate::query::AllQuery;
|
||||
use crate::schema::{Facet, Schema, INDEXED};
|
||||
use crate::Index;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn bench_facet_collector(b: &mut Bencher) {
|
||||
|
||||
@@ -17,8 +17,7 @@ use crate::schema::Field;
|
||||
use crate::{Score, SegmentReader, TantivyError};
|
||||
|
||||
/// The `FilterCollector` filters docs using a fast field value and a predicate.
|
||||
/// Only the documents for which the predicate returned "true" will be passed on to the next
|
||||
/// collector.
|
||||
/// Only the documents for which the predicate returned "true" will be passed on to the next collector.
|
||||
///
|
||||
/// ```rust
|
||||
/// use tantivy::collector::{TopDocs, FilterCollector};
|
||||
@@ -59,7 +58,8 @@ use crate::{Score, SegmentReader, TantivyError};
|
||||
/// # }
|
||||
/// ```
|
||||
pub struct FilterCollector<TCollector, TPredicate, TPredicateValue: FastValue>
|
||||
where TPredicate: 'static + Clone
|
||||
where
|
||||
TPredicate: 'static + Clone,
|
||||
{
|
||||
field: Field,
|
||||
collector: TCollector,
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use fastdivide::DividerU64;
|
||||
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue};
|
||||
use crate::schema::{Field, Type};
|
||||
use crate::{DocId, Score};
|
||||
use fastdivide::DividerU64;
|
||||
|
||||
/// Histogram builds an histogram of the values of a fastfield for the
|
||||
/// collected DocSet.
|
||||
@@ -19,7 +18,7 @@ use crate::{DocId, Score};
|
||||
///
|
||||
/// # Warning
|
||||
///
|
||||
/// f64 fields are not supported.
|
||||
/// f64 field. are not supported.
|
||||
#[derive(Clone)]
|
||||
pub struct HistogramCollector {
|
||||
min_value: u64,
|
||||
@@ -37,8 +36,8 @@ impl HistogramCollector {
|
||||
/// - `bucket_width`: the length of the interval that is associated to each buckets.
|
||||
/// - `num_buckets`: The overall number of buckets.
|
||||
///
|
||||
/// Together, this parameters define a partition of `[min_value, min_value + num_buckets *
|
||||
/// bucket_width)` into `num_buckets` intervals of width bucket that we call `bucket`.
|
||||
/// Together, this parameters define a partition of `[min_value, min_value + num_buckets * bucket_width)`
|
||||
/// into `num_buckets` intervals of width bucket that we call `bucket`.
|
||||
///
|
||||
/// # Disclaimer
|
||||
/// This function panics if the field given is of type f64.
|
||||
@@ -148,13 +147,12 @@ fn add_vecs(mut vals_list: Vec<Vec<u64>>, len: usize) -> Vec<u64> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use fastdivide::DividerU64;
|
||||
use query::AllQuery;
|
||||
|
||||
use super::{add_vecs, HistogramCollector, HistogramComputer};
|
||||
use crate::chrono::{TimeZone, Utc};
|
||||
use crate::schema::{Schema, FAST};
|
||||
use crate::{doc, query, Index};
|
||||
use fastdivide::DividerU64;
|
||||
use query::AllQuery;
|
||||
|
||||
#[test]
|
||||
fn test_add_histograms_simple() {
|
||||
|
||||
@@ -1,90 +1,95 @@
|
||||
//! # Collectors
|
||||
//!
|
||||
//! Collectors define the information you want to extract from the documents matching the queries.
|
||||
//! In tantivy jargon, we call this information your search "fruit".
|
||||
//!
|
||||
//! Your fruit could for instance be :
|
||||
//! - [the count of matching documents](./struct.Count.html)
|
||||
//! - [the top 10 documents, by relevancy or by a fast field](./struct.TopDocs.html)
|
||||
//! - [facet counts](./struct.FacetCollector.html)
|
||||
//!
|
||||
//! At one point in your code, you will trigger the actual search operation by calling
|
||||
//! [the `search(...)` method of your `Searcher` object](../struct.Searcher.html#method.search).
|
||||
//! This call will look like this.
|
||||
//!
|
||||
//! ```verbatim
|
||||
//! let fruit = searcher.search(&query, &collector)?;
|
||||
//! ```
|
||||
//!
|
||||
//! Here the type of fruit is actually determined as an associated type of the collector
|
||||
//! (`Collector::Fruit`).
|
||||
//!
|
||||
//!
|
||||
//! # Combining several collectors
|
||||
//!
|
||||
//! A rich search experience often requires to run several collectors on your search query.
|
||||
//! For instance,
|
||||
//! - selecting the top-K products matching your query
|
||||
//! - counting the matching documents
|
||||
//! - computing several facets
|
||||
//! - computing statistics about the matching product prices
|
||||
//!
|
||||
//! A simple and efficient way to do that is to pass your collectors as one tuple.
|
||||
//! The resulting `Fruit` will then be a typed tuple with each collector's original fruits
|
||||
//! in their respective position.
|
||||
//!
|
||||
//! ```rust
|
||||
//! # use tantivy::schema::*;
|
||||
//! # use tantivy::*;
|
||||
//! # use tantivy::query::*;
|
||||
//! use tantivy::collector::{Count, TopDocs};
|
||||
//! #
|
||||
//! # fn main() -> tantivy::Result<()> {
|
||||
//! # let mut schema_builder = Schema::builder();
|
||||
//! # let title = schema_builder.add_text_field("title", TEXT);
|
||||
//! # let schema = schema_builder.build();
|
||||
//! # let index = Index::create_in_ram(schema);
|
||||
//! # let mut index_writer = index.writer(3_000_000)?;
|
||||
//! # index_writer.add_document(doc!(
|
||||
//! # title => "The Name of the Wind",
|
||||
//! # ))?;
|
||||
//! # index_writer.add_document(doc!(
|
||||
//! # title => "The Diary of Muadib",
|
||||
//! # ))?;
|
||||
//! # index_writer.commit()?;
|
||||
//! # let reader = index.reader()?;
|
||||
//! # let searcher = reader.searcher();
|
||||
//! # let query_parser = QueryParser::for_index(&index, vec![title]);
|
||||
//! # let query = query_parser.parse_query("diary")?;
|
||||
//! let (doc_count, top_docs): (usize, Vec<(Score, DocAddress)>) =
|
||||
//! searcher.search(&query, &(Count, TopDocs::with_limit(2)))?;
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! The `Collector` trait is implemented for up to 4 collectors.
|
||||
//! If you have more than 4 collectors, you can either group them into
|
||||
//! tuples of tuples `(a,(b,(c,d)))`, or rely on [`MultiCollector`](./struct.MultiCollector.html).
|
||||
//!
|
||||
//! # Combining several collectors dynamically
|
||||
//!
|
||||
//! Combining collectors into a tuple is a zero-cost abstraction: everything
|
||||
//! happens as if you had manually implemented a single collector
|
||||
//! combining all of our features.
|
||||
//!
|
||||
//! Unfortunately it requires you to know at compile time your collector types.
|
||||
//! If on the other hand, the collectors depend on some query parameter,
|
||||
//! you can rely on `MultiCollector`'s.
|
||||
//!
|
||||
//!
|
||||
//! # Implementing your own collectors.
|
||||
//!
|
||||
//! See the `custom_collector` example.
|
||||
/*!
|
||||
|
||||
# Collectors
|
||||
|
||||
Collectors define the information you want to extract from the documents matching the queries.
|
||||
In tantivy jargon, we call this information your search "fruit".
|
||||
|
||||
Your fruit could for instance be :
|
||||
- [the count of matching documents](./struct.Count.html)
|
||||
- [the top 10 documents, by relevancy or by a fast field](./struct.TopDocs.html)
|
||||
- [facet counts](./struct.FacetCollector.html)
|
||||
|
||||
At one point in your code, you will trigger the actual search operation by calling
|
||||
[the `search(...)` method of your `Searcher` object](../struct.Searcher.html#method.search).
|
||||
This call will look like this.
|
||||
|
||||
```verbatim
|
||||
let fruit = searcher.search(&query, &collector)?;
|
||||
```
|
||||
|
||||
Here the type of fruit is actually determined as an associated type of the collector (`Collector::Fruit`).
|
||||
|
||||
|
||||
# Combining several collectors
|
||||
|
||||
A rich search experience often requires to run several collectors on your search query.
|
||||
For instance,
|
||||
- selecting the top-K products matching your query
|
||||
- counting the matching documents
|
||||
- computing several facets
|
||||
- computing statistics about the matching product prices
|
||||
|
||||
A simple and efficient way to do that is to pass your collectors as one tuple.
|
||||
The resulting `Fruit` will then be a typed tuple with each collector's original fruits
|
||||
in their respective position.
|
||||
|
||||
```rust
|
||||
# use tantivy::schema::*;
|
||||
# use tantivy::*;
|
||||
# use tantivy::query::*;
|
||||
use tantivy::collector::{Count, TopDocs};
|
||||
#
|
||||
# fn main() -> tantivy::Result<()> {
|
||||
# let mut schema_builder = Schema::builder();
|
||||
# let title = schema_builder.add_text_field("title", TEXT);
|
||||
# let schema = schema_builder.build();
|
||||
# let index = Index::create_in_ram(schema);
|
||||
# let mut index_writer = index.writer(3_000_000)?;
|
||||
# index_writer.add_document(doc!(
|
||||
# title => "The Name of the Wind",
|
||||
# ))?;
|
||||
# index_writer.add_document(doc!(
|
||||
# title => "The Diary of Muadib",
|
||||
# ))?;
|
||||
# index_writer.commit()?;
|
||||
# let reader = index.reader()?;
|
||||
# let searcher = reader.searcher();
|
||||
# let query_parser = QueryParser::for_index(&index, vec![title]);
|
||||
# let query = query_parser.parse_query("diary")?;
|
||||
let (doc_count, top_docs): (usize, Vec<(Score, DocAddress)>) =
|
||||
searcher.search(&query, &(Count, TopDocs::with_limit(2)))?;
|
||||
# Ok(())
|
||||
# }
|
||||
```
|
||||
|
||||
The `Collector` trait is implemented for up to 4 collectors.
|
||||
If you have more than 4 collectors, you can either group them into
|
||||
tuples of tuples `(a,(b,(c,d)))`, or rely on [`MultiCollector`](./struct.MultiCollector.html).
|
||||
|
||||
# Combining several collectors dynamically
|
||||
|
||||
Combining collectors into a tuple is a zero-cost abstraction: everything
|
||||
happens as if you had manually implemented a single collector
|
||||
combining all of our features.
|
||||
|
||||
Unfortunately it requires you to know at compile time your collector types.
|
||||
If on the other hand, the collectors depend on some query parameter,
|
||||
you can rely on `MultiCollector`'s.
|
||||
|
||||
|
||||
# Implementing your own collectors.
|
||||
|
||||
See the `custom_collector` example.
|
||||
|
||||
*/
|
||||
|
||||
use crate::DocId;
|
||||
use crate::Score;
|
||||
use crate::SegmentOrdinal;
|
||||
use crate::SegmentReader;
|
||||
use downcast_rs::impl_downcast;
|
||||
|
||||
use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
|
||||
|
||||
mod count_collector;
|
||||
pub use self::count_collector::Count;
|
||||
|
||||
@@ -106,7 +111,8 @@ mod tweak_score_top_collector;
|
||||
pub use self::tweak_score_top_collector::{ScoreSegmentTweaker, ScoreTweaker};
|
||||
|
||||
mod facet_collector;
|
||||
pub use self::facet_collector::{FacetCollector, FacetCounts};
|
||||
pub use self::facet_collector::FacetCollector;
|
||||
pub use self::facet_collector::FacetCounts;
|
||||
use crate::query::Weight;
|
||||
|
||||
mod docset_collector;
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
use super::Collector;
|
||||
use super::SegmentCollector;
|
||||
use crate::collector::Fruit;
|
||||
use crate::DocId;
|
||||
use crate::Score;
|
||||
use crate::SegmentOrdinal;
|
||||
use crate::SegmentReader;
|
||||
use crate::TantivyError;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
|
||||
use super::{Collector, SegmentCollector};
|
||||
use crate::collector::Fruit;
|
||||
use crate::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
|
||||
|
||||
pub struct MultiFruit {
|
||||
sub_fruits: Vec<Option<Box<dyn Fruit>>>,
|
||||
}
|
||||
@@ -100,8 +104,7 @@ impl<TFruit: Fruit> FruitHandle<TFruit> {
|
||||
///
|
||||
/// If the type of the collectors is known, you can just group yours collectors
|
||||
/// in a tuple. See the
|
||||
/// [Combining several collectors section of the collector
|
||||
/// documentation](./index.html#combining-several-collectors).
|
||||
/// [Combining several collectors section of the collector documentation](./index.html#combining-several-collectors).
|
||||
///
|
||||
/// ```rust
|
||||
/// use tantivy::collector::{Count, TopDocs, MultiCollector};
|
||||
@@ -245,8 +248,10 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::collector::{Count, TopDocs};
|
||||
use crate::query::TermQuery;
|
||||
use crate::schema::{IndexRecordOption, Schema, TEXT};
|
||||
use crate::{Index, Term};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::schema::{Schema, TEXT};
|
||||
use crate::Index;
|
||||
use crate::Term;
|
||||
|
||||
#[test]
|
||||
fn test_multi_collector() -> crate::Result<()> {
|
||||
|
||||
@@ -1,12 +1,20 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use super::*;
|
||||
use crate::collector::{Count, FilterCollector, TopDocs};
|
||||
use crate::core::SegmentReader;
|
||||
use crate::fastfield::{BytesFastFieldReader, DynamicFastFieldReader, FastFieldReader};
|
||||
use crate::fastfield::BytesFastFieldReader;
|
||||
use crate::fastfield::DynamicFastFieldReader;
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use crate::schema::Field;
|
||||
use crate::DocId;
|
||||
use crate::Score;
|
||||
use crate::SegmentOrdinal;
|
||||
use crate::{DocAddress, Document, Searcher};
|
||||
|
||||
use crate::collector::{Count, FilterCollector, TopDocs};
|
||||
use crate::query::{AllQuery, QueryParser};
|
||||
use crate::schema::{Field, Schema, FAST, TEXT};
|
||||
use crate::{doc, DateTime, DocAddress, DocId, Document, Index, Score, Searcher, SegmentOrdinal};
|
||||
use crate::schema::{Schema, FAST, TEXT};
|
||||
use crate::DateTime;
|
||||
use crate::{doc, Index};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub const TEST_COLLECTOR_WITH_SCORE: TestCollector = TestCollector {
|
||||
compute_score: true,
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use crate::DocAddress;
|
||||
use crate::DocId;
|
||||
use crate::SegmentOrdinal;
|
||||
use crate::SegmentReader;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use crate::{DocAddress, DocId, SegmentOrdinal, SegmentReader};
|
||||
|
||||
/// Contains a feature (field, score, etc.) of a document along with the document address.
|
||||
///
|
||||
/// It has a custom implementation of `PartialOrd` that reverses the order. This is because the
|
||||
@@ -60,7 +62,8 @@ pub(crate) struct TopCollector<T> {
|
||||
}
|
||||
|
||||
impl<T> TopCollector<T>
|
||||
where T: PartialOrd + Clone
|
||||
where
|
||||
T: PartialOrd + Clone,
|
||||
{
|
||||
/// Creates a top collector, with a number of documents equal to "limit".
|
||||
///
|
||||
@@ -250,7 +253,7 @@ mod tests {
|
||||
// when harvesting we have to guarantee stable sorting in case of a tie
|
||||
// on the score
|
||||
let doc_ids_collection = [4, 5, 6];
|
||||
let score = 3.3f32;
|
||||
let score = 3.14;
|
||||
|
||||
let mut top_collector_limit_2 = TopSegmentCollector::new(0, 2);
|
||||
for id in &doc_ids_collection {
|
||||
@@ -319,9 +322,8 @@ mod tests {
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
use test::Bencher;
|
||||
|
||||
use super::TopSegmentCollector;
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn bench_top_segment_collector_collect_not_at_capacity(b: &mut Bencher) {
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
use std::collections::BinaryHeap;
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use super::Collector;
|
||||
use crate::collector::custom_score_top_collector::CustomScoreTopCollector;
|
||||
use crate::collector::top_collector::{ComparableDoc, TopCollector, TopSegmentCollector};
|
||||
use crate::collector::top_collector::{ComparableDoc, TopCollector};
|
||||
use crate::collector::tweak_score_top_collector::TweakedScoreTopCollector;
|
||||
use crate::collector::{
|
||||
CustomScorer, CustomSegmentScorer, ScoreSegmentTweaker, ScoreTweaker, SegmentCollector,
|
||||
};
|
||||
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, FastValue};
|
||||
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
|
||||
use crate::query::Weight;
|
||||
use crate::schema::Field;
|
||||
use crate::{DocAddress, DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
|
||||
use crate::DocAddress;
|
||||
use crate::DocId;
|
||||
use crate::Score;
|
||||
use crate::SegmentOrdinal;
|
||||
use crate::SegmentReader;
|
||||
use crate::{collector::custom_score_top_collector::CustomScoreTopCollector, fastfield::FastValue};
|
||||
use crate::{collector::top_collector::TopSegmentCollector, TantivyError};
|
||||
use std::fmt;
|
||||
use std::{collections::BinaryHeap, marker::PhantomData};
|
||||
|
||||
struct FastFieldConvertCollector<
|
||||
TCollector: Collector<Fruit = Vec<(u64, DocAddress)>>,
|
||||
@@ -214,12 +217,11 @@ impl TopDocs {
|
||||
|
||||
/// Set top-K to rank documents by a given fast field.
|
||||
///
|
||||
/// If the field is not a fast or does not exist, this method returns successfully (it is not
|
||||
/// aware of any schema). An error will be returned at the moment of search.
|
||||
/// If the field is not a fast or does not exist, this method returns successfully (it is not aware of any schema).
|
||||
/// An error will be returned at the moment of search.
|
||||
///
|
||||
/// If the field is a FAST field but not a u64 field, search will return successfully but it
|
||||
/// will return returns a monotonic u64-representation (ie. the order is still correct) of
|
||||
/// the requested field type.
|
||||
/// If the field is a FAST field but not a u64 field, search will return successfully but it will return
|
||||
/// returns a monotonic u64-representation (ie. the order is still correct) of the requested field type.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -294,15 +296,14 @@ impl TopDocs {
|
||||
|
||||
/// Set top-K to rank documents by a given fast field.
|
||||
///
|
||||
/// If the field is not a fast field, or its field type does not match the generic type, this
|
||||
/// method does not panic, but an explicit error will be returned at the moment of
|
||||
/// collection.
|
||||
/// If the field is not a fast field, or its field type does not match the generic type, this method does not panic,
|
||||
/// but an explicit error will be returned at the moment of collection.
|
||||
///
|
||||
/// Note that this method is a generic. The requested fast field type will be often
|
||||
/// inferred in your code by the rust compiler.
|
||||
///
|
||||
/// Implementation-wise, for performance reason, tantivy will manipulate the u64 representation
|
||||
/// of your fast field until the last moment.
|
||||
/// Implementation-wise, for performance reason, tantivy will manipulate the u64 representation of your fast
|
||||
/// field until the last moment.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -714,7 +715,10 @@ mod tests {
|
||||
use crate::collector::Collector;
|
||||
use crate::query::{AllQuery, Query, QueryParser};
|
||||
use crate::schema::{Field, Schema, FAST, STORED, TEXT};
|
||||
use crate::{DocAddress, DocId, Index, IndexWriter, Score, SegmentReader};
|
||||
use crate::Index;
|
||||
use crate::IndexWriter;
|
||||
use crate::Score;
|
||||
use crate::{DocAddress, DocId, SegmentReader};
|
||||
|
||||
fn make_index() -> crate::Result<Index> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::collector::top_collector::{TopCollector, TopSegmentCollector};
|
||||
use crate::collector::{Collector, SegmentCollector};
|
||||
use crate::{DocAddress, DocId, Result, Score, SegmentReader};
|
||||
use crate::DocAddress;
|
||||
use crate::{DocId, Result, Score, SegmentReader};
|
||||
|
||||
pub(crate) struct TweakedScoreTopCollector<TScoreTweaker, TScore = Score> {
|
||||
score_tweaker: TScoreTweaker,
|
||||
@@ -8,7 +9,8 @@ pub(crate) struct TweakedScoreTopCollector<TScoreTweaker, TScore = Score> {
|
||||
}
|
||||
|
||||
impl<TScoreTweaker, TScore> TweakedScoreTopCollector<TScoreTweaker, TScore>
|
||||
where TScore: Clone + PartialOrd
|
||||
where
|
||||
TScore: Clone + PartialOrd,
|
||||
{
|
||||
pub fn new(
|
||||
score_tweaker: TScoreTweaker,
|
||||
@@ -116,7 +118,8 @@ where
|
||||
}
|
||||
|
||||
impl<F, TScore> ScoreSegmentTweaker<TScore> for F
|
||||
where F: 'static + FnMut(DocId, Score) -> TScore
|
||||
where
|
||||
F: 'static + FnMut(DocId, Score) -> TScore,
|
||||
{
|
||||
fn score(&mut self, doc: DocId, score: Score) -> TScore {
|
||||
(self)(doc, score)
|
||||
|
||||
@@ -57,11 +57,7 @@ impl Executor {
|
||||
let (idx, arg) = arg_with_idx;
|
||||
let fruit = f(arg);
|
||||
if let Err(err) = fruit_sender.send((idx, fruit)) {
|
||||
error!(
|
||||
"Failed to send search task. It probably means all search \
|
||||
threads have panicked. {:?}",
|
||||
err
|
||||
);
|
||||
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,27 +1,35 @@
|
||||
use super::{segment::Segment, IndexSettings};
|
||||
use crate::core::Executor;
|
||||
use crate::core::IndexMeta;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::core::SegmentMetaInventory;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::OpenReadError;
|
||||
use crate::directory::ManagedDirectory;
|
||||
#[cfg(feature = "mmap")]
|
||||
use crate::directory::MmapDirectory;
|
||||
use crate::directory::INDEX_WRITER_LOCK;
|
||||
use crate::directory::{Directory, RamDirectory};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::error::TantivyError;
|
||||
use crate::indexer::index_writer::{HEAP_SIZE_MIN, MAX_NUM_THREAD};
|
||||
use crate::indexer::segment_updater::save_new_metas;
|
||||
use crate::reader::IndexReader;
|
||||
use crate::reader::IndexReaderBuilder;
|
||||
use crate::schema::Field;
|
||||
use crate::schema::FieldType;
|
||||
use crate::schema::Schema;
|
||||
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
|
||||
use crate::IndexWriter;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
|
||||
#[cfg(feature = "mmap")]
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::segment::Segment;
|
||||
use super::IndexSettings;
|
||||
use crate::core::{
|
||||
Executor, IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory, META_FILEPATH,
|
||||
};
|
||||
use crate::directory::error::OpenReadError;
|
||||
#[cfg(feature = "mmap")]
|
||||
use crate::directory::MmapDirectory;
|
||||
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
|
||||
use crate::error::{DataCorruption, TantivyError};
|
||||
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_ARENA_NUM_BYTES_MIN};
|
||||
use crate::indexer::segment_updater::save_new_metas;
|
||||
use crate::reader::{IndexReader, IndexReaderBuilder};
|
||||
use crate::schema::{Field, FieldType, Schema};
|
||||
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
|
||||
use crate::IndexWriter;
|
||||
|
||||
fn load_metas(
|
||||
directory: &dyn Directory,
|
||||
inventory: &SegmentMetaInventory,
|
||||
@@ -70,6 +78,7 @@ fn load_metas(
|
||||
/// let schema = schema_builder.build();
|
||||
/// let settings = IndexSettings{sort_by_field: Some(IndexSortByField{field:"number".to_string(), order:Order::Asc}), ..Default::default()};
|
||||
/// let index = Index::builder().schema(schema).settings(settings).create_in_ram();
|
||||
///
|
||||
/// ```
|
||||
pub struct IndexBuilder {
|
||||
schema: Option<Schema>,
|
||||
@@ -88,21 +97,16 @@ impl IndexBuilder {
|
||||
index_settings: IndexSettings::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the settings
|
||||
#[must_use]
|
||||
pub fn settings(mut self, settings: IndexSettings) -> Self {
|
||||
self.index_settings = settings;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the schema
|
||||
#[must_use]
|
||||
pub fn schema(mut self, schema: Schema) -> Self {
|
||||
self.schema = Some(schema);
|
||||
self
|
||||
}
|
||||
|
||||
/// Creates a new index using the `RAMDirectory`.
|
||||
///
|
||||
/// The index will be allocated in anonymous memory.
|
||||
@@ -113,7 +117,6 @@ impl IndexBuilder {
|
||||
.create(ram_directory)
|
||||
.expect("Creating a RAMDirectory should never fail"))
|
||||
}
|
||||
|
||||
/// Creates a new index in a given filepath.
|
||||
/// The index will use the `MMapDirectory`.
|
||||
///
|
||||
@@ -126,7 +129,6 @@ impl IndexBuilder {
|
||||
}
|
||||
self.create(mmap_directory)
|
||||
}
|
||||
|
||||
/// Creates a new index in a temp directory.
|
||||
///
|
||||
/// The index will use the `MMapDirectory` in a newly created directory.
|
||||
@@ -140,14 +142,12 @@ impl IndexBuilder {
|
||||
let mmap_directory: Box<dyn Directory> = Box::new(MmapDirectory::create_from_tempdir()?);
|
||||
self.create(mmap_directory)
|
||||
}
|
||||
|
||||
fn get_expect_schema(&self) -> crate::Result<Schema> {
|
||||
self.schema
|
||||
.as_ref()
|
||||
.cloned()
|
||||
.ok_or(TantivyError::IndexBuilderMissingArgument("schema"))
|
||||
}
|
||||
|
||||
/// Opens or creates a new index in the provided directory
|
||||
pub fn open_or_create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
|
||||
let dir = dir.into();
|
||||
@@ -397,18 +397,17 @@ impl Index {
|
||||
/// - `num_threads` defines the number of indexing workers that
|
||||
/// should work at the same time.
|
||||
///
|
||||
/// - `overall_memory_arena_in_bytes` sets the amount of memory
|
||||
/// - `overall_heap_size_in_bytes` sets the amount of memory
|
||||
/// allocated for all indexing thread.
|
||||
/// Each thread will receive a budget of `overall_memory_arena_in_bytes / num_threads`.
|
||||
/// Each thread will receive a budget of `overall_heap_size_in_bytes / num_threads`.
|
||||
///
|
||||
/// # Errors
|
||||
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
|
||||
/// If the memory arena per thread is too small or too big, returns
|
||||
/// `TantivyError::InvalidArgument`
|
||||
/// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument`
|
||||
pub fn writer_with_num_threads(
|
||||
&self,
|
||||
num_threads: usize,
|
||||
overall_memory_arena_in_bytes: usize,
|
||||
overall_heap_size_in_bytes: usize,
|
||||
) -> crate::Result<IndexWriter> {
|
||||
let directory_lock = self
|
||||
.directory
|
||||
@@ -417,25 +416,26 @@ impl Index {
|
||||
TantivyError::LockFailure(
|
||||
err,
|
||||
Some(
|
||||
"Failed to acquire index lock. If you are using a regular directory, this \
|
||||
means there is already an `IndexWriter` working on this `Directory`, in \
|
||||
this process or in a different process."
|
||||
"Failed to acquire index lock. If you are using \
|
||||
a regular directory, this means there is already an \
|
||||
`IndexWriter` working on this `Directory`, in this process \
|
||||
or in a different process."
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
let memory_arena_in_bytes_per_thread = overall_memory_arena_in_bytes / num_threads;
|
||||
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
|
||||
IndexWriter::new(
|
||||
self,
|
||||
num_threads,
|
||||
memory_arena_in_bytes_per_thread,
|
||||
heap_size_in_bytes_per_thread,
|
||||
directory_lock,
|
||||
)
|
||||
}
|
||||
|
||||
/// Helper to create an index writer for tests.
|
||||
///
|
||||
/// That index writer only simply has a single thread and a memory arena of 10 MB.
|
||||
/// That index writer only simply has a single thread and a heap of 10 MB.
|
||||
/// Using a single thread gives us a deterministic allocation of DocId.
|
||||
#[cfg(test)]
|
||||
pub fn writer_for_tests(&self) -> crate::Result<IndexWriter> {
|
||||
@@ -446,28 +446,29 @@ impl Index {
|
||||
///
|
||||
/// Tantivy will automatically define the number of threads to use, but
|
||||
/// no more than 8 threads.
|
||||
/// `overall_memory_arena_in_bytes` is the total target memory usage that will be split
|
||||
/// `overall_heap_size_in_bytes` is the total target memory usage that will be split
|
||||
/// between a given number of threads.
|
||||
///
|
||||
/// # Errors
|
||||
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
|
||||
/// If the memory arena per thread is too small or too big, returns
|
||||
/// `TantivyError::InvalidArgument`
|
||||
pub fn writer(&self, memory_arena_num_bytes: usize) -> crate::Result<IndexWriter> {
|
||||
/// If the heap size per thread is too small or too big, returns `TantivyError::InvalidArgument`
|
||||
pub fn writer(&self, overall_heap_size_in_bytes: usize) -> crate::Result<IndexWriter> {
|
||||
let mut num_threads = std::cmp::min(num_cpus::get(), MAX_NUM_THREAD);
|
||||
let memory_arena_num_bytes_per_thread = memory_arena_num_bytes / num_threads;
|
||||
if memory_arena_num_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
|
||||
num_threads = (memory_arena_num_bytes / MEMORY_ARENA_NUM_BYTES_MIN).max(1);
|
||||
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
|
||||
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
|
||||
num_threads = (overall_heap_size_in_bytes / HEAP_SIZE_MIN).max(1);
|
||||
}
|
||||
self.writer_with_num_threads(num_threads, memory_arena_num_bytes)
|
||||
self.writer_with_num_threads(num_threads, overall_heap_size_in_bytes)
|
||||
}
|
||||
|
||||
/// Accessor to the index settings
|
||||
///
|
||||
pub fn settings(&self) -> &IndexSettings {
|
||||
&self.settings
|
||||
}
|
||||
|
||||
/// Accessor to the index settings
|
||||
///
|
||||
pub fn settings_mut(&mut self) -> &mut IndexSettings {
|
||||
&mut self.settings
|
||||
}
|
||||
@@ -555,9 +556,15 @@ impl fmt::Debug for Index {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::directory::{RamDirectory, WatchCallback};
|
||||
use crate::schema::{Field, Schema, INDEXED, TEXT};
|
||||
use crate::{Directory, Index, IndexReader, IndexSettings, ReloadPolicy};
|
||||
use crate::schema::Field;
|
||||
use crate::schema::{Schema, INDEXED, TEXT};
|
||||
use crate::IndexReader;
|
||||
use crate::ReloadPolicy;
|
||||
use crate::{
|
||||
directory::{RamDirectory, WatchCallback},
|
||||
IndexSettings,
|
||||
};
|
||||
use crate::{Directory, Index};
|
||||
|
||||
#[test]
|
||||
fn test_indexer_for_field() {
|
||||
@@ -666,12 +673,10 @@ mod tests {
|
||||
#[cfg(feature = "mmap")]
|
||||
mod mmap_specific {
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tempfile::TempDir;
|
||||
|
||||
use super::*;
|
||||
use crate::Directory;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_index_on_commit_reload_policy_mmap() -> crate::Result<()> {
|
||||
|
||||
@@ -1,16 +1,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::SegmentComponent;
|
||||
use crate::core::SegmentId;
|
||||
use crate::schema::Schema;
|
||||
use crate::store::Compressor;
|
||||
use crate::{Inventory, Opstamp, TrackedObject};
|
||||
use crate::Opstamp;
|
||||
use crate::{core::SegmentId, store::Compressor};
|
||||
use crate::{Inventory, TrackedObject};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::PathBuf;
|
||||
use std::{collections::HashSet, sync::atomic::AtomicBool};
|
||||
use std::{fmt, sync::Arc};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
struct DeleteMeta {
|
||||
@@ -192,7 +188,6 @@ impl SegmentMeta {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[must_use]
|
||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
|
||||
assert!(
|
||||
num_deleted_docs <= self.max_doc(),
|
||||
@@ -287,6 +282,7 @@ impl Order {
|
||||
/// * the searchable segments,
|
||||
/// * the index `docstamp`
|
||||
/// * the schema
|
||||
///
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct IndexMeta {
|
||||
/// `IndexSettings` to configure index options.
|
||||
@@ -374,8 +370,10 @@ impl fmt::Debug for IndexMeta {
|
||||
mod tests {
|
||||
|
||||
use super::IndexMeta;
|
||||
use crate::schema::{Schema, TEXT};
|
||||
use crate::{IndexSettings, IndexSortByField, Order};
|
||||
use crate::{
|
||||
schema::{Schema, TEXT},
|
||||
IndexSettings, IndexSortByField, Order,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_serialize_metas() {
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use std::io;
|
||||
|
||||
use common::BinarySerializable;
|
||||
|
||||
use crate::directory::FileSlice;
|
||||
use crate::positions::PositionReader;
|
||||
use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo};
|
||||
use crate::schema::{IndexRecordOption, Term};
|
||||
use crate::postings::TermInfo;
|
||||
use crate::postings::{BlockSegmentPostings, SegmentPostings};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::schema::Term;
|
||||
use crate::termdict::TermDictionary;
|
||||
use common::BinarySerializable;
|
||||
|
||||
/// The inverted index reader is in charge of accessing
|
||||
/// the inverted index associated to a specific field.
|
||||
|
||||
@@ -8,10 +8,6 @@ mod segment_component;
|
||||
mod segment_id;
|
||||
mod segment_reader;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub use self::executor::Executor;
|
||||
pub use self::index::{Index, IndexBuilder};
|
||||
pub use self::index_meta::{
|
||||
@@ -24,6 +20,9 @@ pub use self::segment_component::SegmentComponent;
|
||||
pub use self::segment_id::SegmentId;
|
||||
pub use self::segment_reader::SegmentReader;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use std::path::Path;
|
||||
|
||||
/// The meta file contains all the information about the list of segments and the schema
|
||||
/// of the index.
|
||||
pub static META_FILEPATH: Lazy<&'static Path> = Lazy::new(|| Path::new("meta.json"));
|
||||
|
||||
@@ -1,13 +1,20 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::{fmt, io};
|
||||
|
||||
use crate::collector::Collector;
|
||||
use crate::core::{Executor, SegmentReader};
|
||||
use crate::core::Executor;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::query::Query;
|
||||
use crate::schema::{Document, Schema, Term};
|
||||
use crate::schema::Document;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::Term;
|
||||
use crate::space_usage::SearcherSpaceUsage;
|
||||
use crate::store::StoreReader;
|
||||
use crate::{DocAddress, Index, Opstamp, SegmentId, TrackedObject};
|
||||
use crate::DocAddress;
|
||||
use crate::Index;
|
||||
use crate::Opstamp;
|
||||
use crate::SegmentId;
|
||||
use crate::TrackedObject;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::{fmt, io};
|
||||
|
||||
/// Identifies the searcher generation accessed by a [Searcher].
|
||||
///
|
||||
@@ -62,6 +69,7 @@ impl SearcherGeneration {
|
||||
///
|
||||
/// It guarantees that the `Segment` will not be removed before
|
||||
/// the destruction of the `Searcher`.
|
||||
///
|
||||
pub struct Searcher {
|
||||
schema: Schema,
|
||||
index: Index,
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::SegmentComponent;
|
||||
use crate::core::{Index, SegmentId, SegmentMeta};
|
||||
use crate::core::Index;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
use crate::directory::error::{OpenReadError, OpenWriteError};
|
||||
use crate::directory::{Directory, FileSlice, WritePtr};
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use crate::schema::Schema;
|
||||
use crate::Opstamp;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// A segment is a piece of the index.
|
||||
#[derive(Clone)]
|
||||
@@ -54,7 +56,6 @@ impl Segment {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[must_use]
|
||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
|
||||
Segment {
|
||||
index: self.index,
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
use std::cmp::{Ord, Ordering};
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
#[cfg(test)]
|
||||
use std::sync::atomic;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[cfg(test)]
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
use std::error::Error;
|
||||
use std::str::FromStr;
|
||||
#[cfg(test)]
|
||||
use std::sync::atomic;
|
||||
|
||||
/// Uuid identifying a segment.
|
||||
///
|
||||
|
||||
@@ -1,19 +1,28 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{fmt, io};
|
||||
|
||||
use fail::fail_point;
|
||||
|
||||
use crate::core::{InvertedIndexReader, Segment, SegmentComponent, SegmentId};
|
||||
use crate::directory::{CompositeFile, FileSlice};
|
||||
use crate::core::InvertedIndexReader;
|
||||
use crate::core::Segment;
|
||||
use crate::core::SegmentComponent;
|
||||
use crate::core::SegmentId;
|
||||
use crate::directory::CompositeFile;
|
||||
use crate::directory::FileSlice;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
|
||||
use crate::fastfield::intersect_alive_bitsets;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::fastfield::FacetReader;
|
||||
use crate::fastfield::FastFieldReaders;
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders};
|
||||
use crate::schema::{Field, FieldType, IndexRecordOption, Schema};
|
||||
use crate::schema::FieldType;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::{Field, IndexRecordOption};
|
||||
use crate::space_usage::SegmentSpaceUsage;
|
||||
use crate::store::StoreReader;
|
||||
use crate::termdict::TermDictionary;
|
||||
use crate::{DocId, Opstamp};
|
||||
use crate::DocId;
|
||||
use crate::Opstamp;
|
||||
use fail::fail_point;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::{collections::HashMap, io};
|
||||
|
||||
/// Entry point to access all of the datastructures of the `Segment`
|
||||
///
|
||||
@@ -121,8 +130,7 @@ impl SegmentReader {
|
||||
self.fieldnorm_readers.get_field(field)?.ok_or_else(|| {
|
||||
let field_name = self.schema.get_field_name(field);
|
||||
let err_msg = format!(
|
||||
"Field norm not found for field {:?}. Was the field set to record norm during \
|
||||
indexing?",
|
||||
"Field norm not found for field {:?}. Was the field set to record norm during indexing?",
|
||||
field_name
|
||||
);
|
||||
crate::TantivyError::SchemaError(err_msg)
|
||||
@@ -251,23 +259,18 @@ impl SegmentReader {
|
||||
let record_option = record_option_opt.unwrap();
|
||||
let postings_file = postings_file_opt.unwrap();
|
||||
|
||||
let termdict_file: FileSlice =
|
||||
self.termdict_composite.open_read(field).ok_or_else(|| {
|
||||
DataCorruption::comment_only(format!(
|
||||
"Failed to open field {:?}'s term dictionary in the composite file. Has the \
|
||||
schema been modified?",
|
||||
field_entry.name()
|
||||
))
|
||||
})?;
|
||||
let termdict_file: FileSlice = self.termdict_composite.open_read(field)
|
||||
.ok_or_else(||
|
||||
DataCorruption::comment_only(format!("Failed to open field {:?}'s term dictionary in the composite file. Has the schema been modified?", field_entry.name()))
|
||||
)?;
|
||||
|
||||
let positions_file = self.positions_composite.open_read(field).ok_or_else(|| {
|
||||
let error_msg = format!(
|
||||
"Failed to open field {:?}'s positions in the composite file. Has the schema been \
|
||||
modified?",
|
||||
field_entry.name()
|
||||
);
|
||||
DataCorruption::comment_only(error_msg)
|
||||
})?;
|
||||
let positions_file = self
|
||||
.positions_composite
|
||||
.open_read(field)
|
||||
.ok_or_else(|| {
|
||||
let error_msg = format!("Failed to open field {:?}'s positions in the composite file. Has the schema been modified?", field_entry.name());
|
||||
DataCorruption::comment_only(error_msg)
|
||||
})?;
|
||||
|
||||
let inv_idx_reader = Arc::new(InvertedIndexReader::new(
|
||||
TermDictionary::open(termdict_file)?,
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
use crate::directory::FileSlice;
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use crate::schema::Field;
|
||||
use crate::space_usage::FieldUsage;
|
||||
use crate::space_usage::PerFieldSpaceUsage;
|
||||
use common::BinarySerializable;
|
||||
use common::CountingWriter;
|
||||
use common::HasLen;
|
||||
use common::VInt;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::iter::ExactSizeIterator;
|
||||
use std::ops::Range;
|
||||
|
||||
use common::{BinarySerializable, CountingWriter, HasLen, VInt};
|
||||
|
||||
use crate::directory::{FileSlice, TerminatingWrite, WritePtr};
|
||||
use crate::schema::Field;
|
||||
use crate::space_usage::{FieldUsage, PerFieldSpaceUsage};
|
||||
|
||||
#[derive(Eq, PartialEq, Hash, Copy, Ord, PartialOrd, Clone, Debug)]
|
||||
pub struct FileAddr {
|
||||
field: Field,
|
||||
@@ -183,14 +186,13 @@ impl CompositeFile {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
use common::{BinarySerializable, VInt};
|
||||
|
||||
use super::{CompositeFile, CompositeWrite};
|
||||
use crate::directory::{Directory, RamDirectory};
|
||||
use crate::schema::Field;
|
||||
use common::BinarySerializable;
|
||||
use common::VInt;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
#[test]
|
||||
fn test_composite_file() -> crate::Result<()> {
|
||||
|
||||
@@ -1,12 +1,18 @@
|
||||
use std::io::Write;
|
||||
use std::marker::{Send, Sync};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
use std::{fmt, io, thread};
|
||||
|
||||
use crate::directory::directory_lock::Lock;
|
||||
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::{FileHandle, FileSlice, WatchCallback, WatchHandle, WritePtr};
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{FileHandle, WatchCallback};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::marker::Send;
|
||||
use std::marker::Sync;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Retry the logic of acquiring locks is pretty simple.
|
||||
/// We just retry `n` times after a given `duratio`, both
|
||||
@@ -227,7 +233,8 @@ pub trait DirectoryClone {
|
||||
}
|
||||
|
||||
impl<T> DirectoryClone for T
|
||||
where T: 'static + Directory + Clone
|
||||
where
|
||||
T: 'static + Directory + Clone,
|
||||
{
|
||||
fn box_clone(&self) -> Box<dyn Directory> {
|
||||
Box::new(self.clone())
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// A directory lock.
|
||||
///
|
||||
@@ -12,6 +11,7 @@ use once_cell::sync::Lazy;
|
||||
/// - [META_LOCK]
|
||||
///
|
||||
/// Check out these locks documentation for more information.
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct Lock {
|
||||
/// The lock needs to be associated with its own file `path`.
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
use std::path::PathBuf;
|
||||
use std::{fmt, io};
|
||||
|
||||
use crate::Version;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Error while trying to acquire a directory lock.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum LockError {
|
||||
/// Failed to acquired a lock as it is already held by another
|
||||
/// client.
|
||||
/// - In the context of a blocking lock, this means the lock was not released within some
|
||||
/// `timeout` period.
|
||||
/// - In the context of a non-blocking lock, this means the lock was busy at the moment of the
|
||||
/// call.
|
||||
/// - In the context of a blocking lock, this means the lock was not released within some `timeout` period.
|
||||
/// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call.
|
||||
#[error("Could not acquire lock as it is already held, possibly by a different process.")]
|
||||
LockBusy,
|
||||
/// Trying to acquire a lock failed with an `IoError`
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::{fmt, io};
|
||||
|
||||
use common::HasLen;
|
||||
use stable_deref_trait::StableDeref;
|
||||
|
||||
use crate::directory::OwnedBytes;
|
||||
use common::HasLen;
|
||||
use std::fmt;
|
||||
use std::ops::Range;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::{io, ops::Deref};
|
||||
|
||||
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
@@ -33,7 +33,8 @@ impl FileHandle for &'static [u8] {
|
||||
}
|
||||
|
||||
impl<B> From<B> for FileSlice
|
||||
where B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync
|
||||
where
|
||||
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
|
||||
{
|
||||
fn from(bytes: B) -> FileSlice {
|
||||
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
|
||||
@@ -43,6 +44,7 @@ where B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync
|
||||
/// Logical slice of read only file in tantivy.
|
||||
///
|
||||
/// It can be cloned and sliced cheaply.
|
||||
///
|
||||
#[derive(Clone)]
|
||||
pub struct FileSlice {
|
||||
data: Arc<dyn FileHandle>,
|
||||
@@ -77,7 +79,6 @@ impl FileSlice {
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `byte_range.end` exceeds the filesize.
|
||||
#[must_use]
|
||||
pub fn slice(&self, byte_range: Range<usize>) -> FileSlice {
|
||||
assert!(byte_range.end <= self.len());
|
||||
FileSlice {
|
||||
@@ -137,7 +138,6 @@ impl FileSlice {
|
||||
/// boundary.
|
||||
///
|
||||
/// Equivalent to `.slice(from_offset, self.len())`
|
||||
#[must_use]
|
||||
pub fn slice_from(&self, from_offset: usize) -> FileSlice {
|
||||
self.slice(from_offset..self.len())
|
||||
}
|
||||
@@ -145,7 +145,6 @@ impl FileSlice {
|
||||
/// Returns a slice from the end.
|
||||
///
|
||||
/// Equivalent to `.slice(self.len() - from_offset, self.len())`
|
||||
#[must_use]
|
||||
pub fn slice_from_end(&self, from_offset: usize) -> FileSlice {
|
||||
self.slice(self.len() - from_offset..self.len())
|
||||
}
|
||||
@@ -154,7 +153,6 @@ impl FileSlice {
|
||||
/// boundary.
|
||||
///
|
||||
/// Equivalent to `.slice(0, to_offset)`
|
||||
#[must_use]
|
||||
pub fn slice_to(&self, to_offset: usize) -> FileSlice {
|
||||
self.slice(0..to_offset)
|
||||
}
|
||||
@@ -174,11 +172,9 @@ impl HasLen for FileSlice {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io;
|
||||
|
||||
use common::HasLen;
|
||||
|
||||
use super::{FileHandle, FileSlice};
|
||||
use common::HasLen;
|
||||
use std::io;
|
||||
|
||||
#[test]
|
||||
fn test_file_slice() -> io::Result<()> {
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
use crc32fast::Hasher;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::io::BufRead;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::{fs, io, thread};
|
||||
|
||||
use crc32fast::Hasher;
|
||||
|
||||
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
|
||||
pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
|
||||
|
||||
@@ -99,9 +99,10 @@ mod tests {
|
||||
|
||||
use std::mem;
|
||||
|
||||
use super::*;
|
||||
use crate::directory::mmap_directory::atomic_write;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
|
||||
let tmp_dir = tempfile::TempDir::new()?;
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
use crate::directory::error::Incompatibility;
|
||||
use crate::directory::FileSlice;
|
||||
use crate::{
|
||||
directory::{AntiCallToken, TerminatingWrite},
|
||||
Version, INDEX_FORMAT_VERSION,
|
||||
};
|
||||
use common::{BinarySerializable, CountingWriter, DeserializeFrom, FixedSize, HasLen};
|
||||
use crc32fast::Hasher;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::directory::error::Incompatibility;
|
||||
use crate::directory::{AntiCallToken, FileSlice, TerminatingWrite};
|
||||
use crate::{Version, INDEX_FORMAT_VERSION};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
const FOOTER_MAX_LEN: u32 = 50_000;
|
||||
|
||||
@@ -63,9 +64,7 @@ impl Footer {
|
||||
if footer_magic_byte != FOOTER_MAGIC_NUMBER {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
"Footer magic byte mismatch. File corrupted or index was created using old an \
|
||||
tantivy version which is not supported anymore. Please use tantivy 0.15 or above \
|
||||
to recreate the index.",
|
||||
"Footer magic byte mismatch. File corrupted or index was created using old an tantivy version which is not supported anymore. Please use tantivy 0.15 or above to recreate the index.",
|
||||
));
|
||||
}
|
||||
|
||||
@@ -74,7 +73,7 @@ impl Footer {
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"Footer seems invalid as it suggests a footer len of {}. File is corrupted, \
|
||||
or the index was created with a different & old version of tantivy.",
|
||||
or the index was created with a different & old version of tantivy.",
|
||||
footer_len
|
||||
),
|
||||
));
|
||||
@@ -155,12 +154,11 @@ impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::io;
|
||||
|
||||
use crate::directory::footer::Footer;
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::directory::{footer::FOOTER_MAGIC_NUMBER, FileSlice};
|
||||
use common::BinarySerializable;
|
||||
|
||||
use crate::directory::footer::{Footer, FOOTER_MAGIC_NUMBER};
|
||||
use crate::directory::{FileSlice, OwnedBytes};
|
||||
use std::io;
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_footer() {
|
||||
@@ -185,9 +183,8 @@ mod tests {
|
||||
let err = Footer::extract_footer(fileslice).unwrap_err();
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"Footer magic byte mismatch. File corrupted or index was created using old an tantivy \
|
||||
version which is not supported anymore. Please use tantivy 0.15 or above to recreate \
|
||||
the index."
|
||||
"Footer magic byte mismatch. File corrupted or index was created using old an tantivy version which \
|
||||
is not supported anymore. Please use tantivy 0.15 or above to recreate the index."
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
@@ -222,8 +219,8 @@ mod tests {
|
||||
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"Footer seems invalid as it suggests a footer len of 50001. File is corrupted, or the \
|
||||
index was created with a different & old version of tantivy."
|
||||
"Footer seems invalid as it suggests a footer len of 50001. File is corrupted, \
|
||||
or the index was created with a different & old version of tantivy."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,21 +1,24 @@
|
||||
use std::collections::HashSet;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock, RwLockWriteGuard};
|
||||
use std::{io, result};
|
||||
|
||||
use crc32fast::Hasher;
|
||||
|
||||
use crate::core::MANAGED_FILEPATH;
|
||||
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::footer::{Footer, FooterProxy};
|
||||
use crate::directory::{
|
||||
DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback,
|
||||
WatchHandle, WritePtr, META_LOCK,
|
||||
};
|
||||
use crate::directory::GarbageCollectionResult;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::directory::{DirectoryLock, FileHandle};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use crate::directory::{WatchCallback, WatchHandle};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::Directory;
|
||||
|
||||
use crc32fast::Hasher;
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::RwLockWriteGuard;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
/// Returns true iff the file is "managed".
|
||||
/// Non-managed file are not subject to garbage collection.
|
||||
///
|
||||
@@ -341,14 +344,12 @@ impl Clone for ManagedDirectory {
|
||||
#[cfg(test)]
|
||||
mod tests_mmap_specific {
|
||||
|
||||
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
|
||||
use std::collections::HashSet;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
|
||||
|
||||
#[test]
|
||||
fn test_managed_directory() {
|
||||
let tempdir = TempDir::new().unwrap();
|
||||
|
||||
@@ -1,28 +1,32 @@
|
||||
use std::collections::HashMap;
|
||||
use std::convert::From;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{fmt, result};
|
||||
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::file_watcher::FileWatcher;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
|
||||
use crate::directory::{ArcBytes, WeakArcBytes};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use fs2::FileExt;
|
||||
use memmap2::Mmap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stable_deref_trait::StableDeref;
|
||||
use std::convert::From;
|
||||
use std::fmt;
|
||||
use std::fs::OpenOptions;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::io::{BufWriter, Read, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::{collections::HashMap, ops::Deref};
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::{
|
||||
DeleteError, LockError, OpenDirectoryError, OpenReadError, OpenWriteError,
|
||||
};
|
||||
use crate::directory::file_watcher::FileWatcher;
|
||||
use crate::directory::{
|
||||
AntiCallToken, ArcBytes, Directory, DirectoryLock, FileHandle, Lock, OwnedBytes,
|
||||
TerminatingWrite, WatchCallback, WatchHandle, WeakArcBytes, WritePtr,
|
||||
};
|
||||
|
||||
/// Create a default io error given a string.
|
||||
pub(crate) fn make_io_err(msg: String) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, msg)
|
||||
@@ -316,7 +320,8 @@ impl Directory for MmapDirectory {
|
||||
|
||||
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
||||
let msg = format!(
|
||||
"Failed to acquired write lock on mmap cache while reading {:?}",
|
||||
"Failed to acquired write lock \
|
||||
on mmap cache while reading {:?}",
|
||||
path
|
||||
);
|
||||
let io_err = make_io_err(msg);
|
||||
@@ -452,7 +457,6 @@ impl Directory for MmapDirectory {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
use std::os::windows::fs::OpenOptionsExt;
|
||||
|
||||
use winapi::um::winbase;
|
||||
|
||||
open_opts
|
||||
@@ -472,12 +476,15 @@ mod tests {
|
||||
// There are more tests in directory/mod.rs
|
||||
// The following tests are specific to the MmapDirectory
|
||||
|
||||
use common::HasLen;
|
||||
|
||||
use super::*;
|
||||
use crate::indexer::LogMergePolicy;
|
||||
use crate::schema::{Schema, SchemaBuilder, TEXT};
|
||||
use crate::{Index, IndexSettings, ReloadPolicy};
|
||||
use crate::Index;
|
||||
use crate::ReloadPolicy;
|
||||
use crate::{
|
||||
schema::{Schema, SchemaBuilder, TEXT},
|
||||
IndexSettings,
|
||||
};
|
||||
use common::HasLen;
|
||||
|
||||
#[test]
|
||||
fn test_open_non_existent_path() {
|
||||
@@ -514,7 +521,7 @@ mod tests {
|
||||
{
|
||||
for path in &paths {
|
||||
let mut w = mmap_directory.open_write(path).unwrap();
|
||||
w.write_all(content).unwrap();
|
||||
w.write(content).unwrap();
|
||||
w.flush().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
//! WORM (Write Once Read Many) directory abstraction.
|
||||
/*!
|
||||
|
||||
WORM (Write Once Read Many) directory abstraction.
|
||||
|
||||
*/
|
||||
|
||||
#[cfg(feature = "mmap")]
|
||||
mod mmap_directory;
|
||||
@@ -18,19 +22,19 @@ pub mod error;
|
||||
|
||||
mod composite_file;
|
||||
|
||||
use std::io::BufWriter;
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub use common::{AntiCallToken, TerminatingWrite};
|
||||
|
||||
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
|
||||
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
|
||||
pub use self::directory::DirectoryLock;
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
|
||||
pub use self::file_slice::{FileHandle, FileSlice};
|
||||
pub use self::owned_bytes::OwnedBytes;
|
||||
pub use self::ram_directory::RamDirectory;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
pub use common::AntiCallToken;
|
||||
pub use common::TerminatingWrite;
|
||||
use std::io::BufWriter;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Outcome of the Garbage collection
|
||||
pub struct GarbageCollectionResult {
|
||||
@@ -46,10 +50,11 @@ pub struct GarbageCollectionResult {
|
||||
pub failed_to_delete_files: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
pub use self::managed_directory::ManagedDirectory;
|
||||
#[cfg(feature = "mmap")]
|
||||
pub use self::mmap_directory::MmapDirectory;
|
||||
|
||||
pub use self::managed_directory::ManagedDirectory;
|
||||
|
||||
/// Write object for Directory.
|
||||
///
|
||||
/// `WritePtr` are required to implement both Write
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
use crate::directory::FileHandle;
|
||||
use std::io;
|
||||
use std::ops::Range;
|
||||
|
||||
pub use ownedbytes::OwnedBytes;
|
||||
|
||||
use crate::directory::FileHandle;
|
||||
|
||||
impl FileHandle for OwnedBytes {
|
||||
fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
|
||||
Ok(self.slice(range))
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{fmt, result};
|
||||
|
||||
use common::HasLen;
|
||||
use fail::fail_point;
|
||||
|
||||
use super::FileHandle;
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::{
|
||||
AntiCallToken, Directory, FileSlice, TerminatingWrite, WatchCallback, WatchCallbackList,
|
||||
WatchHandle, WritePtr,
|
||||
};
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::WatchCallbackList;
|
||||
use crate::directory::{Directory, FileSlice, WatchCallback, WatchHandle};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use common::HasLen;
|
||||
use fail::fail_point;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::FileHandle;
|
||||
|
||||
/// Writer associated with the `RamDirectory`
|
||||
///
|
||||
@@ -40,9 +40,7 @@ impl Drop for VecWriter {
|
||||
fn drop(&mut self) {
|
||||
if !self.is_flushed {
|
||||
warn!(
|
||||
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop. This \
|
||||
also occurs when the indexer crashed, so you may want to check the logs for the \
|
||||
root cause.",
|
||||
"You forgot to flush {:?} before its writter got Drop. Do not rely on drop. This also occurs when the indexer crashed, so you may want to check the logs for the root cause.",
|
||||
self.path
|
||||
)
|
||||
}
|
||||
@@ -125,6 +123,7 @@ impl fmt::Debug for RamDirectory {
|
||||
///
|
||||
/// It is mainly meant for unit testing.
|
||||
/// Writes are only made visible upon flushing.
|
||||
///
|
||||
#[derive(Clone, Default)]
|
||||
pub struct RamDirectory {
|
||||
fs: Arc<RwLock<InnerDirectory>>,
|
||||
@@ -234,11 +233,10 @@ impl Directory for RamDirectory {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
use super::RamDirectory;
|
||||
use crate::Directory;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
#[test]
|
||||
fn test_persist() {
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
use super::*;
|
||||
use futures::channel::oneshot;
|
||||
use futures::executor::block_on;
|
||||
use std::io::Write;
|
||||
use std::mem;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -6,11 +9,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use futures::executor::block_on;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[cfg(feature = "mmap")]
|
||||
mod mmap_directory_tests {
|
||||
use crate::directory::MmapDirectory;
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::sync::{Arc, RwLock, Weak};
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use futures::{Future, TryFutureExt};
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
|
||||
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
|
||||
#[derive(Clone)]
|
||||
@@ -102,14 +103,12 @@ impl WatchCallbackList {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::directory::{WatchCallback, WatchCallbackList};
|
||||
use futures::executor::block_on;
|
||||
use std::mem;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::executor::block_on;
|
||||
|
||||
use crate::directory::{WatchCallback, WatchCallbackList};
|
||||
|
||||
#[test]
|
||||
fn test_watch_event_router_simple() {
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::borrow::{Borrow, BorrowMut};
|
||||
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::DocId;
|
||||
use std::borrow::Borrow;
|
||||
use std::borrow::BorrowMut;
|
||||
|
||||
/// Sentinel value returned when a DocSet has been entirely consumed.
|
||||
///
|
||||
|
||||
20
src/error.rs
20
src/error.rs
@@ -1,14 +1,17 @@
|
||||
//! Definition of Tantivy's error and result.
|
||||
|
||||
use std::io;
|
||||
|
||||
use crate::directory::error::{Incompatibility, LockError};
|
||||
use crate::fastfield::FastFieldNotAvailableError;
|
||||
use crate::query;
|
||||
use crate::{
|
||||
directory::error::{OpenDirectoryError, OpenReadError, OpenWriteError},
|
||||
schema,
|
||||
};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::PoisonError;
|
||||
use std::{fmt, io};
|
||||
|
||||
use crate::directory::error::{
|
||||
Incompatibility, LockError, OpenDirectoryError, OpenReadError, OpenWriteError,
|
||||
};
|
||||
use crate::fastfield::FastFieldNotAvailableError;
|
||||
use crate::{query, schema};
|
||||
|
||||
/// Represents a `DataCorruption` error.
|
||||
///
|
||||
@@ -74,9 +77,6 @@ pub enum TantivyError {
|
||||
/// A thread holding the locked panicked and poisoned the lock.
|
||||
#[error("A thread holding the locked panicked and poisoned the lock")]
|
||||
Poisoned,
|
||||
/// The provided field name does not exist.
|
||||
#[error("The field does not exist: '{0}'")]
|
||||
FieldNotFound(String),
|
||||
/// Invalid argument was passed by the user.
|
||||
#[error("An invalid argument was passed: '{0}'")]
|
||||
InvalidArgument(String),
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
use common::{intersect_bitsets, BitSet, ReadOnlyBitSet};
|
||||
use ownedbytes::OwnedBytes;
|
||||
|
||||
use crate::space_usage::ByteCount;
|
||||
use crate::DocId;
|
||||
use common::intersect_bitsets;
|
||||
use common::BitSet;
|
||||
use common::ReadOnlyBitSet;
|
||||
use ownedbytes::OwnedBytes;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
|
||||
/// Write a alive `BitSet`
|
||||
///
|
||||
@@ -168,12 +168,11 @@ mod tests {
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
|
||||
use super::AliveBitSet;
|
||||
use rand::prelude::IteratorRandom;
|
||||
use rand::thread_rng;
|
||||
use test::Bencher;
|
||||
|
||||
use super::AliveBitSet;
|
||||
|
||||
fn get_alive() -> Vec<u32> {
|
||||
let mut data = (0..1_000_000_u32).collect::<Vec<u32>>();
|
||||
for _ in 0..(1_000_000) * 1 / 8 {
|
||||
|
||||
@@ -6,11 +6,10 @@ pub use self::writer::BytesFastFieldWriter;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Deref;
|
||||
|
||||
use crate::query::TermQuery;
|
||||
use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value, FAST, INDEXED, STORED};
|
||||
use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value};
|
||||
use crate::{query::TermQuery, schema::FAST, schema::INDEXED, schema::STORED};
|
||||
use crate::{DocAddress, DocSet, Index, Searcher, Term};
|
||||
use std::ops::Deref;
|
||||
|
||||
#[test]
|
||||
fn test_bytes() -> crate::Result<()> {
|
||||
@@ -63,7 +62,7 @@ mod tests {
|
||||
assert_eq!(values.len(), 2);
|
||||
let values_bytes: Vec<&[u8]> = values
|
||||
.into_iter()
|
||||
.flat_map(|value| value.as_bytes())
|
||||
.flat_map(|value| value.bytes_value())
|
||||
.collect();
|
||||
assert_eq!(values_bytes, &[&b"tantivy"[..], &b"lucene"[..]]);
|
||||
Ok(())
|
||||
@@ -86,7 +85,7 @@ mod tests {
|
||||
let field = searcher.schema().get_field("string_bytes").unwrap();
|
||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let term_weight = term_query.specialized_weight(&*searcher, true)?;
|
||||
let term_weight = term_query.specialized_weight(&searcher, true)?;
|
||||
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
|
||||
assert_eq!(term_scorer.doc(), 0u32);
|
||||
Ok(())
|
||||
@@ -99,7 +98,7 @@ mod tests {
|
||||
let field = searcher.schema().get_field("string_bytes").unwrap();
|
||||
let term = Term::from_field_bytes(field, b"lucene".as_ref());
|
||||
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
|
||||
let term_weight_err = term_query.specialized_weight(&*searcher, false);
|
||||
let term_weight_err = term_query.specialized_weight(&searcher, false);
|
||||
assert!(matches!(
|
||||
term_weight_err,
|
||||
Err(crate::TantivyError::SchemaError(_))
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::directory::{FileSlice, OwnedBytes};
|
||||
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, MultiValueLength};
|
||||
use crate::directory::FileSlice;
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::fastfield::{BitpackedFastFieldReader, FastFieldReader, MultiValueLength};
|
||||
use crate::DocId;
|
||||
|
||||
/// Reader for byte array fast fields
|
||||
@@ -14,13 +15,13 @@ use crate::DocId;
|
||||
/// and the start index for the next document, and keeping the bytes in between.
|
||||
#[derive(Clone)]
|
||||
pub struct BytesFastFieldReader {
|
||||
idx_reader: DynamicFastFieldReader<u64>,
|
||||
idx_reader: BitpackedFastFieldReader<u64>,
|
||||
values: OwnedBytes,
|
||||
}
|
||||
|
||||
impl BytesFastFieldReader {
|
||||
pub(crate) fn open(
|
||||
idx_reader: DynamicFastFieldReader<u64>,
|
||||
idx_reader: BitpackedFastFieldReader<u64>,
|
||||
values_file: FileSlice,
|
||||
) -> crate::Result<BytesFastFieldReader> {
|
||||
let values = values_file.read_bytes()?;
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use std::io;
|
||||
|
||||
use crate::fastfield::serializer::CompositeFastFieldSerializer;
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::schema::{Document, Field, Value};
|
||||
use crate::DocId;
|
||||
use crate::{
|
||||
fastfield::serializer::CompositeFastFieldSerializer, indexer::doc_id_mapping::DocIdMapping,
|
||||
};
|
||||
|
||||
/// Writer for byte array (as in, any number of bytes per document) fast fields
|
||||
///
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::result;
|
||||
|
||||
use crate::schema::FieldEntry;
|
||||
use std::result;
|
||||
|
||||
/// `FastFieldNotAvailableError` is returned when the
|
||||
/// user requested for a fast field reader, and the field was not
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::str;
|
||||
|
||||
use super::MultiValuedFastFieldReader;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::schema::Facet;
|
||||
use crate::termdict::{TermDictionary, TermOrdinal};
|
||||
use crate::termdict::TermDictionary;
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::DocId;
|
||||
use std::str;
|
||||
|
||||
/// The facet reader makes it possible to access the list of
|
||||
/// facets associated to a given document in a specific
|
||||
@@ -82,8 +82,11 @@ impl FacetReader {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::schema::{Facet, FacetOptions, SchemaBuilder, Value, STORED};
|
||||
use crate::{DocAddress, Document, Index};
|
||||
use crate::Index;
|
||||
use crate::{
|
||||
schema::{Facet, FacetOptions, SchemaBuilder, Value, STORED},
|
||||
DocAddress, Document,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_facet_only_indexed() -> crate::Result<()> {
|
||||
@@ -103,7 +106,7 @@ mod tests {
|
||||
facet_reader.facet_ords(0u32, &mut facet_ords);
|
||||
assert_eq!(&facet_ords, &[2u64]);
|
||||
let doc = searcher.doc(DocAddress::new(0u32, 0u32))?;
|
||||
let value = doc.get_first(facet_field).and_then(Value::as_facet);
|
||||
let value = doc.get_first(facet_field).and_then(Value::facet);
|
||||
assert_eq!(value, None);
|
||||
Ok(())
|
||||
}
|
||||
@@ -126,7 +129,7 @@ mod tests {
|
||||
facet_reader.facet_ords(0u32, &mut facet_ords);
|
||||
assert_eq!(&facet_ords, &[2u64]);
|
||||
let doc = searcher.doc(DocAddress::new(0u32, 0u32))?;
|
||||
let value: Option<&Facet> = doc.get_first(facet_field).and_then(Value::as_facet);
|
||||
let value: Option<&Facet> = doc.get_first(facet_field).and_then(Value::facet);
|
||||
assert_eq!(value, Facet::from_text("/a/b").ok().as_ref());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,38 +1,51 @@
|
||||
//! Column oriented field storage for tantivy.
|
||||
//!
|
||||
//! It is the equivalent of `Lucene`'s `DocValues`.
|
||||
//!
|
||||
//! Fast fields is a column-oriented fashion storage of `tantivy`.
|
||||
//!
|
||||
//! It is designed for the fast random access of some document
|
||||
//! fields given a document id.
|
||||
//!
|
||||
//! `FastField` are useful when a field is required for all or most of
|
||||
//! the `DocSet` : for instance for scoring, grouping, filtering, or faceting.
|
||||
//!
|
||||
//!
|
||||
//! Fields have to be declared as `FAST` in the schema.
|
||||
//! Currently only 64-bits integers (signed or unsigned) are
|
||||
//! supported.
|
||||
//!
|
||||
//! They are stored in a bit-packed fashion so that their
|
||||
//! memory usage is directly linear with the amplitude of the
|
||||
//! values stored.
|
||||
//!
|
||||
//! Read access performance is comparable to that of an array lookup.
|
||||
/*!
|
||||
Column oriented field storage for tantivy.
|
||||
|
||||
pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet};
|
||||
It is the equivalent of `Lucene`'s `DocValues`.
|
||||
|
||||
Fast fields is a column-oriented fashion storage of `tantivy`.
|
||||
|
||||
It is designed for the fast random access of some document
|
||||
fields given a document id.
|
||||
|
||||
`FastField` are useful when a field is required for all or most of
|
||||
the `DocSet` : for instance for scoring, grouping, filtering, or faceting.
|
||||
|
||||
|
||||
Fields have to be declared as `FAST` in the schema.
|
||||
Currently only 64-bits integers (signed or unsigned) are
|
||||
supported.
|
||||
|
||||
They are stored in a bit-packed fashion so that their
|
||||
memory usage is directly linear with the amplitude of the
|
||||
values stored.
|
||||
|
||||
Read access performance is comparable to that of an array lookup.
|
||||
*/
|
||||
|
||||
pub use self::alive_bitset::intersect_alive_bitsets;
|
||||
pub use self::alive_bitset::write_alive_bitset;
|
||||
pub use self::alive_bitset::AliveBitSet;
|
||||
pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
|
||||
pub use self::error::{FastFieldNotAvailableError, Result};
|
||||
pub use self::facet_reader::FacetReader;
|
||||
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
|
||||
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
|
||||
pub(crate) use self::reader::BitpackedFastFieldReader;
|
||||
pub use self::reader::DynamicFastFieldReader;
|
||||
pub use self::reader::FastFieldReader;
|
||||
pub use self::readers::FastFieldReaders;
|
||||
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldDataAccess, FastFieldStats};
|
||||
pub use self::serializer::CompositeFastFieldSerializer;
|
||||
pub use self::serializer::FastFieldDataAccess;
|
||||
pub use self::serializer::FastFieldStats;
|
||||
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
|
||||
use crate::chrono::{NaiveDateTime, Utc};
|
||||
use crate::schema::{Cardinality, FieldType, Type, Value};
|
||||
use crate::schema::Cardinality;
|
||||
use crate::schema::FieldType;
|
||||
use crate::schema::Value;
|
||||
use crate::DocId;
|
||||
use crate::{
|
||||
chrono::{NaiveDateTime, Utc},
|
||||
schema::Type,
|
||||
};
|
||||
|
||||
mod alive_bitset;
|
||||
mod bytes;
|
||||
@@ -200,20 +213,22 @@ fn value_to_u64(value: &Value) -> u64 {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
|
||||
use super::*;
|
||||
use crate::directory::CompositeFile;
|
||||
use crate::directory::{Directory, RamDirectory, WritePtr};
|
||||
use crate::merge_policy::NoMergePolicy;
|
||||
use crate::schema::Field;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::FAST;
|
||||
use crate::schema::{Document, IntOptions};
|
||||
use crate::{Index, SegmentId, SegmentReader};
|
||||
use common::HasLen;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::SeedableRng;
|
||||
|
||||
use super::*;
|
||||
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
|
||||
use crate::merge_policy::NoMergePolicy;
|
||||
use crate::schema::{Document, Field, IntOptions, Schema, FAST};
|
||||
use crate::{Index, SegmentId, SegmentReader};
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
|
||||
pub static SCHEMA: Lazy<Schema> = Lazy::new(|| {
|
||||
let mut schema_builder = Schema::builder();
|
||||
@@ -392,7 +407,7 @@ mod tests {
|
||||
serializer.close().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
// assert_eq!(file.len(), 17710 as usize); //bitpacked size
|
||||
//assert_eq!(file.len(), 17710 as usize); //bitpacked size
|
||||
assert_eq!(file.len(), 10175_usize); // linear interpol size
|
||||
{
|
||||
let fast_fields_composite = CompositeFile::open(&file)?;
|
||||
@@ -572,16 +587,16 @@ mod tests {
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
use super::tests::FIELD;
|
||||
use super::tests::{generate_permutation, SCHEMA};
|
||||
use super::*;
|
||||
use crate::directory::CompositeFile;
|
||||
use crate::directory::{Directory, RamDirectory, WritePtr};
|
||||
use crate::fastfield::FastFieldReader;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
|
||||
use test::{self, Bencher};
|
||||
|
||||
use super::tests::{generate_permutation, FIELD, SCHEMA};
|
||||
use super::*;
|
||||
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
|
||||
use crate::fastfield::FastFieldReader;
|
||||
|
||||
#[bench]
|
||||
fn bench_intfastfield_linear_veclookup(b: &mut Bencher) {
|
||||
let permutation = generate_permutation();
|
||||
|
||||
@@ -7,17 +7,23 @@ pub use self::writer::MultiValuedFastFieldWriter;
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use chrono::Duration;
|
||||
use futures::executor::block_on;
|
||||
use proptest::strategy::Strategy;
|
||||
use proptest::{prop_oneof, proptest};
|
||||
use test_log::test;
|
||||
|
||||
use crate::collector::TopDocs;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::query::QueryParser;
|
||||
use crate::schema::{Cardinality, Facet, FacetOptions, IntOptions, Schema};
|
||||
use crate::{Document, Index, Term};
|
||||
use crate::schema::Cardinality;
|
||||
use crate::schema::Facet;
|
||||
use crate::schema::FacetOptions;
|
||||
use crate::schema::IntOptions;
|
||||
use crate::schema::Schema;
|
||||
use crate::Document;
|
||||
use crate::Index;
|
||||
use crate::Term;
|
||||
use chrono::Duration;
|
||||
use futures::executor::block_on;
|
||||
use proptest::prop_oneof;
|
||||
use proptest::proptest;
|
||||
use proptest::strategy::Strategy;
|
||||
use test_log::test;
|
||||
|
||||
#[test]
|
||||
fn test_multivalued_u64() -> crate::Result<()> {
|
||||
@@ -104,7 +110,7 @@ mod tests {
|
||||
retrieved_doc
|
||||
.get_first(date_field)
|
||||
.expect("cannot find value")
|
||||
.as_date()
|
||||
.date_value()
|
||||
.unwrap()
|
||||
.timestamp(),
|
||||
first_time_stamp.timestamp()
|
||||
@@ -113,7 +119,7 @@ mod tests {
|
||||
retrieved_doc
|
||||
.get_first(time_i)
|
||||
.expect("cannot find value")
|
||||
.as_i64(),
|
||||
.i64_value(),
|
||||
Some(1i64)
|
||||
);
|
||||
}
|
||||
@@ -132,7 +138,7 @@ mod tests {
|
||||
retrieved_doc
|
||||
.get_first(date_field)
|
||||
.expect("cannot find value")
|
||||
.as_date()
|
||||
.date_value()
|
||||
.unwrap()
|
||||
.timestamp(),
|
||||
two_secs_ahead.timestamp()
|
||||
@@ -141,7 +147,7 @@ mod tests {
|
||||
retrieved_doc
|
||||
.get_first(time_i)
|
||||
.expect("cannot find value")
|
||||
.as_i64(),
|
||||
.i64_value(),
|
||||
Some(3i64)
|
||||
);
|
||||
}
|
||||
@@ -174,7 +180,7 @@ mod tests {
|
||||
retrieved_doc
|
||||
.get_first(date_field)
|
||||
.expect("cannot find value")
|
||||
.as_date()
|
||||
.date_value()
|
||||
.expect("value not of Date type")
|
||||
.timestamp(),
|
||||
(first_time_stamp + Duration::seconds(offset_sec)).timestamp()
|
||||
@@ -183,7 +189,7 @@ mod tests {
|
||||
retrieved_doc
|
||||
.get_first(time_i)
|
||||
.expect("cannot find value")
|
||||
.as_i64(),
|
||||
.i64_value(),
|
||||
Some(time_i_val)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::DocId;
|
||||
/// The `vals_reader` will access the concatenated list of all
|
||||
/// values for all reader.
|
||||
/// The `idx_reader` associated, for each document, the index of its first value.
|
||||
///
|
||||
#[derive(Clone)]
|
||||
pub struct MultiValuedFastFieldReader<Item: FastValue> {
|
||||
idx_reader: DynamicFastFieldReader<u64>,
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
use std::io;
|
||||
|
||||
use fnv::FnvHashMap;
|
||||
use tantivy_bitpacker::minmax;
|
||||
|
||||
use crate::fastfield::serializer::BitpackedFastFieldSerializerLegacy;
|
||||
use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer};
|
||||
use crate::indexer::doc_id_mapping::DocIdMapping;
|
||||
use crate::fastfield::CompositeFastFieldSerializer;
|
||||
use crate::postings::UnorderedTermId;
|
||||
use crate::schema::{Document, Field};
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::DocId;
|
||||
use crate::{fastfield::value_to_u64, indexer::doc_id_mapping::DocIdMapping};
|
||||
use fnv::FnvHashMap;
|
||||
use std::io;
|
||||
use tantivy_bitpacker::minmax;
|
||||
|
||||
/// Writer for multi-valued (as in, more than one value per document)
|
||||
/// int fast field.
|
||||
@@ -22,8 +20,7 @@ use crate::DocId;
|
||||
/// - add your document simply by calling `.add_document(...)`.
|
||||
///
|
||||
/// The `MultiValuedFastFieldWriter` can be acquired from the
|
||||
/// fastfield writer, by calling
|
||||
/// [`.get_multivalue_writer(...)`](./struct.FastFieldsWriter.html#method.get_multivalue_writer).
|
||||
/// fastfield writer, by calling [`.get_multivalue_writer(...)`](./struct.FastFieldsWriter.html#method.get_multivalue_writer).
|
||||
///
|
||||
/// Once acquired, writing is done by calling calls to
|
||||
/// `.add_document_vals(&[u64])` once per document.
|
||||
@@ -79,7 +76,7 @@ impl MultiValuedFastFieldWriter {
|
||||
// facets are indexed in the `SegmentWriter` as we encode their unordered id.
|
||||
if !self.is_facet {
|
||||
for field_value in doc.field_values() {
|
||||
if field_value.field == self.field {
|
||||
if field_value.field() == self.field {
|
||||
self.add_val(value_to_u64(field_value.value()));
|
||||
}
|
||||
}
|
||||
@@ -134,6 +131,7 @@ impl MultiValuedFastFieldWriter {
|
||||
/// During the serialization of the segment, terms gets sorted and
|
||||
/// `tantivy` builds a mapping to convert this `UnorderedTermId` into
|
||||
/// term ordinals.
|
||||
///
|
||||
pub fn serialize(
|
||||
&self,
|
||||
serializer: &mut CompositeFastFieldSerializer,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user