Compare commits

...

31 Commits

Author SHA1 Message Date
Paul Masurel
2e255c4bef Preparing for release 2022-03-09 09:59:08 +09:00
Paul Masurel
387592809f Updated CHANGELOG 2022-03-07 15:31:35 +09:00
Halvor Fladsrud Bø
cedced5bb0 Slop support for phrase queries (#1241)
Closes #1068
2022-03-07 15:29:18 +09:00
dependabot[bot]
d31f045872 Bump actions/checkout from 2 to 3 (#1300)
Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 3.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v2...v3)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2022-03-07 11:54:26 +09:00
PSeitz
6656a70d1b Merge pull request #1301 from saroh/1232-doc-fastfield
update fastfield doc
2022-03-04 08:18:21 +01:00
saroh
d36e0a9549 fix fastfield doc 2022-03-03 17:43:18 +01:00
Antoine G
8771b2673f Update src/fastfield/writer.rs
Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>
2022-03-03 11:25:24 +01:00
Antoine G
a41d3d51a4 Update fastfield_codecs/src/lib.rs 2022-03-03 11:25:06 +01:00
Saroh
cae34ffe47 update fastfield doc 2022-03-02 16:04:15 +01:00
PSeitz
4b62f7907d Merge pull request #1297 from PSeitz/fix_clippy
fix clippy issues
2022-03-02 10:11:56 +01:00
Pascal Seitz
7fa6a0b665 cargo fmt 2022-03-02 09:24:14 +01:00
PSeitz
458ed29a31 Merge pull request #1299 from saroh/1232-doc-lint
doc lint for errors and aggregations
2022-03-02 09:22:07 +01:00
Antoine G
e37775fe21 iff->if or if and only if (#1298)
* has_xxx is_xxx -> if, these function usualy define equivalence
xxx returns bool -> specify equivalence when appropriate

* fix doc
2022-03-02 11:00:00 +09:00
Saroh
1cd2434a32 fix(aggregations) Readme 2022-03-01 20:37:48 +01:00
Saroh
de2cba6d1e error definitions 2022-03-01 20:13:59 +01:00
Paul Masurel
c0b1a58d27 Apply suggestions from code review 2022-03-01 18:41:58 +09:00
Paul Masurel
848b795b9f Apply suggestions from code review 2022-03-01 18:37:51 +09:00
Pascal Seitz
091b668624 fix clippy issues 2022-03-01 08:58:51 +01:00
Paul Masurel
5004290daa Return an error on certain type of corruption. (#1296) 2022-03-01 11:35:56 +09:00
StyMaar
5d2c2b804c Fix link to RamDirectory and MMapDirectory in Directory's documentation (#1295) 2022-03-01 09:46:53 +09:00
PSeitz
1a92b588e0 Merge pull request #1294 from PSeitz/aggregation
fix intermediate result de/serialization
2022-02-28 08:39:23 +01:00
Pascal Seitz
010e92c118 fix intermediate result de/serialization
return None for empty average/stats metric
add test for de/serialization of intermediate result
add test for metric on empty result
2022-02-25 16:39:57 +01:00
Paul Masurel
2ead010c83 Tantivy quickwit (#1293)
* Added sstable and enabling it by default, and parallel boolean query.
* Added async API for FileSlice.
* Added async get_doc
* Reduce blocksize to 32_000
* Added debug logs

Quickwit specific feature a hidden behind the quickwit feature flag.
2022-02-25 17:32:49 +09:00
PSeitz
c4f66eb185 improve validation in aggregation, extend invalid field test (#1292)
* improve validation in aggregation, extend invalid field test

improve validation in aggregation
extend invalid field test
Fixes #1291

* collect fast field names on request structure

* fix visibility of AggregationSegmentCollector
2022-02-25 15:21:19 +09:00
Paul Masurel
d7b46d2137 Added JSON Type (#1270)
- Removed useless copy when ingesting JSON.
- Bugfix in phrase query with a missing field norms.
- Disabled range query on default fields

Closes #1251
2022-02-24 16:25:22 +09:00
PSeitz
d042ce74c7 Merge pull request #1289 from PSeitz/numeric_options
rename IntOptions to NumericOptions
2022-02-23 14:04:40 +01:00
PSeitz
7ba9e662b8 Merge pull request #1290 from PSeitz/improve_docs
improve aggregation docs
2022-02-23 14:04:20 +01:00
Pascal Seitz
fdd5ef85e5 improve aggregation docs 2022-02-22 10:37:54 +01:00
Pascal Seitz
704498a1ac rename IntOptions to NumericOptions
keep IntOptions with deprecation warning
Fixes #1286
2022-02-21 22:20:07 +01:00
PSeitz
1232af7928 fix docs (#1288) 2022-02-21 23:15:58 +09:00
Paul Masurel
d37633e034 Minor changes in indexing. (#1285) 2022-02-21 17:16:52 +09:00
115 changed files with 5470 additions and 993 deletions

View File

@@ -10,7 +10,7 @@ jobs:
coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup toolchain install nightly --component llvm-tools-preview
- name: Install cargo-llvm-cov

View File

@@ -12,13 +12,13 @@ jobs:
functional_test_unsorted:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Run indexing_unsorted
run: cargo test indexing_unsorted -- --ignored
functional_test_sorted:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Run indexing_sorted
run: cargo test indexing_sorted -- --ignored

View File

@@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose --workspace
- name: Install latest nightly to test also against unstable feature flag
@@ -24,16 +24,23 @@ jobs:
toolchain: nightly
override: true
components: rustfmt
- name: Install latest nightly to test also against unstable feature flag
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy
- name: Run tests
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
- name: Run tests quickwit feature
run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace
- name: Check Formatting
run: cargo +nightly fmt --all -- --check
- uses: actions-rs/clippy-check@v1
with:
toolchain: stable

View File

@@ -9,6 +9,8 @@ Tantivy 0.17
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-oss/tantivy/issues/1225)
- Fix opening bytes index with dynamic codec (@PSeitz) [#1278](https://github.com/quickwit-oss/tantivy/issues/1278)
- Added an aggregation collector compatible with Elasticsearch (@PSeitz)
- Added a JSON schema type @fulmicoton [#1251](https://github.com/quickwit-oss/tantivy/issues/1251)
- Added support for slop in phrase queries @halvorboe [#1068](https://github.com/quickwit-oss/tantivy/issues/1068)
Tantivy 0.16.2
================================

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.17.0-dev"
version = "0.17.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -35,7 +35,7 @@ crossbeam = "0.8.1"
futures = { version = "0.3.15", features = ["thread-pool"] }
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
common = { version = "0.1", path = "./common/", package = "tantivy-common" }
common = { version = "0.2", path = "./common/", package = "tantivy-common" }
fastfield_codecs = { version="0.1", path="./fastfield_codecs", default-features = false }
ownedbytes = { version="0.2", path="./ownedbytes" }
stable_deref_trait = "1.2"
@@ -56,6 +56,8 @@ fastdivide = "0.4"
itertools = "0.10.0"
measure_time = "0.8.0"
pretty_assertions = "1.1.0"
serde_cbor = {version="0.11", optional=true}
async-trait = "0.1"
[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"
@@ -94,6 +96,8 @@ snappy-compression = ["snap"]
failpoints = ["fail/failpoints"]
unstable = [] # useful for benches.
quickwit = ["serde_cbor"]
[workspace]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes"]

View File

@@ -1,4 +1,3 @@
[![Docs](https://docs.rs/tantivy/badge.svg)](https://docs.rs/crate/tantivy/)
[![Build Status](https://github.com/quickwit-oss/tantivy/actions/workflows/test.yml/badge.svg)](https://github.com/quickwit-oss/tantivy/actions/workflows/test.yml)
[![codecov](https://codecov.io/gh/quickwit-oss/tantivy/branch/main/graph/badge.svg)](https://codecov.io/gh/quickwit-oss/tantivy)

View File

@@ -4,6 +4,7 @@ use tantivy::schema::{INDEXED, STORED, STRING, TEXT};
use tantivy::Index;
const HDFS_LOGS: &str = include_str!("hdfs.json");
const NUM_REPEATS: usize = 2;
pub fn hdfs_index_benchmark(c: &mut Criterion) {
let schema = {
@@ -20,6 +21,11 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
schema_builder.add_text_field("severity", STRING | STORED);
schema_builder.build()
};
let dynamic_schema = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_json_field("json", TEXT);
schema_builder.build()
};
let mut group = c.benchmark_group("index-hdfs");
group.sample_size(20);
@@ -27,7 +33,7 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
@@ -39,7 +45,7 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
@@ -52,9 +58,11 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
})
});
@@ -62,9 +70,43 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
index_writer.commit().unwrap();
})
});
group.bench_function("index-hdfs-no-commit-json-without-docstore", |b| {
b.iter(|| {
let index = Index::create_in_ram(dynamic_schema.clone());
let json_field = dynamic_schema.get_field("json").unwrap();
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let json_val: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(doc_json).unwrap();
let doc = tantivy::doc!(json_field=>json_val);
index_writer.add_document(doc).unwrap();
}
}
index_writer.commit().unwrap();
})
});
group.bench_function("index-hdfs-with-commit-json-without-docstore", |b| {
b.iter(|| {
let index = Index::create_in_ram(dynamic_schema.clone());
let json_field = dynamic_schema.get_field("json").unwrap();
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let json_val: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(doc_json).unwrap();
let doc = tantivy::doc!(json_field=>json_val);
index_writer.add_document(doc).unwrap();
}
}
index_writer.commit().unwrap();
})

View File

@@ -6,6 +6,7 @@ extern crate test;
mod tests {
use tantivy_bitpacker::BlockedBitpacker;
use test::Bencher;
#[bench]
fn bench_blockedbitp_read(b: &mut Bencher) {
let mut blocked_bitpacker = BlockedBitpacker::new();
@@ -20,6 +21,7 @@ mod tests {
out
});
}
#[bench]
fn bench_blockedbitp_create(b: &mut Bencher) {
b.iter(|| {

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy-common"
version = "0.1.0"
version = "0.2.0"
authors = ["Paul Masurel <paul@quickwit.io>", "Pascal Seitz <pascal@quickwit.io>"]
license = "MIT"
edition = "2018"

128
doc/src/json.md Normal file
View File

@@ -0,0 +1,128 @@
# Json
As of tantivy 0.17, tantivy supports a json object type.
This type can be used to allow for a schema-less search index.
When indexing a json object, we "flatten" the JSON. This operation emits terms that represent a triplet `(json_path, value_type, value)`
For instance, if user is a json field, the following document:
```json
{
"user": {
"name": "Paul Masurel",
"address": {
"city": "Tokyo",
"country": "Japan"
},
"created_at": "2018-11-12T23:20:50.52Z"
}
}
```
emits the following tokens:
- ("name", Text, "Paul")
- ("name", Text, "Masurel")
- ("address.city", Text, "Tokyo")
- ("address.country", Text, "Japan")
- ("created_at", Date, 15420648505)
# Bytes-encoding and lexicographical sort.
Like any other terms, these triplets are encoded into a binary format as follows.
- `json_path`: the json path is a sequence of "segments". In the example above, `address.city`
is just a debug representation of the json path `["address", "city"]`.
Its representation is done by separating segments by a unicode char `\x01`, and ending the path by `\x00`.
- `value type`: One byte represents the `Value` type.
- `value`: The value representation is just the regular Value representation.
This representation is designed to align the natural sort of Terms with the lexicographical sort
of their binary representation (Tantivy's dictionary (whether fst or sstable) is sorted and does prefix encoding).
In the example above, the terms will be sorted as
- ("address.city", Text, "Tokyo")
- ("address.country", Text, "Japan")
- ("name", Text, "Masurel")
- ("name", Text, "Paul")
- ("created_at", Date, 15420648505)
As seen in "pitfalls", we may end up having to search for a value for a same path in several different fields. Putting the field code after the path makes it maximizes compression opportunities but also increases the chances for the two terms to end up in the actual same term dictionary block.
# Pitfalls, limitation and corner cases.
Json gives very little information about the type of the literals it stores.
All numeric types end up mapped as a "Number" and there are no types for dates.
At indexing, tantivy will try to interpret number and strings as different type with a
priority order.
Numbers will be interpreted as u64, i64 and f64 in that order.
Strings will be interpreted as rfc3999 dates or simple strings.
The first working type is picked and is the only term that is emitted for indexing.
Note this interpretation happens on a per-document basis, and there is no effort to try to sniff
a consistent field type at the scale of a segment.
On the query parser side on the other hand, we may end up emitting more than one type.
For instance, we do not even know if the type is a number or string based.
So the query
```
my_path.my_segment:233
```
Will be interpreted as
`(my_path.my_segment, String, 233) or (my_path.my_segment, u64, 233)`
Likewise, we need to emit two tokens if the query contains an rfc3999 date.
Indeed the date could have been actually a single token inside the text of a document at ingestion time. Generally speaking, we will always at least emit a string token in query parsing, and sometimes more.
If one more json field is defined, things get even more complicated.
## Default json field
If the schema contains a text field called "text" and a json field that is set as a default field:
`text:hello` could be reasonably interpreted as targetting the text field or as targetting the json field called `json_dynamic` with the json_path "text".
If there is such an ambiguity, we decide to only search in the "text" field: `text:hello`.
In other words, the parser will not search in default json fields if there is a schema hit.
This is a product decision.
The user can still target the JSON field by specifying its name explicitly:
`json_dynamic.text:hello`.
## Range queries are not supported.
Json field do not support range queries.
## Arrays do not work like nested object.
If json object contains an array, a search query might return more documents
than what might be expected.
Let's take an example.
```json
{
"cart_id": 3234234 ,
"cart": [
{"product_type": "sneakers", "attributes": {"color": "white"} },
{"product_type": "t-shirt", "attributes": {"color": "red"}},
]
}
```
Despite the array structure, a document in tantivy is a bag of terms.
The query:
```
cart.product_type:sneakers AND cart.attributes.color:red
```
Actually match the document above.

View File

@@ -26,7 +26,8 @@ fn main() -> tantivy::Result<()> {
)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype = crate::schema::IntOptions::default().set_fast(Cardinality::SingleValue);
let score_fieldtype =
crate::schema::NumericOptions::default().set_fast(Cardinality::SingleValue);
let highscore_field = schema_builder.add_f64_field("highscore", score_fieldtype.clone());
let price_field = schema_builder.add_f64_field("price", score_fieldtype.clone());

80
examples/json_field.rs Normal file
View File

@@ -0,0 +1,80 @@
// # Json field example
//
// This example shows how the json field can be used
// to make tantivy partially schemaless.
use tantivy::collector::{Count, TopDocs};
use tantivy::query::QueryParser;
use tantivy::schema::{Schema, FAST, STORED, STRING, TEXT};
use tantivy::Index;
fn main() -> tantivy::Result<()> {
// # Defining the schema
//
// We need two fields:
// - a timestamp
// - a json object field
let mut schema_builder = Schema::builder();
schema_builder.add_date_field("timestamp", FAST | STORED);
let event_type = schema_builder.add_text_field("event_type", STRING | STORED);
let attributes = schema_builder.add_json_field("attributes", STORED | TEXT);
let schema = schema_builder.build();
// # Indexing documents
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer(50_000_000)?;
let doc = schema.parse_document(
r#"{
"timestamp": "2022-02-22T23:20:50.53Z",
"event_type": "click",
"attributes": {
"target": "submit-button",
"cart": {"product_id": 103},
"description": "the best vacuum cleaner ever"
}
}"#,
)?;
index_writer.add_document(doc)?;
let doc = schema.parse_document(
r#"{
"timestamp": "2022-02-22T23:20:51.53Z",
"event_type": "click",
"attributes": {
"target": "submit-button",
"cart": {"product_id": 133},
"description": "das keyboard"
}
}"#,
)?;
index_writer.add_document(doc)?;
index_writer.commit()?;
let reader = index.reader()?;
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![event_type, attributes]);
{
let query = query_parser.parse_query("target:submit-button")?;
let count_docs = searcher.search(&*query, &TopDocs::with_limit(2))?;
assert_eq!(count_docs.len(), 2);
}
{
let query = query_parser.parse_query("target:submit")?;
let count_docs = searcher.search(&*query, &TopDocs::with_limit(2))?;
assert_eq!(count_docs.len(), 2);
}
{
let query = query_parser.parse_query("cart.product_id:103")?;
let count_docs = searcher.search(&*query, &Count)?;
assert_eq!(count_docs, 1);
}
{
let query = query_parser
.parse_query("event_type:click AND cart.product_id:133")
.unwrap();
let hits = searcher.search(&*query, &TopDocs::with_limit(2)).unwrap();
assert_eq!(hits.len(), 1);
}
Ok(())
}

View File

@@ -9,7 +9,7 @@ description = "Fast field codecs used by tantivy"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common = { version = "0.1", path = "../common/", package = "tantivy-common" }
common = { version = "0.2", path = "../common/", package = "tantivy-common" }
tantivy-bitpacker = { version="0.1.1", path = "../bitpacker/" }
prettytable-rs = {version="0.8.0", optional= true}
rand = {version="0.8.3", optional= true}

View File

@@ -63,6 +63,7 @@ pub trait FastFieldDataAccess {
}
#[derive(Debug, Clone)]
/// Statistics are used in codec detection and stored in the fast field footer.
pub struct FastFieldStats {
pub min_value: u64,
pub max_value: u64,

View File

@@ -59,7 +59,7 @@ pub enum UserInputBound {
}
impl UserInputBound {
fn display_lower(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
fn display_lower(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
UserInputBound::Inclusive(ref word) => write!(formatter, "[\"{}\"", word),
UserInputBound::Exclusive(ref word) => write!(formatter, "{{\"{}\"", word),
@@ -67,7 +67,7 @@ impl UserInputBound {
}
}
fn display_upper(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
fn display_upper(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
UserInputBound::Inclusive(ref word) => write!(formatter, "\"{}\"]", word),
UserInputBound::Exclusive(ref word) => write!(formatter, "\"{}\"}}", word),

View File

@@ -11,10 +11,10 @@ Tantivy's aggregations have been designed to mimic the
The code is organized in submodules:
##bucket
## bucket
Contains all bucket aggregations, like range aggregation. These bucket aggregations group documents into buckets and can contain sub-aggegations.
##metric
## metric
Contains all metric aggregations, like average aggregation. Metric aggregations do not have sub aggregations.
#### agg_req

View File

@@ -3,6 +3,7 @@
//!
//! [Aggregations] is the top level entry point to create a request, which is a `HashMap<String,
//! Aggregation>`.
//!
//! Requests are compatible with the json format of elasticsearch.
//!
//! # Example
@@ -43,7 +44,7 @@
//! assert_eq!(agg_req1, agg_req2);
//! ```
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
@@ -51,11 +52,20 @@ pub use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};
/// The top-level aggregation request structure, which contains [Aggregation] and their user defined
/// names.
/// names. It is also used in [buckets](BucketAggregation) to define sub-aggregations.
///
/// The key is the user defined name of the aggregation.
pub type Aggregations = HashMap<String, Aggregation>;
/// Extract all fast field names used in the tree.
pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
let mut fast_field_names = Default::default();
for el in aggs.values() {
el.get_fast_field_names(&mut fast_field_names)
}
fast_field_names
}
/// Aggregation request of [BucketAggregation] or [MetricAggregation].
///
/// An aggregation is either a bucket or a metric.
@@ -68,6 +78,15 @@ pub enum Aggregation {
Metric(MetricAggregation),
}
impl Aggregation {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
Aggregation::Bucket(bucket) => bucket.get_fast_field_names(fast_field_names),
Aggregation::Metric(metric) => metric.get_fast_field_names(fast_field_names),
}
}
}
/// BucketAggregations create buckets of documents. Each bucket is associated with a rule which
/// determines whether or not a document in the falls into it. In other words, the buckets
/// effectively define document sets. Buckets are not necessarily disjunct, therefore a document can
@@ -91,6 +110,13 @@ pub struct BucketAggregation {
pub sub_aggregation: Aggregations,
}
impl BucketAggregation {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
self.bucket_agg.get_fast_field_names(fast_field_names);
fast_field_names.extend(get_fast_field_names(&self.sub_aggregation));
}
}
/// The bucket aggregation types.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum BucketAggregationType {
@@ -99,6 +125,14 @@ pub enum BucketAggregationType {
Range(RangeAggregation),
}
impl BucketAggregationType {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
BucketAggregationType::Range(range) => fast_field_names.insert(range.field.to_string()),
};
}
}
/// The aggregations in this family compute metrics based on values extracted
/// from the documents that are being aggregated. Values are extracted from the fast field of
/// the document.
@@ -116,6 +150,15 @@ pub enum MetricAggregation {
Stats(StatsAggregation),
}
impl MetricAggregation {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
MetricAggregation::Average(avg) => fast_field_names.insert(avg.field.to_string()),
MetricAggregation::Stats(stats) => fast_field_names.insert(stats.field.to_string()),
};
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -166,4 +209,62 @@ mod tests {
let agg_req2: String = serde_json::to_string_pretty(&agg_req1).unwrap();
assert_eq!(agg_req2, elasticsearch_compatible_json_req);
}
#[test]
fn test_get_fast_field_names() {
let agg_req2: Aggregations = vec![
(
"range".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "score2".to_string(),
ranges: vec![
(f64::MIN..3f64).into(),
(3f64..7f64).into(),
(7f64..20f64).into(),
(20f64..f64::MAX).into(),
],
}),
sub_aggregation: Default::default(),
}),
),
(
"metric".to_string(),
Aggregation::Metric(MetricAggregation::Average(
AverageAggregation::from_field_name("field123".to_string()),
)),
),
]
.into_iter()
.collect();
let agg_req1: Aggregations = vec![(
"range".to_string(),
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "score".to_string(),
ranges: vec![
(f64::MIN..3f64).into(),
(3f64..7f64).into(),
(7f64..20f64).into(),
(20f64..f64::MAX).into(),
],
}),
sub_aggregation: agg_req2,
}),
)]
.into_iter()
.collect();
assert_eq!(
get_fast_field_names(&agg_req1),
vec![
"score".to_string(),
"score2".to_string(),
"field123".to_string()
]
.into_iter()
.collect()
)
}
}

View File

@@ -4,8 +4,8 @@ use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAgg
use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;
use crate::fastfield::DynamicFastFieldReader;
use crate::schema::Type;
use crate::fastfield::{type_and_cardinality, DynamicFastFieldReader, FastType};
use crate::schema::{Cardinality, Type};
use crate::{SegmentReader, TantivyError};
#[derive(Clone, Default)]
@@ -38,7 +38,7 @@ pub struct BucketAggregationWithAccessor {
}
impl BucketAggregationWithAccessor {
fn from_bucket(
fn try_from_bucket(
bucket: &BucketAggregationType,
sub_aggregation: &Aggregations,
reader: &SegmentReader,
@@ -53,7 +53,7 @@ impl BucketAggregationWithAccessor {
Ok(BucketAggregationWithAccessor {
accessor,
field_type,
sub_aggregation: get_aggregations_with_accessor(&sub_aggregation, reader)?,
sub_aggregation: get_aggs_with_accessor_and_validate(&sub_aggregation, reader)?,
bucket_agg: bucket.clone(),
})
}
@@ -68,7 +68,7 @@ pub struct MetricAggregationWithAccessor {
}
impl MetricAggregationWithAccessor {
fn from_metric(
fn try_from_metric(
metric: &MetricAggregation,
reader: &SegmentReader,
) -> crate::Result<MetricAggregationWithAccessor> {
@@ -87,7 +87,7 @@ impl MetricAggregationWithAccessor {
}
}
pub(crate) fn get_aggregations_with_accessor(
pub(crate) fn get_aggs_with_accessor_and_validate(
aggs: &Aggregations,
reader: &SegmentReader,
) -> crate::Result<AggregationsWithAccessor> {
@@ -97,7 +97,7 @@ pub(crate) fn get_aggregations_with_accessor(
match agg {
Aggregation::Bucket(bucket) => buckets.push((
key.to_string(),
BucketAggregationWithAccessor::from_bucket(
BucketAggregationWithAccessor::try_from_bucket(
&bucket.bucket_agg,
&bucket.sub_aggregation,
reader,
@@ -105,7 +105,7 @@ pub(crate) fn get_aggregations_with_accessor(
)),
Aggregation::Metric(metric) => metrics.push((
key.to_string(),
MetricAggregationWithAccessor::from_metric(metric, reader)?,
MetricAggregationWithAccessor::try_from_metric(metric, reader)?,
)),
}
}
@@ -124,15 +124,21 @@ fn get_ff_reader_and_validate(
.get_field(field_name)
.ok_or_else(|| TantivyError::FieldNotFound(field_name.to_string()))?;
let field_type = reader.schema().get_field_entry(field).field_type();
if field_type.value_type() != Type::I64
&& field_type.value_type() != Type::U64
&& field_type.value_type() != Type::F64
{
if let Some((ff_type, cardinality)) = type_and_cardinality(field_type) {
if cardinality == Cardinality::MultiValues || ff_type == FastType::Date {
return Err(TantivyError::InvalidArgument(format!(
"Invalid field type in aggregation {:?}, only Cardinality::SingleValue supported",
field_type.value_type()
)));
}
} else {
return Err(TantivyError::InvalidArgument(format!(
"Invalid field type in aggregation {:?}, only f64, u64, i64 is supported",
"Only single value fast fields of type f64, u64, i64 are supported, but got {:?} ",
field_type.value_type()
)));
}
};
let ff_fields = reader.fast_fields();
ff_fields
.u64_lenient(field)

View File

@@ -112,6 +112,34 @@ impl From<IntermediateBucketResult> for BucketResult {
/// This is the range entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
///
/// # JSON Format
/// ```json
/// {
/// ...
/// "my_ranges": {
/// "buckets": [
/// {
/// "key": "*-10",
/// "to": 10,
/// "doc_count": 5
/// },
/// {
/// "key": "10-20",
/// "from": 10,
/// "to": 20,
/// "doc_count": 2
/// },
/// {
/// "key": "20-*",
/// "from": 20,
/// "doc_count": 3
/// }
/// ]
/// }
/// ...
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RangeBucketEntry {
/// The identifier of the bucket.

View File

@@ -1,10 +1,13 @@
//! Module for all bucket aggregations.
//!
//! Results of final buckets are [BucketEntry](super::agg_result::BucketEntry).
//! BucketAggregations create buckets of documents
//! [BucketAggregation](super::agg_req::BucketAggregation).
//!
//! Results of final buckets are [BucketResult](super::agg_result::BucketResult).
//! Results of intermediate buckets are
//! [IntermediateBucketEntry](super::intermediate_agg_result::IntermediateBucketEntry)
//! [IntermediateBucketResult](super::intermediate_agg_result::IntermediateBucketResult)
mod range;
pub use range::RangeAggregation;
pub(crate) use range::SegmentRangeCollector;
pub use range::*;

View File

@@ -1,6 +1,5 @@
use std::ops::Range;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use crate::aggregation::agg_req_with_accessor::{
@@ -18,18 +17,33 @@ use crate::{DocId, TantivyError};
/// Provide user-defined buckets to aggregate on.
/// Two special buckets will automatically be created to cover the whole range of values.
/// The provided buckets have to be continous.
/// During the aggregation, the values extracted from the fast_field `field_name` will be checked
/// During the aggregation, the values extracted from the fast_field `field` will be checked
/// against each bucket range. Note that this aggregation includes the from value and excludes the
/// to value for each range.
///
/// Result type is [BucketResult](crate::aggregation::agg_result::BucketResult) with
/// [BucketEntryKeyCount](crate::aggregation::agg_result::BucketEntryKeyCount) on the
/// [RangeBucketEntry](crate::aggregation::agg_result::RangeBucketEntry) on the
/// AggregationCollector.
///
/// Result type is
/// [crate::aggregation::intermediate_agg_result::IntermediateBucketResult] with
/// [crate::aggregation::intermediate_agg_result::IntermediateBucketEntryKeyCount] on the
/// [crate::aggregation::intermediate_agg_result::IntermediateRangeBucketEntry] on the
/// DistributedAggregationCollector.
///
/// # Request JSON Format
/// ```json
/// {
/// "range": {
/// "field": "score",
/// "ranges": [
/// { "to": 3.0 },
/// { "from": 3.0, "to": 7.0 },
/// { "from": 7.0, "to": 20.0 }
/// { "from": 20.0 }
/// ]
/// }
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RangeAggregation {
/// The field to aggregate on.
@@ -40,9 +54,14 @@ pub struct RangeAggregation {
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
/// The range for one range bucket.
pub struct RangeAggregationRange {
/// The from range value, which is inclusive in the range.
/// None equals to an open ended interval.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub from: Option<f64>,
/// The to range value, which is not inclusive in the range.
/// None equals to an open ended interval.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub to: Option<f64>,
}
@@ -64,7 +83,7 @@ impl From<Range<f64>> for RangeAggregationRange {
}
#[derive(Clone, Debug, PartialEq)]
pub struct SegmentRangeAndBucketEntry {
pub(crate) struct SegmentRangeAndBucketEntry {
range: Range<u64>,
bucket: SegmentRangeBucketEntry,
}
@@ -87,7 +106,7 @@ impl SegmentRangeCollector {
.into_iter()
.map(move |range_bucket| {
(
range_to_key(&range_bucket.range, &field_type),
range_to_string(&range_bucket.range, &field_type),
range_bucket.bucket.into(),
)
})
@@ -96,7 +115,7 @@ impl SegmentRangeCollector {
IntermediateBucketResult::Range(buckets)
}
pub(crate) fn from_req(
pub(crate) fn from_req_and_validate(
req: &RangeAggregation,
sub_aggregation: &AggregationsWithAccessor,
field_type: Type,
@@ -120,7 +139,7 @@ impl SegmentRangeCollector {
let sub_aggregation = if sub_aggregation.is_empty() {
None
} else {
Some(SegmentAggregationResultsCollector::from_req(
Some(SegmentAggregationResultsCollector::from_req_and_validate(
sub_aggregation,
)?)
};
@@ -219,15 +238,22 @@ impl SegmentRangeCollector {
/// fast field.
/// The alternative would be that every value read would be converted to the f64 range, but that is
/// more computational expensive when many documents are hit.
fn to_u64_range(range: &RangeAggregationRange, field_type: &Type) -> Range<u64> {
range
.from
.map(|from| f64_to_fastfield_u64(from, field_type))
.unwrap_or(u64::MIN)
..range
.to
.map(|to| f64_to_fastfield_u64(to, field_type))
.unwrap_or(u64::MAX)
fn to_u64_range(range: &RangeAggregationRange, field_type: &Type) -> crate::Result<Range<u64>> {
let start = if let Some(from) = range.from {
f64_to_fastfield_u64(from, field_type)
.ok_or_else(|| TantivyError::InvalidArgument("invalid field type".to_string()))?
} else {
u64::MIN
};
let end = if let Some(to) = range.to {
f64_to_fastfield_u64(to, field_type)
.ok_or_else(|| TantivyError::InvalidArgument("invalid field type".to_string()))?
} else {
u64::MAX
};
Ok(start..end)
}
/// Extends the provided buckets to contain the whole value range, by inserting buckets at the
@@ -239,7 +265,7 @@ fn extend_validate_ranges(
let mut converted_buckets = buckets
.iter()
.map(|range| to_u64_range(range, field_type))
.collect_vec();
.collect::<crate::Result<Vec<_>>>()?;
converted_buckets.sort_by_key(|bucket| bucket.start);
if converted_buckets[0].start != u64::MIN {
@@ -274,7 +300,7 @@ fn extend_validate_ranges(
Ok(converted_buckets)
}
pub fn range_to_string(range: &Range<u64>, field_type: &Type) -> String {
pub(crate) fn range_to_string(range: &Range<u64>, field_type: &Type) -> String {
// is_start is there for malformed requests, e.g. ig the user passes the range u64::MIN..0.0,
// it should be rendererd as "*-0" and not "*-*"
let to_str = |val: u64, is_start: bool| {
@@ -288,7 +314,7 @@ pub fn range_to_string(range: &Range<u64>, field_type: &Type) -> String {
format!("{}-{}", to_str(range.start, true), to_str(range.end, false))
}
pub fn range_to_key(range: &Range<u64>, field_type: &Type) -> Key {
pub(crate) fn range_to_key(range: &Range<u64>, field_type: &Type) -> Key {
Key::Str(range_to_string(range, field_type))
}
@@ -315,7 +341,7 @@ mod tests {
ranges,
};
SegmentRangeCollector::from_req(&req, &Default::default(), field_type).unwrap()
SegmentRangeCollector::from_req_and_validate(&req, &Default::default(), field_type).unwrap()
}
#[test]
@@ -479,6 +505,7 @@ mod tests {
#[cfg(all(test, feature = "unstable"))]
mod bench {
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;

View File

@@ -3,9 +3,9 @@ use super::agg_req_with_accessor::AggregationsWithAccessor;
use super::agg_result::AggregationResults;
use super::intermediate_agg_result::IntermediateAggregationResults;
use super::segment_agg_result::SegmentAggregationResultsCollector;
use crate::aggregation::agg_req_with_accessor::get_aggregations_with_accessor;
use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate;
use crate::collector::{Collector, SegmentCollector};
use crate::TantivyError;
use crate::{SegmentReader, TantivyError};
/// Collector for aggregations.
///
@@ -50,12 +50,7 @@ impl Collector for DistributedAggregationCollector {
_segment_local_id: crate::SegmentOrdinal,
reader: &crate::SegmentReader,
) -> crate::Result<Self::Child> {
let aggs_with_accessor = get_aggregations_with_accessor(&self.agg, reader)?;
let result = SegmentAggregationResultsCollector::from_req(&aggs_with_accessor)?;
Ok(AggregationSegmentCollector {
aggs: aggs_with_accessor,
result,
})
AggregationSegmentCollector::from_agg_req_and_reader(&self.agg, reader)
}
fn requires_scoring(&self) -> bool {
@@ -80,8 +75,9 @@ impl Collector for AggregationCollector {
_segment_local_id: crate::SegmentOrdinal,
reader: &crate::SegmentReader,
) -> crate::Result<Self::Child> {
let aggs_with_accessor = get_aggregations_with_accessor(&self.agg, reader)?;
let result = SegmentAggregationResultsCollector::from_req(&aggs_with_accessor)?;
let aggs_with_accessor = get_aggs_with_accessor_and_validate(&self.agg, reader)?;
let result =
SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?;
Ok(AggregationSegmentCollector {
aggs: aggs_with_accessor,
result,
@@ -115,11 +111,29 @@ fn merge_fruits(
}
}
/// AggregationSegmentCollector does the aggregation collection on a segment.
pub struct AggregationSegmentCollector {
aggs: AggregationsWithAccessor,
result: SegmentAggregationResultsCollector,
}
impl AggregationSegmentCollector {
/// Creates an AggregationSegmentCollector from an [Aggregations] request and a segment reader.
/// Also includes validation, e.g. checking field types and existence.
pub fn from_agg_req_and_reader(
agg: &Aggregations,
reader: &SegmentReader,
) -> crate::Result<Self> {
let aggs_with_accessor = get_aggs_with_accessor_and_validate(agg, reader)?;
let result =
SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?;
Ok(AggregationSegmentCollector {
aggs: aggs_with_accessor,
result,
})
}
}
impl SegmentCollector for AggregationSegmentCollector {
type Fruit = IntermediateAggregationResults;

View File

@@ -11,7 +11,7 @@ use super::segment_agg_result::{
SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentMetricResultCollector,
SegmentRangeBucketEntry,
};
use super::{Key, VecWithNames};
use super::{Key, SerializedKey, VecWithNames};
/// Contains the intermediate aggregation result, which is optimized to be merged with other
/// intermediate results.
@@ -124,7 +124,7 @@ impl IntermediateMetricResult {
pub enum IntermediateBucketResult {
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub_aggregations.
Range(HashMap<Key, IntermediateRangeBucketEntry>),
Range(HashMap<SerializedKey, IntermediateRangeBucketEntry>),
}
impl From<SegmentBucketResultCollector> for IntermediateBucketResult {
@@ -215,7 +215,7 @@ mod tests {
let mut buckets = HashMap::new();
for (key, doc_count) in data {
buckets.insert(
Key::Str(key.to_string()),
key.to_string(),
IntermediateRangeBucketEntry {
key: Key::Str(key.to_string()),
doc_count: *doc_count,
@@ -238,7 +238,7 @@ mod tests {
let mut buckets = HashMap::new();
for (key, doc_count, sub_aggregation_key, sub_aggregation_count) in data {
buckets.insert(
Key::Str(key.to_string()),
key.to_string(),
IntermediateRangeBucketEntry {
key: Key::Str(key.to_string()),
doc_count: *doc_count,

View File

@@ -12,6 +12,15 @@ use crate::DocId;
/// extracted from the aggregated documents.
/// Supported field types are u64, i64, and f64.
/// See [super::SingleMetricResult] for return value.
///
/// # JSON Format
/// ```json
/// {
/// "avg": {
/// "field": "score",
/// }
/// }
/// ```
pub struct AverageAggregation {
/// The field name to compute the stats on.
pub field: String,
@@ -90,8 +99,12 @@ impl IntermediateAverage {
self.doc_count += other.doc_count;
}
/// compute final result
pub fn finalize(&self) -> f64 {
self.sum / self.doc_count as f64
pub fn finalize(&self) -> Option<f64> {
if self.doc_count == 0 {
None
} else {
Some(self.sum / self.doc_count as f64)
}
}
#[inline]
fn collect(&mut self, val: f64) {

View File

@@ -1,5 +1,7 @@
//! Module for all metric aggregations.
//!
//! The aggregations in this family compute metrics, see [super::agg_req::MetricAggregation] for
//! details.
mod average;
mod stats;
pub use average::*;
@@ -12,11 +14,17 @@ pub use stats::*;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SingleMetricResult {
/// The value of the single value metric.
pub value: f64,
pub value: Option<f64>,
}
impl From<f64> for SingleMetricResult {
fn from(value: f64) -> Self {
Self { value: Some(value) }
}
}
impl From<Option<f64>> for SingleMetricResult {
fn from(value: Option<f64>) -> Self {
Self { value }
}
}

View File

@@ -9,11 +9,22 @@ use crate::DocId;
/// extracted from the aggregated documents.
/// Supported field types are u64, i64, and f64.
/// See [Stats] for returned statistics.
///
/// # JSON Format
/// ```json
/// {
/// "stats": {
/// "field": "score",
/// }
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct StatsAggregation {
/// The field name to compute the stats on.
pub field: String,
}
impl StatsAggregation {
/// Create new StatsAggregation from a field.
pub fn from_field_name(field_name: String) -> Self {
@@ -32,14 +43,14 @@ pub struct Stats {
pub count: usize,
/// The sum of the fast field values.
pub sum: f64,
/// The standard deviation of the fast field values.
pub standard_deviation: f64,
/// The standard deviation of the fast field values. None for count == 0.
pub standard_deviation: Option<f64>,
/// The min value of the fast field values.
pub min: f64,
pub min: Option<f64>,
/// The max value of the fast field values.
pub max: f64,
/// The average of the values.
pub avg: f64,
pub max: Option<f64>,
/// The average of the values. None for count == 0.
pub avg: Option<f64>,
}
/// IntermediateStats contains the mergeable version for stats.
@@ -63,17 +74,21 @@ impl IntermediateStats {
}
}
pub(crate) fn avg(&self) -> f64 {
self.sum / (self.count as f64)
pub(crate) fn avg(&self) -> Option<f64> {
if self.count == 0 {
None
} else {
Some(self.sum / (self.count as f64))
}
}
fn square_mean(&self) -> f64 {
self.squared_sum / (self.count as f64)
}
pub(crate) fn standard_deviation(&self) -> f64 {
let average = self.avg();
(self.square_mean() - average * average).sqrt()
pub(crate) fn standard_deviation(&self) -> Option<f64> {
self.avg()
.map(|average| (self.square_mean() - average * average).sqrt())
}
/// Merge data from other stats into this instance.
@@ -85,14 +100,24 @@ impl IntermediateStats {
self.max = self.max.max(other.max);
}
/// compute final result
/// compute final resultimprove_docs
pub fn finalize(&self) -> Stats {
let min = if self.count == 0 {
None
} else {
Some(self.min)
};
let max = if self.count == 0 {
None
} else {
Some(self.max)
};
Stats {
count: self.count,
sum: self.sum,
standard_deviation: self.standard_deviation(),
min: self.min,
max: self.max,
min,
max,
avg: self.avg(),
}
}
@@ -199,7 +224,11 @@ mod tests {
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "score".to_string(),
ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()],
ranges: vec![
(3f64..7f64).into(),
(7f64..19f64).into(),
(19f64..20f64).into(),
],
}),
sub_aggregation: iter::once((
"stats".to_string(),
@@ -268,6 +297,18 @@ mod tests {
})
);
assert_eq!(
res["range"]["buckets"][3]["stats"],
json!({
"avg": serde_json::Value::Null,
"count": 0,
"max": serde_json::Value::Null,
"min": serde_json::Value::Null,
"standard_deviation": serde_json::Value::Null,
"sum": 0.0,
})
);
Ok(())
}
}

View File

@@ -1,7 +1,7 @@
//! # Aggregations
//!
//!
//! Aggregation summarizes your data as statistics on buckets or metrics.
//! An aggregation summarizes your data as statistics on buckets or metrics.
//!
//! Aggregations can provide answer to questions like:
//! - What is the average price of all sold articles?
@@ -13,10 +13,25 @@
//! # Usage
//!
//!
//! To use aggregations, build an aggregation request by constructing [agg_req::Aggregations].
//! To use aggregations, build an aggregation request by constructing
//! [Aggregations](agg_req::Aggregations).
//! Create an [AggregationCollector] from this request. AggregationCollector implements the
//! `Collector` trait and can be passed as collector into `searcher.search()`.
//!
//! # JSON Format
//! Aggregations request and result structures de/serialize into elasticsearch compatible JSON.
//!
//! ```verbatim
//! let agg_req: Aggregations = serde_json::from_str(json_request_string).unwrap();
//! let collector = AggregationCollector::from_aggs(agg_req);
//! let searcher = reader.searcher();
//! let agg_res = searcher.search(&term_query, &collector).unwrap_err();
//! let json_response_string: String = &serde_json::to_string(&agg_res)?;
//! ```
//! # Limitations
//!
//! Currently aggregations work only on single value fast fields of type u64, f64 and i64.
//!
//! # Example
//! Compute the average metric, by building [agg_req::Aggregations], which is built from an (String,
//! [agg_req::Aggregation]) iterator.
@@ -137,7 +152,9 @@ mod segment_agg_result;
use std::collections::HashMap;
use std::fmt::Display;
pub use collector::{AggregationCollector, DistributedAggregationCollector};
pub use collector::{
AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
@@ -209,41 +226,40 @@ impl<T: Clone> VecWithNames<T> {
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Ord, PartialOrd)]
/// The serialized key is used in a HashMap.
pub type SerializedKey = String;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, PartialOrd)]
/// The key to identify a bucket.
#[serde(untagged)]
pub enum Key {
/// String key
Str(String),
/// u64 key
U64(u64),
/// i64 key
I64(i64),
/// f64 key
F64(f64),
}
impl Display for Key {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Key::Str(val) => f.write_str(val),
Key::U64(val) => f.write_str(&val.to_string()),
Key::I64(val) => f.write_str(&val.to_string()),
Key::F64(val) => f.write_str(&val.to_string()),
}
}
}
impl Serialize for Key {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
serializer.serialize_str(&self.to_string())
}
}
/// Invert of to_fastfield_u64
/// Invert of to_fastfield_u64. Used to convert to f64 for metrics.
///
/// # Panics
/// Only u64, f64, i64 is supported
pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 {
match field_type {
Type::U64 => val as f64,
Type::I64 => i64::from_u64(val) as f64,
Type::F64 => f64::from_u64(val),
Type::Date | Type::Str | Type::Facet | Type::Bytes => unimplemented!(),
_ => {
panic!("unexpected type {:?}. This should not happen", field_type)
}
}
}
@@ -257,12 +273,12 @@ pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 {
/// A f64 value of e.g. 2.0 needs to be converted using the same monotonic
/// conversion function, so that the value matches the u64 value stored in the fast
/// field.
pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> u64 {
pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {
match field_type {
Type::U64 => val as u64,
Type::I64 => (val as i64).to_u64(),
Type::F64 => val.to_u64(),
Type::Date | Type::Str | Type::Facet | Type::Bytes => unimplemented!(),
Type::U64 => Some(val as u64),
Type::I64 => Some((val as i64).to_u64()),
Type::F64 => Some(val.to_u64()),
_ => None,
}
}
@@ -278,9 +294,10 @@ mod tests {
use super::metric::AverageAggregation;
use crate::aggregation::agg_req::{BucketAggregationType, MetricAggregation};
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use crate::aggregation::segment_agg_result::DOC_BLOCK_SIZE;
use crate::aggregation::DistributedAggregationCollector;
use crate::query::TermQuery;
use crate::query::{AllQuery, TermQuery};
use crate::schema::{Cardinality, IndexRecordOption, Schema, TextFieldIndexing};
use crate::{Index, Term};
@@ -304,13 +321,13 @@ mod tests {
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype =
crate::schema::IntOptions::default().set_fast(Cardinality::SingleValue);
crate::schema::NumericOptions::default().set_fast(Cardinality::SingleValue);
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
let fraction_field = schema_builder.add_f64_field(
"fraction_f64",
crate::schema::IntOptions::default().set_fast(Cardinality::SingleValue),
crate::schema::NumericOptions::default().set_fast(Cardinality::SingleValue),
);
let index = Index::create_in_ram(schema_builder.build());
{
@@ -451,9 +468,14 @@ mod tests {
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype =
crate::schema::IntOptions::default().set_fast(Cardinality::SingleValue);
crate::schema::NumericOptions::default().set_fast(Cardinality::SingleValue);
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
let score_field_f64 = schema_builder.add_f64_field("score_f64", score_fieldtype.clone());
let multivalue =
crate::schema::NumericOptions::default().set_fast(Cardinality::MultiValues);
let scores_field_i64 = schema_builder.add_i64_field("scores_i64", multivalue);
let score_field_i64 = schema_builder.add_i64_field("score_i64", score_fieldtype);
let index = Index::create_in_ram(schema_builder.build());
{
@@ -464,12 +486,16 @@ mod tests {
score_field => 1u64,
score_field_f64 => 1f64,
score_field_i64 => 1i64,
scores_field_i64 => 1i64,
scores_field_i64 => 2i64,
))?;
index_writer.add_document(doc!(
text_field => "cool",
score_field => 3u64,
score_field_f64 => 3f64,
score_field_i64 => 3i64,
scores_field_i64 => 5i64,
scores_field_i64 => 5i64,
))?;
index_writer.add_document(doc!(
text_field => "cool",
@@ -640,6 +666,11 @@ mod tests {
IndexRecordOption::Basic,
);
let query_with_no_hits = TermQuery::new(
Term::from_field_text(text_field, "thistermdoesnotexist"),
IndexRecordOption::Basic,
);
let sub_agg_req: Aggregations =
vec![("average_in_range".to_string(), get_avg_req("score"))]
.into_iter()
@@ -653,7 +684,8 @@ mod tests {
"ranges": [
{ "to": 3.0 },
{ "from": 3.0, "to": 7.0 },
{ "from": 7.0, "to": 20.0 },
{ "from": 7.0, "to": 19.0 },
{ "from": 19.0, "to": 20.0 },
{ "from": 20.0 }
]
},
@@ -667,7 +699,8 @@ mod tests {
"ranges": [
{ "to": 3.0 },
{ "from": 3.0, "to": 7.0 },
{ "from": 7.0, "to": 20.0 },
{ "from": 7.0, "to": 19.0 },
{ "from": 19.0, "to": 20.0 },
{ "from": 20.0 }
]
},
@@ -684,7 +717,8 @@ mod tests {
"ranges": [
{ "to": 3.0 },
{ "from": 3.0, "to": 7.0 },
{ "from": 7.0, "to": 20.0 },
{ "from": 7.0, "to": 19.0 },
{ "from": 19.0, "to": 20.0 },
{ "from": 20.0 }
]
},
@@ -705,7 +739,11 @@ mod tests {
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "score".to_string(),
ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()],
ranges: vec![
(3f64..7f64).into(),
(7f64..19f64).into(),
(19f64..20f64).into(),
],
}),
sub_aggregation: sub_agg_req.clone(),
}),
@@ -715,7 +753,11 @@ mod tests {
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "score_f64".to_string(),
ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()],
ranges: vec![
(3f64..7f64).into(),
(7f64..19f64).into(),
(19f64..20f64).into(),
],
}),
sub_aggregation: sub_agg_req.clone(),
}),
@@ -725,7 +767,11 @@ mod tests {
Aggregation::Bucket(BucketAggregation {
bucket_agg: BucketAggregationType::Range(RangeAggregation {
field: "score_i64".to_string(),
ranges: vec![(3f64..7f64).into(), (7f64..20f64).into()],
ranges: vec![
(3f64..7f64).into(),
(7f64..19f64).into(),
(19f64..20f64).into(),
],
}),
sub_aggregation: sub_agg_req,
}),
@@ -737,12 +783,16 @@ mod tests {
};
let agg_res: AggregationResults = if use_distributed_collector {
let collector = DistributedAggregationCollector::from_aggs(agg_req);
let collector = DistributedAggregationCollector::from_aggs(agg_req.clone());
let searcher = reader.searcher();
searcher.search(&term_query, &collector).unwrap().into()
let res = searcher.search(&term_query, &collector).unwrap();
// Test de/serialization roundtrip on intermediate_agg_result
let res: IntermediateAggregationResults =
serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap();
res.into()
} else {
let collector = AggregationCollector::from_aggs(agg_req);
let collector = AggregationCollector::from_aggs(agg_req.clone());
let searcher = reader.searcher();
searcher.search(&term_query, &collector).unwrap()
@@ -756,39 +806,54 @@ mod tests {
assert_eq!(res["rangei64"]["buckets"][1]["doc_count"], 2u64);
assert_eq!(res["average"]["value"], 12.142857142857142f64);
assert_eq!(res["range"]["buckets"][2]["key"], "7-20");
assert_eq!(res["range"]["buckets"][2]["key"], "7-19");
assert_eq!(res["range"]["buckets"][2]["doc_count"], 3u64);
assert_eq!(res["rangef64"]["buckets"][2]["doc_count"], 3u64);
assert_eq!(res["rangei64"]["buckets"][2]["doc_count"], 3u64);
assert_eq!(res["rangei64"]["buckets"][4], serde_json::Value::Null);
assert_eq!(res["rangei64"]["buckets"][5], serde_json::Value::Null);
assert_eq!(res["range"]["buckets"][3]["key"], "20-*");
assert_eq!(res["range"]["buckets"][3]["doc_count"], 1u64);
assert_eq!(res["rangef64"]["buckets"][3]["doc_count"], 1u64);
assert_eq!(res["rangei64"]["buckets"][3]["doc_count"], 1u64);
assert_eq!(res["range"]["buckets"][4]["key"], "20-*");
assert_eq!(res["range"]["buckets"][4]["doc_count"], 1u64);
assert_eq!(res["rangef64"]["buckets"][4]["doc_count"], 1u64);
assert_eq!(res["rangei64"]["buckets"][4]["doc_count"], 1u64);
assert_eq!(res["range"]["buckets"][3]["key"], "19-20");
assert_eq!(res["range"]["buckets"][3]["doc_count"], 0u64);
assert_eq!(res["rangef64"]["buckets"][3]["doc_count"], 0u64);
assert_eq!(res["rangei64"]["buckets"][3]["doc_count"], 0u64);
assert_eq!(
res["range"]["buckets"][3]["average_in_range"]["value"],
serde_json::Value::Null
);
assert_eq!(
res["range"]["buckets"][4]["average_in_range"]["value"],
44.0f64
);
assert_eq!(
res["rangef64"]["buckets"][3]["average_in_range"]["value"],
res["rangef64"]["buckets"][4]["average_in_range"]["value"],
44.0f64
);
assert_eq!(
res["rangei64"]["buckets"][3]["average_in_range"]["value"],
res["rangei64"]["buckets"][4]["average_in_range"]["value"],
44.0f64
);
assert_eq!(
res["range"]["7-20"]["average_in_range"]["value"],
res["rangef64"]["7-20"]["average_in_range"]["value"]
res["range"]["7-19"]["average_in_range"]["value"],
res["rangef64"]["7-19"]["average_in_range"]["value"]
);
assert_eq!(
res["range"]["7-20"]["average_in_range"]["value"],
res["rangei64"]["7-20"]["average_in_range"]["value"]
res["range"]["7-19"]["average_in_range"]["value"],
res["rangei64"]["7-19"]["average_in_range"]["value"]
);
// Test empty result set
let collector = AggregationCollector::from_aggs(agg_req);
let searcher = reader.searcher();
searcher.search(&query_with_no_hits, &collector).unwrap();
Ok(())
}
@@ -839,31 +904,42 @@ mod tests {
let index = get_test_index_2_segments(false)?;
let reader = index.reader()?;
let text_field = reader.searcher().schema().get_field("text").unwrap();
let term_query = TermQuery::new(
Term::from_field_text(text_field, "cool"),
IndexRecordOption::Basic,
);
let avg_on_field = |field_name: &str| {
let agg_req_1: Aggregations = vec![(
"average".to_string(),
Aggregation::Metric(MetricAggregation::Average(
AverageAggregation::from_field_name(field_name.to_string()),
)),
)]
.into_iter()
.collect();
let agg_req_1: Aggregations = vec![(
"average".to_string(),
Aggregation::Metric(MetricAggregation::Average(
AverageAggregation::from_field_name("text".to_string()),
)),
)]
.into_iter()
.collect();
let collector = AggregationCollector::from_aggs(agg_req_1);
let collector = AggregationCollector::from_aggs(agg_req_1);
let searcher = reader.searcher();
let searcher = reader.searcher();
let agg_res = searcher.search(&term_query, &collector).unwrap_err();
searcher.search(&AllQuery, &collector).unwrap_err()
};
let agg_res = avg_on_field("text");
assert_eq!(
format!("{:?}", agg_res),
r#"InvalidArgument("Invalid field type in aggregation Str, only f64, u64, i64 is supported")"#
r#"InvalidArgument("Only single value fast fields of type f64, u64, i64 are supported, but got Str ")"#
);
let agg_res = avg_on_field("not_exist_field");
assert_eq!(
format!("{:?}", agg_res),
r#"FieldNotFound("not_exist_field")"#
);
let agg_res = avg_on_field("scores_i64");
assert_eq!(
format!("{:?}", agg_res),
r#"InvalidArgument("Invalid field type in aggregation I64, only Cardinality::SingleValue supported")"#
);
Ok(())
}
@@ -888,7 +964,7 @@ mod tests {
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype =
crate::schema::IntOptions::default().set_fast(Cardinality::SingleValue);
crate::schema::NumericOptions::default().set_fast(Cardinality::SingleValue);
let score_field = schema_builder.add_u64_field("score", score_fieldtype.clone());
let score_field_f64 =
schema_builder.add_f64_field("score_f64", score_fieldtype.clone());

View File

@@ -5,8 +5,6 @@
use std::fmt::Debug;
use itertools::Itertools;
use super::agg_req::MetricAggregation;
use super::agg_req_with_accessor::{
AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor,
@@ -42,22 +40,27 @@ impl Debug for SegmentAggregationResultsCollector {
}
impl SegmentAggregationResultsCollector {
pub(crate) fn from_req(req: &AggregationsWithAccessor) -> crate::Result<Self> {
pub(crate) fn from_req_and_validate(req: &AggregationsWithAccessor) -> crate::Result<Self> {
let buckets = req
.buckets
.entries()
.map(|(key, req)| {
Ok((
key.to_string(),
SegmentBucketResultCollector::from_req(req)?,
SegmentBucketResultCollector::from_req_and_validate(req)?,
))
})
.collect::<crate::Result<_>>()?;
let metrics = req
.metrics
.entries()
.map(|(key, req)| (key.to_string(), SegmentMetricResultCollector::from_req(req)))
.collect_vec();
.map(|(key, req)| {
Ok((
key.to_string(),
SegmentMetricResultCollector::from_req_and_validate(req)?,
))
})
.collect::<crate::Result<_>>()?;
Ok(SegmentAggregationResultsCollector {
metrics: VecWithNames::from_entries(metrics),
buckets: VecWithNames::from_entries(buckets),
@@ -115,15 +118,17 @@ pub(crate) enum SegmentMetricResultCollector {
}
impl SegmentMetricResultCollector {
pub fn from_req(req: &MetricAggregationWithAccessor) -> Self {
pub fn from_req_and_validate(req: &MetricAggregationWithAccessor) -> crate::Result<Self> {
match &req.metric {
MetricAggregation::Average(AverageAggregation { field: _ }) => {
SegmentMetricResultCollector::Average(SegmentAverageCollector::from_req(
req.field_type,
Ok(SegmentMetricResultCollector::Average(
SegmentAverageCollector::from_req(req.field_type),
))
}
MetricAggregation::Stats(StatsAggregation { field: _ }) => {
SegmentMetricResultCollector::Stats(SegmentStatsCollector::from_req(req.field_type))
Ok(SegmentMetricResultCollector::Stats(
SegmentStatsCollector::from_req(req.field_type),
))
}
}
}
@@ -149,11 +154,15 @@ pub(crate) enum SegmentBucketResultCollector {
}
impl SegmentBucketResultCollector {
pub fn from_req(req: &BucketAggregationWithAccessor) -> crate::Result<Self> {
pub fn from_req_and_validate(req: &BucketAggregationWithAccessor) -> crate::Result<Self> {
match &req.bucket_agg {
BucketAggregationType::Range(range_req) => Ok(Self::Range(
SegmentRangeCollector::from_req(range_req, &req.sub_aggregation, req.field_type)?,
)),
BucketAggregationType::Range(range_req) => {
Ok(Self::Range(SegmentRangeCollector::from_req_and_validate(
range_req,
&req.sub_aggregation,
req.field_type,
)?))
}
}
}

View File

@@ -173,8 +173,7 @@ impl<T: PartialOrd + Clone> TopSegmentCollector<T> {
.collect()
}
/// Return true iff at least K documents have gone through
/// the collector.
/// Return true if more documents have been collected than the limit.
#[inline]
pub(crate) fn at_capacity(&self) -> bool {
self.heap.len() >= self.limit

View File

@@ -64,7 +64,7 @@ fn load_metas(
/// let body_field = schema_builder.add_text_field("body", TEXT);
/// let number_field = schema_builder.add_u64_field(
/// "number",
/// IntOptions::default().set_fast(Cardinality::SingleValue),
/// NumericOptions::default().set_fast(Cardinality::SingleValue),
/// );
///
/// let schema = schema_builder.build();

View File

@@ -88,7 +88,8 @@ impl InvertedIndexReader {
let postings_slice = self
.postings_file_slice
.slice(term_info.postings_range.clone());
block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?);
let postings_bytes = postings_slice.read_bytes()?;
block_postings.reset(term_info.doc_freq, postings_bytes)?;
Ok(())
}
@@ -197,3 +198,36 @@ impl InvertedIndexReader {
.unwrap_or(0u32))
}
}
#[cfg(feature = "quickwit")]
impl InvertedIndexReader {
pub(crate) async fn get_term_info_async(
&self,
term: &Term,
) -> crate::AsyncIoResult<Option<TermInfo>> {
self.termdict.get_async(term.value_bytes()).await
}
/// Returns a block postings given a `Term`.
/// This method is for an advanced usage only.
///
/// Most user should prefer using `read_postings` instead.
pub async fn warm_postings(
&self,
term: &Term,
with_positions: bool,
) -> crate::AsyncIoResult<()> {
let term_info_opt = self.get_term_info_async(term).await?;
if let Some(term_info) = term_info_opt {
self.postings_file_slice
.read_bytes_slice_async(term_info.postings_range.clone())
.await?;
if with_positions {
self.positions_file_slice
.read_bytes_slice_async(term_info.positions_range.clone())
.await?;
}
}
Ok(())
}
}

View File

@@ -110,6 +110,13 @@ impl Searcher {
store_reader.get(doc_address.doc_id)
}
/// Fetches a document in an asynchronous manner.
#[cfg(feature = "quickwit")]
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
let store_reader = &self.store_readers[doc_address.segment_ord as usize];
store_reader.get_async(doc_address.doc_id).await
}
/// Access the schema associated to the index of this searcher.
pub fn schema(&self) -> &Schema {
&self.schema

View File

@@ -70,7 +70,7 @@ impl SegmentReader {
self.max_doc - self.num_docs
}
/// Returns true iff some of the documents of the segment have been deleted.
/// Returns true if some of the documents of the segment have been deleted.
pub fn has_deletes(&self) -> bool {
self.num_deleted_docs() > 0
}
@@ -121,9 +121,8 @@ impl SegmentReader {
self.fieldnorm_readers.get_field(field)?.ok_or_else(|| {
let field_name = self.schema.get_field_name(field);
let err_msg = format!(
"Field norm not found for field {:?}. Was the field set to record norm during \
indexing?",
field_name
"Field norm not found for field {field_name:?}. Was the field set to record norm \
during indexing?"
);
crate::TantivyError::SchemaError(err_msg)
})
@@ -302,7 +301,7 @@ impl SegmentReader {
self.alive_bitset_opt.as_ref()
}
/// Returns true iff the `doc` is marked
/// Returns true if the `doc` is marked
/// as deleted.
pub fn is_deleted(&self, doc: DocId) -> bool {
self.alive_bitset()

View File

@@ -96,9 +96,9 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
///
/// There are currently two implementations of `Directory`
///
/// - The [`MMapDirectory`](struct.MmapDirectory.html), this
/// - The [`MMapDirectory`][crate::directory::MmapDirectory], this
/// should be your default choice.
/// - The [`RamDirectory`](struct.RamDirectory.html), which
/// - The [`RamDirectory`][crate::directory::RamDirectory], which
/// should be used mostly for tests.
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// Opens a file and returns a boxed `FileHandle`.
@@ -128,7 +128,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `DeleteError::DoesNotExist`.
fn delete(&self, path: &Path) -> Result<(), DeleteError>;
/// Returns true iff the file exists
/// Returns true if and only if the file exists
fn exists(&self, path: &Path) -> Result<bool, OpenReadError>;
/// Opens a writer for the *virtual file* associated with

View File

@@ -2,6 +2,7 @@ use std::ops::{Deref, Range};
use std::sync::{Arc, Weak};
use std::{fmt, io};
use async_trait::async_trait;
use common::HasLen;
use stable_deref_trait::StableDeref;
@@ -18,18 +19,35 @@ pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
/// The underlying behavior is therefore specific to the `Directory` that created it.
/// Despite its name, a `FileSlice` may or may not directly map to an actual file
/// on the filesystem.
#[async_trait]
pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug {
/// Reads a slice of bytes.
///
/// This method may panic if the range requested is invalid.
fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes>;
#[cfg(feature = "quickwit")]
#[doc(hidden)]
async fn read_bytes_async(
&self,
_byte_range: Range<usize>,
) -> crate::AsyncIoResult<OwnedBytes> {
Err(crate::error::AsyncIoError::AsyncUnsupported)
}
}
#[async_trait]
impl FileHandle for &'static [u8] {
fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
let bytes = &self[range];
Ok(OwnedBytes::new(bytes))
}
#[cfg(feature = "quickwit")]
async fn read_bytes_async(&self, byte_range: Range<usize>) -> crate::AsyncIoResult<OwnedBytes> {
Ok(self.read_bytes(byte_range)?)
}
}
impl<B> From<B> for FileSlice
@@ -102,6 +120,12 @@ impl FileSlice {
self.data.read_bytes(self.range.clone())
}
#[cfg(feature = "quickwit")]
#[doc(hidden)]
pub async fn read_bytes_async(&self) -> crate::AsyncIoResult<OwnedBytes> {
self.data.read_bytes_async(self.range.clone()).await
}
/// Reads a specific slice of data.
///
/// This is equivalent to running `file_slice.slice(from, to).read_bytes()`.
@@ -116,6 +140,23 @@ impl FileSlice {
.read_bytes(self.range.start + range.start..self.range.start + range.end)
}
#[cfg(feature = "quickwit")]
#[doc(hidden)]
pub async fn read_bytes_slice_async(
&self,
byte_range: Range<usize>,
) -> crate::AsyncIoResult<OwnedBytes> {
assert!(
self.range.start + byte_range.end <= self.range.end,
"`to` exceeds the fileslice length"
);
self.data
.read_bytes_async(
self.range.start + byte_range.start..self.range.start + byte_range.end,
)
.await
}
/// Splits the FileSlice at the given offset and return two file slices.
/// `file_slice[..split_offset]` and `file_slice[split_offset..]`.
///
@@ -160,10 +201,16 @@ impl FileSlice {
}
}
#[async_trait]
impl FileHandle for FileSlice {
fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
self.read_bytes_slice(range)
}
#[cfg(feature = "quickwit")]
async fn read_bytes_async(&self, byte_range: Range<usize>) -> crate::AsyncIoResult<OwnedBytes> {
self.read_bytes_slice_async(byte_range).await
}
}
impl HasLen for FileSlice {
@@ -172,6 +219,19 @@ impl HasLen for FileSlice {
}
}
#[async_trait]
impl FileHandle for OwnedBytes {
fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
Ok(self.slice(range))
}
#[cfg(feature = "quickwit")]
async fn read_bytes_async(&self, range: Range<usize>) -> crate::AsyncIoResult<OwnedBytes> {
let bytes = self.read_bytes(range)?;
Ok(bytes)
}
}
#[cfg(test)]
mod tests {
use std::io;

View File

@@ -16,7 +16,7 @@ use crate::directory::{
use crate::error::DataCorruption;
use crate::Directory;
/// Returns true iff the file is "managed".
/// Returns true if the file is "managed".
/// Non-managed file are not subject to garbage collection.
///
/// Filenames that starts by a "." -typically locks-

View File

@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::convert::From;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
use std::io::{self, BufWriter, Read, Seek, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
@@ -265,7 +264,7 @@ impl Write for SafeFileWriter {
}
impl Seek for SafeFileWriter {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
self.0.seek(pos)
}
}

View File

@@ -9,7 +9,6 @@ mod file_slice;
mod file_watcher;
mod footer;
mod managed_directory;
mod owned_bytes;
mod ram_directory;
mod watch_event_router;
@@ -22,13 +21,13 @@ use std::io::BufWriter;
use std::path::PathBuf;
pub use common::{AntiCallToken, TerminatingWrite};
pub use ownedbytes::OwnedBytes;
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
pub use self::file_slice::{FileHandle, FileSlice};
pub use self::owned_bytes::OwnedBytes;
pub use self::ram_directory::RamDirectory;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};

View File

@@ -1,12 +0,0 @@
use std::io;
use std::ops::Range;
pub use ownedbytes::OwnedBytes;
use crate::directory::FileHandle;
impl FileHandle for OwnedBytes {
fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> {
Ok(self.slice(range))
}
}

View File

@@ -1,9 +1,11 @@
//! Definition of Tantivy's error and result.
//! Definition of Tantivy's errors and results.
use std::path::PathBuf;
use std::sync::PoisonError;
use std::{fmt, io};
use thiserror::Error;
use crate::directory::error::{
Incompatibility, LockError, OpenDirectoryError, OpenReadError, OpenWriteError,
};
@@ -12,7 +14,7 @@ use crate::{query, schema};
/// Represents a `DataCorruption` error.
///
/// When facing data corruption, tantivy actually panic or return this error.
/// When facing data corruption, tantivy actually panics or returns this error.
pub struct DataCorruption {
filepath: Option<PathBuf>,
comment: String,
@@ -38,9 +40,9 @@ impl DataCorruption {
impl fmt::Debug for DataCorruption {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Data corruption: ")?;
write!(f, "Data corruption")?;
if let Some(ref filepath) = &self.filepath {
write!(f, "(in file `{:?}`)", filepath)?;
write!(f, " (in file `{:?}`)", filepath)?;
}
write!(f, ": {}.", self.comment)?;
Ok(())
@@ -59,10 +61,10 @@ pub enum TantivyError {
/// Failed to open a file for write.
#[error("Failed to open file for write: '{0:?}'")]
OpenWriteError(#[from] OpenWriteError),
/// Index already exists in this directory
/// Index already exists in this directory.
#[error("Index already exists")]
IndexAlreadyExists,
/// Failed to acquire file lock
/// Failed to acquire file lock.
#[error("Failed to acquire Lockfile: {0:?}. {1:?}")]
LockFailure(LockError, Option<String>),
/// IO Error.
@@ -80,23 +82,45 @@ pub enum TantivyError {
/// Invalid argument was passed by the user.
#[error("An invalid argument was passed: '{0}'")]
InvalidArgument(String),
/// An Error happened in one of the thread.
/// An Error occurred in one of the threads.
#[error("An error occurred in a thread: '{0}'")]
ErrorInThread(String),
/// An Error appeared related to opening or creating a index.
/// An Error occurred related to opening or creating a index.
#[error("Missing required index builder argument when open/create index: '{0}'")]
IndexBuilderMissingArgument(&'static str),
/// An Error appeared related to the schema.
/// An Error occurred related to the schema.
#[error("Schema error: '{0}'")]
SchemaError(String),
/// System error. (e.g.: We failed spawning a new thread)
/// System error. (e.g.: We failed spawning a new thread).
#[error("System error.'{0}'")]
SystemError(String),
/// Index incompatible with current version of tantivy
/// Index incompatible with current version of Tantivy.
#[error("{0:?}")]
IncompatibleIndex(Incompatibility),
}
#[cfg(feature = "quickwit")]
#[derive(Error, Debug)]
#[doc(hidden)]
pub enum AsyncIoError {
#[error("io::Error `{0}`")]
Io(#[from] io::Error),
#[error("Asynchronous API is unsupported by this directory")]
AsyncUnsupported,
}
#[cfg(feature = "quickwit")]
impl From<AsyncIoError> for TantivyError {
fn from(async_io_err: AsyncIoError) -> Self {
match async_io_err {
AsyncIoError::Io(io_err) => TantivyError::from(io_err),
AsyncIoError::AsyncUnsupported => {
TantivyError::SystemError(format!("{:?}", async_io_err))
}
}
}
}
impl From<DataCorruption> for TantivyError {
fn from(data_corruption: DataCorruption) -> TantivyError {
TantivyError::DataCorruption(data_corruption)

View File

@@ -7,7 +7,7 @@ use ownedbytes::OwnedBytes;
use crate::space_usage::ByteCount;
use crate::DocId;
/// Write a alive `BitSet`
/// Write an alive `BitSet`
///
/// where `alive_bitset` is the set of alive `DocId`.
/// Warning: this function does not call terminate. The caller is in charge of
@@ -55,19 +55,19 @@ impl AliveBitSet {
AliveBitSet::from(readonly_bitset)
}
/// Opens a delete bitset given its file.
/// Opens an alive bitset given its file.
pub fn open(bytes: OwnedBytes) -> AliveBitSet {
let bitset = ReadOnlyBitSet::open(bytes);
AliveBitSet::from(bitset)
}
/// Returns true iff the document is still "alive". In other words, if it has not been deleted.
/// Returns true if the document is still "alive". In other words, if it has not been deleted.
#[inline]
pub fn is_alive(&self, doc: DocId) -> bool {
self.bitset.contains(doc)
}
/// Returns true iff the document has been marked as deleted.
/// Returns true if the document has been marked as deleted.
#[inline]
pub fn is_deleted(&self, doc: DocId) -> bool {
!self.is_alive(doc)
@@ -79,13 +79,13 @@ impl AliveBitSet {
self.bitset.iter()
}
/// Get underlying bitset
/// Get underlying bitset.
#[inline]
pub fn bitset(&self) -> &ReadOnlyBitSet {
&self.bitset
}
/// The number of deleted docs
/// The number of alive documents.
pub fn num_alive_docs(&self) -> usize {
self.num_alive_docs
}

View File

@@ -7,7 +7,7 @@ use crate::DocId;
/// Writer for byte array (as in, any number of bytes per document) fast fields
///
/// This `BytesFastFieldWriter` is only useful for advanced user.
/// This `BytesFastFieldWriter` is only useful for advanced users.
/// The normal way to get your associated bytes in your index
/// is to
/// - declare your field with fast set to `Cardinality::SingleValue`

View File

@@ -2,7 +2,7 @@
//!
//! It is the equivalent of `Lucene`'s `DocValues`.
//!
//! Fast fields is a column-oriented fashion storage of `tantivy`.
//! A fast field is a column-oriented fashion storage for `tantivy`.
//!
//! It is designed for the fast random access of some document
//! fields given a document id.
@@ -12,11 +12,10 @@
//!
//!
//! Fields have to be declared as `FAST` in the schema.
//! Currently only 64-bits integers (signed or unsigned) are
//! supported.
//! Currently supported fields are: u64, i64, f64 and bytes.
//!
//! They are stored in a bit-packed fashion so that their
//! memory usage is directly linear with the amplitude of the
//! u64, i64 and f64 fields are stored in a bit-packed fashion so that
//! their memory usage is directly linear with the amplitude of the
//! values stored.
//!
//! Read access performance is comparable to that of an array lookup.
@@ -28,6 +27,7 @@ pub use self::facet_reader::FacetReader;
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
pub use self::readers::FastFieldReaders;
pub(crate) use self::readers::{type_and_cardinality, FastType};
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldDataAccess, FastFieldStats};
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
use crate::chrono::{NaiveDateTime, Utc};
@@ -212,7 +212,7 @@ mod tests {
use super::*;
use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr};
use crate::merge_policy::NoMergePolicy;
use crate::schema::{Document, Field, IntOptions, Schema, FAST};
use crate::schema::{Document, Field, NumericOptions, Schema, FAST};
use crate::{Index, SegmentId, SegmentReader};
pub static SCHEMA: Lazy<Schema> = Lazy::new(|| {
@@ -520,7 +520,7 @@ mod tests {
let date_field = schema_builder.add_date_field("date", FAST);
let multi_date_field = schema_builder.add_date_field(
"multi_date",
IntOptions::default().set_fast(Cardinality::MultiValues),
NumericOptions::default().set_fast(Cardinality::MultiValues),
);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);

View File

@@ -16,7 +16,7 @@ mod tests {
use crate::collector::TopDocs;
use crate::indexer::NoMergePolicy;
use crate::query::QueryParser;
use crate::schema::{Cardinality, Facet, FacetOptions, IntOptions, Schema};
use crate::schema::{Cardinality, Facet, FacetOptions, NumericOptions, Schema};
use crate::{Document, Index, Term};
#[test]
@@ -24,7 +24,7 @@ mod tests {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_u64_field(
"multifield",
IntOptions::default().set_fast(Cardinality::MultiValues),
NumericOptions::default().set_fast(Cardinality::MultiValues),
);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
@@ -59,14 +59,14 @@ mod tests {
let mut schema_builder = Schema::builder();
let date_field = schema_builder.add_date_field(
"multi_date_field",
IntOptions::default()
NumericOptions::default()
.set_fast(Cardinality::MultiValues)
.set_indexed()
.set_fieldnorm()
.set_stored(),
);
let time_i =
schema_builder.add_i64_field("time_stamp_i", IntOptions::default().set_stored());
schema_builder.add_i64_field("time_stamp_i", NumericOptions::default().set_stored());
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
@@ -94,8 +94,11 @@ mod tests {
assert_eq!(reader.num_docs(), 5);
{
let parser = QueryParser::for_index(&index, vec![date_field]);
let query = parser.parse_query(&format!("\"{}\"", first_time_stamp.to_rfc3339()))?;
let parser = QueryParser::for_index(&index, vec![]);
let query = parser.parse_query(&format!(
"multi_date_field:\"{}\"",
first_time_stamp.to_rfc3339()
))?;
let results = searcher.search(&query, &TopDocs::with_limit(5))?;
assert_eq!(results.len(), 1);
for (_score, doc_address) in results {
@@ -150,7 +153,7 @@ mod tests {
{
let parser = QueryParser::for_index(&index, vec![date_field]);
let range_q = format!(
"[{} TO {}}}",
"multi_date_field:[{} TO {}}}",
(first_time_stamp + Duration::seconds(1)).to_rfc3339(),
(first_time_stamp + Duration::seconds(3)).to_rfc3339()
);
@@ -196,7 +199,7 @@ mod tests {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_i64_field(
"multifield",
IntOptions::default().set_fast(Cardinality::MultiValues),
NumericOptions::default().set_fast(Cardinality::MultiValues),
);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
@@ -226,7 +229,7 @@ mod tests {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_u64_field(
"multifield",
IntOptions::default()
NumericOptions::default()
.set_fast(Cardinality::MultiValues)
.set_indexed(),
);

View File

@@ -90,7 +90,7 @@ impl<Item: FastValue> MultiValueLength for MultiValuedFastFieldReader<Item> {
mod tests {
use crate::core::Index;
use crate::schema::{Cardinality, Facet, FacetOptions, IntOptions, Schema};
use crate::schema::{Cardinality, Facet, FacetOptions, NumericOptions, Schema};
#[test]
fn test_multifastfield_reader() -> crate::Result<()> {
@@ -148,7 +148,7 @@ mod tests {
#[test]
fn test_multifastfield_reader_min_max() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let field_options = IntOptions::default()
let field_options = NumericOptions::default()
.set_indexed()
.set_fast(Cardinality::MultiValues);
let item_field = schema_builder.add_i64_field("items", field_options);

View File

@@ -14,7 +14,7 @@ use crate::DocId;
/// Writer for multi-valued (as in, more than one value per document)
/// int fast field.
///
/// This `Writer` is only useful for advanced user.
/// This `Writer` is only useful for advanced users.
/// The normal way to get your multivalued int in your index
/// is to
/// - declare your field with fast set to `Cardinality::MultiValues`
@@ -23,10 +23,11 @@ use crate::DocId;
///
/// The `MultiValuedFastFieldWriter` can be acquired from the
/// fastfield writer, by calling
/// [`.get_multivalue_writer(...)`](./struct.FastFieldsWriter.html#method.get_multivalue_writer).
/// [`.get_multivalue_writer_mut(...)`](./struct.FastFieldsWriter.html#method.
/// get_multivalue_writer_mut).
///
/// Once acquired, writing is done by calling calls to
/// `.add_document_vals(&[u64])` once per document.
/// Once acquired, writing is done by calling
/// [`.add_document_vals(&[u64])`](MultiValuedFastFieldWriter::add_document_vals) once per document.
///
/// The serializer makes it possible to remap all of the values
/// that were pushed to the writer using a mapping.

View File

@@ -17,14 +17,14 @@ pub struct FastFieldReaders {
fast_fields_composite: CompositeFile,
}
#[derive(Eq, PartialEq, Debug)]
enum FastType {
pub(crate) enum FastType {
I64,
U64,
F64,
Date,
}
fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality)> {
pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality)> {
match field_type {
FieldType::U64(options) => options
.get_fastfield_cardinality()
@@ -55,7 +55,8 @@ impl FastFieldReaders {
self.fast_fields_composite.space_usage()
}
fn fast_field_data(&self, field: Field, idx: usize) -> crate::Result<FileSlice> {
#[doc(hidden)]
pub fn fast_field_data(&self, field: Field, idx: usize) -> crate::Result<FileSlice> {
self.fast_fields_composite
.open_read_with_idx(field, idx)
.ok_or_else(|| {

View File

@@ -197,7 +197,7 @@ impl CompositeFastFieldSerializer {
/// Closes the serializer
///
/// After this call the data must be persistently save on disk.
/// After this call the data must be persistently saved on disk.
pub fn close(self) -> io::Result<()> {
self.composite_write.close()
}

View File

@@ -14,7 +14,7 @@ use crate::postings::UnorderedTermId;
use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema};
use crate::termdict::TermOrdinal;
/// The fastfieldswriter regroup all of the fast field writers.
/// The `FastFieldsWriter` groups all of the fast field writers.
pub struct FastFieldsWriter {
single_value_writers: Vec<IntFastFieldWriter>,
multi_values_writers: Vec<MultiValuedFastFieldWriter>,

View File

@@ -221,7 +221,7 @@ impl DeleteCursor {
}
/// Advance to the next delete operation.
/// Returns true iff there is such an operation.
/// Returns true if and only if there is such an operation.
pub fn advance(&mut self) -> bool {
if self.load_block_if_required() {
self.pos += 1;

View File

@@ -168,12 +168,12 @@ mod tests_indexsorting {
let my_string_field = schema_builder.add_text_field("string_field", STRING | STORED);
let my_number = schema_builder.add_u64_field(
"my_number",
IntOptions::default().set_fast(Cardinality::SingleValue),
NumericOptions::default().set_fast(Cardinality::SingleValue),
);
let multi_numbers = schema_builder.add_u64_field(
"multi_numbers",
IntOptions::default().set_fast(Cardinality::MultiValues),
NumericOptions::default().set_fast(Cardinality::MultiValues),
);
let schema = schema_builder.build();

View File

@@ -794,8 +794,8 @@ mod tests {
use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Cardinality, Facet, FacetOptions, IndexRecordOption, IntOptions, TextFieldIndexing,
TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
};
use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order, ReloadPolicy, Term};
@@ -1404,7 +1404,7 @@ mod tests {
let multi_numbers = schema_builder.add_u64_field(
"multi_numbers",
IntOptions::default()
NumericOptions::default()
.set_fast(Cardinality::MultiValues)
.set_stored(),
);

View File

@@ -0,0 +1,415 @@
use chrono::Utc;
use fnv::FnvHashMap;
use murmurhash32::murmurhash2;
use crate::fastfield::FastValue;
use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter};
use crate::schema::term::{JSON_END_OF_PATH, JSON_PATH_SEGMENT_SEP};
use crate::schema::Type;
use crate::tokenizer::TextAnalyzer;
use crate::{DocId, Term};
/// This object is a map storing the last position for a given path for the current document
/// being indexed.
///
/// It is key to solve the following problem:
/// If we index a JsonObject emitting several terms with the same path
/// we do not want to create false positive in phrase queries.
///
/// For instance:
///
/// ```json
/// {"bands": [
/// {"band_name": "Elliot Smith"},
/// {"band_name": "The Who"},
/// ]}
/// ```
///
/// If we are careless and index each band names independently,
/// `Elliot` and `The` will end up indexed at position 0, and `Smith` and `Who` will be indexed at
/// position 1.
/// As a result, with lemmatization, "The Smiths" will match our object.
///
/// Worse, if a same term is appears in the second object, a non increasing value would be pushed
/// to the position recorder probably provoking a panic.
///
/// This problem is solved for regular multivalued object by offsetting the position
/// of values, with a position gap. Here we would like `The` and `Who` to get indexed at
/// position 2 and 3 respectively.
///
/// With regular fields, we sort the fields beforehands, so that all terms with the same
/// path are indexed consecutively.
///
/// In JSON object, we do not have this confort, so we need to record these position offsets in
/// a map.
///
/// Note that using a single position for the entire object would not hurt correctness.
/// It would however hurt compression.
///
/// We can therefore afford working with a map that is not imperfect. It is fine if several
/// path map to the same index position as long as the probability is relatively low.
#[derive(Default)]
struct IndexingPositionsPerPath {
positions_per_path: FnvHashMap<u32, IndexingPosition>,
}
impl IndexingPositionsPerPath {
fn get_position(&mut self, term: &Term) -> &mut IndexingPosition {
self.positions_per_path
.entry(murmurhash2(term.as_slice()))
.or_insert_with(Default::default)
}
}
pub(crate) fn index_json_values<'a>(
doc: DocId,
json_values: impl Iterator<Item = crate::Result<&'a serde_json::Map<String, serde_json::Value>>>,
text_analyzer: &TextAnalyzer,
term_buffer: &mut Term,
postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext,
) -> crate::Result<()> {
let mut json_term_writer = JsonTermWriter::wrap(term_buffer);
let mut positions_per_path: IndexingPositionsPerPath = Default::default();
for json_value_res in json_values {
let json_value = json_value_res?;
index_json_object(
doc,
json_value,
text_analyzer,
&mut json_term_writer,
postings_writer,
ctx,
&mut positions_per_path,
);
}
Ok(())
}
fn index_json_object<'a>(
doc: DocId,
json_value: &serde_json::Map<String, serde_json::Value>,
text_analyzer: &TextAnalyzer,
json_term_writer: &mut JsonTermWriter<'a>,
postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext,
positions_per_path: &mut IndexingPositionsPerPath,
) {
for (json_path_segment, json_value) in json_value {
json_term_writer.push_path_segment(json_path_segment);
index_json_value(
doc,
json_value,
text_analyzer,
json_term_writer,
postings_writer,
ctx,
positions_per_path,
);
json_term_writer.pop_path_segment();
}
}
fn index_json_value<'a>(
doc: DocId,
json_value: &serde_json::Value,
text_analyzer: &TextAnalyzer,
json_term_writer: &mut JsonTermWriter<'a>,
postings_writer: &mut dyn PostingsWriter,
ctx: &mut IndexingContext,
positions_per_path: &mut IndexingPositionsPerPath,
) {
match json_value {
serde_json::Value::Null => {}
serde_json::Value::Bool(val_bool) => {
let bool_u64 = if *val_bool { 1u64 } else { 0u64 };
json_term_writer.set_fast_value(bool_u64);
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
}
serde_json::Value::Number(number) => {
if let Some(number_u64) = number.as_u64() {
json_term_writer.set_fast_value(number_u64);
} else if let Some(number_i64) = number.as_i64() {
json_term_writer.set_fast_value(number_i64);
} else if let Some(number_f64) = number.as_f64() {
json_term_writer.set_fast_value(number_f64);
}
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
}
serde_json::Value::String(text) => match infer_type_from_str(text) {
TextOrDateTime::Text(text) => {
let mut token_stream = text_analyzer.token_stream(text);
// TODO make sure the chain position works out.
json_term_writer.close_path_and_set_type(Type::Str);
let indexing_position = positions_per_path.get_position(json_term_writer.term());
postings_writer.index_text(
doc,
&mut *token_stream,
json_term_writer.term_buffer,
ctx,
indexing_position,
);
}
TextOrDateTime::DateTime(dt) => {
json_term_writer.set_fast_value(dt);
postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx);
}
},
serde_json::Value::Array(arr) => {
for val in arr {
index_json_value(
doc,
val,
text_analyzer,
json_term_writer,
postings_writer,
ctx,
positions_per_path,
);
}
}
serde_json::Value::Object(map) => {
index_json_object(
doc,
map,
text_analyzer,
json_term_writer,
postings_writer,
ctx,
positions_per_path,
);
}
}
}
enum TextOrDateTime<'a> {
Text(&'a str),
DateTime(crate::DateTime),
}
fn infer_type_from_str(text: &str) -> TextOrDateTime {
match chrono::DateTime::parse_from_rfc3339(text) {
Ok(dt) => {
let dt_utc = dt.with_timezone(&Utc);
TextOrDateTime::DateTime(dt_utc)
}
Err(_) => TextOrDateTime::Text(text),
}
}
pub struct JsonTermWriter<'a> {
term_buffer: &'a mut Term,
path_stack: Vec<usize>,
}
impl<'a> JsonTermWriter<'a> {
pub fn wrap(term_buffer: &'a mut Term) -> Self {
term_buffer.clear_with_type(Type::Json);
let mut path_stack = Vec::with_capacity(10);
path_stack.push(5);
Self {
term_buffer,
path_stack,
}
}
fn trim_to_end_of_path(&mut self) {
let end_of_path = *self.path_stack.last().unwrap();
self.term_buffer.truncate(end_of_path);
}
pub fn close_path_and_set_type(&mut self, typ: Type) {
self.trim_to_end_of_path();
let buffer = self.term_buffer.as_mut();
let buffer_len = buffer.len();
buffer[buffer_len - 1] = JSON_END_OF_PATH;
buffer.push(typ.to_code());
}
pub fn push_path_segment(&mut self, segment: &str) {
// the path stack should never be empty.
self.trim_to_end_of_path();
let buffer = self.term_buffer.as_mut();
let buffer_len = buffer.len();
if self.path_stack.len() > 1 {
buffer[buffer_len - 1] = JSON_PATH_SEGMENT_SEP;
}
buffer.extend(segment.as_bytes());
buffer.push(JSON_PATH_SEGMENT_SEP);
self.path_stack.push(buffer.len());
}
pub fn pop_path_segment(&mut self) {
self.path_stack.pop();
assert!(!self.path_stack.is_empty());
self.trim_to_end_of_path();
}
/// Returns the json path of the term being currently built.
#[cfg(test)]
pub(crate) fn path(&self) -> &[u8] {
let end_of_path = self.path_stack.last().cloned().unwrap_or(6);
&self.term().as_slice()[5..end_of_path - 1]
}
pub fn set_fast_value<T: FastValue>(&mut self, val: T) {
self.close_path_and_set_type(T::to_type());
self.term_buffer
.as_mut()
.extend_from_slice(val.to_u64().to_be_bytes().as_slice());
}
#[cfg(test)]
pub(crate) fn set_str(&mut self, text: &str) {
self.close_path_and_set_type(Type::Str);
self.term_buffer.as_mut().extend_from_slice(text.as_bytes());
}
pub fn term(&self) -> &Term {
self.term_buffer
}
}
#[cfg(test)]
mod tests {
use super::JsonTermWriter;
use crate::schema::{Field, Type};
use crate::Term;
#[test]
fn test_json_writer() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("attributes");
json_writer.push_path_segment("color");
json_writer.set_str("red");
assert_eq!(
format!("{:?}", json_writer.term()),
"Term(type=Json, field=1, path=attributes.color, vtype=Str, \"red\")"
);
json_writer.set_str("blue");
assert_eq!(
format!("{:?}", json_writer.term()),
"Term(type=Json, field=1, path=attributes.color, vtype=Str, \"blue\")"
);
json_writer.pop_path_segment();
json_writer.push_path_segment("dimensions");
json_writer.push_path_segment("width");
json_writer.set_fast_value(400i64);
assert_eq!(
format!("{:?}", json_writer.term()),
"Term(type=Json, field=1, path=attributes.dimensions.width, vtype=I64, 400)"
);
json_writer.pop_path_segment();
json_writer.push_path_segment("height");
json_writer.set_fast_value(300i64);
assert_eq!(
format!("{:?}", json_writer.term()),
"Term(type=Json, field=1, path=attributes.dimensions.height, vtype=I64, 300)"
);
}
#[test]
fn test_string_term() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
json_writer.set_str("red");
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00sred"
)
}
#[test]
fn test_i64_term() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
json_writer.set_fast_value(-4i64);
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00i\x7f\xff\xff\xff\xff\xff\xff\xfc"
)
}
#[test]
fn test_u64_term() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
json_writer.set_fast_value(4u64);
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00u\x00\x00\x00\x00\x00\x00\x00\x04"
)
}
#[test]
fn test_f64_term() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
json_writer.set_fast_value(4.0f64);
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00f\xc0\x10\x00\x00\x00\x00\x00\x00"
)
}
#[test]
fn test_push_after_set_path_segment() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("attribute");
json_writer.set_str("something");
json_writer.push_path_segment("color");
json_writer.set_str("red");
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jattribute\x01color\x00sred"
)
}
#[test]
fn test_pop_segment() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
json_writer.push_path_segment("hue");
json_writer.pop_path_segment();
json_writer.set_str("red");
assert_eq!(
json_writer.term().as_slice(),
b"\x00\x00\x00\x01jcolor\x00sred"
)
}
#[test]
fn test_json_writer_path() {
let field = Field::from_field_id(1);
let mut term = Term::new();
term.set_field(Type::Json, field);
let mut json_writer = JsonTermWriter::wrap(&mut term);
json_writer.push_path_segment("color");
assert_eq!(json_writer.path(), b"color");
json_writer.push_path_segment("hue");
assert_eq!(json_writer.path(), b"color\x01hue");
json_writer.set_str("pink");
assert_eq!(json_writer.path(), b"color\x01hue");
}
}

View File

@@ -278,7 +278,7 @@ impl IndexMerger {
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write_fast_fields");
debug_time!("write-fast-fields");
for (field, field_entry) in self.schema.fields() {
let field_type = field_entry.field_type();
@@ -307,16 +307,16 @@ impl IndexMerger {
}
None => {}
},
FieldType::Str(_) => {
// We don't handle str fast field for the moment
// They can be implemented using what is done
// for facets in the future.
}
FieldType::Bytes(byte_options) => {
if byte_options.is_fast() {
self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?;
}
}
FieldType::Str(_) | FieldType::JsonObject(_) => {
// We don't handle json / string fast field for the moment
// They can be implemented using what is done
// for facets in the future
}
}
}
Ok(())
@@ -597,7 +597,7 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write_hierarchical_facet_field");
debug_time!("write-hierarchical-facet-field");
// Multifastfield consists of 2 fastfields.
// The first serves as an index into the second one and is stricly increasing.
@@ -827,7 +827,7 @@ impl IndexMerger {
fieldnorm_reader: Option<FieldNormReader>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<Option<TermOrdinalMapping>> {
debug_time!("write_postings_for_field");
debug_time!("write-postings-for-field");
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
@@ -1023,7 +1023,8 @@ impl IndexMerger {
store_writer: &mut StoreWriter,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write_storable_fields");
debug_time!("write-storable-fields");
debug!("write-storable-field");
let store_readers: Vec<_> = self
.readers
@@ -1036,6 +1037,7 @@ impl IndexMerger {
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();
if !doc_id_mapping.is_trivial() {
debug!("non-trivial-doc-id-mapping");
for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
@@ -1050,6 +1052,7 @@ impl IndexMerger {
}
}
} else {
debug!("trivial-doc-id-mapping");
for reader in &self.readers {
let store_reader = reader.get_store_reader()?;
if reader.has_deletes()
@@ -1099,10 +1102,11 @@ impl IndexMerger {
} else {
self.get_doc_id_from_concatenated_data()?
};
debug!("write-fieldnorms");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
self.write_fieldnorms(fieldnorms_serializer, &doc_id_mapping)?;
}
debug!("write-postings");
let fieldnorm_data = serializer
.segment()
.open_read(SegmentComponent::FieldNorms)?;
@@ -1112,12 +1116,15 @@ impl IndexMerger {
fieldnorm_readers,
&doc_id_mapping,
)?;
debug!("write-fastfields");
self.write_fast_fields(
serializer.get_fast_field_serializer(),
term_ord_mappings,
&doc_id_mapping,
)?;
debug!("write-storagefields");
self.write_storable_fields(serializer.get_store_writer(), &doc_id_mapping)?;
debug!("close-serializer");
serializer.close()?;
Ok(self.max_doc)
}
@@ -1137,7 +1144,7 @@ mod tests {
use crate::fastfield::FastFieldReader;
use crate::query::{AllQuery, BooleanQuery, Scorer, TermQuery};
use crate::schema::{
Cardinality, Document, Facet, FacetOptions, IndexRecordOption, IntOptions, Term,
Cardinality, Document, Facet, FacetOptions, IndexRecordOption, NumericOptions, Term,
TextFieldIndexing, INDEXED, TEXT,
};
use crate::{
@@ -1157,7 +1164,7 @@ mod tests {
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let date_field = schema_builder.add_date_field("date", INDEXED);
let score_fieldtype = schema::IntOptions::default().set_fast(Cardinality::SingleValue);
let score_fieldtype = schema::NumericOptions::default().set_fast(Cardinality::SingleValue);
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
let bytes_score_field = schema_builder.add_bytes_field("score_bytes", FAST);
let index = Index::create_in_ram(schema_builder.build());
@@ -1306,7 +1313,7 @@ mod tests {
)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let score_fieldtype = schema::IntOptions::default().set_fast(Cardinality::SingleValue);
let score_fieldtype = schema::NumericOptions::default().set_fast(Cardinality::SingleValue);
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
let bytes_score_field = schema_builder.add_bytes_field("score_bytes", FAST);
let index = Index::create_in_ram(schema_builder.build());
@@ -1666,7 +1673,7 @@ mod tests {
fn test_merge_facets(index_settings: Option<IndexSettings>, force_segment_value_overlap: bool) {
let mut schema_builder = schema::Schema::builder();
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let int_options = IntOptions::default()
let int_options = NumericOptions::default()
.set_fast(Cardinality::SingleValue)
.set_indexed();
let int_field = schema_builder.add_u64_field("intval", int_options);
@@ -1830,7 +1837,7 @@ mod tests {
#[test]
fn test_merge_multivalued_int_fields_all_deleted() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let int_options = IntOptions::default()
let int_options = NumericOptions::default()
.set_fast(Cardinality::MultiValues)
.set_indexed();
let int_field = schema_builder.add_u64_field("intvals", int_options);
@@ -1867,7 +1874,7 @@ mod tests {
#[test]
fn test_merge_multivalued_int_fields_simple() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let int_options = IntOptions::default()
let int_options = NumericOptions::default()
.set_fast(Cardinality::MultiValues)
.set_indexed();
let int_field = schema_builder.add_u64_field("intvals", int_options);
@@ -1994,7 +2001,7 @@ mod tests {
fn merges_f64_fast_fields_correctly() -> crate::Result<()> {
let mut builder = schema::SchemaBuilder::new();
let fast_multi = IntOptions::default().set_fast(Cardinality::MultiValues);
let fast_multi = NumericOptions::default().set_fast(Cardinality::MultiValues);
let field = builder.add_f64_field("f64", schema::FAST);
let multi_field = builder.add_f64_field("f64s", fast_multi);

View File

@@ -7,14 +7,14 @@ mod tests {
use crate::fastfield::{AliveBitSet, FastFieldReader, MultiValuedFastFieldReader};
use crate::query::QueryParser;
use crate::schema::{
self, BytesOptions, Cardinality, Facet, FacetOptions, IndexRecordOption, IntOptions,
self, BytesOptions, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions,
};
use crate::{DocAddress, DocSet, IndexSettings, IndexSortByField, Order, Postings, Term};
fn create_test_index_posting_list_issue(index_settings: Option<IndexSettings>) -> Index {
let mut schema_builder = schema::Schema::builder();
let int_options = IntOptions::default()
let int_options = NumericOptions::default()
.set_fast(Cardinality::SingleValue)
.set_indexed();
let int_field = schema_builder.add_u64_field("intval", int_options);
@@ -63,7 +63,7 @@ mod tests {
force_disjunct_segment_sort_values: bool,
) -> crate::Result<Index> {
let mut schema_builder = schema::Schema::builder();
let int_options = IntOptions::default()
let int_options = NumericOptions::default()
.set_fast(Cardinality::SingleValue)
.set_stored()
.set_indexed();
@@ -75,7 +75,7 @@ mod tests {
let multi_numbers = schema_builder.add_u64_field(
"multi_numbers",
IntOptions::default().set_fast(Cardinality::MultiValues),
NumericOptions::default().set_fast(Cardinality::MultiValues),
);
let text_field_options = TextOptions::default()
.set_indexing_options(
@@ -486,11 +486,11 @@ mod bench_sorted_index_merge {
// use cratedoc_id, readerdoc_id_mappinglet vals = reader.fate::schema;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::indexer::merger::IndexMerger;
use crate::schema::{Cardinality, Document, IntOptions, Schema};
use crate::schema::{Cardinality, Document, NumericOptions, Schema};
use crate::{IndexSettings, IndexSortByField, IndexWriter, Order};
fn create_index(sort_by_field: Option<IndexSortByField>) -> Index {
let mut schema_builder = Schema::builder();
let int_options = IntOptions::default()
let int_options = NumericOptions::default()
.set_fast(Cardinality::SingleValue)
.set_indexed();
let int_field = schema_builder.add_u64_field("intval", int_options);

View File

@@ -5,6 +5,7 @@ pub mod doc_id_mapping;
mod doc_opstamp_mapping;
pub mod index_writer;
mod index_writer_status;
mod json_term_writer;
mod log_merge_policy;
mod merge_operation;
pub mod merge_policy;
@@ -24,6 +25,7 @@ use crossbeam::channel;
use smallvec::SmallVec;
pub use self::index_writer::IndexWriter;
pub(crate) use self::json_term_writer::JsonTermWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};

View File

@@ -3,12 +3,13 @@ use super::operation::AddOperation;
use crate::core::Segment;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::indexer::json_term_writer::index_json_values;
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::postings::{
compute_table_size, serialize_postings, IndexingContext, IndexingPosition,
PerFieldPostingsWriter, PostingsWriter,
};
use crate::schema::{Field, FieldEntry, FieldType, FieldValue, Schema, Term, Type, Value};
use crate::schema::{FieldEntry, FieldType, FieldValue, Schema, Term, Value};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{
BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer,
@@ -55,13 +56,13 @@ fn remap_doc_opstamps(
/// The segment is layed on disk when the segment gets `finalized`.
pub struct SegmentWriter {
pub(crate) max_doc: DocId,
pub(crate) indexing_context: IndexingContext,
pub(crate) ctx: IndexingContext,
pub(crate) per_field_postings_writers: PerFieldPostingsWriter,
pub(crate) segment_serializer: SegmentSerializer,
pub(crate) fast_field_writers: FastFieldsWriter,
pub(crate) fieldnorms_writer: FieldNormsWriter,
pub(crate) doc_opstamps: Vec<Opstamp>,
tokenizers: Vec<Option<TextAnalyzer>>,
per_field_text_analyzers: Vec<TextAnalyzer>,
term_buffer: Term,
schema: Schema,
}
@@ -85,29 +86,33 @@ impl SegmentWriter {
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema);
let tokenizers = schema
let per_field_text_analyzers = schema
.fields()
.map(
|(_, field_entry): (Field, &FieldEntry)| match field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()
.and_then(|text_index_option| {
let tokenizer_name = &text_index_option.tokenizer();
tokenizer_manager.get(tokenizer_name)
}),
.map(|(_, field_entry): (_, &FieldEntry)| {
let text_options = match field_entry.field_type() {
FieldType::Str(ref text_options) => text_options.get_indexing_options(),
FieldType::JsonObject(ref json_object_options) => {
json_object_options.get_text_indexing_options()
}
_ => None,
},
)
};
text_options
.and_then(|text_index_option| {
let tokenizer_name = &text_index_option.tokenizer();
tokenizer_manager.get(tokenizer_name)
})
.unwrap_or_default()
})
.collect();
Ok(SegmentWriter {
max_doc: 0,
indexing_context: IndexingContext::new(table_size),
ctx: IndexingContext::new(table_size),
per_field_postings_writers,
fieldnorms_writer: FieldNormsWriter::for_schema(&schema),
segment_serializer,
fast_field_writers: FastFieldsWriter::from_schema(&schema),
doc_opstamps: Vec::with_capacity(1_000),
tokenizers,
per_field_text_analyzers,
term_buffer: Term::new(),
schema,
})
@@ -130,7 +135,7 @@ impl SegmentWriter {
.transpose()?;
remap_and_write(
&self.per_field_postings_writers,
self.indexing_context,
self.ctx,
&self.fast_field_writers,
&self.fieldnorms_writer,
&self.schema,
@@ -142,7 +147,7 @@ impl SegmentWriter {
}
pub fn mem_usage(&self) -> usize {
self.indexing_context.mem_usage()
self.ctx.mem_usage()
+ self.fieldnorms_writer.mem_usage()
+ self.fast_field_writers.mem_usage()
+ self.segment_serializer.mem_usage()
@@ -162,13 +167,12 @@ impl SegmentWriter {
if !field_entry.is_indexed() {
continue;
}
let (term_buffer, indexing_context) =
(&mut self.term_buffer, &mut self.indexing_context);
let (term_buffer, ctx) = (&mut self.term_buffer, &mut self.ctx);
let postings_writer: &mut dyn PostingsWriter =
self.per_field_postings_writers.get_for_field_mut(field);
term_buffer.set_field(field_entry.field_type().value_type(), field);
match *field_entry.field_type() {
FieldType::Facet(_) => {
term_buffer.set_field(Type::Facet, field);
for value in values {
let facet = value.as_facet().ok_or_else(make_schema_error)?;
let facet_str = facet.encoded_str();
@@ -177,12 +181,8 @@ impl SegmentWriter {
.token_stream(facet_str)
.process(&mut |token| {
term_buffer.set_text(&token.text);
let unordered_term_id = postings_writer.subscribe(
doc_id,
0u32,
term_buffer,
indexing_context,
);
let unordered_term_id =
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
// TODO pass indexing context directly in subscribe function
unordered_term_id_opt = Some(unordered_term_id);
});
@@ -210,13 +210,11 @@ impl SegmentWriter {
.push(PreTokenizedStream::from(tok_str.clone()).into());
}
Value::Str(ref text) => {
if let Some(ref mut tokenizer) =
self.tokenizers[field.field_id() as usize]
{
offsets.push(total_offset);
total_offset += text.len();
token_streams.push(tokenizer.token_stream(text));
}
let text_analyzer =
&self.per_field_text_analyzers[field.field_id() as usize];
offsets.push(total_offset);
total_offset += text.len();
token_streams.push(text_analyzer.token_stream(text));
}
_ => (),
}
@@ -224,12 +222,12 @@ impl SegmentWriter {
let mut indexing_position = IndexingPosition::default();
for mut token_stream in token_streams {
assert_eq!(term_buffer.as_slice().len(), 5);
postings_writer.index_text(
doc_id,
field,
&mut *token_stream,
term_buffer,
indexing_context,
ctx,
&mut indexing_position,
);
}
@@ -238,44 +236,53 @@ impl SegmentWriter {
}
FieldType::U64(_) => {
for value in values {
term_buffer.set_field(Type::U64, field);
let u64_val = value.as_u64().ok_or_else(make_schema_error)?;
term_buffer.set_u64(u64_val);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::Date(_) => {
for value in values {
term_buffer.set_field(Type::Date, field);
let date_val = value.as_date().ok_or_else(make_schema_error)?;
term_buffer.set_i64(date_val.timestamp());
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::I64(_) => {
for value in values {
term_buffer.set_field(Type::I64, field);
let i64_val = value.as_i64().ok_or_else(make_schema_error)?;
term_buffer.set_i64(i64_val);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::F64(_) => {
for value in values {
term_buffer.set_field(Type::F64, field);
let f64_val = value.as_f64().ok_or_else(make_schema_error)?;
term_buffer.set_f64(f64_val);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::Bytes(_) => {
for value in values {
term_buffer.set_field(Type::Bytes, field);
let bytes = value.as_bytes().ok_or_else(make_schema_error)?;
term_buffer.set_bytes(bytes);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::JsonObject(_) => {
let text_analyzer = &self.per_field_text_analyzers[field.field_id() as usize];
let json_values_it = values
.iter()
.map(|value| value.as_json().ok_or_else(make_schema_error));
index_json_values(
doc_id,
json_values_it,
text_analyzer,
term_buffer,
postings_writer,
ctx,
)?;
}
}
}
Ok(())
@@ -324,13 +331,14 @@ impl SegmentWriter {
/// `doc_id_map` is used to map to the new doc_id order.
fn remap_and_write(
per_field_postings_writers: &PerFieldPostingsWriter,
indexing_context: IndexingContext,
ctx: IndexingContext,
fast_field_writers: &FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
schema: &Schema,
mut serializer: SegmentSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<()> {
debug!("remap-and-write");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
fieldnorms_writer.serialize(fieldnorms_serializer, doc_id_map)?;
}
@@ -339,19 +347,21 @@ fn remap_and_write(
.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
let term_ord_map = serialize_postings(
indexing_context,
ctx,
per_field_postings_writers,
fieldnorm_readers,
doc_id_map,
schema,
serializer.get_postings_serializer(),
)?;
debug!("fastfield-serialize");
fast_field_writers.serialize(
serializer.get_fast_field_serializer(),
&term_ord_map,
doc_id_map,
)?;
debug!("resort-docstore");
// finalize temp docstore and create version, which reflects the doc_id_map
if let Some(doc_id_map) = doc_id_map {
let store_write = serializer
@@ -374,6 +384,7 @@ fn remap_and_write(
}
}
debug!("serializer-close");
serializer.close()?;
Ok(())
@@ -403,10 +414,16 @@ pub fn prepare_doc_for_store(doc: Document, schema: &Schema) -> Document {
#[cfg(test)]
mod tests {
use chrono::Utc;
use super::compute_initial_table_size;
use crate::schema::{Schema, STORED, TEXT};
use crate::collector::Count;
use crate::indexer::json_term_writer::JsonTermWriter;
use crate::postings::TermInfo;
use crate::query::PhraseQuery;
use crate::schema::{IndexRecordOption, Schema, Type, STORED, STRING, TEXT};
use crate::tokenizer::{PreTokenizedString, Token};
use crate::Document;
use crate::{DocAddress, DocSet, Document, Index, Postings, Term, TERMINATED};
#[test]
fn test_hashmap_size() {
@@ -445,4 +462,247 @@ mod tests {
Some("title")
);
}
#[test]
fn test_json_indexing() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", STORED | TEXT);
let schema = schema_builder.build();
let json_val: serde_json::Map<String, serde_json::Value> = serde_json::from_str(
r#"{
"toto": "titi",
"float": -0.2,
"unsigned": 1,
"signed": -2,
"complexobject": {
"field.with.dot": 1
},
"date": "1985-04-12T23:20:50.52Z",
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
}"#,
)
.unwrap();
let doc = doc!(json_field=>json_val.clone());
let index = Index::create_in_ram(schema.clone());
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let doc = searcher
.doc(DocAddress {
segment_ord: 0u32,
doc_id: 0u32,
})
.unwrap();
let serdeser_json_val = serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(
&schema.to_json(&doc),
)
.unwrap()
.get("json")
.unwrap()[0]
.as_object()
.unwrap()
.clone();
assert_eq!(json_val, serdeser_json_val);
let segment_reader = searcher.segment_reader(0u32);
let inv_idx = segment_reader.inverted_index(json_field).unwrap();
let term_dict = inv_idx.terms();
let mut term = Term::new();
term.set_field(Type::Json, json_field);
let mut term_stream = term_dict.stream().unwrap();
let mut json_term_writer = JsonTermWriter::wrap(&mut term);
json_term_writer.push_path_segment("complexobject");
json_term_writer.push_path_segment("field.with.dot");
json_term_writer.set_fast_value(1u64);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.pop_path_segment();
json_term_writer.pop_path_segment();
json_term_writer.push_path_segment("date");
json_term_writer.set_fast_value(
chrono::DateTime::parse_from_rfc3339("1985-04-12T23:20:50.52Z")
.unwrap()
.with_timezone(&Utc),
);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.pop_path_segment();
json_term_writer.push_path_segment("float");
json_term_writer.set_fast_value(-0.2f64);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.pop_path_segment();
json_term_writer.push_path_segment("my_arr");
json_term_writer.set_fast_value(2u64);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.set_fast_value(3u64);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.set_fast_value(4u64);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.push_path_segment("my_key");
json_term_writer.set_str("tokens");
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.set_str("two");
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.pop_path_segment();
json_term_writer.pop_path_segment();
json_term_writer.push_path_segment("signed");
json_term_writer.set_fast_value(-2i64);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.pop_path_segment();
json_term_writer.push_path_segment("toto");
json_term_writer.set_str("titi");
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
json_term_writer.pop_path_segment();
json_term_writer.push_path_segment("unsigned");
json_term_writer.set_fast_value(1u64);
assert!(term_stream.advance());
assert_eq!(term_stream.key(), json_term_writer.term().value_bytes());
assert!(!term_stream.advance());
}
#[test]
fn test_json_tokenized_with_position() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", STORED | TEXT);
let schema = schema_builder.build();
let mut doc = Document::default();
let json_val: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(r#"{"mykey": "repeated token token"}"#).unwrap();
doc.add_json_object(json_field, json_val);
let index = Index::create_in_ram(schema);
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
let inv_index = segment_reader.inverted_index(json_field).unwrap();
let mut term = Term::new();
term.set_field(Type::Json, json_field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term);
json_term_writer.push_path_segment("mykey");
json_term_writer.set_str("token");
let term_info = inv_index
.get_term_info(json_term_writer.term())
.unwrap()
.unwrap();
assert_eq!(
term_info,
TermInfo {
doc_freq: 1,
postings_range: 2..4,
positions_range: 2..5
}
);
let mut postings = inv_index
.read_postings(&term, IndexRecordOption::WithFreqsAndPositions)
.unwrap()
.unwrap();
assert_eq!(postings.doc(), 0);
assert_eq!(postings.term_freq(), 2);
let mut positions = Vec::new();
postings.positions(&mut positions);
assert_eq!(&positions[..], &[1, 2]);
assert_eq!(postings.advance(), TERMINATED);
}
#[test]
fn test_json_raw_no_position() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", STRING);
let schema = schema_builder.build();
let json_val: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(r#"{"mykey": "two tokens"}"#).unwrap();
let doc = doc!(json_field=>json_val);
let index = Index::create_in_ram(schema);
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
let inv_index = segment_reader.inverted_index(json_field).unwrap();
let mut term = Term::new();
term.set_field(Type::Json, json_field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term);
json_term_writer.push_path_segment("mykey");
json_term_writer.set_str("two tokens");
let term_info = inv_index
.get_term_info(json_term_writer.term())
.unwrap()
.unwrap();
assert_eq!(
term_info,
TermInfo {
doc_freq: 1,
postings_range: 0..1,
positions_range: 0..0
}
);
let mut postings = inv_index
.read_postings(&term, IndexRecordOption::WithFreqs)
.unwrap()
.unwrap();
assert_eq!(postings.doc(), 0);
assert_eq!(postings.term_freq(), 1);
let mut positions = Vec::new();
postings.positions(&mut positions);
assert_eq!(postings.advance(), TERMINATED);
}
#[test]
fn test_position_overlapping_path() {
// This test checks that we do not end up detecting phrase query due
// to several string literal in the same json object being overlapping.
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", TEXT);
let schema = schema_builder.build();
let json_val: serde_json::Map<String, serde_json::Value> = serde_json::from_str(
r#"{"mykey": [{"field": "hello happy tax payer"}, {"field": "nothello"}]}"#,
)
.unwrap();
let doc = doc!(json_field=>json_val);
let index = Index::create_in_ram(schema);
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let mut term = Term::new();
term.set_field(Type::Json, json_field);
let mut json_term_writer = JsonTermWriter::wrap(&mut term);
json_term_writer.push_path_segment("mykey");
json_term_writer.push_path_segment("field");
json_term_writer.set_str("hello");
let hello_term = json_term_writer.term().clone();
json_term_writer.set_str("nothello");
let nothello_term = json_term_writer.term().clone();
json_term_writer.set_str("happy");
let happy_term = json_term_writer.term().clone();
let phrase_query = PhraseQuery::new(vec![hello_term, happy_term.clone()]);
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 1);
let phrase_query = PhraseQuery::new(vec![nothello_term, happy_term]);
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}
}

View File

@@ -134,6 +134,10 @@ pub use crate::error::TantivyError;
/// and instead, refer to this as `crate::Result<T>`.
pub type Result<T> = std::result::Result<T, TantivyError>;
/// Result for an Async io operation.
#[cfg(feature = "quickwit")]
pub type AsyncIoResult<T> = std::result::Result<T, crate::error::AsyncIoError>;
/// Tantivy DateTime
pub type DateTime = chrono::DateTime<chrono::Utc>;

View File

@@ -1,6 +1,6 @@
use std::io;
use common::{BinarySerializable, VInt};
use common::VInt;
use crate::directory::{FileSlice, OwnedBytes};
use crate::fieldnorm::FieldNormReader;
@@ -28,9 +28,7 @@ pub struct BlockSegmentPostings {
freq_decoder: BlockDecoder,
freq_reading_option: FreqReadingOption,
block_max_score_cache: Option<Score>,
doc_freq: u32,
data: OwnedBytes,
pub(crate) skip_reader: SkipReader,
}
@@ -70,13 +68,13 @@ fn decode_vint_block(
fn split_into_skips_and_postings(
doc_freq: u32,
mut bytes: OwnedBytes,
) -> (Option<OwnedBytes>, OwnedBytes) {
) -> io::Result<(Option<OwnedBytes>, OwnedBytes)> {
if doc_freq < COMPRESSION_BLOCK_SIZE as u32 {
return (None, bytes);
return Ok((None, bytes));
}
let skip_len = VInt::deserialize(&mut bytes).expect("Data corrupted").0 as usize;
let skip_len = VInt::deserialize_u64(&mut bytes)? as usize;
let (skip_data, postings_data) = bytes.split(skip_len);
(Some(skip_data), postings_data)
Ok((Some(skip_data), postings_data))
}
impl BlockSegmentPostings {
@@ -92,8 +90,8 @@ impl BlockSegmentPostings {
(_, _) => FreqReadingOption::ReadFreq,
};
let (skip_data_opt, postings_data) =
split_into_skips_and_postings(doc_freq, data.read_bytes()?);
let bytes = data.read_bytes()?;
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?;
let skip_reader = match skip_data_opt {
Some(skip_data) => SkipReader::new(skip_data, doc_freq, record_option),
None => SkipReader::new(OwnedBytes::empty(), doc_freq, record_option),
@@ -166,8 +164,9 @@ impl BlockSegmentPostings {
// # Warning
//
// This does not reset the positions list.
pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedBytes) {
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, postings_data);
pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedBytes) -> io::Result<()> {
let (skip_data_opt, postings_data) =
split_into_skips_and_postings(doc_freq, postings_data)?;
self.data = postings_data;
self.block_max_score_cache = None;
self.loaded_offset = std::usize::MAX;
@@ -178,6 +177,7 @@ impl BlockSegmentPostings {
}
self.doc_freq = doc_freq;
self.load_block();
Ok(())
}
/// Returns the overall number of documents in the block postings.
@@ -322,7 +322,7 @@ impl BlockSegmentPostings {
/// Advance to the next block.
///
/// Returns false iff there was no remaining blocks.
/// Returns false if and only if there is no remaining block.
pub fn advance(&mut self) {
self.skip_reader.advance();
self.block_max_score_cache = None;

View File

@@ -0,0 +1,94 @@
use std::io;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{BufferLender, NothingRecorder, Recorder};
use crate::postings::stacker::Addr;
use crate::postings::{
FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter, UnorderedTermId,
};
use crate::schema::term::as_json_path_type_value_bytes;
use crate::schema::Type;
use crate::tokenizer::TokenStream;
use crate::{DocId, Term};
#[derive(Default)]
pub(crate) struct JsonPostingsWriter<Rec: Recorder> {
str_posting_writer: SpecializedPostingsWriter<Rec>,
non_str_posting_writer: SpecializedPostingsWriter<NothingRecorder>,
}
impl<Rec: Recorder> From<JsonPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(json_postings_writer: JsonPostingsWriter<Rec>) -> Box<dyn PostingsWriter> {
Box::new(json_postings_writer)
}
}
impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
fn subscribe(
&mut self,
doc: crate::DocId,
pos: u32,
term: &crate::Term,
ctx: &mut IndexingContext,
) -> UnorderedTermId {
self.non_str_posting_writer.subscribe(doc, pos, term, ctx)
}
fn index_text(
&mut self,
doc_id: DocId,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
self.str_posting_writer.index_text(
doc_id,
token_stream,
term_buffer,
ctx,
indexing_position,
);
}
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
doc_id_map: Option<&DocIdMapping>,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
// TODO optimization opportunity here.
if let Some((_, typ, _)) = as_json_path_type_value_bytes(term.value_bytes()) {
if typ == Type::Str {
SpecializedPostingsWriter::<Rec>::serialize_one_term(
term,
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,
)?;
} else {
SpecializedPostingsWriter::<NothingRecorder>::serialize_one_term(
term,
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,
)?;
}
}
}
Ok(())
}
fn total_num_tokens(&self) -> u64 {
self.str_posting_writer.total_num_tokens() + self.non_str_posting_writer.total_num_tokens()
}
}

View File

@@ -7,6 +7,7 @@ pub(crate) use self::block_search::branchless_binary_search;
mod block_segment_postings;
pub(crate) mod compression;
mod indexing_context;
mod json_postings_writer;
mod per_field_postings_writer;
mod postings;
mod postings_writer;

View File

@@ -1,3 +1,4 @@
use crate::postings::json_postings_writer::JsonPostingsWriter;
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{NothingRecorder, TermFrequencyRecorder, TfAndPositionRecorder};
use crate::postings::PostingsWriter;
@@ -33,21 +34,38 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn Postings
.get_indexing_options()
.map(|indexing_options| match indexing_options.index_option() {
IndexRecordOption::Basic => {
SpecializedPostingsWriter::<NothingRecorder>::new_boxed()
SpecializedPostingsWriter::<NothingRecorder>::default().into()
}
IndexRecordOption::WithFreqs => {
SpecializedPostingsWriter::<TermFrequencyRecorder>::new_boxed()
SpecializedPostingsWriter::<TermFrequencyRecorder>::default().into()
}
IndexRecordOption::WithFreqsAndPositions => {
SpecializedPostingsWriter::<TfAndPositionRecorder>::new_boxed()
SpecializedPostingsWriter::<TfAndPositionRecorder>::default().into()
}
})
.unwrap_or_else(SpecializedPostingsWriter::<NothingRecorder>::new_boxed),
.unwrap_or_else(|| SpecializedPostingsWriter::<NothingRecorder>::default().into()),
FieldType::U64(_)
| FieldType::I64(_)
| FieldType::F64(_)
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::Facet(_) => SpecializedPostingsWriter::<NothingRecorder>::new_boxed(),
| FieldType::Facet(_) => Box::new(SpecializedPostingsWriter::<NothingRecorder>::default()),
FieldType::JsonObject(ref json_object_options) => {
if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() {
match text_indexing_option.index_option() {
IndexRecordOption::Basic => {
JsonPostingsWriter::<NothingRecorder>::default().into()
}
IndexRecordOption::WithFreqs => {
JsonPostingsWriter::<TermFrequencyRecorder>::default().into()
}
IndexRecordOption::WithFreqsAndPositions => {
JsonPostingsWriter::<TfAndPositionRecorder>::default().into()
}
}
} else {
JsonPostingsWriter::<NothingRecorder>::default().into()
}
}
}
}

View File

@@ -13,7 +13,7 @@ use crate::postings::{
FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter,
UnorderedTermId,
};
use crate::schema::{Field, FieldType, Schema, Term, Type};
use crate::schema::{Field, FieldType, Schema, Term};
use crate::termdict::TermOrdinal;
use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN};
use crate::DocId;
@@ -49,7 +49,7 @@ fn make_field_partition(
/// It pushes all term, one field at a time, towards the
/// postings serializer.
pub(crate) fn serialize_postings(
indexing_context: IndexingContext,
ctx: IndexingContext,
per_field_postings_writers: &PerFieldPostingsWriter,
fieldnorm_readers: FieldNormReaders,
doc_id_map: Option<&DocIdMapping>,
@@ -57,15 +57,13 @@ pub(crate) fn serialize_postings(
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(indexing_context.term_index.len());
term_offsets.extend(indexing_context.term_index.iter());
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();
let field_offsets = make_field_partition(&term_offsets);
for (field, byte_offsets) in field_offsets {
let field_entry = schema.get_field_entry(field);
match *field_entry.field_type() {
@@ -85,6 +83,7 @@ pub(crate) fn serialize_postings(
}
FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) | FieldType::Date(_) => {}
FieldType::Bytes(_) => {}
FieldType::JsonObject(_) => {}
}
let postings_writer = per_field_postings_writers.get_for_field(field);
@@ -94,7 +93,7 @@ pub(crate) fn serialize_postings(
postings_writer.serialize(
&term_offsets[byte_offsets],
doc_id_map,
&indexing_context,
&ctx,
&mut field_serializer,
)?;
field_serializer.close()?;
@@ -118,14 +117,14 @@ pub(crate) trait PostingsWriter {
/// * doc - the document id
/// * pos - the term position (expressed in tokens)
/// * term - the term
/// * indexing_context - Contains a term hashmap and a memory arena to store all necessary
/// posting list information.
/// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list
/// information.
fn subscribe(
&mut self,
doc: DocId,
pos: u32,
term: &Term,
indexing_context: &mut IndexingContext,
ctx: &mut IndexingContext,
) -> UnorderedTermId;
/// Serializes the postings on disk.
@@ -134,7 +133,7 @@ pub(crate) trait PostingsWriter {
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
doc_id_map: Option<&DocIdMapping>,
indexing_context: &IndexingContext,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()>;
@@ -142,13 +141,12 @@ pub(crate) trait PostingsWriter {
fn index_text(
&mut self,
doc_id: DocId,
field: Field,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
indexing_context: &mut IndexingContext,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
term_buffer.set_field(Type::Str, field);
let end_of_path_idx = term_buffer.as_slice().len();
let mut num_tokens = 0;
let mut end_position = 0;
token_stream.process(&mut |token: &Token| {
@@ -162,14 +160,16 @@ pub(crate) trait PostingsWriter {
);
return;
}
term_buffer.set_text(token.text.as_str());
term_buffer.truncate(end_of_path_idx);
term_buffer.append_bytes(token.text.as_bytes());
let start_position = indexing_position.end_position + token.position as u32;
end_position = start_position + token.position_length as u32;
self.subscribe(doc_id, start_position, term_buffer, indexing_context);
self.subscribe(doc_id, start_position, term_buffer, ctx);
num_tokens += 1;
});
indexing_position.end_position = end_position + POSITION_GAP;
indexing_position.num_tokens += num_tokens;
term_buffer.truncate(end_of_path_idx);
}
fn total_num_tokens(&self) -> u64;
@@ -177,40 +177,50 @@ pub(crate) trait PostingsWriter {
/// The `SpecializedPostingsWriter` is just here to remove dynamic
/// dispatch to the recorder information.
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
#[derive(Default)]
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
total_num_tokens: u64,
_recorder_type: PhantomData<Rec>,
}
impl<Rec: Recorder + 'static> SpecializedPostingsWriter<Rec> {
/// constructor
pub fn new() -> SpecializedPostingsWriter<Rec> {
SpecializedPostingsWriter {
total_num_tokens: 0u64,
_recorder_type: PhantomData,
}
}
/// Builds a `SpecializedPostingsWriter` storing its data in a memory arena.
pub fn new_boxed() -> Box<dyn PostingsWriter> {
Box::new(SpecializedPostingsWriter::<Rec>::new())
impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(
specialized_postings_writer: SpecializedPostingsWriter<Rec>,
) -> Box<dyn PostingsWriter> {
Box::new(specialized_postings_writer)
}
}
impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec> {
impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
#[inline]
pub(crate) fn serialize_one_term(
term: &Term<&[u8]>,
addr: Addr,
doc_id_map: Option<&DocIdMapping>,
buffer_lender: &mut BufferLender,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let recorder: Rec = ctx.term_index.read(addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
serializer.close_term()?;
Ok(())
}
}
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
fn subscribe(
&mut self,
doc: DocId,
position: u32,
term: &Term,
indexing_context: &mut IndexingContext,
ctx: &mut IndexingContext,
) -> UnorderedTermId {
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
let (term_index, arena) = (
&mut indexing_context.term_index,
&mut indexing_context.arena,
);
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
@@ -221,7 +231,7 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::new();
let mut recorder = Rec::default();
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
@@ -233,21 +243,12 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
doc_id_map: Option<&DocIdMapping>,
indexing_context: &IndexingContext,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
let recorder: Rec = indexing_context.term_index.read(*addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(
&indexing_context.arena,
doc_id_map,
serializer,
&mut buffer_lender,
);
serializer.close_term()?;
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
}
Ok(())
}

View File

@@ -56,9 +56,7 @@ impl<'a> Iterator for VInt32Reader<'a> {
/// * the document id
/// * the term frequency
/// * the term positions
pub(crate) trait Recorder: Copy + 'static {
///
fn new() -> Self;
pub(crate) trait Recorder: Copy + Default + 'static {
/// Returns the current document
fn current_doc(&self) -> u32;
/// Starts recording information about a new document
@@ -90,14 +88,16 @@ pub struct NothingRecorder {
current_doc: DocId,
}
impl Recorder for NothingRecorder {
fn new() -> Self {
impl Default for NothingRecorder {
fn default() -> Self {
NothingRecorder {
stack: ExpUnrolledLinkedList::new(),
current_doc: u32::max_value(),
}
}
}
impl Recorder for NothingRecorder {
fn current_doc(&self) -> DocId {
self.current_doc
}
@@ -152,8 +152,8 @@ pub struct TermFrequencyRecorder {
term_doc_freq: u32,
}
impl Recorder for TermFrequencyRecorder {
fn new() -> Self {
impl Default for TermFrequencyRecorder {
fn default() -> Self {
TermFrequencyRecorder {
stack: ExpUnrolledLinkedList::new(),
current_doc: 0,
@@ -161,7 +161,9 @@ impl Recorder for TermFrequencyRecorder {
term_doc_freq: 0u32,
}
}
}
impl Recorder for TermFrequencyRecorder {
fn current_doc(&self) -> DocId {
self.current_doc
}
@@ -223,15 +225,18 @@ pub struct TfAndPositionRecorder {
current_doc: DocId,
term_doc_freq: u32,
}
impl Recorder for TfAndPositionRecorder {
fn new() -> Self {
impl Default for TfAndPositionRecorder {
fn default() -> Self {
TfAndPositionRecorder {
stack: ExpUnrolledLinkedList::new(),
current_doc: u32::max_value(),
term_doc_freq: 0u32,
}
}
}
impl Recorder for TfAndPositionRecorder {
fn current_doc(&self) -> DocId {
self.current_doc
}

View File

@@ -76,7 +76,7 @@ impl InvertedIndexSerializer {
field: Field,
total_num_tokens: u64,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'_>> {
) -> io::Result<FieldSerializer> {
let field_entry: &FieldEntry = self.schema.get_field_entry(field);
let term_dictionary_write = self.terms_write.for_field(field);
let postings_write = self.postings_write.for_field(field);
@@ -122,24 +122,21 @@ impl<'a> FieldSerializer<'a> {
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'a>> {
total_num_tokens.serialize(postings_write)?;
let mode = match field_type {
FieldType::Str(ref text_options) => {
if let Some(text_indexing_options) = text_options.get_indexing_options() {
text_indexing_options.index_option()
} else {
IndexRecordOption::Basic
}
}
_ => IndexRecordOption::Basic,
};
let index_record_option = field_type
.index_record_option()
.unwrap_or(IndexRecordOption::Basic);
let term_dictionary_builder = TermDictionaryBuilder::create(term_dictionary_write)?;
let average_fieldnorm = fieldnorm_reader
.as_ref()
.map(|ff_reader| (total_num_tokens as Score / ff_reader.num_docs() as Score))
.unwrap_or(0.0);
let postings_serializer =
PostingsSerializer::new(postings_write, average_fieldnorm, mode, fieldnorm_reader);
let positions_serializer_opt = if mode.has_positions() {
let postings_serializer = PostingsSerializer::new(
postings_write,
average_fieldnorm,
index_record_option,
fieldnorm_reader,
);
let positions_serializer_opt = if index_record_option.has_positions() {
Some(PositionSerializer::new(positions_write))
} else {
None
@@ -203,6 +200,7 @@ impl<'a> FieldSerializer<'a> {
self.current_term_info.doc_freq += 1;
self.postings_serializer.write_doc(doc_id, term_freq);
if let Some(ref mut positions_serializer) = self.positions_serializer_opt.as_mut() {
assert_eq!(term_freq as usize, position_deltas.len());
positions_serializer.write_positions_delta(position_deltas);
}
}

View File

@@ -47,7 +47,7 @@ fn find_pivot_doc(
/// scorer in scorers[..pivot_len] and `scorer.doc()` for scorer in scorers[pivot_len..].
/// Note: before and after calling this method, scorers need to be sorted by their `.doc()`.
fn block_max_was_too_low_advance_one_scorer(
scorers: &mut Vec<TermScorerWithMaxScore>,
scorers: &mut [TermScorerWithMaxScore],
pivot_len: usize,
) {
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
@@ -82,7 +82,7 @@ fn block_max_was_too_low_advance_one_scorer(
// Given a list of term_scorers and a `ord` and assuming that `term_scorers[ord]` is sorted
// except term_scorers[ord] that might be in advance compared to its ranks,
// bubble up term_scorers[ord] in order to restore the ordering.
fn restore_ordering(term_scorers: &mut Vec<TermScorerWithMaxScore>, ord: usize) {
fn restore_ordering(term_scorers: &mut [TermScorerWithMaxScore], ord: usize) {
let doc = term_scorers[ord].doc();
for i in ord + 1..term_scorers.len() {
if term_scorers[i].doc() >= doc {

View File

@@ -204,8 +204,8 @@ impl BooleanQuery {
#[cfg(test)]
mod tests {
use super::BooleanQuery;
use crate::collector::DocSetCollector;
use crate::query::{QueryClone, TermQuery};
use crate::collector::{Count, DocSetCollector};
use crate::query::{QueryClone, QueryParser, TermQuery};
use crate::schema::{IndexRecordOption, Schema, TEXT};
use crate::{DocAddress, Index, Term};
@@ -282,4 +282,42 @@ mod tests {
}
Ok(())
}
#[test]
pub fn test_json_array_pitfall_bag_of_terms() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(json_field=>json!({
"cart": [
{"product_type": "sneakers", "attributes": {"color": "white"}},
{"product_type": "t-shirt", "attributes": {"color": "red"}},
{"product_type": "cd", "attributes": {"genre": "blues"}},
]
})))?;
index_writer.commit()?;
}
let searcher = index.reader()?.searcher();
let doc_matches = |query: &str| {
let query_parser = QueryParser::for_index(&index, vec![json_field]);
let query = query_parser.parse_query(query).unwrap();
searcher.search(&query, &Count).unwrap() == 1
};
// As expected
assert!(doc_matches(
r#"cart.product_type:sneakers AND cart.attributes.color:white"#
));
// Unexpected match, due to the fact that array do not act as nested docs.
assert!(doc_matches(
r#"cart.product_type:sneakers AND cart.attributes.color:red"#
));
// However, bviously this works...
assert!(!doc_matches(
r#"cart.product_type:sneakers AND cart.attributes.color:blues"#
));
Ok(())
}
}

View File

@@ -35,7 +35,7 @@ where TScoreCombiner: ScoreCombiner {
.iter()
.all(|scorer| scorer.freq_reading_option() == FreqReadingOption::ReadFreq)
{
// Block wand is only available iff we read frequencies.
// Block wand is only available if we read frequencies.
return SpecializedScorer::TermUnion(scorers);
} else {
return SpecializedScorer::Other(Box::new(Union::<_, TScoreCombiner>::from(

View File

@@ -9,10 +9,12 @@ pub use self::phrase_weight::PhraseWeight;
#[cfg(test)]
pub mod tests {
use serde_json::json;
use super::*;
use crate::collector::tests::{TEST_COLLECTOR_WITHOUT_SCORE, TEST_COLLECTOR_WITH_SCORE};
use crate::core::Index;
use crate::query::Weight;
use crate::query::{QueryParser, Weight};
use crate::schema::{Schema, Term, TEXT};
use crate::{assert_nearly_equals, DocAddress, DocId, TERMINATED};
@@ -179,6 +181,90 @@ pub mod tests {
Ok(())
}
#[ignore]
#[test]
pub fn test_phrase_score_with_slop() -> crate::Result<()> {
let index = create_index(&["a c b", "a b c a b"])?;
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<&str>| {
let terms: Vec<Term> = texts
.iter()
.map(|text| Term::from_field_text(text_field, text))
.collect();
let mut phrase_query = PhraseQuery::new(terms);
phrase_query.set_slop(1);
searcher
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.expect("search should succeed")
.scores()
.to_vec()
};
let scores = test_query(vec!["a", "b"]);
assert_nearly_equals!(scores[0], 0.40618482);
assert_nearly_equals!(scores[1], 0.46844664);
Ok(())
}
#[test]
pub fn test_phrase_score_with_slop_size() -> crate::Result<()> {
let index = create_index(&["a b e c", "a e e e c", "a e e e e c"])?;
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<&str>| {
let terms: Vec<Term> = texts
.iter()
.map(|text| Term::from_field_text(text_field, text))
.collect();
let mut phrase_query = PhraseQuery::new(terms);
phrase_query.set_slop(3);
searcher
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.expect("search should succeed")
.scores()
.to_vec()
};
let scores = test_query(vec!["a", "c"]);
assert_nearly_equals!(scores[0], 0.29086056);
assert_nearly_equals!(scores[1], 0.26706287);
Ok(())
}
#[test]
pub fn test_phrase_score_with_slop_ordering() -> crate::Result<()> {
let index = create_index(&[
"a e b e c",
"a e e e e e b e e e e c",
"a c b",
"a c e b e",
"a e c b",
"a e b c",
])?;
let schema = index.schema();
let text_field = schema.get_field("text").unwrap();
let searcher = index.reader().unwrap().searcher();
let test_query = |texts: Vec<&str>| {
let terms: Vec<Term> = texts
.iter()
.map(|text| Term::from_field_text(text_field, text))
.collect();
let mut phrase_query = PhraseQuery::new(terms);
phrase_query.set_slop(3);
searcher
.search(&phrase_query, &TEST_COLLECTOR_WITH_SCORE)
.expect("search should succeed")
.scores()
.to_vec()
};
let scores = test_query(vec!["a", "b", "c"]);
// The first and last matches.
assert_nearly_equals!(scores[0], 0.23091172);
assert_nearly_equals!(scores[1], 0.25024384);
Ok(())
}
#[test] // motivated by #234
pub fn test_phrase_query_docfreq_order() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
@@ -248,4 +334,56 @@ pub mod tests {
assert_eq!(test_query(vec![(1, "a"), (3, "c")]), vec![0]);
Ok(())
}
#[test]
pub fn test_phrase_query_on_json() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(json_field=>json!({
"text": "elliot smith the happy who"
})))?;
index_writer.add_document(doc!(json_field=>json!({
"text": "the who elliot smith"
})))?;
index_writer.add_document(doc!(json_field=>json!({
"arr": [{"text":"the who"}, {"text":"elliot smith"}]
})))?;
index_writer.add_document(doc!(json_field=>json!({
"text2": "the smith"
})))?;
index_writer.commit()?;
}
let searcher = index.reader()?.searcher();
let matching_docs = |query: &str| {
let query_parser = QueryParser::for_index(&index, vec![json_field]);
let phrase_query = query_parser.parse_query(query).unwrap();
let phrase_weight = phrase_query.weight(&*searcher, false).unwrap();
let mut phrase_scorer = phrase_weight
.scorer(searcher.segment_reader(0), 1.0f32)
.unwrap();
let mut docs = Vec::new();
loop {
let doc = phrase_scorer.doc();
if doc == TERMINATED {
break;
}
docs.push(doc);
phrase_scorer.advance();
}
docs
};
assert!(matching_docs(r#"text:"the smith""#).is_empty());
assert_eq!(&matching_docs(r#"text:the"#), &[0u32, 1u32]);
assert_eq!(&matching_docs(r#"text:"the""#), &[0u32, 1u32]);
assert_eq!(&matching_docs(r#"text:"smith""#), &[0u32, 1u32]);
assert_eq!(&matching_docs(r#"text:"elliot smith""#), &[0u32, 1u32]);
assert_eq!(&matching_docs(r#"text2:"the smith""#), &[3u32]);
assert!(&matching_docs(r#"arr.text:"the smith""#).is_empty());
assert_eq!(&matching_docs(r#"arr.text:"elliot smith""#), &[2]);
Ok(())
}
}

View File

@@ -23,6 +23,7 @@ use crate::schema::{Field, IndexRecordOption, Term};
pub struct PhraseQuery {
field: Field,
phrase_terms: Vec<(usize, Term)>,
slop: u32,
}
impl PhraseQuery {
@@ -53,9 +54,15 @@ impl PhraseQuery {
PhraseQuery {
field,
phrase_terms: terms,
slop: 0,
}
}
/// Slop allowed for the phrase.
pub fn set_slop(&mut self, value: u32) {
self.slop = value;
}
/// The `Field` this `PhraseQuery` is targeting.
pub fn field(&self) -> Field {
self.field
@@ -94,11 +101,11 @@ impl PhraseQuery {
}
let terms = self.phrase_terms();
let bm25_weight = Bm25Weight::for_terms(searcher, &terms)?;
Ok(PhraseWeight::new(
self.phrase_terms.clone(),
bm25_weight,
scoring_enabled,
))
let mut weight = PhraseWeight::new(self.phrase_terms.clone(), bm25_weight, scoring_enabled);
if self.slop > 0 {
weight.slop(self.slop);
}
Ok(weight)
}
}

View File

@@ -52,24 +52,25 @@ pub struct PhraseScorer<TPostings: Postings> {
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
scoring_enabled: bool,
slop: u32,
}
/// Returns true iff the two sorted array contain a common element
/// Returns true if and only if the two sorted arrays contain a common element
fn intersection_exists(left: &[u32], right: &[u32]) -> bool {
let mut left_i = 0;
let mut right_i = 0;
while left_i < left.len() && right_i < right.len() {
let left_val = left[left_i];
let right_val = right[right_i];
let mut left_index = 0;
let mut right_index = 0;
while left_index < left.len() && right_index < right.len() {
let left_val = left[left_index];
let right_val = right[right_index];
match left_val.cmp(&right_val) {
Ordering::Less => {
left_i += 1;
left_index += 1;
}
Ordering::Equal => {
return true;
}
Ordering::Greater => {
right_i += 1;
right_index += 1;
}
}
}
@@ -77,23 +78,23 @@ fn intersection_exists(left: &[u32], right: &[u32]) -> bool {
}
fn intersection_count(left: &[u32], right: &[u32]) -> usize {
let mut left_i = 0;
let mut right_i = 0;
let mut left_index = 0;
let mut right_index = 0;
let mut count = 0;
while left_i < left.len() && right_i < right.len() {
let left_val = left[left_i];
let right_val = right[right_i];
while left_index < left.len() && right_index < right.len() {
let left_val = left[left_index];
let right_val = right[right_index];
match left_val.cmp(&right_val) {
Ordering::Less => {
left_i += 1;
left_index += 1;
}
Ordering::Equal => {
count += 1;
left_i += 1;
right_i += 1;
left_index += 1;
right_index += 1;
}
Ordering::Greater => {
right_i += 1;
right_index += 1;
}
}
}
@@ -105,38 +106,91 @@ fn intersection_count(left: &[u32], right: &[u32]) -> usize {
///
/// Returns the length of the intersection
fn intersection(left: &mut [u32], right: &[u32]) -> usize {
let mut left_i = 0;
let mut right_i = 0;
let mut left_index = 0;
let mut right_index = 0;
let mut count = 0;
let left_len = left.len();
let right_len = right.len();
while left_i < left_len && right_i < right_len {
let left_val = left[left_i];
let right_val = right[right_i];
while left_index < left_len && right_index < right_len {
let left_val = left[left_index];
let right_val = right[right_index];
match left_val.cmp(&right_val) {
Ordering::Less => {
left_i += 1;
left_index += 1;
}
Ordering::Equal => {
left[count] = left_val;
count += 1;
left_i += 1;
right_i += 1;
left_index += 1;
right_index += 1;
}
Ordering::Greater => {
right_i += 1;
right_index += 1;
}
}
}
count
}
/// Intersect twos sorted arrays `left` and `right` and outputs the
/// resulting array in left.
///
/// Condition for match is that the value stored in left is less than or equal to
/// the value in right and that the distance to the previous token is lte to the slop.
///
/// Returns the length of the intersection
fn intersection_with_slop(left: &mut [u32], right: &[u32], slop: u32) -> usize {
let mut left_index = 0;
let mut right_index = 0;
let mut count = 0;
let left_len = left.len();
let right_len = right.len();
while left_index < left_len && right_index < right_len {
let left_val = left[left_index];
let right_val = right[right_index];
// The three conditions are:
// left_val < right_slop -> left index increment.
// right_slop <= left_val <= right -> find the best match.
// left_val > right -> right index increment.
let right_slop = if right_val >= slop {
right_val - slop
} else {
0
};
if left_val < right_slop {
left_index += 1;
} else if right_slop <= left_val && left_val <= right_val {
while left_index + 1 < left_len {
// there could be a better match
let next_left_val = left[left_index + 1];
if next_left_val > right_val {
// the next value is outside the range, so current one is the best.
break;
}
// the next value is better.
left_index += 1;
}
// store the match in left.
left[count] = right_val;
count += 1;
left_index += 1;
right_index += 1;
} else if left_val > right_val {
right_index += 1;
}
}
count
}
impl<TPostings: Postings> PhraseScorer<TPostings> {
pub fn new(
term_postings: Vec<(usize, TPostings)>,
similarity_weight: Bm25Weight,
fieldnorm_reader: FieldNormReader,
scoring_enabled: bool,
slop: u32,
) -> PhraseScorer<TPostings> {
let max_offset = term_postings
.iter()
@@ -159,6 +213,7 @@ impl<TPostings: Postings> PhraseScorer<TPostings> {
similarity_weight,
fieldnorm_reader,
scoring_enabled,
slop,
};
if scorer.doc() != TERMINATED && !scorer.phrase_match() {
scorer.advance();
@@ -181,51 +236,54 @@ impl<TPostings: Postings> PhraseScorer<TPostings> {
}
fn phrase_exists(&mut self) -> bool {
self.intersection_docset
.docset_mut_specialized(0)
.positions(&mut self.left);
let mut intersection_len = self.left.len();
for i in 1..self.num_terms - 1 {
{
self.intersection_docset
.docset_mut_specialized(i)
.positions(&mut self.right);
}
intersection_len = intersection(&mut self.left[..intersection_len], &self.right[..]);
if intersection_len == 0 {
return false;
}
}
self.intersection_docset
.docset_mut_specialized(self.num_terms - 1)
.positions(&mut self.right);
let intersection_len = self.compute_phrase_match();
intersection_exists(&self.left[..intersection_len], &self.right[..])
}
fn compute_phrase_count(&mut self) -> u32 {
let intersection_len = self.compute_phrase_match();
intersection_count(&self.left[..intersection_len], &self.right[..]) as u32
}
fn compute_phrase_match(&mut self) -> usize {
{
self.intersection_docset
.docset_mut_specialized(0)
.positions(&mut self.left);
}
let mut intersection_len = self.left.len();
for i in 1..self.num_terms - 1 {
let end_term = if self.has_slop() {
self.num_terms
} else {
self.num_terms - 1
};
for i in 1..end_term {
{
self.intersection_docset
.docset_mut_specialized(i)
.positions(&mut self.right);
}
intersection_len = intersection(&mut self.left[..intersection_len], &self.right[..]);
intersection_len = if self.has_slop() {
intersection_with_slop(
&mut self.left[..intersection_len],
&self.right[..],
self.slop,
)
} else {
intersection(&mut self.left[..intersection_len], &self.right[..])
};
if intersection_len == 0 {
return 0u32;
return 0;
}
}
self.intersection_docset
.docset_mut_specialized(self.num_terms - 1)
.positions(&mut self.right);
intersection_count(&self.left[..intersection_len], &self.right[..]) as u32
intersection_len
}
fn has_slop(&self) -> bool {
self.slop > 0
}
}
@@ -268,18 +326,26 @@ impl<TPostings: Postings> Scorer for PhraseScorer<TPostings> {
#[cfg(test)]
mod tests {
use super::{intersection, intersection_count};
use super::{intersection, intersection_count, intersection_with_slop};
fn test_intersection_sym(left: &[u32], right: &[u32], expected: &[u32]) {
test_intersection_aux(left, right, expected);
test_intersection_aux(right, left, expected);
test_intersection_aux(left, right, expected, 0);
test_intersection_aux(right, left, expected, 0);
}
fn test_intersection_aux(left: &[u32], right: &[u32], expected: &[u32]) {
fn test_intersection_aux(left: &[u32], right: &[u32], expected: &[u32], slop: u32) {
let mut left_vec = Vec::from(left);
let left_mut = &mut left_vec[..];
assert_eq!(intersection_count(left_mut, right), expected.len());
let count = intersection(left_mut, right);
if slop == 0 {
let left_mut = &mut left_vec[..];
assert_eq!(intersection_count(left_mut, right), expected.len());
let count = intersection(left_mut, right);
assert_eq!(&left_mut[..count], expected);
return;
}
let mut right_vec = Vec::from(right);
let right_mut = &mut right_vec[..];
let count = intersection_with_slop(left_mut, right_mut, slop);
assert_eq!(&left_mut[..count], expected);
}
@@ -291,6 +357,36 @@ mod tests {
test_intersection_sym(&[5, 7], &[1, 5, 10, 12], &[5]);
test_intersection_sym(&[1, 5, 6, 9, 10, 12], &[6, 8, 9, 12], &[6, 9, 12]);
}
#[test]
fn test_slop() {
// The slop is not symetric. It does not allow for the phrase to be out of order.
test_intersection_aux(&[1], &[2], &[2], 1);
test_intersection_aux(&[1], &[3], &[], 1);
test_intersection_aux(&[1], &[3], &[3], 2);
test_intersection_aux(&[], &[2], &[], 100000);
test_intersection_aux(&[5, 7, 11], &[1, 5, 10, 12], &[5, 12], 1);
test_intersection_aux(&[1, 5, 6, 9, 10, 12], &[6, 8, 9, 12], &[6, 9, 12], 1);
test_intersection_aux(&[1, 5, 6, 9, 10, 12], &[6, 8, 9, 12], &[6, 9, 12], 10);
test_intersection_aux(&[1, 3, 5], &[2, 4, 6], &[2, 4, 6], 1);
test_intersection_aux(&[1, 3, 5], &[2, 4, 6], &[], 0);
}
fn test_merge(left: &[u32], right: &[u32], expected_left: &[u32], slop: u32) {
let mut left_vec = Vec::from(left);
let left_mut = &mut left_vec[..];
let mut right_vec = Vec::from(right);
let right_mut = &mut right_vec[..];
let count = intersection_with_slop(left_mut, right_mut, slop);
assert_eq!(&left_mut[..count], expected_left);
}
#[test]
fn test_merge_slop() {
test_merge(&[1, 2], &[1], &[1], 1);
test_merge(&[3], &[4], &[4], 2);
test_merge(&[3], &[4], &[4], 2);
test_merge(&[1, 5, 6, 9, 10, 12], &[6, 8, 9, 12], &[6, 9, 12], 10);
}
}
#[cfg(all(test, feature = "unstable"))]

View File

@@ -12,6 +12,7 @@ pub struct PhraseWeight {
phrase_terms: Vec<(usize, Term)>,
similarity_weight: Bm25Weight,
scoring_enabled: bool,
slop: u32,
}
impl PhraseWeight {
@@ -21,23 +22,26 @@ impl PhraseWeight {
similarity_weight: Bm25Weight,
scoring_enabled: bool,
) -> PhraseWeight {
let slop = 0;
PhraseWeight {
phrase_terms,
similarity_weight,
scoring_enabled,
slop,
}
}
fn fieldnorm_reader(&self, reader: &SegmentReader) -> crate::Result<FieldNormReader> {
let field = self.phrase_terms[0].1.field();
if self.scoring_enabled {
reader.get_fieldnorms_reader(field)
} else {
Ok(FieldNormReader::constant(reader.max_doc(), 1))
if let Some(fieldnorm_reader) = reader.fieldnorms_readers().get_field(field)? {
return Ok(fieldnorm_reader);
}
}
Ok(FieldNormReader::constant(reader.max_doc(), 1))
}
fn phrase_scorer(
pub(crate) fn phrase_scorer(
&self,
reader: &SegmentReader,
boost: Score,
@@ -73,8 +77,13 @@ impl PhraseWeight {
similarity_weight,
fieldnorm_reader,
self.scoring_enabled,
self.slop,
)))
}
pub fn slop(&mut self, slop: u32) {
self.slop = slop;
}
}
impl Weight for PhraseWeight {

File diff suppressed because it is too large Load Diff

View File

@@ -174,7 +174,7 @@ mod tests {
);
assert_eq!(
format!("{:?}", term_query),
r#"TermQuery(Term(type=Str, field=1, val="hello"))"#
r#"TermQuery(Term(type=Str, field=1, "hello"))"#
);
}

View File

@@ -93,6 +93,10 @@ impl TermWeight {
}
}
pub fn term(&self) -> &Term {
&self.term
}
pub(crate) fn specialized_scorer(
&self,
reader: &SegmentReader,

View File

@@ -122,7 +122,7 @@ impl IndexReaderBuilder {
/// Sets the number of [Searcher] to pool.
///
/// See [Self::searcher()].
/// See [IndexReader::searcher()].
#[must_use]
pub fn num_searchers(mut self, num_searchers: usize) -> IndexReaderBuilder {
self.num_searchers = num_searchers;

View File

@@ -10,7 +10,7 @@ pub const GC_INTERVAL: Duration = Duration::from_secs(1);
/// `Warmer` can be used to maintain segment-level state e.g. caches.
///
/// They must be registered with the [IndexReaderBuilder].
/// They must be registered with the [super::IndexReaderBuilder].
pub trait Warmer: Sync + Send {
/// Perform any warming work using the provided [Searcher].
fn warm(&self, searcher: &Searcher) -> crate::Result<()>;

View File

@@ -14,10 +14,10 @@ pub struct BytesOptions {
}
/// For backward compability we add an intermediary to interpret the
/// lack of fieldnorms attribute as "true" iff indexed.
/// lack of fieldnorms attribute as "true" if and only if indexed.
///
/// (Downstream, for the moment, this attribute is not used anyway if not indexed...)
/// Note that: newly serialized IntOptions will include the new attribute.
/// (Downstream, for the moment, this attribute is not used if not indexed...)
/// Note that: newly serialized NumericOptions will include the new attribute.
#[derive(Deserialize)]
struct BytesOptionsDeser {
indexed: bool,
@@ -39,22 +39,22 @@ impl From<BytesOptionsDeser> for BytesOptions {
}
impl BytesOptions {
/// Returns true iff the value is indexed.
/// Returns true if the value is indexed.
pub fn is_indexed(&self) -> bool {
self.indexed
}
/// Returns true iff the value is normed.
/// Returns true if and only if the value is normed.
pub fn fieldnorms(&self) -> bool {
self.fieldnorms
}
/// Returns true iff the value is a fast field.
/// Returns true if the value is a fast field.
pub fn is_fast(&self) -> bool {
self.fast
}
/// Returns true iff the value is stored.
/// Returns true if the value is stored.
pub fn is_stored(&self) -> bool {
self.stored
}

View File

@@ -71,7 +71,7 @@ impl Document {
self.field_values.len()
}
/// Returns true iff the document contains no fields.
/// Returns true if the document contains no fields.
pub fn is_empty(&self) -> bool {
self.field_values.is_empty()
}
@@ -117,7 +117,16 @@ impl Document {
/// Add a bytes field
pub fn add_bytes<T: Into<Vec<u8>>>(&mut self, field: Field, value: T) {
self.add_field_value(field, value.into())
self.add_field_value(field, value.into());
}
/// Add a bytes field
pub fn add_json_object(
&mut self,
field: Field,
json_object: serde_json::Map<String, serde_json::Value>,
) {
self.add_field_value(field, json_object);
}
/// Add a (field, value) to the document.

View File

@@ -49,7 +49,7 @@ impl Facet {
Facet("".to_string())
}
/// Returns true iff the facet is the root facet `/`.
/// Returns true if the facet is the root facet `/`.
pub fn is_root(&self) -> bool {
self.encoded_str().is_empty()
}

View File

@@ -13,7 +13,7 @@ pub struct FacetOptions {
}
impl FacetOptions {
/// Returns true iff the value is stored.
/// Returns true if the value is stored.
pub fn is_stored(&self) -> bool {
self.stored
}

View File

@@ -1,7 +1,9 @@
use serde::{Deserialize, Serialize};
use crate::schema::bytes_options::BytesOptions;
use crate::schema::{is_valid_field_name, FacetOptions, FieldType, IntOptions, TextOptions};
use crate::schema::{
is_valid_field_name, FacetOptions, FieldType, JsonObjectOptions, NumericOptions, TextOptions,
};
/// A `FieldEntry` represents a field and its configuration.
/// `Schema` are a collection of `FieldEntry`
@@ -27,71 +29,44 @@ impl FieldEntry {
}
}
/// Creates a new u64 field entry in the schema, given
/// a name, and some options.
/// Creates a new text field entry.
pub fn new_text(field_name: String, text_options: TextOptions) -> FieldEntry {
assert!(is_valid_field_name(&field_name));
FieldEntry {
name: field_name,
field_type: FieldType::Str(text_options),
}
Self::new(field_name, FieldType::Str(text_options))
}
/// Creates a new u64 field entry in the schema, given
/// a name, and some options.
pub fn new_u64(field_name: String, field_type: IntOptions) -> FieldEntry {
assert!(is_valid_field_name(&field_name));
FieldEntry {
name: field_name,
field_type: FieldType::U64(field_type),
}
/// Creates a new u64 field entry.
pub fn new_u64(field_name: String, int_options: NumericOptions) -> FieldEntry {
Self::new(field_name, FieldType::U64(int_options))
}
/// Creates a new i64 field entry in the schema, given
/// a name, and some options.
pub fn new_i64(field_name: String, field_type: IntOptions) -> FieldEntry {
assert!(is_valid_field_name(&field_name));
FieldEntry {
name: field_name,
field_type: FieldType::I64(field_type),
}
/// Creates a new i64 field entry.
pub fn new_i64(field_name: String, int_options: NumericOptions) -> FieldEntry {
Self::new(field_name, FieldType::I64(int_options))
}
/// Creates a new f64 field entry in the schema, given
/// a name, and some options.
pub fn new_f64(field_name: String, field_type: IntOptions) -> FieldEntry {
assert!(is_valid_field_name(&field_name));
FieldEntry {
name: field_name,
field_type: FieldType::F64(field_type),
}
/// Creates a new f64 field entry.
pub fn new_f64(field_name: String, f64_options: NumericOptions) -> FieldEntry {
Self::new(field_name, FieldType::F64(f64_options))
}
/// Creates a new date field entry in the schema, given
/// a name, and some options.
pub fn new_date(field_name: String, field_type: IntOptions) -> FieldEntry {
assert!(is_valid_field_name(&field_name));
FieldEntry {
name: field_name,
field_type: FieldType::Date(field_type),
}
/// Creates a new date field entry.
pub fn new_date(field_name: String, date_options: NumericOptions) -> FieldEntry {
Self::new(field_name, FieldType::Date(date_options))
}
/// Creates a field entry for a facet.
pub fn new_facet(field_name: String, field_type: FacetOptions) -> FieldEntry {
assert!(is_valid_field_name(&field_name));
FieldEntry {
name: field_name,
field_type: FieldType::Facet(field_type),
}
pub fn new_facet(field_name: String, facet_options: FacetOptions) -> FieldEntry {
Self::new(field_name, FieldType::Facet(facet_options))
}
/// Creates a field entry for a bytes field
pub fn new_bytes(field_name: String, bytes_type: BytesOptions) -> FieldEntry {
FieldEntry {
name: field_name,
field_type: FieldType::Bytes(bytes_type),
}
pub fn new_bytes(field_name: String, bytes_options: BytesOptions) -> FieldEntry {
Self::new(field_name, FieldType::Bytes(bytes_options))
}
/// Creates a field entry for a json field
pub fn new_json(field_name: String, json_object_options: JsonObjectOptions) -> FieldEntry {
Self::new(field_name, FieldType::JsonObject(json_object_options))
}
/// Returns the name of the field
@@ -104,19 +79,19 @@ impl FieldEntry {
&self.field_type
}
/// Returns true iff the field is indexed.
/// Returns true if the field is indexed.
///
/// An indexed field is searchable.
pub fn is_indexed(&self) -> bool {
self.field_type.is_indexed()
}
/// Returns true iff the field is normed
/// Returns true if the field is normed
pub fn has_fieldnorms(&self) -> bool {
self.field_type.has_fieldnorms()
}
/// Returns true iff the field is a int (signed or unsigned) fast field
/// Returns true if the field is a int (signed or unsigned) fast field
pub fn is_fast(&self) -> bool {
match self.field_type {
FieldType::U64(ref options)
@@ -127,7 +102,7 @@ impl FieldEntry {
}
}
/// Returns true iff the field is stored
/// Returns true if the field is stored
pub fn is_stored(&self) -> bool {
match self.field_type {
FieldType::U64(ref options)
@@ -137,6 +112,7 @@ impl FieldEntry {
FieldType::Str(ref options) => options.is_stored(),
FieldType::Facet(ref options) => options.is_stored(),
FieldType::Bytes(ref options) => options.is_stored(),
FieldType::JsonObject(ref options) => options.is_stored(),
}
}
}

View File

@@ -1,25 +1,32 @@
use chrono::{FixedOffset, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use thiserror::Error;
use crate::schema::bytes_options::BytesOptions;
use crate::schema::facet_options::FacetOptions;
use crate::schema::{Facet, IndexRecordOption, IntOptions, TextFieldIndexing, TextOptions, Value};
use crate::schema::{
Facet, IndexRecordOption, JsonObjectOptions, NumericOptions, TextFieldIndexing, TextOptions,
Value,
};
use crate::tokenizer::PreTokenizedString;
/// Possible error that may occur while parsing a field value
/// At this point the JSON is known to be valid.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Error)]
pub enum ValueParsingError {
/// Encountered a numerical value that overflows or underflow its integer type.
OverflowError(String),
/// The json node is not of the correct type.
/// (e.g. 3 for a `Str` type or `"abc"` for a u64 type)
/// Tantivy will try to autocast values.
TypeError(String),
/// The json node is a string but contains json that is
/// not valid base64.
InvalidBase64(String),
#[error("Overflow error. Expected {expected}, got {json}")]
OverflowError {
expected: &'static str,
json: serde_json::Value,
},
#[error("Type error. Expected {expected}, got {json}")]
TypeError {
expected: &'static str,
json: serde_json::Value,
},
#[error("Invalid base64: {base64}")]
InvalidBase64 { base64: String },
}
/// Type of the value that a field can take.
@@ -43,9 +50,11 @@ pub enum Type {
Facet = b'h',
/// `Vec<u8>`
Bytes = b'b',
/// Leaf in a Json object.
Json = b'j',
}
const ALL_TYPES: [Type; 7] = [
const ALL_TYPES: [Type; 8] = [
Type::Str,
Type::U64,
Type::I64,
@@ -53,6 +62,7 @@ const ALL_TYPES: [Type; 7] = [
Type::Date,
Type::Facet,
Type::Bytes,
Type::Json,
];
impl Type {
@@ -67,6 +77,20 @@ impl Type {
*self as u8
}
/// Returns a human readable name for the Type.
pub fn name(&self) -> &'static str {
match self {
Type::Str => "Str",
Type::U64 => "U64",
Type::I64 => "I64",
Type::F64 => "F64",
Type::Date => "Date",
Type::Facet => "Facet",
Type::Bytes => "Bytes",
Type::Json => "Json",
}
}
/// Interprets a 1byte code as a type.
/// Returns None if the code is invalid.
pub fn from_code(code: u8) -> Option<Self> {
@@ -78,6 +102,7 @@ impl Type {
b'd' => Some(Type::Date),
b'h' => Some(Type::Facet),
b'b' => Some(Type::Bytes),
b'j' => Some(Type::Json),
_ => None,
}
}
@@ -93,17 +118,19 @@ pub enum FieldType {
#[serde(rename = "text")]
Str(TextOptions),
/// Unsigned 64-bits integers field type configuration
U64(IntOptions),
U64(NumericOptions),
/// Signed 64-bits integers 64 field type configuration
I64(IntOptions),
I64(NumericOptions),
/// 64-bits float 64 field type configuration
F64(IntOptions),
F64(NumericOptions),
/// Signed 64-bits Date 64 field type configuration,
Date(IntOptions),
Date(NumericOptions),
/// Hierachical Facet
Facet(FacetOptions),
/// Bytes (one per document)
Bytes(BytesOptions),
/// Json object
JsonObject(JsonObjectOptions),
}
impl FieldType {
@@ -117,10 +144,11 @@ impl FieldType {
FieldType::Date(_) => Type::Date,
FieldType::Facet(_) => Type::Facet,
FieldType::Bytes(_) => Type::Bytes,
FieldType::JsonObject(_) => Type::Json,
}
}
/// returns true iff the field is indexed.
/// returns true if the field is indexed.
pub fn is_indexed(&self) -> bool {
match *self {
FieldType::Str(ref text_options) => text_options.get_indexing_options().is_some(),
@@ -130,10 +158,32 @@ impl FieldType {
FieldType::Date(ref date_options) => date_options.is_indexed(),
FieldType::Facet(ref _facet_options) => true,
FieldType::Bytes(ref bytes_options) => bytes_options.is_indexed(),
FieldType::JsonObject(ref json_object_options) => json_object_options.is_indexed(),
}
}
/// returns true iff the field is normed.
/// Returns the index record option for the field.
///
/// If the field is not indexed, returns `None`.
pub fn index_record_option(&self) -> Option<IndexRecordOption> {
match self {
FieldType::Str(text_options) => text_options
.get_indexing_options()
.map(|text_indexing| text_indexing.index_option()),
FieldType::JsonObject(json_object_options) => json_object_options
.get_text_indexing_options()
.map(|text_indexing| text_indexing.index_option()),
field_type => {
if field_type.is_indexed() {
Some(IndexRecordOption::Basic)
} else {
None
}
}
}
}
/// returns true if the field is normed.
pub fn has_fieldnorms(&self) -> bool {
match *self {
FieldType::Str(ref text_options) => text_options
@@ -146,12 +196,17 @@ impl FieldType {
| FieldType::Date(ref int_options) => int_options.fieldnorms(),
FieldType::Facet(_) => false,
FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(),
FieldType::JsonObject(ref _json_object_options) => false,
}
}
/// Given a field configuration, return the maximal possible
/// `IndexRecordOption` available.
///
/// For the Json object, this does not necessarily mean it is the index record
/// option level is available for all terms.
/// (Non string terms have the Basic indexing option at most.)
///
/// If the field is not indexed, then returns `None`.
pub fn get_index_record_option(&self) -> Option<IndexRecordOption> {
match *self {
@@ -176,6 +231,9 @@ impl FieldType {
None
}
}
FieldType::JsonObject(ref json_obj_options) => json_obj_options
.get_text_indexing_options()
.map(TextFieldIndexing::index_option),
}
}
@@ -184,91 +242,100 @@ impl FieldType {
/// Tantivy will not try to cast values.
/// For instance, If the json value is the integer `3` and the
/// target field is a `Str`, this method will return an Error.
pub fn value_from_json(&self, json: &JsonValue) -> Result<Value, ValueParsingError> {
match *json {
JsonValue::String(ref field_text) => match *self {
pub fn value_from_json(&self, json: JsonValue) -> Result<Value, ValueParsingError> {
match json {
JsonValue::String(field_text) => match *self {
FieldType::Date(_) => {
let dt_with_fixed_tz: chrono::DateTime<FixedOffset> =
chrono::DateTime::parse_from_rfc3339(field_text).map_err(|err| {
ValueParsingError::TypeError(format!(
"Failed to parse date from JSON. Expected rfc3339 format, got {}. \
{:?}",
field_text, err
))
chrono::DateTime::parse_from_rfc3339(&field_text).map_err(|_err| {
ValueParsingError::TypeError {
expected: "rfc3339 format",
json: JsonValue::String(field_text),
}
})?;
Ok(Value::Date(dt_with_fixed_tz.with_timezone(&Utc)))
}
FieldType::Str(_) => Ok(Value::Str(field_text.clone())),
FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) => Err(
ValueParsingError::TypeError(format!("Expected an integer, got {:?}", json)),
),
FieldType::Facet(_) => Ok(Value::Facet(Facet::from(field_text))),
FieldType::Bytes(_) => base64::decode(field_text).map(Value::Bytes).map_err(|_| {
ValueParsingError::InvalidBase64(format!(
"Expected base64 string, got {:?}",
field_text
))
FieldType::Str(_) => Ok(Value::Str(field_text)),
FieldType::U64(_) | FieldType::I64(_) | FieldType::F64(_) => {
Err(ValueParsingError::TypeError {
expected: "an integer",
json: JsonValue::String(field_text),
})
}
FieldType::Facet(_) => Ok(Value::Facet(Facet::from(&field_text))),
FieldType::Bytes(_) => base64::decode(&field_text)
.map(Value::Bytes)
.map_err(|_| ValueParsingError::InvalidBase64 { base64: field_text }),
FieldType::JsonObject(_) => Err(ValueParsingError::TypeError {
expected: "a json object",
json: JsonValue::String(field_text),
}),
},
JsonValue::Number(ref field_val_num) => match *self {
JsonValue::Number(field_val_num) => match self {
FieldType::I64(_) | FieldType::Date(_) => {
if let Some(field_val_i64) = field_val_num.as_i64() {
Ok(Value::I64(field_val_i64))
} else {
let msg = format!("Expected an i64 int, got {:?}", json);
Err(ValueParsingError::OverflowError(msg))
Err(ValueParsingError::OverflowError {
expected: "an i64 int",
json: JsonValue::Number(field_val_num),
})
}
}
FieldType::U64(_) => {
if let Some(field_val_u64) = field_val_num.as_u64() {
Ok(Value::U64(field_val_u64))
} else {
let msg = format!("Expected a u64 int, got {:?}", json);
Err(ValueParsingError::OverflowError(msg))
Err(ValueParsingError::OverflowError {
expected: "u64",
json: JsonValue::Number(field_val_num),
})
}
}
FieldType::F64(_) => {
if let Some(field_val_f64) = field_val_num.as_f64() {
Ok(Value::F64(field_val_f64))
} else {
let msg = format!("Expected a f64 int, got {:?}", json);
Err(ValueParsingError::OverflowError(msg))
Err(ValueParsingError::OverflowError {
expected: "a f64",
json: JsonValue::Number(field_val_num),
})
}
}
FieldType::Str(_) | FieldType::Facet(_) | FieldType::Bytes(_) => {
let msg = format!("Expected a string, got {:?}", json);
Err(ValueParsingError::TypeError(msg))
Err(ValueParsingError::TypeError {
expected: "a string",
json: JsonValue::Number(field_val_num),
})
}
FieldType::JsonObject(_) => Err(ValueParsingError::TypeError {
expected: "a json object",
json: JsonValue::Number(field_val_num),
}),
},
JsonValue::Object(_) => match *self {
JsonValue::Object(json_map) => match self {
FieldType::Str(_) => {
if let Ok(tok_str_val) =
serde_json::from_value::<PreTokenizedString>(json.clone())
{
if let Ok(tok_str_val) = serde_json::from_value::<PreTokenizedString>(
serde_json::Value::Object(json_map.clone()),
) {
Ok(Value::PreTokStr(tok_str_val))
} else {
let msg = format!(
"Json value {:?} cannot be translated to PreTokenizedString.",
json
);
Err(ValueParsingError::TypeError(msg))
Err(ValueParsingError::TypeError {
expected: "a string or an pretokenized string",
json: JsonValue::Object(json_map),
})
}
}
_ => {
let msg = format!(
"Json value not supported error {:?}. Expected {:?}",
json, self
);
Err(ValueParsingError::TypeError(msg))
}
FieldType::JsonObject(_) => Ok(Value::JsonObject(json_map)),
_ => Err(ValueParsingError::TypeError {
expected: self.value_type().name(),
json: JsonValue::Object(json_map),
}),
},
_ => {
let msg = format!(
"Json value not supported error {:?}. Expected {:?}",
json, self
);
Err(ValueParsingError::TypeError(msg))
}
_ => Err(ValueParsingError::TypeError {
expected: self.value_type().name(),
json: json.clone(),
}),
}
}
}
@@ -276,6 +343,7 @@ impl FieldType {
#[cfg(test)]
mod tests {
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
use serde_json::json;
use super::FieldType;
use crate::schema::field_type::ValueParsingError;
@@ -311,19 +379,19 @@ mod tests {
#[test]
fn test_bytes_value_from_json() {
let result = FieldType::Bytes(Default::default())
.value_from_json(&json!("dGhpcyBpcyBhIHRlc3Q="))
.value_from_json(json!("dGhpcyBpcyBhIHRlc3Q="))
.unwrap();
assert_eq!(result, Value::Bytes("this is a test".as_bytes().to_vec()));
let result = FieldType::Bytes(Default::default()).value_from_json(&json!(521));
let result = FieldType::Bytes(Default::default()).value_from_json(json!(521));
match result {
Err(ValueParsingError::TypeError(_)) => {}
Err(ValueParsingError::TypeError { .. }) => {}
_ => panic!("Expected parse failure for wrong type"),
}
let result = FieldType::Bytes(Default::default()).value_from_json(&json!("-"));
let result = FieldType::Bytes(Default::default()).value_from_json(json!("-"));
match result {
Err(ValueParsingError::InvalidBase64(_)) => {}
Err(ValueParsingError::InvalidBase64 { .. }) => {}
_ => panic!("Expected parse failure for invalid base64"),
}
}
@@ -385,7 +453,7 @@ mod tests {
});
let deserialized_value = FieldType::Str(TextOptions::default())
.value_from_json(&serde_json::from_str(pre_tokenized_string_json).unwrap())
.value_from_json(serde_json::from_str(pre_tokenized_string_json).unwrap())
.unwrap();
assert_eq!(deserialized_value, expected_value);

View File

@@ -1,6 +1,6 @@
use std::ops::BitOr;
use crate::schema::{IntOptions, TextOptions};
use crate::schema::{NumericOptions, TextOptions};
#[derive(Clone)]
pub struct StoredFlag;
@@ -22,8 +22,8 @@ pub const STORED: SchemaFlagList<StoredFlag, ()> = SchemaFlagList {
pub struct IndexedFlag;
/// Flag to mark the field as indexed. An indexed field is searchable and has a fieldnorm.
///
/// The `INDEXED` flag can only be used when building `IntOptions` (`u64`, `i64` and `f64` fields)
/// Of course, text fields can also be indexed... But this is expressed by using either the
/// The `INDEXED` flag can only be used when building `NumericOptions` (`u64`, `i64` and `f64`
/// fields) Of course, text fields can also be indexed... But this is expressed by using either the
/// `STRING` (untokenized) or `TEXT` (tokenized with the english tokenizer) flags.
pub const INDEXED: SchemaFlagList<IndexedFlag, ()> = SchemaFlagList {
head: IndexedFlag,
@@ -36,7 +36,7 @@ pub struct FastFlag;
///
/// Fast fields can be random-accessed rapidly. Fields useful for scoring, filtering
/// or collection should be mark as fast fields.
/// The `FAST` flag can only be used when building `IntOptions` (`u64`, `i64` and `f64` fields)
/// The `FAST` flag can only be used when building `NumericOptions` (`u64`, `i64` and `f64` fields)
pub const FAST: SchemaFlagList<FastFlag, ()> = SchemaFlagList {
head: FastFlag,
tail: (),
@@ -58,10 +58,10 @@ where
}
}
impl<T: Clone + Into<IntOptions>> BitOr<IntOptions> for SchemaFlagList<T, ()> {
type Output = IntOptions;
impl<T: Clone + Into<NumericOptions>> BitOr<NumericOptions> for SchemaFlagList<T, ()> {
type Output = NumericOptions;
fn bitor(self, rhs: IntOptions) -> Self::Output {
fn bitor(self, rhs: NumericOptions) -> Self::Output {
self.head.into() | rhs
}
}

View File

@@ -30,7 +30,7 @@ pub enum IndexRecordOption {
}
impl IndexRecordOption {
/// Returns true iff this option includes encoding
/// Returns true if this option includes encoding
/// term frequencies.
pub fn has_freq(self) -> bool {
match self {
@@ -39,7 +39,7 @@ impl IndexRecordOption {
}
}
/// Returns true iff this option include encoding
/// Returns true if this option include encoding
/// term positions.
pub fn has_positions(self) -> bool {
match self {

View File

@@ -0,0 +1,109 @@
use std::ops::BitOr;
use serde::{Deserialize, Serialize};
use crate::schema::flags::{SchemaFlagList, StoredFlag};
use crate::schema::{TextFieldIndexing, TextOptions};
/// The `JsonObjectOptions` make it possible to
/// configure how a json object field should be indexed and stored.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct JsonObjectOptions {
stored: bool,
// If set to some, int, date, f64 and text will be indexed.
// Text will use the TextFieldIndexing setting for indexing.
indexing: Option<TextFieldIndexing>,
}
impl JsonObjectOptions {
/// Returns `true` if the json object should be stored.
pub fn is_stored(&self) -> bool {
self.stored
}
/// Returns `true` iff the json object should be indexed.
pub fn is_indexed(&self) -> bool {
self.indexing.is_some()
}
/// Returns the text indexing options.
///
/// If set to `Some` then both int and str values will be indexed.
/// The inner `TextFieldIndexing` will however, only apply to the str values
/// in the json object.
pub fn get_text_indexing_options(&self) -> Option<&TextFieldIndexing> {
self.indexing.as_ref()
}
}
impl From<StoredFlag> for JsonObjectOptions {
fn from(_stored_flag: StoredFlag) -> Self {
JsonObjectOptions {
stored: true,
indexing: None,
}
}
}
impl From<()> for JsonObjectOptions {
fn from(_: ()) -> Self {
Self::default()
}
}
impl<T: Into<JsonObjectOptions>> BitOr<T> for JsonObjectOptions {
type Output = JsonObjectOptions;
fn bitor(self, other: T) -> Self {
let other = other.into();
JsonObjectOptions {
indexing: self.indexing.or(other.indexing),
stored: self.stored | other.stored,
}
}
}
impl<Head, Tail> From<SchemaFlagList<Head, Tail>> for JsonObjectOptions
where
Head: Clone,
Tail: Clone,
Self: BitOr<Output = Self> + From<Head> + From<Tail>,
{
fn from(head_tail: SchemaFlagList<Head, Tail>) -> Self {
Self::from(head_tail.head) | Self::from(head_tail.tail)
}
}
impl From<TextOptions> for JsonObjectOptions {
fn from(text_options: TextOptions) -> Self {
JsonObjectOptions {
stored: text_options.is_stored(),
indexing: text_options.get_indexing_options().cloned(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{STORED, TEXT};
#[test]
fn test_json_options() {
{
let json_options: JsonObjectOptions = (STORED | TEXT).into();
assert!(json_options.is_stored());
assert!(json_options.is_indexed());
}
{
let json_options: JsonObjectOptions = TEXT.into();
assert!(!json_options.is_stored());
assert!(json_options.is_indexed());
}
{
let json_options: JsonObjectOptions = STORED.into();
assert!(json_options.is_stored());
assert!(!json_options.is_indexed());
}
}
}

View File

@@ -60,7 +60,7 @@
//! ```
//! use tantivy::schema::*;
//! let mut schema_builder = Schema::builder();
//! let num_stars_options = IntOptions::default()
//! let num_stars_options = NumericOptions::default()
//! .set_stored()
//! .set_indexed();
//! schema_builder.add_u64_field("num_stars", num_stars_options);
@@ -104,7 +104,7 @@ mod document;
mod facet;
mod facet_options;
mod schema;
mod term;
pub(crate) mod term;
mod field_entry;
mod field_type;
@@ -112,14 +112,14 @@ mod field_value;
mod bytes_options;
mod field;
mod flags;
mod index_record_option;
mod int_options;
mod json_object_options;
mod named_field_document;
mod numeric_options;
mod text_options;
mod value;
mod flags;
pub use self::bytes_options::BytesOptions;
pub use self::document::Document;
pub(crate) use self::facet::FACET_SEP_BYTE;
@@ -131,8 +131,11 @@ pub use self::field_type::{FieldType, Type};
pub use self::field_value::FieldValue;
pub use self::flags::{FAST, INDEXED, STORED};
pub use self::index_record_option::IndexRecordOption;
pub use self::int_options::{Cardinality, IntOptions};
pub use self::json_object_options::JsonObjectOptions;
pub use self::named_field_document::NamedFieldDocument;
pub use self::numeric_options::NumericOptions;
#[allow(deprecated)]
pub use self::numeric_options::{Cardinality, IntOptions};
pub use self::schema::{DocParsingError, Schema, SchemaBuilder};
pub use self::term::Term;
pub use self::text_options::{TextFieldIndexing, TextOptions, STRING, TEXT};

View File

@@ -16,10 +16,14 @@ pub enum Cardinality {
MultiValues,
}
#[deprecated(since = "0.17.0", note = "Use NumericOptions instead.")]
/// Deprecated use [NumericOptions] instead.
pub type IntOptions = NumericOptions;
/// Define how an u64, i64, of f64 field should be handled by tantivy.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(from = "IntOptionsDeser")]
pub struct IntOptions {
#[serde(from = "NumericOptionsDeser")]
pub struct NumericOptions {
indexed: bool,
// This boolean has no effect if the field is not marked as indexed too.
fieldnorms: bool, // This attribute only has an effect if indexed is true.
@@ -29,12 +33,12 @@ pub struct IntOptions {
}
/// For backward compability we add an intermediary to interpret the
/// lack of fieldnorms attribute as "true" iff indexed.
/// lack of fieldnorms attribute as "true" if and only if indexed.
///
/// (Downstream, for the moment, this attribute is not used anyway if not indexed...)
/// Note that: newly serialized IntOptions will include the new attribute.
/// Note that: newly serialized NumericOptions will include the new attribute.
#[derive(Deserialize)]
struct IntOptionsDeser {
struct NumericOptionsDeser {
indexed: bool,
#[serde(default)]
fieldnorms: Option<bool>, // This attribute only has an effect if indexed is true.
@@ -43,9 +47,9 @@ struct IntOptionsDeser {
stored: bool,
}
impl From<IntOptionsDeser> for IntOptions {
fn from(deser: IntOptionsDeser) -> Self {
IntOptions {
impl From<NumericOptionsDeser> for NumericOptions {
fn from(deser: NumericOptionsDeser) -> Self {
NumericOptions {
indexed: deser.indexed,
fieldnorms: deser.fieldnorms.unwrap_or(deser.indexed),
fast: deser.fast,
@@ -54,7 +58,7 @@ impl From<IntOptionsDeser> for IntOptions {
}
}
impl IntOptions {
impl NumericOptions {
/// Returns true iff the value is stored.
pub fn is_stored(&self) -> bool {
self.stored
@@ -70,6 +74,15 @@ impl IntOptions {
self.fieldnorms && self.indexed
}
/// Returns true iff the value is a fast field and multivalue.
pub fn is_multivalue_fast(&self) -> bool {
if let Some(cardinality) = self.fast {
cardinality == Cardinality::MultiValues
} else {
false
}
}
/// Returns true iff the value is a fast field.
pub fn is_fast(&self) -> bool {
self.fast.is_some()
@@ -80,7 +93,7 @@ impl IntOptions {
/// Only the fields that are set as *stored* are
/// persisted into the Tantivy's store.
#[must_use]
pub fn set_stored(mut self) -> IntOptions {
pub fn set_stored(mut self) -> NumericOptions {
self.stored = true;
self
}
@@ -92,7 +105,7 @@ impl IntOptions {
///
/// This is required for the field to be searchable.
#[must_use]
pub fn set_indexed(mut self) -> IntOptions {
pub fn set_indexed(mut self) -> NumericOptions {
self.indexed = true;
self
}
@@ -102,7 +115,7 @@ impl IntOptions {
/// Setting an integer as fieldnorm will generate
/// the fieldnorm data for it.
#[must_use]
pub fn set_fieldnorm(mut self) -> IntOptions {
pub fn set_fieldnorm(mut self) -> NumericOptions {
self.fieldnorms = true;
self
}
@@ -114,7 +127,7 @@ impl IntOptions {
/// If more than one value is associated to a fast field, only the last one is
/// kept.
#[must_use]
pub fn set_fast(mut self, cardinality: Cardinality) -> IntOptions {
pub fn set_fast(mut self, cardinality: Cardinality) -> NumericOptions {
self.fast = Some(cardinality);
self
}
@@ -128,15 +141,15 @@ impl IntOptions {
}
}
impl From<()> for IntOptions {
fn from(_: ()) -> IntOptions {
IntOptions::default()
impl From<()> for NumericOptions {
fn from(_: ()) -> NumericOptions {
NumericOptions::default()
}
}
impl From<FastFlag> for IntOptions {
impl From<FastFlag> for NumericOptions {
fn from(_: FastFlag) -> Self {
IntOptions {
NumericOptions {
indexed: false,
fieldnorms: false,
stored: false,
@@ -145,9 +158,9 @@ impl From<FastFlag> for IntOptions {
}
}
impl From<StoredFlag> for IntOptions {
impl From<StoredFlag> for NumericOptions {
fn from(_: StoredFlag) -> Self {
IntOptions {
NumericOptions {
indexed: false,
fieldnorms: false,
stored: true,
@@ -156,9 +169,9 @@ impl From<StoredFlag> for IntOptions {
}
}
impl From<IndexedFlag> for IntOptions {
impl From<IndexedFlag> for NumericOptions {
fn from(_: IndexedFlag) -> Self {
IntOptions {
NumericOptions {
indexed: true,
fieldnorms: true,
stored: false,
@@ -167,12 +180,12 @@ impl From<IndexedFlag> for IntOptions {
}
}
impl<T: Into<IntOptions>> BitOr<T> for IntOptions {
type Output = IntOptions;
impl<T: Into<NumericOptions>> BitOr<T> for NumericOptions {
type Output = NumericOptions;
fn bitor(self, other: T) -> IntOptions {
fn bitor(self, other: T) -> NumericOptions {
let other = other.into();
IntOptions {
NumericOptions {
indexed: self.indexed | other.indexed,
fieldnorms: self.fieldnorms | other.fieldnorms,
stored: self.stored | other.stored,
@@ -181,7 +194,7 @@ impl<T: Into<IntOptions>> BitOr<T> for IntOptions {
}
}
impl<Head, Tail> From<SchemaFlagList<Head, Tail>> for IntOptions
impl<Head, Tail> From<SchemaFlagList<Head, Tail>> for NumericOptions
where
Head: Clone,
Tail: Clone,
@@ -202,10 +215,10 @@ mod tests {
"indexed": true,
"stored": false
}"#;
let int_options: IntOptions = serde_json::from_str(json).unwrap();
let int_options: NumericOptions = serde_json::from_str(json).unwrap();
assert_eq!(
&int_options,
&IntOptions {
&NumericOptions {
indexed: true,
fieldnorms: true,
fast: None,
@@ -220,10 +233,10 @@ mod tests {
"indexed": false,
"stored": false
}"#;
let int_options: IntOptions = serde_json::from_str(json).unwrap();
let int_options: NumericOptions = serde_json::from_str(json).unwrap();
assert_eq!(
&int_options,
&IntOptions {
&NumericOptions {
indexed: false,
fieldnorms: false,
fast: None,
@@ -239,10 +252,10 @@ mod tests {
"fieldnorms": false,
"stored": false
}"#;
let int_options: IntOptions = serde_json::from_str(json).unwrap();
let int_options: NumericOptions = serde_json::from_str(json).unwrap();
assert_eq!(
&int_options,
&IntOptions {
&NumericOptions {
indexed: true,
fieldnorms: false,
fast: None,
@@ -259,10 +272,10 @@ mod tests {
"fieldnorms": true,
"stored": false
}"#;
let int_options: IntOptions = serde_json::from_str(json).unwrap();
let int_options: NumericOptions = serde_json::from_str(json).unwrap();
assert_eq!(
&int_options,
&IntOptions {
&NumericOptions {
indexed: false,
fieldnorms: true,
fast: None,

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use serde::de::{SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{self, Map as JsonObject, Value as JsonValue};
use serde_json::{self, Value as JsonValue};
use super::*;
use crate::schema::bytes_options::BytesOptions;
@@ -52,7 +52,7 @@ impl SchemaBuilder {
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_u64_field<T: Into<IntOptions>>(
pub fn add_u64_field<T: Into<NumericOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
@@ -72,7 +72,7 @@ impl SchemaBuilder {
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_i64_field<T: Into<IntOptions>>(
pub fn add_i64_field<T: Into<NumericOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
@@ -92,7 +92,7 @@ impl SchemaBuilder {
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_f64_field<T: Into<IntOptions>>(
pub fn add_f64_field<T: Into<NumericOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
@@ -114,7 +114,7 @@ impl SchemaBuilder {
/// by the second one.
/// The first field will get a field id
/// but only the second one will be indexed
pub fn add_date_field<T: Into<IntOptions>>(
pub fn add_date_field<T: Into<NumericOptions>>(
&mut self,
field_name_str: &str,
field_options: T,
@@ -173,6 +173,16 @@ impl SchemaBuilder {
self.add_field(field_entry)
}
/// Adds a json object field to the schema.
pub fn add_json_field<T: Into<JsonObjectOptions>>(
&mut self,
field_name: &str,
field_options: T,
) -> Field {
let field_entry = FieldEntry::new_json(field_name.to_string(), field_options.into());
self.add_field(field_entry)
}
/// Adds a field entry to the schema in build.
pub fn add_field(&mut self, field_entry: FieldEntry) -> Field {
let field = Field::from_field_id(self.fields.len() as u32);
@@ -298,23 +308,23 @@ impl Schema {
/// Build a document object from a json-object.
pub fn parse_document(&self, doc_json: &str) -> Result<Document, DocParsingError> {
let json_obj: JsonObject<String, JsonValue> =
serde_json::from_str(doc_json).map_err(|_| {
let doc_json_sample: String = if doc_json.len() < 20 {
String::from(doc_json)
} else {
format!("{:?}...", &doc_json[0..20])
};
DocParsingError::NotJson(doc_json_sample)
})?;
let json_obj: serde_json::Map<String, JsonValue> =
serde_json::from_str(doc_json).map_err(|_| DocParsingError::invalid_json(doc_json))?;
self.json_object_to_doc(json_obj)
}
/// Build a document object from a json-object.
pub fn json_object_to_doc(
&self,
json_obj: serde_json::Map<String, JsonValue>,
) -> Result<Document, DocParsingError> {
let mut doc = Document::default();
for (field_name, json_value) in json_obj.iter() {
if let Some(field) = self.get_field(field_name) {
for (field_name, json_value) in json_obj {
if let Some(field) = self.get_field(&field_name) {
let field_entry = self.get_field_entry(field);
let field_type = field_entry.field_type();
match *json_value {
JsonValue::Array(ref json_items) => {
match json_value {
JsonValue::Array(json_items) => {
for json_item in json_items {
let value = field_type
.value_from_json(json_item)
@@ -383,12 +393,24 @@ impl<'de> Deserialize<'de> for Schema {
pub enum DocParsingError {
/// The payload given is not valid JSON.
#[error("The provided string is not valid JSON")]
NotJson(String),
InvalidJson(String),
/// One of the value node could not be parsed.
#[error("The field '{0:?}' could not be parsed: {1:?}")]
ValueError(String, ValueParsingError),
}
impl DocParsingError {
/// Builds a NotJson DocParsingError
fn invalid_json(invalid_json: &str) -> Self {
let sample_json: String = if invalid_json.len() < 20 {
invalid_json.to_string()
} else {
format!("{:?}...", &invalid_json[0..20])
};
DocParsingError::InvalidJson(sample_json)
}
}
#[cfg(test)]
mod tests {
@@ -398,8 +420,8 @@ mod tests {
use serde_json;
use crate::schema::field_type::ValueParsingError;
use crate::schema::int_options::Cardinality::SingleValue;
use crate::schema::schema::DocParsingError::NotJson;
use crate::schema::numeric_options::Cardinality::SingleValue;
use crate::schema::schema::DocParsingError::InvalidJson;
use crate::schema::*;
#[test]
@@ -413,13 +435,13 @@ mod tests {
#[test]
pub fn test_schema_serialization() {
let mut schema_builder = Schema::builder();
let count_options = IntOptions::default()
let count_options = NumericOptions::default()
.set_stored()
.set_fast(Cardinality::SingleValue);
let popularity_options = IntOptions::default()
let popularity_options = NumericOptions::default()
.set_stored()
.set_fast(Cardinality::SingleValue);
let score_options = IntOptions::default()
let score_options = NumericOptions::default()
.set_indexed()
.set_fieldnorm()
.set_fast(Cardinality::SingleValue);
@@ -529,7 +551,7 @@ mod tests {
#[test]
pub fn test_document_to_json() {
let mut schema_builder = Schema::builder();
let count_options = IntOptions::default()
let count_options = NumericOptions::default()
.set_stored()
.set_fast(Cardinality::SingleValue);
schema_builder.add_text_field("title", TEXT);
@@ -594,13 +616,13 @@ mod tests {
#[test]
pub fn test_parse_document() {
let mut schema_builder = Schema::builder();
let count_options = IntOptions::default()
let count_options = NumericOptions::default()
.set_stored()
.set_fast(Cardinality::SingleValue);
let popularity_options = IntOptions::default()
let popularity_options = NumericOptions::default()
.set_stored()
.set_fast(Cardinality::SingleValue);
let score_options = IntOptions::default()
let score_options = NumericOptions::default()
.set_indexed()
.set_fast(Cardinality::SingleValue);
let title_field = schema_builder.add_text_field("title", TEXT);
@@ -666,7 +688,7 @@ mod tests {
json_err,
Err(DocParsingError::ValueError(
_,
ValueParsingError::TypeError(_)
ValueParsingError::TypeError { .. }
))
);
}
@@ -684,7 +706,7 @@ mod tests {
json_err,
Err(DocParsingError::ValueError(
_,
ValueParsingError::OverflowError(_)
ValueParsingError::OverflowError { .. }
))
);
}
@@ -702,7 +724,7 @@ mod tests {
json_err,
Err(DocParsingError::ValueError(
_,
ValueParsingError::OverflowError(_)
ValueParsingError::OverflowError { .. }
))
));
}
@@ -720,7 +742,7 @@ mod tests {
json_err,
Err(DocParsingError::ValueError(
_,
ValueParsingError::OverflowError(_)
ValueParsingError::OverflowError { .. }
))
);
}
@@ -732,7 +754,7 @@ mod tests {
"count": 50,
}"#,
);
assert_matches!(json_err, Err(NotJson(_)));
assert_matches!(json_err, Err(InvalidJson(_)));
}
}
@@ -744,7 +766,7 @@ mod tests {
.set_tokenizer("raw")
.set_index_option(IndexRecordOption::Basic),
);
let timestamp_options = IntOptions::default()
let timestamp_options = NumericOptions::default()
.set_stored()
.set_indexed()
.set_fieldnorm()

View File

@@ -1,3 +1,4 @@
use std::convert::TryInto;
use std::hash::{Hash, Hasher};
use std::{fmt, str};
@@ -8,8 +9,26 @@ use crate::DateTime;
/// Size (in bytes) of the buffer of a fast value (u64, i64, f64, or date) term.
/// <field> + <type byte> + <value len>
///
/// - <field> is a big endian encoded u32 field id
/// - <type_byte>'s most significant bit expresses whether the term is a json term or not
/// The remaining 7 bits are used to encode the type of the value.
/// If this is a JSON term, the type is the type of the leaf of the json.
///
/// - <value> is, if this is not the json term, a binary representation specific to the type.
/// If it is a JSON Term, then it is preprended with the path that leads to this leaf value.
const FAST_VALUE_TERM_LEN: usize = 4 + 1 + 8;
/// Separates the different segments of
/// the json path.
pub const JSON_PATH_SEGMENT_SEP: u8 = 1u8;
pub const JSON_PATH_SEGMENT_SEP_STR: &str =
unsafe { std::str::from_utf8_unchecked(&[JSON_PATH_SEGMENT_SEP]) };
/// Separates the json path and the value in
/// a JSON term binary representation.
pub const JSON_END_OF_PATH: u8 = 0u8;
/// Term represents the value that the token can take.
///
/// It actually wraps a `Vec<u8>`.
@@ -17,6 +36,12 @@ const FAST_VALUE_TERM_LEN: usize = 4 + 1 + 8;
pub struct Term<B = Vec<u8>>(B)
where B: AsRef<[u8]>;
impl AsMut<Vec<u8>> for Term {
fn as_mut(&mut self) -> &mut Vec<u8> {
&mut self.0
}
}
impl Term {
pub(crate) fn new() -> Term {
Term(Vec::with_capacity(100))
@@ -120,6 +145,22 @@ impl Term {
pub fn set_text(&mut self, text: &str) {
self.set_bytes(text.as_bytes());
}
/// Removes the value_bytes and set the type code.
pub fn clear_with_type(&mut self, typ: Type) {
self.truncate(5);
self.0[4] = typ.to_code();
}
/// Truncate the term right after the field and the type code.
pub fn truncate(&mut self, len: usize) {
self.0.truncate(len);
}
/// Truncate the term right after the field and the type code.
pub fn append_bytes(&mut self, bytes: &[u8]) {
self.0.extend_from_slice(bytes);
}
}
impl<B> Ord for Term<B>
@@ -164,13 +205,16 @@ where B: AsRef<[u8]>
Term(data)
}
fn typ_code(&self) -> u8 {
*self
.as_slice()
.get(4)
.expect("the byte representation is too short")
}
/// Return the type of the term.
pub fn typ(&self) -> Type {
assert!(
self.as_slice().len() >= 5,
"the type does byte representation is too short"
);
Type::from_code(self.as_slice()[4]).expect("The term has an invalid type code")
Type::from_code(self.typ_code()).expect("The term has an invalid type code")
}
/// Returns the field.
@@ -189,10 +233,14 @@ where B: AsRef<[u8]>
}
fn get_fast_type<T: FastValue>(&self) -> Option<T> {
if self.typ() != T::to_type() || self.as_slice().len() != FAST_VALUE_TERM_LEN {
if self.typ() != T::to_type() {
return None;
}
let mut value_bytes = [0u8; 8];
let bytes = self.value_bytes();
if bytes.len() != 8 {
return None;
}
value_bytes.copy_from_slice(self.value_bytes());
let value_u64 = u64::from_be_bytes(value_bytes);
Some(FastValue::from_u64(value_u64))
@@ -290,40 +338,74 @@ fn write_opt<T: std::fmt::Debug>(f: &mut fmt::Formatter, val_opt: Option<T>) ->
Ok(())
}
impl fmt::Debug for Term {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let field_id = self.field().field_id();
let typ = self.typ();
write!(f, "Term(type={:?}, field={}, val=", typ, field_id,)?;
match typ {
Type::Str => {
let s = str::from_utf8(self.value_bytes()).ok();
write_opt(f, s)?;
}
Type::U64 => {
write_opt(f, self.as_u64())?;
}
Type::I64 => {
let val_i64 = self.as_i64();
write_opt(f, val_i64)?;
}
Type::F64 => {
let val_f64 = self.as_f64();
write_opt(f, val_f64)?;
}
// TODO pretty print these types too.
Type::Date => {
let val_date = self.as_date();
write_opt(f, val_date)?;
}
Type::Facet => {
let facet = self.as_facet().map(|facet| facet.to_path_string());
write_opt(f, facet)?;
}
Type::Bytes => {
write_opt(f, self.as_bytes())?;
fn as_str(value_bytes: &[u8]) -> Option<&str> {
std::str::from_utf8(value_bytes).ok()
}
fn get_fast_type<T: FastValue>(bytes: &[u8]) -> Option<T> {
let value_u64 = u64::from_be_bytes(bytes.try_into().ok()?);
Some(FastValue::from_u64(value_u64))
}
/// Returns the json path (without non-human friendly separators, the type of the value, and the
/// value bytes). Returns None if the value is not JSON or is not valid.
pub(crate) fn as_json_path_type_value_bytes(bytes: &[u8]) -> Option<(&str, Type, &[u8])> {
let pos = bytes.iter().cloned().position(|b| b == JSON_END_OF_PATH)?;
let json_path = str::from_utf8(&bytes[..pos]).ok()?;
let type_code = *bytes.get(pos + 1)?;
let typ = Type::from_code(type_code)?;
Some((json_path, typ, &bytes[pos + 2..]))
}
fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
match typ {
Type::Str => {
let s = as_str(bytes);
write_opt(f, s)?;
}
Type::U64 => {
write_opt(f, get_fast_type::<u64>(bytes))?;
}
Type::I64 => {
write_opt(f, get_fast_type::<i64>(bytes))?;
}
Type::F64 => {
write_opt(f, get_fast_type::<f64>(bytes))?;
}
// TODO pretty print these types too.
Type::Date => {
write_opt(f, get_fast_type::<crate::DateTime>(bytes))?;
}
Type::Facet => {
let facet_str = str::from_utf8(bytes)
.ok()
.map(ToString::to_string)
.map(Facet::from_encoded_string)
.map(|facet| facet.to_path_string());
write_opt(f, facet_str)?;
}
Type::Bytes => {
write_opt(f, Some(bytes))?;
}
Type::Json => {
if let Some((path, typ, bytes)) = as_json_path_type_value_bytes(bytes) {
let path_pretty = path.replace(JSON_PATH_SEGMENT_SEP_STR, ".");
write!(f, "path={path_pretty}, vtype={typ:?}, ")?;
debug_value_bytes(typ, bytes, f)?;
}
}
}
Ok(())
}
impl<B> fmt::Debug for Term<B>
where B: AsRef<[u8]>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let field_id = self.field().field_id();
let typ = self.typ();
write!(f, "Term(type={typ:?}, field={field_id}, ")?;
debug_value_bytes(typ, self.value_bytes(), f)?;
write!(f, ")",)?;
Ok(())
}

View File

@@ -19,7 +19,7 @@ impl TextOptions {
self.indexing.as_ref()
}
/// Returns true iff the text is to be stored.
/// Returns true if the text is to be stored.
pub fn is_stored(&self) -> bool {
self.stored
}
@@ -46,7 +46,7 @@ impl TextOptions {
/// Essentially, should we store the term frequency and/or the positions (See
/// [`IndexRecordOption`](./enum.IndexRecordOption.html)).
/// - the name of the `Tokenizer` that should be used to process the field.
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Debug, Eq, Serialize, Deserialize)]
pub struct TextFieldIndexing {
record: IndexRecordOption,
fieldnorms: bool,
@@ -83,7 +83,7 @@ impl TextFieldIndexing {
self
}
/// Returns true iff fieldnorms are stored.
/// Returns true if and only if fieldnorms are stored.
pub fn fieldnorms(&self) -> bool {
self.fieldnorms
}

View File

@@ -2,6 +2,7 @@ use std::fmt;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Map;
use crate::schema::Facet;
use crate::tokenizer::PreTokenizedString;
@@ -27,6 +28,8 @@ pub enum Value {
Facet(Facet),
/// Arbitrarily sized byte array
Bytes(Vec<u8>),
/// Json object value.
JsonObject(serde_json::Map<String, serde_json::Value>),
}
impl Eq for Value {}
@@ -43,6 +46,7 @@ impl Serialize for Value {
Value::Date(ref date) => serializer.serialize_str(&date.to_rfc3339()),
Value::Facet(ref facet) => facet.serialize(serializer),
Value::Bytes(ref bytes) => serializer.serialize_bytes(bytes),
Value::JsonObject(ref obj) => obj.serialize(serializer),
}
}
}
@@ -168,6 +172,17 @@ impl Value {
None
}
}
/// Returns the json object, provided the value is of the JsonObject type.
///
/// Returns None if the value is not of type JsonObject.
pub fn as_json(&self) -> Option<&Map<String, serde_json::Value>> {
if let Value::JsonObject(json) = self {
Some(json)
} else {
None
}
}
}
impl From<String> for Value {
@@ -230,6 +245,23 @@ impl From<PreTokenizedString> for Value {
}
}
impl From<serde_json::Map<String, serde_json::Value>> for Value {
fn from(json_object: serde_json::Map<String, serde_json::Value>) -> Value {
Value::JsonObject(json_object)
}
}
impl From<serde_json::Value> for Value {
fn from(json_value: serde_json::Value) -> Value {
match json_value {
serde_json::Value::Object(json_object) => Value::JsonObject(json_object),
_ => {
panic!("Expected a json object.");
}
}
}
}
mod binary_serialize {
use std::io::{self, Read, Write};
@@ -248,6 +280,7 @@ mod binary_serialize {
const DATE_CODE: u8 = 5;
const F64_CODE: u8 = 6;
const EXT_CODE: u8 = 7;
const JSON_OBJ_CODE: u8 = 8;
// extended types
@@ -296,8 +329,14 @@ mod binary_serialize {
BYTES_CODE.serialize(writer)?;
bytes.serialize(writer)
}
Value::JsonObject(ref map) => {
JSON_OBJ_CODE.serialize(writer)?;
serde_json::to_writer(writer, &map)?;
Ok(())
}
}
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let type_code = u8::deserialize(reader)?;
match type_code {
@@ -347,6 +386,10 @@ mod binary_serialize {
)),
}
}
JSON_OBJ_CODE => {
let map = serde_json::from_reader(reader)?;
Ok(Value::JsonObject(map))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("No field type is associated with code {:?}", type_code),

View File

@@ -4,11 +4,12 @@ use std::sync::{Arc, Mutex};
use common::{BinarySerializable, HasLen, VInt};
use lru::LruCache;
use ownedbytes::OwnedBytes;
use super::footer::DocStoreFooter;
use super::index::SkipIndex;
use super::Compressor;
use crate::directory::{FileSlice, OwnedBytes};
use crate::directory::FileSlice;
use crate::error::DataCorruption;
use crate::fastfield::AliveBitSet;
use crate::schema::Document;
@@ -239,6 +240,60 @@ impl StoreReader {
}
}
#[cfg(feature = "quickwit")]
impl StoreReader {
async fn read_block_async(&self, checkpoint: &Checkpoint) -> crate::AsyncIoResult<Block> {
if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.byte_range.start) {
self.cache_hits.fetch_add(1, Ordering::SeqCst);
return Ok(block.clone());
}
self.cache_misses.fetch_add(1, Ordering::SeqCst);
let compressed_block = self
.data
.slice(checkpoint.byte_range.clone())
.read_bytes_async()
.await?;
let mut decompressed_block = vec![];
self.compressor
.decompress(compressed_block.as_slice(), &mut decompressed_block)?;
let block = OwnedBytes::new(decompressed_block);
self.cache
.lock()
.unwrap()
.put(checkpoint.byte_range.start, block.clone());
Ok(block)
}
/// Fetches a document asynchronously.
async fn get_document_bytes_async(&self, doc_id: DocId) -> crate::Result<OwnedBytes> {
let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
})?;
let block = self.read_block_async(&checkpoint).await?;
let mut cursor = &block[..];
let cursor_len_before = cursor.len();
for _ in checkpoint.doc_range.start..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
let start_pos = cursor_len_before - cursor.len();
let end_pos = cursor_len_before - cursor.len() + doc_length;
Ok(block.slice(start_pos..end_pos))
}
/// Reads raw bytes of a given document. Returns `RawDocument`, which contains the block of a
/// document and its start and end position within the block.
pub(crate) async fn get_async(&self, doc_id: DocId) -> crate::Result<Document> {
let mut doc_bytes = self.get_document_bytes_async(doc_id).await?;
Ok(Document::deserialize(&mut doc_bytes)?)
}
}
#[cfg(test)]
mod tests {
use std::path::Path;

View File

@@ -51,18 +51,19 @@ impl<'a> TermMerger<'a> {
/// Returns `true` if there is indeed another term
/// `false` if there is none.
pub fn advance(&mut self) -> bool {
if let Some((k, values)) = self.union.next() {
self.current_key.clear();
self.current_key.extend_from_slice(k);
self.current_segment_and_term_ordinals.clear();
self.current_segment_and_term_ordinals
.extend_from_slice(values);
self.current_segment_and_term_ordinals
.sort_by_key(|iv| iv.index);
true
let (key, values) = if let Some((key, values)) = self.union.next() {
(key, values)
} else {
false
}
return false;
};
self.current_key.clear();
self.current_key.extend_from_slice(key);
self.current_segment_and_term_ordinals.clear();
self.current_segment_and_term_ordinals
.extend_from_slice(values);
self.current_segment_and_term_ordinals
.sort_by_key(|iv| iv.index);
true
}
/// Returns the current term.

View File

@@ -18,9 +18,11 @@
//!
//! A second datastructure makes it possible to access a
//! [`TermInfo`](../postings/struct.TermInfo.html).
mod merger;
mod streamer;
mod term_info_store;
mod termdict;
pub use self::merger::TermMerger;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};

View File

@@ -143,12 +143,13 @@ impl TermDictionary {
Ok(self.fst_index.get(key))
}
/// Returns the term associated to a given term ordinal.
/// Stores the term associated to a given term ordinal in
/// a `bytes` buffer.
///
/// Term ordinals are defined as the position of the term in
/// the sorted list of terms.
///
/// Returns true iff the term has been found.
/// Returns true if and only if the term has been found.
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.

View File

@@ -19,16 +19,41 @@
//! A second datastructure makes it possible to access a
//! [`TermInfo`](../postings/struct.TermInfo.html).
#[cfg(not(feature = "quickwit"))]
mod fst_termdict;
#[cfg(not(feature = "quickwit"))]
use fst_termdict as termdict;
mod merger;
#[cfg(feature = "quickwit")]
mod sstable_termdict;
#[cfg(feature = "quickwit")]
use sstable_termdict as termdict;
use tantivy_fst::automaton::AlwaysMatch;
pub use self::merger::TermMerger;
pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer};
#[cfg(test)]
mod tests;
/// Position of the term in the sorted list of terms.
pub type TermOrdinal = u64;
#[cfg(test)]
mod tests;
/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
pub type TermDictionary = self::termdict::TermDictionary;
/// Builder for the new term dictionary.
///
/// Inserting must be done in the order of the `keys`.
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub type TermMerger<'a> = self::termdict::TermMerger<'a>;
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;

View File

@@ -0,0 +1,120 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use crate::postings::TermInfo;
use crate::termdict::{TermOrdinal, TermStreamer};
pub struct HeapItem<'a> {
pub streamer: TermStreamer<'a>,
pub segment_ord: usize,
}
impl<'a> PartialEq for HeapItem<'a> {
fn eq(&self, other: &Self) -> bool {
self.segment_ord == other.segment_ord
}
}
impl<'a> Eq for HeapItem<'a> {}
impl<'a> PartialOrd for HeapItem<'a> {
fn partial_cmp(&self, other: &HeapItem<'a>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a> Ord for HeapItem<'a> {
fn cmp(&self, other: &HeapItem<'a>) -> Ordering {
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
}
}
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub struct TermMerger<'a> {
heap: BinaryHeap<HeapItem<'a>>,
current_streamers: Vec<HeapItem<'a>>,
}
impl<'a> TermMerger<'a> {
/// Stream of merged term dictionary
pub fn new(streams: Vec<TermStreamer<'a>>) -> TermMerger<'a> {
TermMerger {
heap: BinaryHeap::new(),
current_streamers: streams
.into_iter()
.enumerate()
.map(|(ord, streamer)| HeapItem {
streamer,
segment_ord: ord,
})
.collect(),
}
}
pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
}
fn advance_segments(&mut self) {
let streamers = &mut self.current_streamers;
let heap = &mut self.heap;
for mut heap_item in streamers.drain(..) {
if heap_item.streamer.advance() {
heap.push(heap_item);
}
}
}
/// Advance the term iterator to the next term.
/// Returns true if there is indeed another term
/// False if there is none.
pub fn advance(&mut self) -> bool {
self.advance_segments();
if let Some(head) = self.heap.pop() {
self.current_streamers.push(head);
while let Some(next_streamer) = self.heap.peek() {
if self.current_streamers[0].streamer.key() != next_streamer.streamer.key() {
break;
}
let next_heap_it = self.heap.pop().unwrap(); // safe : we peeked beforehand
self.current_streamers.push(next_heap_it);
}
true
} else {
false
}
}
/// Returns the current term.
///
/// This method may be called
/// if and only if advance() has been called before
/// and "true" was returned.
pub fn key(&self) -> &[u8] {
self.current_streamers[0].streamer.key()
}
/// Returns the sorted list of segment ordinals
/// that include the current term.
///
/// This method may be called
/// if and only if advance() has been called before
/// and "true" was returned.
pub fn current_segment_ords_and_term_infos<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermInfo)> {
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.value().clone()))
}
}

View File

@@ -0,0 +1,144 @@
use std::io;
mod merger;
mod sstable;
mod streamer;
mod termdict;
use std::iter::ExactSizeIterator;
use common::VInt;
pub use self::merger::TermMerger;
use self::sstable::value::{ValueReader, ValueWriter};
use self::sstable::{BlockReader, SSTable};
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
use crate::postings::TermInfo;
pub struct TermSSTable;
impl SSTable for TermSSTable {
type Value = TermInfo;
type Reader = TermInfoReader;
type Writer = TermInfoWriter;
}
#[derive(Default)]
pub struct TermInfoReader {
term_infos: Vec<TermInfo>,
}
impl ValueReader for TermInfoReader {
type Value = TermInfo;
fn value(&self, idx: usize) -> &TermInfo {
&self.term_infos[idx]
}
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
self.term_infos.clear();
let num_els = VInt::deserialize_u64(reader)?;
let mut postings_start = VInt::deserialize_u64(reader)? as usize;
let mut positions_start = VInt::deserialize_u64(reader)? as usize;
for _ in 0..num_els {
let doc_freq = VInt::deserialize_u64(reader)? as u32;
let postings_num_bytes = VInt::deserialize_u64(reader)?;
let positions_num_bytes = VInt::deserialize_u64(reader)?;
let postings_end = postings_start + postings_num_bytes as usize;
let positions_end = positions_start + positions_num_bytes as usize;
let term_info = TermInfo {
doc_freq,
postings_range: postings_start..postings_end,
positions_range: positions_start..positions_end,
};
self.term_infos.push(term_info);
postings_start = postings_end;
positions_start = positions_end;
}
Ok(())
}
}
#[derive(Default)]
pub struct TermInfoWriter {
term_infos: Vec<TermInfo>,
}
impl ValueWriter for TermInfoWriter {
type Value = TermInfo;
fn write(&mut self, term_info: &TermInfo) {
self.term_infos.push(term_info.clone());
}
fn write_block(&mut self, buffer: &mut Vec<u8>) {
VInt(self.term_infos.len() as u64).serialize_into_vec(buffer);
if self.term_infos.is_empty() {
return;
}
VInt(self.term_infos[0].postings_range.start as u64).serialize_into_vec(buffer);
VInt(self.term_infos[0].positions_range.start as u64).serialize_into_vec(buffer);
for term_info in &self.term_infos {
VInt(term_info.doc_freq as u64).serialize_into_vec(buffer);
VInt(term_info.postings_range.len() as u64).serialize_into_vec(buffer);
VInt(term_info.positions_range.len() as u64).serialize_into_vec(buffer);
}
self.term_infos.clear();
}
}
#[cfg(test)]
mod tests {
use std::io;
use super::BlockReader;
use crate::directory::OwnedBytes;
use crate::postings::TermInfo;
use crate::termdict::sstable_termdict::sstable::value::{ValueReader, ValueWriter};
use crate::termdict::sstable_termdict::TermInfoReader;
#[test]
fn test_block_terminfos() -> io::Result<()> {
let mut term_info_writer = super::TermInfoWriter::default();
term_info_writer.write(&TermInfo {
doc_freq: 120u32,
postings_range: 17..45,
positions_range: 10..122,
});
term_info_writer.write(&TermInfo {
doc_freq: 10u32,
postings_range: 45..450,
positions_range: 122..1100,
});
term_info_writer.write(&TermInfo {
doc_freq: 17u32,
postings_range: 450..462,
positions_range: 1100..1302,
});
let mut buffer = Vec::new();
term_info_writer.write_block(&mut buffer);
let mut block_reader = make_block_reader(&buffer[..]);
let mut term_info_reader = TermInfoReader::default();
term_info_reader.read(&mut block_reader)?;
assert_eq!(
term_info_reader.value(0),
&TermInfo {
doc_freq: 120u32,
postings_range: 17..45,
positions_range: 10..122
}
);
assert!(block_reader.buffer().is_empty());
Ok(())
}
fn make_block_reader(data: &[u8]) -> BlockReader {
let mut buffer = (data.len() as u32).to_le_bytes().to_vec();
buffer.extend_from_slice(data);
let owned_bytes = OwnedBytes::new(buffer);
let mut block_reader = BlockReader::new(Box::new(owned_bytes));
block_reader.read_block().unwrap();
block_reader
}
}

View File

@@ -0,0 +1,81 @@
use std::io::{self, Read};
use byteorder::{LittleEndian, ReadBytesExt};
pub struct BlockReader<'a> {
buffer: Vec<u8>,
reader: Box<dyn io::Read + 'a>,
offset: usize,
}
impl<'a> BlockReader<'a> {
pub fn new(reader: Box<dyn io::Read + 'a>) -> BlockReader<'a> {
BlockReader {
buffer: Vec::new(),
reader,
offset: 0,
}
}
pub fn deserialize_u64(&mut self) -> u64 {
let (num_bytes, val) = super::vint::deserialize_read(self.buffer());
self.advance(num_bytes);
val
}
#[inline(always)]
pub fn buffer_from_to(&self, start: usize, end: usize) -> &[u8] {
&self.buffer[start..end]
}
pub fn read_block(&mut self) -> io::Result<bool> {
self.offset = 0;
let block_len_res = self.reader.read_u32::<LittleEndian>();
if let Err(err) = &block_len_res {
if err.kind() == io::ErrorKind::UnexpectedEof {
return Ok(false);
}
}
let block_len = block_len_res?;
if block_len == 0u32 {
self.buffer.clear();
return Ok(false);
}
self.buffer.resize(block_len as usize, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
Ok(true)
}
pub fn offset(&self) -> usize {
self.offset
}
pub fn advance(&mut self, num_bytes: usize) {
self.offset += num_bytes;
}
pub fn buffer(&self) -> &[u8] {
&self.buffer[self.offset..]
}
}
impl<'a> io::Read for BlockReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.buffer().read(buf)?;
self.advance(len);
Ok(len)
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
let len = self.buffer.len();
buf.extend_from_slice(self.buffer());
self.advance(len);
Ok(len)
}
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
self.buffer().read_exact(buf)?;
self.advance(buf.len());
Ok(())
}
}

Some files were not shown because too many files have changed in this diff Show More